From d00292013cd7bc3b7e7b42aea5336cd18683683e Mon Sep 17 00:00:00 2001 From: Anton Pirker Date: Thu, 8 Aug 2024 15:07:08 +0200 Subject: [PATCH] Dramatiq integration from @jacobsvante (#3397) This is the code from [sentry-dramatiq](https://github.com/jacobsvante/sentry-dramatiq). As described in this GitHub issue (https://github.com/getsentry/sentry-python/issues/3387) @jacobsvante, the original maintainer of this integration, is not doing any Python anymore and wants to donate his integration to Sentry so we can take care of it. This PR adds the current version of the `DramatiqIntegration` to our repo. (The original integrations has been ported to the new SDK 2.x API) Fixes #3387 --------- Co-authored-by: Ivana Kellyer --- .../test-integrations-data-processing.yml | 8 + .../split-tox-gh-actions.py | 1 + sentry_sdk/integrations/dramatiq.py | 167 +++++++++++++ tests/integrations/dramatiq/__init__.py | 3 + tests/integrations/dramatiq/test_dramatiq.py | 231 ++++++++++++++++++ tox.ini | 13 + 6 files changed, 423 insertions(+) create mode 100644 sentry_sdk/integrations/dramatiq.py create mode 100644 tests/integrations/dramatiq/__init__.py create mode 100644 tests/integrations/dramatiq/test_dramatiq.py diff --git a/.github/workflows/test-integrations-data-processing.yml b/.github/workflows/test-integrations-data-processing.yml index 1585adb20e..cb872d3196 100644 --- a/.github/workflows/test-integrations-data-processing.yml +++ b/.github/workflows/test-integrations-data-processing.yml @@ -57,6 +57,10 @@ jobs: run: | set -x # print commands that are executed ./scripts/runtox.sh "py${{ matrix.python-version }}-celery-latest" + - name: Test dramatiq latest + run: | + set -x # print commands that are executed + ./scripts/runtox.sh "py${{ matrix.python-version }}-dramatiq-latest" - name: Test huey latest run: | set -x # print commands that are executed @@ -125,6 +129,10 @@ jobs: run: | set -x # print commands that are executed ./scripts/runtox.sh --exclude-latest "py${{ matrix.python-version }}-celery" + - name: Test dramatiq pinned + run: | + set -x # print commands that are executed + ./scripts/runtox.sh --exclude-latest "py${{ matrix.python-version }}-dramatiq" - name: Test huey pinned run: | set -x # print commands that are executed diff --git a/scripts/split-tox-gh-actions/split-tox-gh-actions.py b/scripts/split-tox-gh-actions/split-tox-gh-actions.py index b9f978d850..002b930b68 100755 --- a/scripts/split-tox-gh-actions/split-tox-gh-actions.py +++ b/scripts/split-tox-gh-actions/split-tox-gh-actions.py @@ -80,6 +80,7 @@ "arq", "beam", "celery", + "dramatiq", "huey", "rq", "spark", diff --git a/sentry_sdk/integrations/dramatiq.py b/sentry_sdk/integrations/dramatiq.py new file mode 100644 index 0000000000..673c3323e8 --- /dev/null +++ b/sentry_sdk/integrations/dramatiq.py @@ -0,0 +1,167 @@ +import json + +import sentry_sdk +from sentry_sdk.integrations import Integration +from sentry_sdk._types import TYPE_CHECKING +from sentry_sdk.integrations._wsgi_common import request_body_within_bounds +from sentry_sdk.utils import ( + AnnotatedValue, + capture_internal_exceptions, + event_from_exception, +) + +from dramatiq.broker import Broker # type: ignore +from dramatiq.message import Message # type: ignore +from dramatiq.middleware import Middleware, default_middleware # type: ignore +from dramatiq.errors import Retry # type: ignore + +if TYPE_CHECKING: + from typing import Any, Callable, Dict, Optional, Union + from sentry_sdk._types import Event, Hint + + +class DramatiqIntegration(Integration): + """ + Dramatiq integration for Sentry + + Please make sure that you call `sentry_sdk.init` *before* initializing + your broker, as it monkey patches `Broker.__init__`. + + This integration was originally developed and maintained + by https://github.com/jacobsvante and later donated to the Sentry + project. + """ + + identifier = "dramatiq" + + @staticmethod + def setup_once(): + # type: () -> None + _patch_dramatiq_broker() + + +def _patch_dramatiq_broker(): + # type: () -> None + original_broker__init__ = Broker.__init__ + + def sentry_patched_broker__init__(self, *args, **kw): + # type: (Broker, *Any, **Any) -> None + integration = sentry_sdk.get_client().get_integration(DramatiqIntegration) + + try: + middleware = kw.pop("middleware") + except KeyError: + # Unfortunately Broker and StubBroker allows middleware to be + # passed in as positional arguments, whilst RabbitmqBroker and + # RedisBroker does not. + if len(args) == 1: + middleware = args[0] + args = [] # type: ignore + else: + middleware = None + + if middleware is None: + middleware = list(m() for m in default_middleware) + else: + middleware = list(middleware) + + if integration is not None: + middleware = [m for m in middleware if not isinstance(m, SentryMiddleware)] + middleware.insert(0, SentryMiddleware()) + + kw["middleware"] = middleware + original_broker__init__(self, *args, **kw) + + Broker.__init__ = sentry_patched_broker__init__ + + +class SentryMiddleware(Middleware): # type: ignore[misc] + """ + A Dramatiq middleware that automatically captures and sends + exceptions to Sentry. + + This is automatically added to every instantiated broker via the + DramatiqIntegration. + """ + + def before_process_message(self, broker, message): + # type: (Broker, Message) -> None + integration = sentry_sdk.get_client().get_integration(DramatiqIntegration) + if integration is None: + return + + message._scope_manager = sentry_sdk.new_scope() + message._scope_manager.__enter__() + + scope = sentry_sdk.get_current_scope() + scope.transaction = message.actor_name + scope.set_extra("dramatiq_message_id", message.message_id) + scope.add_event_processor(_make_message_event_processor(message, integration)) + + def after_process_message(self, broker, message, *, result=None, exception=None): + # type: (Broker, Message, Any, Optional[Any], Optional[Exception]) -> None + integration = sentry_sdk.get_client().get_integration(DramatiqIntegration) + if integration is None: + return + + actor = broker.get_actor(message.actor_name) + throws = message.options.get("throws") or actor.options.get("throws") + + try: + if ( + exception is not None + and not (throws and isinstance(exception, throws)) + and not isinstance(exception, Retry) + ): + event, hint = event_from_exception( + exception, + client_options=sentry_sdk.get_client().options, + mechanism={ + "type": DramatiqIntegration.identifier, + "handled": False, + }, + ) + sentry_sdk.capture_event(event, hint=hint) + finally: + message._scope_manager.__exit__(None, None, None) + + +def _make_message_event_processor(message, integration): + # type: (Message, DramatiqIntegration) -> Callable[[Event, Hint], Optional[Event]] + + def inner(event, hint): + # type: (Event, Hint) -> Optional[Event] + with capture_internal_exceptions(): + DramatiqMessageExtractor(message).extract_into_event(event) + + return event + + return inner + + +class DramatiqMessageExtractor(object): + def __init__(self, message): + # type: (Message) -> None + self.message_data = dict(message.asdict()) + + def content_length(self): + # type: () -> int + return len(json.dumps(self.message_data)) + + def extract_into_event(self, event): + # type: (Event) -> None + client = sentry_sdk.get_client() + if not client.is_active(): + return + + contexts = event.setdefault("contexts", {}) + request_info = contexts.setdefault("dramatiq", {}) + request_info["type"] = "dramatiq" + + data = None # type: Optional[Union[AnnotatedValue, Dict[str, Any]]] + if not request_body_within_bounds(client, self.content_length()): + data = AnnotatedValue.removed_because_over_size_limit() + else: + data = self.message_data + + request_info["data"] = data diff --git a/tests/integrations/dramatiq/__init__.py b/tests/integrations/dramatiq/__init__.py new file mode 100644 index 0000000000..70bbf21db4 --- /dev/null +++ b/tests/integrations/dramatiq/__init__.py @@ -0,0 +1,3 @@ +import pytest + +pytest.importorskip("dramatiq") diff --git a/tests/integrations/dramatiq/test_dramatiq.py b/tests/integrations/dramatiq/test_dramatiq.py new file mode 100644 index 0000000000..d7917cbd00 --- /dev/null +++ b/tests/integrations/dramatiq/test_dramatiq.py @@ -0,0 +1,231 @@ +import pytest +import uuid + +import dramatiq +from dramatiq.brokers.stub import StubBroker + +import sentry_sdk +from sentry_sdk.integrations.dramatiq import DramatiqIntegration + + +@pytest.fixture +def broker(sentry_init): + sentry_init(integrations=[DramatiqIntegration()]) + broker = StubBroker() + broker.emit_after("process_boot") + dramatiq.set_broker(broker) + yield broker + broker.flush_all() + broker.close() + + +@pytest.fixture +def worker(broker): + worker = dramatiq.Worker(broker, worker_timeout=100, worker_threads=1) + worker.start() + yield worker + worker.stop() + + +def test_that_a_single_error_is_captured(broker, worker, capture_events): + events = capture_events() + + @dramatiq.actor(max_retries=0) + def dummy_actor(x, y): + return x / y + + dummy_actor.send(1, 2) + dummy_actor.send(1, 0) + broker.join(dummy_actor.queue_name) + worker.join() + + (event,) = events + exception = event["exception"]["values"][0] + assert exception["type"] == "ZeroDivisionError" + + +def test_that_actor_name_is_set_as_transaction(broker, worker, capture_events): + events = capture_events() + + @dramatiq.actor(max_retries=0) + def dummy_actor(x, y): + return x / y + + dummy_actor.send(1, 0) + broker.join(dummy_actor.queue_name) + worker.join() + + (event,) = events + assert event["transaction"] == "dummy_actor" + + +def test_that_dramatiq_message_id_is_set_as_extra(broker, worker, capture_events): + events = capture_events() + + @dramatiq.actor(max_retries=0) + def dummy_actor(x, y): + sentry_sdk.capture_message("hi") + return x / y + + dummy_actor.send(1, 0) + broker.join(dummy_actor.queue_name) + worker.join() + + event_message, event_error = events + assert "dramatiq_message_id" in event_message["extra"] + assert "dramatiq_message_id" in event_error["extra"] + assert ( + event_message["extra"]["dramatiq_message_id"] + == event_error["extra"]["dramatiq_message_id"] + ) + msg_ids = [e["extra"]["dramatiq_message_id"] for e in events] + assert all(uuid.UUID(msg_id) and isinstance(msg_id, str) for msg_id in msg_ids) + + +def test_that_local_variables_are_captured(broker, worker, capture_events): + events = capture_events() + + @dramatiq.actor(max_retries=0) + def dummy_actor(x, y): + foo = 42 # noqa + return x / y + + dummy_actor.send(1, 2) + dummy_actor.send(1, 0) + broker.join(dummy_actor.queue_name) + worker.join() + + (event,) = events + exception = event["exception"]["values"][0] + assert exception["stacktrace"]["frames"][-1]["vars"] == { + "x": "1", + "y": "0", + "foo": "42", + } + + +def test_that_messages_are_captured(broker, worker, capture_events): + events = capture_events() + + @dramatiq.actor(max_retries=0) + def dummy_actor(): + sentry_sdk.capture_message("hi") + + dummy_actor.send() + broker.join(dummy_actor.queue_name) + worker.join() + + (event,) = events + assert event["message"] == "hi" + assert event["level"] == "info" + assert event["transaction"] == "dummy_actor" + + +def test_that_sub_actor_errors_are_captured(broker, worker, capture_events): + events = capture_events() + + @dramatiq.actor(max_retries=0) + def dummy_actor(x, y): + sub_actor.send(x, y) + + @dramatiq.actor(max_retries=0) + def sub_actor(x, y): + return x / y + + dummy_actor.send(1, 2) + dummy_actor.send(1, 0) + broker.join(dummy_actor.queue_name) + worker.join() + + (event,) = events + assert event["transaction"] == "sub_actor" + + exception = event["exception"]["values"][0] + assert exception["type"] == "ZeroDivisionError" + + +def test_that_multiple_errors_are_captured(broker, worker, capture_events): + events = capture_events() + + @dramatiq.actor(max_retries=0) + def dummy_actor(x, y): + return x / y + + dummy_actor.send(1, 0) + broker.join(dummy_actor.queue_name) + worker.join() + + dummy_actor.send(1, None) + broker.join(dummy_actor.queue_name) + worker.join() + + event1, event2 = events + + assert event1["transaction"] == "dummy_actor" + exception = event1["exception"]["values"][0] + assert exception["type"] == "ZeroDivisionError" + + assert event2["transaction"] == "dummy_actor" + exception = event2["exception"]["values"][0] + assert exception["type"] == "TypeError" + + +def test_that_message_data_is_added_as_request(broker, worker, capture_events): + events = capture_events() + + @dramatiq.actor(max_retries=0) + def dummy_actor(x, y): + return x / y + + dummy_actor.send_with_options( + args=( + 1, + 0, + ), + max_retries=0, + ) + broker.join(dummy_actor.queue_name) + worker.join() + + (event,) = events + + assert event["transaction"] == "dummy_actor" + request_data = event["contexts"]["dramatiq"]["data"] + assert request_data["queue_name"] == "default" + assert request_data["actor_name"] == "dummy_actor" + assert request_data["args"] == [1, 0] + assert request_data["kwargs"] == {} + assert request_data["options"]["max_retries"] == 0 + assert uuid.UUID(request_data["message_id"]) + assert isinstance(request_data["message_timestamp"], int) + + +def test_that_expected_exceptions_are_not_captured(broker, worker, capture_events): + events = capture_events() + + class ExpectedException(Exception): + pass + + @dramatiq.actor(max_retries=0, throws=ExpectedException) + def dummy_actor(): + raise ExpectedException + + dummy_actor.send() + broker.join(dummy_actor.queue_name) + worker.join() + + assert events == [] + + +def test_that_retry_exceptions_are_not_captured(broker, worker, capture_events): + events = capture_events() + + @dramatiq.actor(max_retries=2) + def dummy_actor(): + raise dramatiq.errors.Retry("Retrying", delay=100) + + dummy_actor.send() + broker.join(dummy_actor.queue_name) + worker.join() + + assert events == [] diff --git a/tox.ini b/tox.ini index 3acf70bb6f..98536d9860 100644 --- a/tox.ini +++ b/tox.ini @@ -108,6 +108,12 @@ envlist = {py3.10,py3.11,py3.12}-django-v{5.0,5.1} {py3.10,py3.11,py3.12}-django-latest + # dramatiq + {py3.6,py3.9}-dramatiq-v{1.13} + {py3.7,py3.10,py3.11}-dramatiq-v{1.15} + {py3.8,py3.11,py3.12}-dramatiq-v{1.17} + {py3.8,py3.11,py3.12}-dramatiq-latest + # Falcon {py3.6,py3.7}-falcon-v{1,1.4,2} {py3.6,py3.11,py3.12}-falcon-v{3} @@ -407,6 +413,12 @@ deps = django-v5.1: Django==5.1rc1 django-latest: Django + # dramatiq + dramatiq-v1.13: dramatiq>=1.13,<1.14 + dramatiq-v1.15: dramatiq>=1.15,<1.16 + dramatiq-v1.17: dramatiq>=1.17,<1.18 + dramatiq-latest: dramatiq + # Falcon falcon-v1.4: falcon~=1.4.0 falcon-v1: falcon~=1.0 @@ -683,6 +695,7 @@ setenv = cohere: TESTPATH=tests/integrations/cohere cloud_resource_context: TESTPATH=tests/integrations/cloud_resource_context django: TESTPATH=tests/integrations/django + dramatiq: TESTPATH=tests/integrations/dramatiq falcon: TESTPATH=tests/integrations/falcon fastapi: TESTPATH=tests/integrations/fastapi flask: TESTPATH=tests/integrations/flask