diff --git a/.gitignore b/.gitignore index 89da33f..0cc6e8a 100644 --- a/.gitignore +++ b/.gitignore @@ -27,6 +27,9 @@ share/python-wheels/ *.egg MANIFEST +#IDE +.idea* + # PyInstaller # Usually these files are written by a python script from a template # before PyInstaller builds the exe, so as to inject date/other infos into it. diff --git a/eventbusk/bus.py b/eventbusk/bus.py index 7b83576..356bb18 100644 --- a/eventbusk/bus.py +++ b/eventbusk/bus.py @@ -6,8 +6,9 @@ import json import logging import time +import uuid from abc import ABC -from dataclasses import asdict, dataclass +from dataclasses import asdict, dataclass, field from functools import wraps from typing import Callable, Type, Union @@ -31,6 +32,14 @@ class MyEvent(Event): foo: int bar: str """ + event_id: uuid.UUID = field(default_factory=uuid.uuid4, init=False) + + +class EventJsonEncoder(json.JSONEncoder): + def default(self, o): + if isinstance(o, uuid.UUID): + return str(o) + return json.JSONEncoder.default(self, o) EventT = Type[Event] @@ -120,8 +129,7 @@ def send( event_fqn = self.to_fqn(event.__class__) # TODO: Ensure unknown event throws a error. topic = self._event_to_topic[event_fqn] - - data = json.dumps(asdict(event)).encode("utf-8") + data = json.dumps(asdict(event), cls=EventJsonEncoder).encode("utf-8") try: self.producer.produce( topic=topic, value=data, flush=flush, on_delivery=on_delivery @@ -131,7 +139,7 @@ def send( if fail_silently: logger.warning( "Error producing event.", - extra={"event": event_fqn, "topic": topic}, + extra={"event": event_fqn, "topic": topic, "event_id": event.event_id}, exc_info=True, ) else: @@ -216,9 +224,25 @@ def wrapper() -> None: msg_value = message.value().decode("utf-8") # type: ignore event_data = json.loads(msg_value) + if "event_id" in event_data: + try: + event_id = uuid.UUID(event_data.pop('event_id')) + except ValueError: + logger.exception( + ( + "Error while converting str -> UUID " + ), + extra={**log_context, **{"data": event_data}}, + exc_info=True, + ) + pass + else: + event_id = None + # TODO: Fix following # Too many arguments for "Event" [call-arg] event = event_type(**event_data) # type: ignore + setattr(event, 'event_id', event_id) try: func(event) diff --git a/pyproject.toml b/pyproject.toml index 22b9348..a1caa18 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "eventbusk" -version = "0.1.0" +version = "0.1.1" description = "Event bus with Kafka" authors = ["Airbase Inc "] diff --git a/tests/test_bus.py b/tests/test_bus.py index f8d0c4d..a682ed5 100644 --- a/tests/test_bus.py +++ b/tests/test_bus.py @@ -4,6 +4,7 @@ from __future__ import annotations import logging +import uuid from dataclasses import dataclass from pytest_mock import MockerFixture @@ -43,8 +44,13 @@ def test_bus_send(mocker: MockerFixture) -> None: # Given events registered to certain topics bus.register_event(topic="first_topic", event_type=Foo) bus.register_event(topic="second_topic", event_type=Bar) + + foo_event_uuid = uuid.uuid4() + bar_event_uuid = uuid.uuid4() foo_event = Foo(first=1) + foo_event.event_id = foo_event_uuid bar_event = Bar(second=1) + bar_event.event_id = bar_event_uuid # When we send events of a different types def on_delivery(error: str, event: Event) -> None: @@ -62,13 +68,19 @@ def on_delivery(error: str, event: Event) -> None: [ mocker.call( topic="first_topic", - value=b'{"first": 1}', + value=bytes( + '{{"event_id": "{foo_event_uuid}", "first": 1}}'.format(foo_event_uuid=str(foo_event_uuid)), + "utf-8" + ), flush=True, on_delivery=on_delivery, ), mocker.call( topic="second_topic", - value=b'{"second": 1}', + value=bytes( + '{{"event_id": "{bar_event_uuid}", "second": 1}}'.format(bar_event_uuid=str(bar_event_uuid)), + "utf-8" + ), flush=True, on_delivery=on_delivery, ),