From f46a6e1c0e8df6bb384a64bd75e345a21a52f9c3 Mon Sep 17 00:00:00 2001 From: Tammy Baylis <96076570+tammy-baylis-swi@users.noreply.github.com> Date: Fri, 28 Apr 2023 09:47:51 -0700 Subject: [PATCH 1/2] Request ASGI attributes passed to Sampler (#1762) * Request ASGI attributes passed to Sampler * Update changelog * aiohttp-client test http.url --------- Co-authored-by: Srikanth Chekuri Co-authored-by: Shalev Roda <65566801+shalevr@users.noreply.github.com> --- CHANGELOG.md | 2 ++ .../src/opentelemetry/instrumentation/asgi/__init__.py | 5 +++-- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index cb9d7ae22a..4470050941 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#1733](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1733)) - Make Django request span attributes available for `start_span`. ([#1730](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1730)) +- Make ASGI request span attributes available for `start_span`. + ([#1762](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1762)) ### Fixed diff --git a/instrumentation/opentelemetry-instrumentation-asgi/src/opentelemetry/instrumentation/asgi/__init__.py b/instrumentation/opentelemetry-instrumentation-asgi/src/opentelemetry/instrumentation/asgi/__init__.py index d4653ac50b..6fc88d3eeb 100644 --- a/instrumentation/opentelemetry-instrumentation-asgi/src/opentelemetry/instrumentation/asgi/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-asgi/src/opentelemetry/instrumentation/asgi/__init__.py @@ -531,15 +531,16 @@ async def __call__(self, scope, receive, send): span_name, additional_attributes = self.default_span_details(scope) + attributes = collect_request_attributes(scope) + attributes.update(additional_attributes) span, token = _start_internal_or_server_span( tracer=self.tracer, span_name=span_name, start_time=None, context_carrier=scope, context_getter=asgi_getter, + attributes=attributes, ) - attributes = collect_request_attributes(scope) - attributes.update(additional_attributes) active_requests_count_attrs = _parse_active_request_count_attrs( attributes ) From 46e4b1da44c534fed8e1002899e9e41e6d668018 Mon Sep 17 00:00:00 2001 From: Naor Malca Date: Sat, 29 Apr 2023 02:09:24 +0300 Subject: [PATCH 2/2] Add support for anonymous tasks (#1407) --- CHANGELOG.md | 3 ++ .../instrumentation/celery/__init__.py | 13 +++-- .../instrumentation/celery/utils.py | 2 + .../tests/test_tasks.py | 50 +++++++++++++++++++ .../tests/test_utils.py | 7 +++ 5 files changed, 72 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4470050941..73fc2b9aa1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#1730](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1730)) - Make ASGI request span attributes available for `start_span`. ([#1762](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1762)) +- `opentelemetry-instrumentation-celery` Add support for anonymous tasks. + ([#1407](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1407) + ### Fixed diff --git a/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py b/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py index c6a7540ccd..cb265b46f8 100644 --- a/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py @@ -183,10 +183,17 @@ def _trace_before_publish(self, *args, **kwargs): task = utils.retrieve_task_from_sender(kwargs) task_id = utils.retrieve_task_id_from_message(kwargs) - if task is None or task_id is None: + if task_id is None: return - operation_name = f"{_TASK_APPLY_ASYNC}/{task.name}" + if task is None: + # task is an anonymous task send using send_task or using canvas workflow + # Signatures() to send to a task not in the current processes dependency + # tree + task_name = kwargs.get("sender", "unknown") + else: + task_name = task.name + operation_name = f"{_TASK_APPLY_ASYNC}/{task_name}" span = self._tracer.start_span( operation_name, kind=trace.SpanKind.PRODUCER ) @@ -195,7 +202,7 @@ def _trace_before_publish(self, *args, **kwargs): if span.is_recording(): span.set_attribute(_TASK_TAG_KEY, _TASK_APPLY_ASYNC) span.set_attribute(SpanAttributes.MESSAGING_MESSAGE_ID, task_id) - span.set_attribute(_TASK_NAME_KEY, task.name) + span.set_attribute(_TASK_NAME_KEY, task_name) utils.set_attributes_from_context(span, kwargs) activation = trace.use_span(span, end_on_exit=True) diff --git a/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/utils.py b/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/utils.py index 77abb89af8..6f4f9cbc3a 100644 --- a/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/utils.py +++ b/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/utils.py @@ -132,6 +132,8 @@ def attach_span(task, task_id, span, is_publish=False): NOTE: We cannot test for this well yet, because we do not run a celery worker, and cannot run `task.apply_async()` """ + if task is None: + return span_dict = getattr(task, CTX_KEY, None) if span_dict is None: span_dict = {} diff --git a/instrumentation/opentelemetry-instrumentation-celery/tests/test_tasks.py b/instrumentation/opentelemetry-instrumentation-celery/tests/test_tasks.py index 12fe79821c..47f79d7e1c 100644 --- a/instrumentation/opentelemetry-instrumentation-celery/tests/test_tasks.py +++ b/instrumentation/opentelemetry-instrumentation-celery/tests/test_tasks.py @@ -36,6 +36,7 @@ def tearDown(self): CeleryInstrumentor().uninstrument() self._worker.stop() self._thread.join() + CeleryInstrumentor().uninstrument() def test_task(self): CeleryInstrumentor().instrument() @@ -97,3 +98,52 @@ def test_uninstrument(self): spans = self.memory_exporter.get_finished_spans() self.assertEqual(len(spans), 0) + + +class TestCelerySignatureTask(TestBase): + def setUp(self): + super().setUp() + + def start_app(*args, **kwargs): + # Add an additional task that will not be registered with parent thread + @app.task + def hidden_task(num_a): + return num_a * 2 + + self._worker = app.Worker(app=app, pool="solo", concurrency=1) + return self._worker.start(*args, **kwargs) + + self._thread = threading.Thread(target=start_app) + self._worker = app.Worker(app=app, pool="solo", concurrency=1) + self._thread.daemon = True + self._thread.start() + + def tearDown(self): + super().tearDown() + self._worker.stop() + self._thread.join() + CeleryInstrumentor().uninstrument() + + def test_hidden_task(self): + # no-op since already instrumented + CeleryInstrumentor().instrument() + + res = app.signature("tests.test_tasks.hidden_task", (2,)).apply_async() + while not res.ready(): + time.sleep(0.05) + spans = self.sorted_spans(self.memory_exporter.get_finished_spans()) + self.assertEqual(len(spans), 2) + + consumer, producer = spans + + self.assertEqual(consumer.name, "run/tests.test_tasks.hidden_task") + self.assertEqual(consumer.kind, SpanKind.CONSUMER) + + self.assertEqual( + producer.name, "apply_async/tests.test_tasks.hidden_task" + ) + self.assertEqual(producer.kind, SpanKind.PRODUCER) + + self.assertNotEqual(consumer.parent, producer.context) + self.assertEqual(consumer.parent.span_id, producer.context.span_id) + self.assertEqual(consumer.context.trace_id, producer.context.trace_id) diff --git a/instrumentation/opentelemetry-instrumentation-celery/tests/test_utils.py b/instrumentation/opentelemetry-instrumentation-celery/tests/test_utils.py index 8b5352fe3a..55aa3eec1e 100644 --- a/instrumentation/opentelemetry-instrumentation-celery/tests/test_utils.py +++ b/instrumentation/opentelemetry-instrumentation-celery/tests/test_utils.py @@ -185,6 +185,13 @@ def fn_task(): utils.detach_span(fn_task, task_id) self.assertEqual(utils.retrieve_span(fn_task, task_id), (None, None)) + def test_optional_task_span_attach(self): + task_id = "7c6731af-9533-40c3-83a9-25b58f0d837f" + span = trace._Span("name", mock.Mock(spec=trace_api.SpanContext)) + + # assert this is is a no-aop + self.assertIsNone(utils.attach_span(None, task_id, span)) + def test_span_delete_empty(self): # ensure detach_span doesn't raise an exception if span is not present @self.app.task