diff --git a/.github/workflows/test-integrations-data-processing.yml b/.github/workflows/test-integrations-data-processing.yml index 1585adb20e..cb872d3196 100644 --- a/.github/workflows/test-integrations-data-processing.yml +++ b/.github/workflows/test-integrations-data-processing.yml @@ -57,6 +57,10 @@ jobs: run: | set -x # print commands that are executed ./scripts/runtox.sh "py${{ matrix.python-version }}-celery-latest" + - name: Test dramatiq latest + run: | + set -x # print commands that are executed + ./scripts/runtox.sh "py${{ matrix.python-version }}-dramatiq-latest" - name: Test huey latest run: | set -x # print commands that are executed @@ -125,6 +129,10 @@ jobs: run: | set -x # print commands that are executed ./scripts/runtox.sh --exclude-latest "py${{ matrix.python-version }}-celery" + - name: Test dramatiq pinned + run: | + set -x # print commands that are executed + ./scripts/runtox.sh --exclude-latest "py${{ matrix.python-version }}-dramatiq" - name: Test huey pinned run: | set -x # print commands that are executed diff --git a/scripts/split-tox-gh-actions/split-tox-gh-actions.py b/scripts/split-tox-gh-actions/split-tox-gh-actions.py index b9f978d850..002b930b68 100755 --- a/scripts/split-tox-gh-actions/split-tox-gh-actions.py +++ b/scripts/split-tox-gh-actions/split-tox-gh-actions.py @@ -80,6 +80,7 @@ "arq", "beam", "celery", + "dramatiq", "huey", "rq", "spark", diff --git a/sentry_sdk/integrations/dramatiq.py b/sentry_sdk/integrations/dramatiq.py new file mode 100644 index 0000000000..673c3323e8 --- /dev/null +++ b/sentry_sdk/integrations/dramatiq.py @@ -0,0 +1,167 @@ +import json + +import sentry_sdk +from sentry_sdk.integrations import Integration +from sentry_sdk._types import TYPE_CHECKING +from sentry_sdk.integrations._wsgi_common import request_body_within_bounds +from sentry_sdk.utils import ( + AnnotatedValue, + capture_internal_exceptions, + event_from_exception, +) + +from dramatiq.broker import Broker # type: ignore +from dramatiq.message import Message # type: ignore +from dramatiq.middleware import Middleware, default_middleware # type: ignore +from dramatiq.errors import Retry # type: ignore + +if TYPE_CHECKING: + from typing import Any, Callable, Dict, Optional, Union + from sentry_sdk._types import Event, Hint + + +class DramatiqIntegration(Integration): + """ + Dramatiq integration for Sentry + + Please make sure that you call `sentry_sdk.init` *before* initializing + your broker, as it monkey patches `Broker.__init__`. + + This integration was originally developed and maintained + by https://github.com/jacobsvante and later donated to the Sentry + project. + """ + + identifier = "dramatiq" + + @staticmethod + def setup_once(): + # type: () -> None + _patch_dramatiq_broker() + + +def _patch_dramatiq_broker(): + # type: () -> None + original_broker__init__ = Broker.__init__ + + def sentry_patched_broker__init__(self, *args, **kw): + # type: (Broker, *Any, **Any) -> None + integration = sentry_sdk.get_client().get_integration(DramatiqIntegration) + + try: + middleware = kw.pop("middleware") + except KeyError: + # Unfortunately Broker and StubBroker allows middleware to be + # passed in as positional arguments, whilst RabbitmqBroker and + # RedisBroker does not. + if len(args) == 1: + middleware = args[0] + args = [] # type: ignore + else: + middleware = None + + if middleware is None: + middleware = list(m() for m in default_middleware) + else: + middleware = list(middleware) + + if integration is not None: + middleware = [m for m in middleware if not isinstance(m, SentryMiddleware)] + middleware.insert(0, SentryMiddleware()) + + kw["middleware"] = middleware + original_broker__init__(self, *args, **kw) + + Broker.__init__ = sentry_patched_broker__init__ + + +class SentryMiddleware(Middleware): # type: ignore[misc] + """ + A Dramatiq middleware that automatically captures and sends + exceptions to Sentry. + + This is automatically added to every instantiated broker via the + DramatiqIntegration. + """ + + def before_process_message(self, broker, message): + # type: (Broker, Message) -> None + integration = sentry_sdk.get_client().get_integration(DramatiqIntegration) + if integration is None: + return + + message._scope_manager = sentry_sdk.new_scope() + message._scope_manager.__enter__() + + scope = sentry_sdk.get_current_scope() + scope.transaction = message.actor_name + scope.set_extra("dramatiq_message_id", message.message_id) + scope.add_event_processor(_make_message_event_processor(message, integration)) + + def after_process_message(self, broker, message, *, result=None, exception=None): + # type: (Broker, Message, Any, Optional[Any], Optional[Exception]) -> None + integration = sentry_sdk.get_client().get_integration(DramatiqIntegration) + if integration is None: + return + + actor = broker.get_actor(message.actor_name) + throws = message.options.get("throws") or actor.options.get("throws") + + try: + if ( + exception is not None + and not (throws and isinstance(exception, throws)) + and not isinstance(exception, Retry) + ): + event, hint = event_from_exception( + exception, + client_options=sentry_sdk.get_client().options, + mechanism={ + "type": DramatiqIntegration.identifier, + "handled": False, + }, + ) + sentry_sdk.capture_event(event, hint=hint) + finally: + message._scope_manager.__exit__(None, None, None) + + +def _make_message_event_processor(message, integration): + # type: (Message, DramatiqIntegration) -> Callable[[Event, Hint], Optional[Event]] + + def inner(event, hint): + # type: (Event, Hint) -> Optional[Event] + with capture_internal_exceptions(): + DramatiqMessageExtractor(message).extract_into_event(event) + + return event + + return inner + + +class DramatiqMessageExtractor(object): + def __init__(self, message): + # type: (Message) -> None + self.message_data = dict(message.asdict()) + + def content_length(self): + # type: () -> int + return len(json.dumps(self.message_data)) + + def extract_into_event(self, event): + # type: (Event) -> None + client = sentry_sdk.get_client() + if not client.is_active(): + return + + contexts = event.setdefault("contexts", {}) + request_info = contexts.setdefault("dramatiq", {}) + request_info["type"] = "dramatiq" + + data = None # type: Optional[Union[AnnotatedValue, Dict[str, Any]]] + if not request_body_within_bounds(client, self.content_length()): + data = AnnotatedValue.removed_because_over_size_limit() + else: + data = self.message_data + + request_info["data"] = data diff --git a/tests/integrations/dramatiq/__init__.py b/tests/integrations/dramatiq/__init__.py new file mode 100644 index 0000000000..70bbf21db4 --- /dev/null +++ b/tests/integrations/dramatiq/__init__.py @@ -0,0 +1,3 @@ +import pytest + +pytest.importorskip("dramatiq") diff --git a/tests/integrations/dramatiq/test_dramatiq.py b/tests/integrations/dramatiq/test_dramatiq.py new file mode 100644 index 0000000000..d7917cbd00 --- /dev/null +++ b/tests/integrations/dramatiq/test_dramatiq.py @@ -0,0 +1,231 @@ +import pytest +import uuid + +import dramatiq +from dramatiq.brokers.stub import StubBroker + +import sentry_sdk +from sentry_sdk.integrations.dramatiq import DramatiqIntegration + + +@pytest.fixture +def broker(sentry_init): + sentry_init(integrations=[DramatiqIntegration()]) + broker = StubBroker() + broker.emit_after("process_boot") + dramatiq.set_broker(broker) + yield broker + broker.flush_all() + broker.close() + + +@pytest.fixture +def worker(broker): + worker = dramatiq.Worker(broker, worker_timeout=100, worker_threads=1) + worker.start() + yield worker + worker.stop() + + +def test_that_a_single_error_is_captured(broker, worker, capture_events): + events = capture_events() + + @dramatiq.actor(max_retries=0) + def dummy_actor(x, y): + return x / y + + dummy_actor.send(1, 2) + dummy_actor.send(1, 0) + broker.join(dummy_actor.queue_name) + worker.join() + + (event,) = events + exception = event["exception"]["values"][0] + assert exception["type"] == "ZeroDivisionError" + + +def test_that_actor_name_is_set_as_transaction(broker, worker, capture_events): + events = capture_events() + + @dramatiq.actor(max_retries=0) + def dummy_actor(x, y): + return x / y + + dummy_actor.send(1, 0) + broker.join(dummy_actor.queue_name) + worker.join() + + (event,) = events + assert event["transaction"] == "dummy_actor" + + +def test_that_dramatiq_message_id_is_set_as_extra(broker, worker, capture_events): + events = capture_events() + + @dramatiq.actor(max_retries=0) + def dummy_actor(x, y): + sentry_sdk.capture_message("hi") + return x / y + + dummy_actor.send(1, 0) + broker.join(dummy_actor.queue_name) + worker.join() + + event_message, event_error = events + assert "dramatiq_message_id" in event_message["extra"] + assert "dramatiq_message_id" in event_error["extra"] + assert ( + event_message["extra"]["dramatiq_message_id"] + == event_error["extra"]["dramatiq_message_id"] + ) + msg_ids = [e["extra"]["dramatiq_message_id"] for e in events] + assert all(uuid.UUID(msg_id) and isinstance(msg_id, str) for msg_id in msg_ids) + + +def test_that_local_variables_are_captured(broker, worker, capture_events): + events = capture_events() + + @dramatiq.actor(max_retries=0) + def dummy_actor(x, y): + foo = 42 # noqa + return x / y + + dummy_actor.send(1, 2) + dummy_actor.send(1, 0) + broker.join(dummy_actor.queue_name) + worker.join() + + (event,) = events + exception = event["exception"]["values"][0] + assert exception["stacktrace"]["frames"][-1]["vars"] == { + "x": "1", + "y": "0", + "foo": "42", + } + + +def test_that_messages_are_captured(broker, worker, capture_events): + events = capture_events() + + @dramatiq.actor(max_retries=0) + def dummy_actor(): + sentry_sdk.capture_message("hi") + + dummy_actor.send() + broker.join(dummy_actor.queue_name) + worker.join() + + (event,) = events + assert event["message"] == "hi" + assert event["level"] == "info" + assert event["transaction"] == "dummy_actor" + + +def test_that_sub_actor_errors_are_captured(broker, worker, capture_events): + events = capture_events() + + @dramatiq.actor(max_retries=0) + def dummy_actor(x, y): + sub_actor.send(x, y) + + @dramatiq.actor(max_retries=0) + def sub_actor(x, y): + return x / y + + dummy_actor.send(1, 2) + dummy_actor.send(1, 0) + broker.join(dummy_actor.queue_name) + worker.join() + + (event,) = events + assert event["transaction"] == "sub_actor" + + exception = event["exception"]["values"][0] + assert exception["type"] == "ZeroDivisionError" + + +def test_that_multiple_errors_are_captured(broker, worker, capture_events): + events = capture_events() + + @dramatiq.actor(max_retries=0) + def dummy_actor(x, y): + return x / y + + dummy_actor.send(1, 0) + broker.join(dummy_actor.queue_name) + worker.join() + + dummy_actor.send(1, None) + broker.join(dummy_actor.queue_name) + worker.join() + + event1, event2 = events + + assert event1["transaction"] == "dummy_actor" + exception = event1["exception"]["values"][0] + assert exception["type"] == "ZeroDivisionError" + + assert event2["transaction"] == "dummy_actor" + exception = event2["exception"]["values"][0] + assert exception["type"] == "TypeError" + + +def test_that_message_data_is_added_as_request(broker, worker, capture_events): + events = capture_events() + + @dramatiq.actor(max_retries=0) + def dummy_actor(x, y): + return x / y + + dummy_actor.send_with_options( + args=( + 1, + 0, + ), + max_retries=0, + ) + broker.join(dummy_actor.queue_name) + worker.join() + + (event,) = events + + assert event["transaction"] == "dummy_actor" + request_data = event["contexts"]["dramatiq"]["data"] + assert request_data["queue_name"] == "default" + assert request_data["actor_name"] == "dummy_actor" + assert request_data["args"] == [1, 0] + assert request_data["kwargs"] == {} + assert request_data["options"]["max_retries"] == 0 + assert uuid.UUID(request_data["message_id"]) + assert isinstance(request_data["message_timestamp"], int) + + +def test_that_expected_exceptions_are_not_captured(broker, worker, capture_events): + events = capture_events() + + class ExpectedException(Exception): + pass + + @dramatiq.actor(max_retries=0, throws=ExpectedException) + def dummy_actor(): + raise ExpectedException + + dummy_actor.send() + broker.join(dummy_actor.queue_name) + worker.join() + + assert events == [] + + +def test_that_retry_exceptions_are_not_captured(broker, worker, capture_events): + events = capture_events() + + @dramatiq.actor(max_retries=2) + def dummy_actor(): + raise dramatiq.errors.Retry("Retrying", delay=100) + + dummy_actor.send() + broker.join(dummy_actor.queue_name) + worker.join() + + assert events == [] diff --git a/tox.ini b/tox.ini index 3acf70bb6f..98536d9860 100644 --- a/tox.ini +++ b/tox.ini @@ -108,6 +108,12 @@ envlist = {py3.10,py3.11,py3.12}-django-v{5.0,5.1} {py3.10,py3.11,py3.12}-django-latest + # dramatiq + {py3.6,py3.9}-dramatiq-v{1.13} + {py3.7,py3.10,py3.11}-dramatiq-v{1.15} + {py3.8,py3.11,py3.12}-dramatiq-v{1.17} + {py3.8,py3.11,py3.12}-dramatiq-latest + # Falcon {py3.6,py3.7}-falcon-v{1,1.4,2} {py3.6,py3.11,py3.12}-falcon-v{3} @@ -407,6 +413,12 @@ deps = django-v5.1: Django==5.1rc1 django-latest: Django + # dramatiq + dramatiq-v1.13: dramatiq>=1.13,<1.14 + dramatiq-v1.15: dramatiq>=1.15,<1.16 + dramatiq-v1.17: dramatiq>=1.17,<1.18 + dramatiq-latest: dramatiq + # Falcon falcon-v1.4: falcon~=1.4.0 falcon-v1: falcon~=1.0 @@ -683,6 +695,7 @@ setenv = cohere: TESTPATH=tests/integrations/cohere cloud_resource_context: TESTPATH=tests/integrations/cloud_resource_context django: TESTPATH=tests/integrations/django + dramatiq: TESTPATH=tests/integrations/dramatiq falcon: TESTPATH=tests/integrations/falcon fastapi: TESTPATH=tests/integrations/fastapi flask: TESTPATH=tests/integrations/flask