diff --git a/CHANGELOG.md b/CHANGELOG.md index ead8d5c134..54c194ef46 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,7 +25,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#1407](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1407)) - `opentelemetry-instrumentation-logging` Add `otelTraceSampled` to instrumetation-logging ([#1773](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1773)) - +- `opentelemetry-instrumentation-kafka-python` Add instrumentation to `consume` method ### Fixed diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py index 12cb363219..a24313ca0e 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py @@ -113,6 +113,7 @@ def instrument_consumer(consumer: Consumer, tracer_provider=None) from .utils import ( KafkaPropertiesExtractor, _enrich_span, + _get_links_from_records, _get_span_name, _kafka_getter, _kafka_setter, @@ -136,6 +137,10 @@ def __init__(self, config): # This method is deliberately implemented in order to allow wrapt to wrap this function def poll(self, timeout=-1): # pylint: disable=useless-super-delegation return super().poll(timeout) + + # This method is deliberately implemented in order to allow wrapt to wrap this function + def consume(self, *args, **kwargs): # pylint: disable=useless-super-delegation + return super().consume(*args, **kwargs) class ProxiedProducer(Producer): @@ -178,9 +183,11 @@ def commit(self, *args, **kwargs): return self._consumer.commit(*args, **kwargs) def consume( - self, num_messages=1, *args, **kwargs + self, *args, **kwargs ): # pylint: disable=keyword-arg-before-vararg - return self._consumer.consume(num_messages, *args, **kwargs) + return ConfluentKafkaInstrumentor.wrap_consume( + self._consumer.consume, self, self._tracer, args, kwargs, + ) def get_watermark_offsets( self, partition, timeout=-1, *args, **kwargs @@ -274,6 +281,11 @@ def _inner_wrap_poll(func, instance, args, kwargs): return ConfluentKafkaInstrumentor.wrap_poll( func, instance, self._tracer, args, kwargs ) + + def _inner_wrap_consume(func, instance, args, kwargs): + return ConfluentKafkaInstrumentor.wrap_consume( + func, instance, self._tracer, args, kwargs + ) wrapt.wrap_function_wrapper( AutoInstrumentedProducer, @@ -286,6 +298,12 @@ def _inner_wrap_poll(func, instance, args, kwargs): "poll", _inner_wrap_poll, ) + + wrapt.wrap_function_wrapper( + AutoInstrumentedConsumer, + "consume", + _inner_wrap_consume, + ) def _uninstrument(self, **kwargs): confluent_kafka.Producer = self._original_kafka_producer @@ -336,13 +354,7 @@ def wrap_poll(func, instance, tracer, args, kwargs): ): record = func(*args, **kwargs) if record: - links = [] - ctx = propagate.extract(record.headers(), getter=_kafka_getter) - if ctx: - for item in ctx.values(): - if hasattr(item, "get_span_context"): - links.append(Link(context=item.get_span_context())) - + links = _get_links_from_records([record]) instance._current_consume_span = tracer.start_span( name=f"{record.topic()} process", links=links, @@ -361,3 +373,35 @@ def wrap_poll(func, instance, tracer, args, kwargs): ) return record + + @staticmethod + def wrap_consume(func, instance, tracer, args, kwargs): + if instance._current_consume_span: + context.detach(instance._current_context_token) + instance._current_context_token = None + instance._current_consume_span.end() + instance._current_consume_span = None + + with tracer.start_as_current_span( + "recv", end_on_exit=True, kind=trace.SpanKind.CONSUMER + ): + records = func(*args, **kwargs) + if len(records) > 0: + links = _get_links_from_records(records) + instance._current_consume_span = tracer.start_span( + name=f"{records[0].topic()} process", + links=links, + kind=SpanKind.CONSUMER, + ) + + _enrich_span( + instance._current_consume_span, + records[0].topic(), + operation=MessagingOperationValues.PROCESS, + ) + + instance._current_context_token = context.attach( + trace.set_span_in_context(instance._current_consume_span) + ) + + return records diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py index 77fce03cd8..6ab0c9a4cd 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py @@ -1,6 +1,8 @@ from logging import getLogger from typing import List, Optional +from opentelemetry import propagate +from opentelemetry.trace import SpanKind, Link from opentelemetry.propagators import textmap from opentelemetry.semconv.trace import ( MessagingDestinationKindValues, @@ -82,11 +84,11 @@ def set(self, carrier: textmap.CarrierT, key: str, value: str) -> None: def _enrich_span( - span, - topic, - partition: Optional[int] = None, - offset: Optional[int] = None, - operation: Optional[MessagingOperationValues] = None, + span, + topic, + partition: Optional[int] = None, + offset: Optional[int] = None, + operation: Optional[MessagingOperationValues] = None, ): if not span.is_recording(): return @@ -116,6 +118,18 @@ def _enrich_span( ) +def _get_links_from_records(records): + links = [] + for record in records: + ctx = propagate.extract(record.headers(), getter=_kafka_getter) + if ctx: + for item in ctx.values(): + if hasattr(item, "get_span_context"): + links.append(Link(context=item.get_span_context())) + + return links + + _kafka_setter = KafkaContextSetter()