Skip to content

geeknam/esser

Repository files navigation

esser - [E]vent [S]ourcing [Ser]verlessly

pypi version pypi package Build Status Coverage Status Code Issues Slack

  • Serverless + Pay-As-You-Go
  • Aggregates
  • Snapshots
  • Projections

Architectural Design

[Esser Diagram]

Features

  • Command validation
  • Datastore agnostic read layer
  • Push based messaging via DynamoDB Stream
  • Built-in Snapshotting
  • Publish / subscribe style signalling
  • Generated Cloudformation templates (Infrastructure as Code)

Components

  • Runtime: AWS Lambda (Python)
  • Append Only Event Store: DynamoDB
  • Event Source Triggers: DynamoDB Stream
  • Read / Query Store: PostgreSQL / Elasticsearch (via contrib)

Example Usage

Add first entity

items/aggregate.py

from esser.entities import Entity
from esser.registry import register
from items import commands
from items import receivers
from items.event_handler import ItemEventHandler


@register
class Item(Entity):
    
    # set event handler to aggregate state
    event_handler = ItemEventHandler()
    created = commands.CreateItem()
    price_updated = commands.UpdatePrice()

Add commands that can be issued

items/commands.py

from esser.commands import BaseCommand, CreateCommand


class CreateItem(CreateCommand):

    event_name = 'ItemCreated'
    schema = {
        'name': {'type': 'string'},
        'price': {'type': 'float'}
    }


class UpdatePrice(BaseCommand):

    event_name = 'PriceUpdated'
    schema = {
        'price': {'type': 'float', 'diff': True}
    }

Add event handler to fold event stream

items/event_handler.py

from esser.event_handler import BaseEventHandler

class ItemEventHandler(BaseEventHandler):

    def on_item_created(self, aggregate, next_event):
        return self.on_created(aggregate, next_event)

    def on_price_updated(self, aggregate, next_event):
        aggregate['price'] = next_event.event_data['price']
        return aggregate

Subscribe to events

items/receivers.py

from esser.signals.decorators import receiver
from esser.signals import event_pre_save, event_received, event_post_save
from esser.handlers import LambdaHandler
from items.commands import UpdatePrice


@receiver(event_pre_save, sender=UpdatePrice)
def presave_price_updated(sender, **kwargs):
    # Do something before saving the event
    pass


@receiver(event_received, sender=LambdaHandler)
def received_command(sender, **kwargs):
    # when the command is received
    pass

@receiver(event_post_save)
def handle_event_saved(sender, **kwargs):
    # when the event has already been saved
    pass