✉️ Async communications using AWS SNS + SQS for Python services ✨
Show me the code
from razemax.event_manager import EventManager
class NorthKoreaThreatCreatedEvent:
def __init__(self, id: int, target: str):
self.id = id
self.target = target
def trump_subscriber(event: NorthKoreaThreatCreatedEvent):
print(f"North korea will attack us or {event.target}!")
EventManager.subscribe(trump_subscriber, NorthKoreaThreatCreatedEvent)
EventManager.trigger(NorthKoreaThreatCreatedEvent(0, "Mexico"))
Result:
North korea will attack us or Mexico!
SQS queue must be subscribed to SNS topic before running the consumer, also the queue needs to filter only the events of interest. If the provided SNS topic pushes events which are not included in the mapper, then razemax will be raising exceptions.
from razemax.consumers import MessageConsumer
from razemax.drivers import SQSDriver
from razemax.event_manager import EventManager
from razemax.publisher import SNSMessagePublisher
def kp_message_to_event(message):
# Highly recommended to use Marshmallow to validate
return NorthKoreaThreatCreatedEvent(message.body['id'], message.body['target_name'])
mapper = {
'KPThreatCreated': kp_message_to_event
}
aws_settings = {
'region_name': "",
'aws_access_key_id': "",
'aws_secret_access_key': "",
'endpoint_url': ""
}
queue_driver = SQSDriver.build("korea-threats-queue", aws_settings)
MessageConsumer(mapper, EventManager, queue_driver).process_message()
publisher = SNSMessagePublisher.build(aws_settings, 'korea-topic')
publisher.publish('KPThreatCreated', {'id': 21, 'target_name': 'Portugal'})
Result:
North korea will attack us or Portugal!
pip install razemax
To run end to end tests do:
make unit-tests
make integration-tests
This project is licensed under the MIT License - see the LICENSE.md file for details