From 00588220eb274581a75d7d56bb5500ee1f5518d1 Mon Sep 17 00:00:00 2001 From: Malcolm Rebughini <9681621+malcolmrebughini@users.noreply.github.com> Date: Mon, 28 Oct 2024 12:21:30 +0100 Subject: [PATCH 1/4] detach only if the ctx token is not None --- .../src/opentelemetry/instrumentation/celery/__init__.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py b/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py index 39b3bffe60..908f158507 100644 --- a/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py @@ -213,7 +213,10 @@ def _trace_postrun(self, *args, **kwargs): self.update_task_duration_time(task_id) labels = {"task": task.name, "worker": task.request.hostname} self._record_histograms(task_id, labels) - context_api.detach(token) + # if the process sending the task is not instrumented + # there's no incoming context and no token to detach + if token is not None: + context_api.detach(token) def _trace_before_publish(self, *args, **kwargs): task = utils.retrieve_task_from_sender(kwargs) From bda4ca10fdc701f9864b8ae0c722b9b30a81f932 Mon Sep 17 00:00:00 2001 From: Riccardo Magliocchetti Date: Mon, 28 Oct 2024 12:19:54 +0100 Subject: [PATCH 2/4] Add test Without the fix we have this in stdout: [2024-10-28 12:14:18,868: ERROR/MainProcess] Failed to detach context Traceback (most recent call last): File "/home/rm/src/opentelemetry-python-contrib/.tox/py310-test-instrumentation-celery-1/lib/python3.10/site-packages/opentelemetry/context/__init__.py", line 152, in detach _RUNTIME_CONTEXT.detach(token) File "/home/rm/src/opentelemetry-python-contrib/.tox/py310-test-instrumentation-celery-1/lib/python3.10/site-packages/opentelemetry/context/contextvars_context.py", line 50, in detach self._current_context.reset(token) # type: ignore TypeError: expected an instance of Token, got None --- .../tests/test_tasks.py | 39 ++++++++++++++++++- 1 file changed, 38 insertions(+), 1 deletion(-) diff --git a/instrumentation/opentelemetry-instrumentation-celery/tests/test_tasks.py b/instrumentation/opentelemetry-instrumentation-celery/tests/test_tasks.py index 0dc668b112..72117ec68b 100644 --- a/instrumentation/opentelemetry-instrumentation-celery/tests/test_tasks.py +++ b/instrumentation/opentelemetry-instrumentation-celery/tests/test_tasks.py @@ -15,8 +15,11 @@ import threading import time +from wrapt import wrap_function_wrapper + from opentelemetry import baggage, context -from opentelemetry.instrumentation.celery import CeleryInstrumentor +from opentelemetry.instrumentation.celery import CeleryInstrumentor, utils +from opentelemetry.instrumentation.utils import unwrap from opentelemetry.semconv.trace import SpanAttributes from opentelemetry.test.test_base import TestBase from opentelemetry.trace import SpanKind, StatusCode @@ -185,6 +188,40 @@ def test_baggage(self): self.assertEqual(task.result, {"key": "value"}) + def _retrieve_context_wrapper_none_token( + self, wrapped, instance, args, kwargs + ): + ctx = wrapped(*args, **kwargs) + if ctx is None: + return ctx + span, activation, _ = ctx + return span, activation, None + + def test_task_not_instrumented_does_not_raise(self): + wrap_function_wrapper( + utils, + "retrieve_context", + self._retrieve_context_wrapper_none_token, + ) + + CeleryInstrumentor().instrument() + + result = task_add.delay(1, 2) + + timeout = time.time() + 60 * 1 # 1 minutes from now + while not result.ready(): + if time.time() > timeout: + break + time.sleep(0.05) + + spans = self.sorted_spans(self.memory_exporter.get_finished_spans()) + self.assertEqual(len(spans), 2) + + # TODO: assert we don't have "TypeError: expected an instance of Token, got None" in logs + self.assertTrue(result) + + unwrap(utils, "retrieve_context") + class TestCelerySignatureTask(TestBase): def setUp(self): From 73befbad6fc2722ef9d56b79375eb49aa03bb22b Mon Sep 17 00:00:00 2001 From: Riccardo Magliocchetti Date: Mon, 28 Oct 2024 12:33:05 +0100 Subject: [PATCH 3/4] Add changelog --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 867f30afbb..ed4671d559 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -32,6 +32,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#2901])(https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2901) - `opentelemetry-instrumentation-system-metrics` Update metric units to conform to UCUM conventions. ([#2922](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2922)) +- `opentelemetry-instrumentation-celery` Don't detach context without a None token + ([#2927](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2927)) ### Breaking changes From c07fabe7afb63b5f8ad3a25010ec58839d1ddf94 Mon Sep 17 00:00:00 2001 From: Riccardo Magliocchetti Date: Mon, 28 Oct 2024 13:47:51 +0100 Subject: [PATCH 4/4] please pylint --- .../tests/test_tasks.py | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-celery/tests/test_tasks.py b/instrumentation/opentelemetry-instrumentation-celery/tests/test_tasks.py index 72117ec68b..c68b1bc758 100644 --- a/instrumentation/opentelemetry-instrumentation-celery/tests/test_tasks.py +++ b/instrumentation/opentelemetry-instrumentation-celery/tests/test_tasks.py @@ -188,20 +188,20 @@ def test_baggage(self): self.assertEqual(task.result, {"key": "value"}) - def _retrieve_context_wrapper_none_token( - self, wrapped, instance, args, kwargs - ): - ctx = wrapped(*args, **kwargs) - if ctx is None: - return ctx - span, activation, _ = ctx - return span, activation, None - def test_task_not_instrumented_does_not_raise(self): + def _retrieve_context_wrapper_none_token( + wrapped, instance, args, kwargs + ): + ctx = wrapped(*args, **kwargs) + if ctx is None: + return ctx + span, activation, _ = ctx + return span, activation, None + wrap_function_wrapper( utils, "retrieve_context", - self._retrieve_context_wrapper_none_token, + _retrieve_context_wrapper_none_token, ) CeleryInstrumentor().instrument()