From 1677ee2b815b4df88e2aec421186855391addbb5 Mon Sep 17 00:00:00 2001 From: dor Date: Tue, 31 May 2022 11:22:23 +0300 Subject: [PATCH 1/6] add kafka instrumentation --- .../confluent_kafka/__init__.py | 315 ++++++++++++++++++ .../confluent_kafka/package.py | 16 + .../instrumentation/confluent_kafka/utils.py | 111 ++++++ .../confluent_kafka/version.py | 15 + .../tests/__init__.py | 0 .../tests/test_instrumentation.py | 53 +++ 6 files changed, 510 insertions(+) create mode 100644 instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py create mode 100644 instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/package.py create mode 100644 instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py create mode 100644 instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/version.py create mode 100644 instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/__init__.py create mode 100644 instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py new file mode 100644 index 0000000000..531c393c07 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py @@ -0,0 +1,315 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Instrument `confluent-kafka-python` to report instrumentation-confluent-kafka produced and consumed messages + +Usage +----- + +..code:: python + + from opentelemetry.instrumentation.confluentkafka import ConfluentKafkaInstrumentor + from confluent_kafka import Producer, Consumer + + # Instrument kafka + ConfluentKafkaInstrumentor().instrument() + + # report a span of type producer with the default settings + conf1 = {'bootstrap.servers': "localhost:9092"} + producer = Producer(conf1) + producer.produce('my-topic',b'raw_bytes') + + conf2 = {'bootstrap.servers': "localhost:9092", + 'group.id': "foo", + 'auto.offset.reset': 'smallest'} + # report a span of type consumer with the default settings + consumer = Consumer(conf2) + def basic_consume_loop(consumer, topics): + try: + consumer.subscribe(topics) + running = True + while running: + msg = consumer.poll(timeout=1.0) + if msg is None: continue + + if msg.error(): + if msg.error().code() == KafkaError._PARTITION_EOF: + # End of partition event + sys.stderr.write('%% %s [%d] reached end at offset %d\n' % + (msg.topic(), msg.partition(), msg.offset())) + elif msg.error(): + raise KafkaException(msg.error()) + else: + msg_process(msg) + finally: + # Close down consumer to commit final offsets. + consumer.close() + + basic_consume_loop(consumer, "my-topic") + + +The `_instrument` method accepts the following keyword args: +tracer_provider (TracerProvider) - an optional tracer provider +instrument_producer (Callable) - a function with extra user-defined logic to be performed before sending the message + this function signature is: + def instrument_producer(producer: Producer, tracer_provider=None) +instrument_consumer (Callable) - a function with extra user-defined logic to be performed after consuming a message + this function signature is: + def instrument_consumer(consumer: Consumer, tracer_provider=None) +for example: +.. code: python + from opentelemetry.instrumentation.confluentkafka import ConfluentKafkaInstrumentor + from confluent_kafka import Producer, Consumer + + inst = ConfluentKafkaInstrumentor() + + p = confluent_kafka.Producer({'bootstrap.servers': 'localhost:29092'}) + c = confluent_kafka.Consumer({ + 'bootstrap.servers': 'localhost:29092', + 'group.id': 'mygroup', + 'auto.offset.reset': 'earliest' + }) + + # instrument confluent kafka with produce and consume hooks + p = inst.instrument_producer(p, tracer_provider) + c = inst.instrument_consumer(c, tracer_provider=tracer_provider) + + + # Using kafka as normal now will automatically generate spans, + # including user custom attributes added from the hooks + conf = {'bootstrap.servers': "localhost:9092"} + p.produce('my-topic',b'raw_bytes') + msg = c.poll() + + +API +___ +""" +from typing import Collection + +import confluent_kafka +import wrapt +from confluent_kafka import Producer, Consumer +from opentelemetry import trace, propagate, context +from opentelemetry.instrumentation.instrumentor import BaseInstrumentor +from opentelemetry.semconv.trace import MessagingOperationValues +from opentelemetry.trace import Tracer, Link, SpanKind +from opentelemetry.instrumentation.utils import unwrap + +from kafka_instrumentation.package import _instruments +from kafka_instrumentation.utils import KafkaPropertiesExtractor, _get_span_name, \ + _kafka_setter, _enrich_span, _kafka_getter +from kafka_instrumentation.version import __version__ + + +class AutoInstrumentedProducer(Producer): + def __init__(self, config): + super().__init__(config) + + def produce(self, topic, value=None, *args, **kwargs): + super().produce(topic, value, *args, **kwargs) + + +class AutoInstrumentedConsumer(Consumer): + def __init__(self, config): + super().__init__(config) + self._current_consume_span = None + + def poll(self, timeout=-1): + return super().poll(timeout) + + +class ProxiedProducer(Producer): + + def __init__(self, producer: Producer, tracer: Tracer): + self._producer = producer + self._tracer = tracer + + def flush(self, timeout=-1): + self._producer.flush(timeout) + + def poll(self, timeout=-1): + self._producer.poll(timeout) + + def produce(self, topic, value=None, *args, **kwargs): + new_kwargs = kwargs.copy() + new_kwargs['topic'] = topic + new_kwargs['value'] = value + + return ConfluentKafkaInstrumentor.wrap_produce(self._producer.produce, self, self._tracer, args, new_kwargs) + + def original_producer(self): + return self._producer + + +class ProxiedConsumer(Consumer): + + def __init__(self, consumer: Consumer, tracer: Tracer): + self._consumer = consumer + self._tracer = tracer + self._current_consume_span = None + self._current_context_token = None + + def committed(self, partitions, timeout=-1): + return self._consumer.committed(partitions, timeout) + + def consume(self, num_messages=1, *args, **kwargs): + return self._consumer.consume(num_messages, *args, **kwargs) + + def get_watermark_offsets(self, partition, timeout=-1, *args, **kwargs): + return self._consumer.get_watermark_offsets(partition, timeout, *args, **kwargs) + + def offsets_for_times(self, partitions, timeout=-1): + return self._consumer.offsets_for_times(partitions, timeout) + + def poll(self, timeout=-1): + return ConfluentKafkaInstrumentor.wrap_poll(self._consumer.poll, self, self._tracer, [timeout], {}) + + def subscribe(self, topics, on_assign=lambda *args: None, *args, **kwargs): + self._consumer.subscribe(topics, on_assign, *args, **kwargs) + + def original_consumer(self): + return self._consumer + + +class ConfluentKafkaInstrumentor(BaseInstrumentor): + """An instrumentor for confluent kafka module + See `BaseInstrumentor` + """ + + @staticmethod + def instrument_producer(producer: Producer, tracer_provider=None) -> ProxiedProducer: + tracer = trace.get_tracer( + __name__, __version__, tracer_provider=tracer_provider + ) + + manual_producer = ProxiedProducer(producer, tracer) + + return manual_producer + + @staticmethod + def instrument_consumer(consumer: Consumer, tracer_provider=None) -> ProxiedConsumer: + tracer = trace.get_tracer( + __name__, __version__, tracer_provider=tracer_provider + ) + + manual_consumer = ProxiedConsumer(consumer, tracer) + + return manual_consumer + + @staticmethod + def uninstrument_producer(producer) -> Producer: + if isinstance(producer, ProxiedProducer): + return producer.original_producer() + + @staticmethod + def uninstrument_consumer(consumer) -> Consumer: + if isinstance(consumer, ProxiedConsumer): + return consumer.original_consumer() + + def instrumentation_dependencies(self) -> Collection[str]: + return _instruments + + def _instrument(self, **kwargs): + self._original_kafka_producer = confluent_kafka.Producer + self._original_kafka_consumer = confluent_kafka.Consumer + + confluent_kafka.Producer = AutoInstrumentedProducer + confluent_kafka.Consumer = AutoInstrumentedConsumer + + tracer_provider = kwargs.get("tracer_provider") + tracer = trace.get_tracer( + __name__, __version__, tracer_provider=tracer_provider + ) + + self._tracer = tracer + + def _inner_wrap_produce(func, instance, args, kwargs): + return ConfluentKafkaInstrumentor.wrap_produce(func, instance, self._tracer, args, kwargs) + + def _inner_wrap_poll(func, instance, args, kwargs): + return ConfluentKafkaInstrumentor.wrap_poll(func, instance, self._tracer, args, kwargs) + + wrapt.wrap_function_wrapper("kafka_instrumentation", + "AutoInstrumentedProducer.produce", _inner_wrap_produce) + + wrapt.wrap_function_wrapper("kafka_instrumentation", + "AutoInstrumentedConsumer.poll", _inner_wrap_poll) + + def _uninstrument(self, **kwargs): + confluent_kafka.Producer = self._original_kafka_producer + confluent_kafka.Consumer = self._original_kafka_consumer + + unwrap(AutoInstrumentedProducer, "produce") + unwrap(AutoInstrumentedConsumer, "poll") + + @staticmethod + def wrap_produce(func, instance, tracer, args, kwargs): + topic = kwargs.get("topic") + if not topic: + topic = args[0] + + span_name = _get_span_name("send", topic) + with tracer.start_as_current_span(name=span_name, kind=trace.SpanKind.PRODUCER) as span: + headers = KafkaPropertiesExtractor.extract_produce_headers(args, kwargs) + if headers is None: + headers = [] + kwargs["headers"] = headers + + topic = KafkaPropertiesExtractor.extract_produce_topic(args) + bootstrap_servers = KafkaPropertiesExtractor.extract_bootstrap_servers(instance) + _enrich_span(span, topic, bootstrap_servers, operation=MessagingOperationValues.RECEIVE) # Replace + propagate.inject( + headers, + setter=_kafka_setter, + ) + return func(*args, **kwargs) + + @staticmethod + def wrap_poll(func, instance, tracer, args, kwargs): + if instance._current_consume_span: + context.detach(instance._current_context_token) + instance._current_context_token = None + instance._current_consume_span.end() + instance._current_consume_span = None + + with tracer.start_as_current_span("recv", end_on_exit=True, kind=trace.SpanKind.CONSUMER) as span: + record = func(*args, **kwargs) + if record: + links = [] + ctx = propagate.extract(record.headers(), getter=_kafka_getter) + if ctx: + for item in ctx.values(): + if hasattr(item, "get_span_context"): + links.append(Link(context=item.get_span_context())) + + instance._current_consume_span = tracer.start_span( + name=f"{record.topic()} process", links=links, kind=SpanKind.CONSUMER + ) + + bootstrap_servers = KafkaPropertiesExtractor.extract_bootstrap_servers(instance) + _enrich_span( + instance._current_consume_span, + record.topic(), + bootstrap_servers, + record.partition(), + record.offset(), + operation=MessagingOperationValues.PROCESS, + + ) + instance._current_context_token = context.attach( + trace.set_span_in_context(instance._current_consume_span)) + + return record \ No newline at end of file diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/package.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/package.py new file mode 100644 index 0000000000..b75cdf6fff --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/package.py @@ -0,0 +1,16 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +_instruments = ("confluent-kafka ~= 1.8.2",) \ No newline at end of file diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py new file mode 100644 index 0000000000..3e8c302fb5 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py @@ -0,0 +1,111 @@ +import json +import trace +from logging import getLogger +from typing import List, Optional + +from opentelemetry import propagate +from opentelemetry.propagators import textmap +from opentelemetry.semconv.trace import ( + SpanAttributes, + MessagingOperationValues, + MessagingDestinationKindValues, +) +from opentelemetry.trace import Link, SpanKind + +_LOG = getLogger(__name__) + + +class KafkaPropertiesExtractor: + @staticmethod + def extract_bootstrap_servers(instance): + return instance.config.get("bootstrap_servers") + + @staticmethod + def _extract_argument(key, position, default_value, args, kwargs): + if len(args) > position: + return args[position] + return kwargs.get(key, default_value) + + @staticmethod + def extract_produce_topic(args): + """extract topic from `produce` method arguments in Producer class""" + if len(args) > 0: + return args[0] + return "unknown" + + @staticmethod + def extract_produce_headers(args, kwargs): + """extract headers from `produce` method arguments in Producer class""" + return KafkaPropertiesExtractor._extract_argument( + "headers", 6, None, args, kwargs + ) + + +class KafkaContextGetter(textmap.Getter): + def get(self, carrier: textmap.CarrierT, key: str) -> Optional[List[str]]: + if carrier is None: + return None + for item_key, value in carrier: + if item_key == key: + if value is not None: + return [value.decode()] + return None + + def keys(self, carrier: textmap.CarrierT) -> List[str]: + if carrier is None: + return [] + return [key for (key, value) in carrier] + + +class KafkaContextSetter(textmap.Setter): + def set(self, carrier: textmap.CarrierT, key: str, value: str) -> None: + if carrier is None or key is None: + return + + if value: + value = value.encode() + carrier.append((key, value)) + + +_kafka_getter = KafkaContextGetter() +def _enrich_span( + span, + topic, + bootstrap_servers: List[str], + partition: Optional[int] = None, + offset: Optional[int] = None, + operation: Optional[MessagingOperationValues] = None, + +): + + if not span.is_recording(): + return + + span.set_attribute(SpanAttributes.MESSAGING_SYSTEM, "kafka") + span.set_attribute(SpanAttributes.MESSAGING_DESTINATION, topic) + span.set_attribute(SpanAttributes.MESSAGING_KAFKA_PARTITION, partition) + + span.set_attribute( + SpanAttributes.MESSAGING_URL, json.dumps(bootstrap_servers) + ) + span.set_attribute( + SpanAttributes.MESSAGING_DESTINATION_KIND, + MessagingDestinationKindValues.QUEUE.value, + ) + + if operation: + span.set_attribute(SpanAttributes.MESSAGING_OPERATION, operation.value) + else: + span.set_attribute(SpanAttributes.MESSAGING_TEMP_DESTINATION, True) + + # https://stackoverflow.com/questions/65935155/identify-and-find-specific-message-in-kafka-topic + # A message within Kafka is uniquely defined by its topic name, topic partition and offset. + if partition and offset and topic: + span.set_attribute(SpanAttributes.MESSAGING_MESSAGE_ID, f"{topic}.{partition}.{offset}") + + +_kafka_setter = KafkaContextSetter() + + +def _get_span_name(operation: str, topic: str): + return f"{topic} {operation}" diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/version.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/version.py new file mode 100644 index 0000000000..713d32087d --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/version.py @@ -0,0 +1,15 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +__version__ = "0.27b0" \ No newline at end of file diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/__init__.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py new file mode 100644 index 0000000000..593a5ada1b --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py @@ -0,0 +1,53 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from unittest import TestCase + + +from confluent_kafka import Producer, Consumer + +from kafka_instrumentation import ConfluentKafkaInstrumentor, ProxiedProducer, ProxiedConsumer + + +class TestConfluentKafka(TestCase): + def test_instrument_api(self) -> None: + instrumentation = ConfluentKafkaInstrumentor() + + p = Producer({'bootstrap.servers': 'localhost:29092'}) + p = instrumentation.instrument_producer(p) + + self.assertEqual(p.__class__, ProxiedProducer) + + p = instrumentation.uninstrument_producer(p) + self.assertEqual(p.__class__, Producer) + + p = Producer({'bootstrap.servers': 'localhost:29092'}) + p = instrumentation.instrument_producer(p) + + self.assertEqual(p.__class__, ProxiedProducer) + + p = instrumentation.uninstrument_producer(p) + self.assertEqual(p.__class__, Producer) + + c = Consumer({ + 'bootstrap.servers': 'localhost:29092', + 'group.id': 'mygroup', + 'auto.offset.reset': 'earliest' + }) + + c = instrumentation.instrument_consumer(c) + self.assertEqual(c.__class__, ProxiedConsumer) + + c = instrumentation.uninstrument_consumer(c) + self.assertEqual(c.__class__, Consumer) + From 7e8778da229bd57a42fff6ab2e7665b74a9e6096 Mon Sep 17 00:00:00 2001 From: dor Date: Tue, 31 May 2022 15:06:52 +0300 Subject: [PATCH 2/6] add confluent kafka instrumentation --- .github/component_owners.yml | 4 + CHANGELOG.md | 5 + instrumentation/README.md | 1 + .../README.rst | 23 +++ .../setup.cfg | 57 +++++++ .../setup.py | 99 +++++++++++++ .../confluent_kafka/__init__.py | 139 ++++++++++++------ .../confluent_kafka/package.py | 2 +- .../instrumentation/confluent_kafka/utils.py | 24 ++- .../confluent_kafka/version.py | 2 +- .../tests/test_instrumentation.py | 51 ++++--- .../setup.cfg | 1 + .../instrumentation/bootstrap_gen.py | 4 + 13 files changed, 326 insertions(+), 86 deletions(-) create mode 100644 instrumentation/opentelemetry-instrumentation-confluent-kafka/README.rst create mode 100644 instrumentation/opentelemetry-instrumentation-confluent-kafka/setup.cfg create mode 100644 instrumentation/opentelemetry-instrumentation-confluent-kafka/setup.py diff --git a/.github/component_owners.yml b/.github/component_owners.yml index 8292e2f527..0072086294 100644 --- a/.github/component_owners.yml +++ b/.github/component_owners.yml @@ -11,6 +11,10 @@ components: - oxeye-nikolay - nikosokolik + instrumentation/opentelemetry-instrumentation-confluent-kafka: + - oxeye-dorkolog + - dorkolog + propagator/opentelemetry-propagator-aws-xray: - NathanielRN diff --git a/CHANGELOG.md b/CHANGELOG.md index c4b9d1705e..a34df46f0e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - cleanup type hints for textmap `Getter` and `Setter` classes ([1106](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1106)) +### Added +- Added `opentelemetry-instrumention-confluent-kafka` + ([#1111](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1111)) + + ## [1.12.0rc1-0.31b0](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v1.12.0rc1-0.31b0) - 2022-05-17 diff --git a/instrumentation/README.md b/instrumentation/README.md index e4347f03ac..934b1d6e3d 100644 --- a/instrumentation/README.md +++ b/instrumentation/README.md @@ -10,6 +10,7 @@ | [opentelemetry-instrumentation-boto3sqs](./opentelemetry-instrumentation-boto3sqs) | boto3 ~= 1.0 | | [opentelemetry-instrumentation-botocore](./opentelemetry-instrumentation-botocore) | botocore ~= 1.0 | | [opentelemetry-instrumentation-celery](./opentelemetry-instrumentation-celery) | celery >= 4.0, < 6.0 | +| [opentelemetry-instrumentation-confluent-kafka](./opentelemetry-instrumentation-confluent-kafka) | confluent-kafka ~= 1.8.2 | | [opentelemetry-instrumentation-dbapi](./opentelemetry-instrumentation-dbapi) | dbapi | | [opentelemetry-instrumentation-django](./opentelemetry-instrumentation-django) | django >= 1.10 | | [opentelemetry-instrumentation-elasticsearch](./opentelemetry-instrumentation-elasticsearch) | elasticsearch >= 2.0 | diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/README.rst b/instrumentation/opentelemetry-instrumentation-confluent-kafka/README.rst new file mode 100644 index 0000000000..dfd9d0283f --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/README.rst @@ -0,0 +1,23 @@ +OpenTelemetry confluent-kafka Instrumentation +=========================== + +|pypi| + +.. |pypi| image:: https://badge.fury.io/py/opentelemetry-instrumentation-confluent-kafka.svg + :target: https://pypi.org/project/opentelemetry-instrumentation-confluent-kafka/ + +This library allows tracing requests made by the confluent-kafka library. + +Installation +------------ + +:: + + pip install opentelemetry-instrumentation-confluent-kafka + + +References +---------- + +* `OpenTelemetry confluent-kafka/ Tracing `_ +* `OpenTelemetry Project `_ diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/setup.cfg b/instrumentation/opentelemetry-instrumentation-confluent-kafka/setup.cfg new file mode 100644 index 0000000000..302127afa6 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/setup.cfg @@ -0,0 +1,57 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +[metadata] +name = opentelemetry-instrumentation-confluent-kafka +description = OpenTelemetry Confluent Kafka instrumentation +long_description = file: README.rst +long_description_content_type = text/x-rst +author = OpenTelemetry Authors +author_email = cncf-opentelemetry-contributors@lists.cncf.io +url = https://github.com/open-telemetry/opentelemetry-python-contrib/tree/main/instrumentation/opentelemetry-instrumentation-confluent-kafka +platforms = any +license = Apache-2.0 +classifiers = + Development Status :: 4 - Beta + Intended Audience :: Developers + License :: OSI Approved :: Apache Software License + Programming Language :: Python + Programming Language :: Python :: 3 + Programming Language :: Python :: 3.6 + Programming Language :: Python :: 3.7 + Programming Language :: Python :: 3.8 + Programming Language :: Python :: 3.9 + Programming Language :: Python :: 3.10 + +[options] +python_requires = >=3.6 +package_dir= + =src +packages=find_namespace: + +install_requires = + opentelemetry-api ~= 1.3 + wrapt >= 1.0.0, < 2.0.0 + +[options.extras_require] +test = + # add any test dependencies here + confluent-kafka ~= 1.8.2 + +[options.packages.find] +where = src + +[options.entry_points] +opentelemetry_instrumentor = + confluent_kafka = opentelemetry.instrumentation.confluent_kafka:ConfluentKafkaInstrumentor diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/setup.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/setup.py new file mode 100644 index 0000000000..03f616369b --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/setup.py @@ -0,0 +1,99 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# DO NOT EDIT. THIS FILE WAS AUTOGENERATED FROM templates/instrumentation_setup.py.txt. +# RUN `python scripts/generate_setup.py` TO REGENERATE. + + +import distutils.cmd +import json +import os +from configparser import ConfigParser + +import setuptools + +config = ConfigParser() +config.read("setup.cfg") + +# We provide extras_require parameter to setuptools.setup later which +# overwrites the extras_require section from setup.cfg. To support extras_require +# section in setup.cfg, we load it here and merge it with the extras_require param. +extras_require = {} +if "options.extras_require" in config: + for key, value in config["options.extras_require"].items(): + extras_require[key] = [v for v in value.split("\n") if v.strip()] + +BASE_DIR = os.path.dirname(__file__) +PACKAGE_INFO = {} + +VERSION_FILENAME = os.path.join( + BASE_DIR, + "src", + "opentelemetry", + "instrumentation", + "confluent_kafka", + "version.py", +) +with open(VERSION_FILENAME, encoding="utf-8") as f: + exec(f.read(), PACKAGE_INFO) + +PACKAGE_FILENAME = os.path.join( + BASE_DIR, + "src", + "opentelemetry", + "instrumentation", + "confluent_kafka", + "package.py", +) +with open(PACKAGE_FILENAME, encoding="utf-8") as f: + exec(f.read(), PACKAGE_INFO) + +# Mark any instruments/runtime dependencies as test dependencies as well. +extras_require["instruments"] = PACKAGE_INFO["_instruments"] +test_deps = extras_require.get("test", []) +for dep in extras_require["instruments"]: + test_deps.append(dep) + +extras_require["test"] = test_deps + + +class JSONMetadataCommand(distutils.cmd.Command): + + description = ( + "print out package metadata as JSON. This is used by OpenTelemetry dev scripts to ", + "auto-generate code in other places", + ) + user_options = [] + + def initialize_options(self): + pass + + def finalize_options(self): + pass + + def run(self): + metadata = { + "name": config["metadata"]["name"], + "version": PACKAGE_INFO["__version__"], + "instruments": PACKAGE_INFO["_instruments"], + } + print(json.dumps(metadata)) + + +setuptools.setup( + cmdclass={"meta": JSONMetadataCommand}, + version=PACKAGE_INFO["__version__"], + extras_require=extras_require, +) diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py index 531c393c07..8aa6ab2658 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py @@ -101,24 +101,30 @@ def instrument_consumer(consumer: Consumer, tracer_provider=None) import confluent_kafka import wrapt -from confluent_kafka import Producer, Consumer -from opentelemetry import trace, propagate, context +from confluent_kafka import Consumer, Producer + +from opentelemetry import context, propagate, trace +from opentelemetry.instrumentation.confluent_kafka.package import _instruments +from opentelemetry.instrumentation.confluent_kafka.utils import ( + KafkaPropertiesExtractor, + _enrich_span, + _get_span_name, + _kafka_getter, + _kafka_setter, +) +from opentelemetry.instrumentation.confluent_kafka.version import __version__ from opentelemetry.instrumentation.instrumentor import BaseInstrumentor -from opentelemetry.semconv.trace import MessagingOperationValues -from opentelemetry.trace import Tracer, Link, SpanKind from opentelemetry.instrumentation.utils import unwrap - -from kafka_instrumentation.package import _instruments -from kafka_instrumentation.utils import KafkaPropertiesExtractor, _get_span_name, \ - _kafka_setter, _enrich_span, _kafka_getter -from kafka_instrumentation.version import __version__ +from opentelemetry.semconv.trace import MessagingOperationValues +from opentelemetry.trace import Link, SpanKind, Tracer class AutoInstrumentedProducer(Producer): - def __init__(self, config): - super().__init__(config) - def produce(self, topic, value=None, *args, **kwargs): + # This method is deliberately implemented in order to allow wrapt to wrap this function + def produce( + self, topic, value=None, *args, **kwargs + ): # pylint: disable=keyword-arg-before-vararg,useless-super-delegation super().produce(topic, value, *args, **kwargs) @@ -127,12 +133,12 @@ def __init__(self, config): super().__init__(config) self._current_consume_span = None - def poll(self, timeout=-1): + # This method is deliberately implemented in order to allow wrapt to wrap this function + def poll(self, timeout=-1): # pylint: disable=useless-super-delegation return super().poll(timeout) class ProxiedProducer(Producer): - def __init__(self, producer: Producer, tracer: Tracer): self._producer = producer self._tracer = tracer @@ -143,19 +149,22 @@ def flush(self, timeout=-1): def poll(self, timeout=-1): self._producer.poll(timeout) - def produce(self, topic, value=None, *args, **kwargs): + def produce( + self, topic, value=None, *args, **kwargs + ): # pylint: disable=keyword-arg-before-vararg new_kwargs = kwargs.copy() - new_kwargs['topic'] = topic - new_kwargs['value'] = value + new_kwargs["topic"] = topic + new_kwargs["value"] = value - return ConfluentKafkaInstrumentor.wrap_produce(self._producer.produce, self, self._tracer, args, new_kwargs) + return ConfluentKafkaInstrumentor.wrap_produce( + self._producer.produce, self, self._tracer, args, new_kwargs + ) def original_producer(self): return self._producer class ProxiedConsumer(Consumer): - def __init__(self, consumer: Consumer, tracer: Tracer): self._consumer = consumer self._tracer = tracer @@ -165,19 +174,29 @@ def __init__(self, consumer: Consumer, tracer: Tracer): def committed(self, partitions, timeout=-1): return self._consumer.committed(partitions, timeout) - def consume(self, num_messages=1, *args, **kwargs): + def consume( + self, num_messages=1, *args, **kwargs + ): # pylint: disable=keyword-arg-before-vararg return self._consumer.consume(num_messages, *args, **kwargs) - def get_watermark_offsets(self, partition, timeout=-1, *args, **kwargs): - return self._consumer.get_watermark_offsets(partition, timeout, *args, **kwargs) + def get_watermark_offsets( + self, partition, timeout=-1, *args, **kwargs + ): # pylint: disable=keyword-arg-before-vararg + return self._consumer.get_watermark_offsets( + partition, timeout, *args, **kwargs + ) def offsets_for_times(self, partitions, timeout=-1): return self._consumer.offsets_for_times(partitions, timeout) def poll(self, timeout=-1): - return ConfluentKafkaInstrumentor.wrap_poll(self._consumer.poll, self, self._tracer, [timeout], {}) + return ConfluentKafkaInstrumentor.wrap_poll( + self._consumer.poll, self, self._tracer, [timeout], {} + ) - def subscribe(self, topics, on_assign=lambda *args: None, *args, **kwargs): + def subscribe( + self, topics, on_assign=lambda *args: None, *args, **kwargs + ): # pylint: disable=keyword-arg-before-vararg self._consumer.subscribe(topics, on_assign, *args, **kwargs) def original_consumer(self): @@ -189,8 +208,11 @@ class ConfluentKafkaInstrumentor(BaseInstrumentor): See `BaseInstrumentor` """ + # pylint: disable=attribute-defined-outside-init @staticmethod - def instrument_producer(producer: Producer, tracer_provider=None) -> ProxiedProducer: + def instrument_producer( + producer: Producer, tracer_provider=None + ) -> ProxiedProducer: tracer = trace.get_tracer( __name__, __version__, tracer_provider=tracer_provider ) @@ -200,7 +222,9 @@ def instrument_producer(producer: Producer, tracer_provider=None) -> ProxiedProd return manual_producer @staticmethod - def instrument_consumer(consumer: Consumer, tracer_provider=None) -> ProxiedConsumer: + def instrument_consumer( + consumer: Consumer, tracer_provider=None + ) -> ProxiedConsumer: tracer = trace.get_tracer( __name__, __version__, tracer_provider=tracer_provider ) @@ -210,14 +234,16 @@ def instrument_consumer(consumer: Consumer, tracer_provider=None) -> ProxiedCons return manual_consumer @staticmethod - def uninstrument_producer(producer) -> Producer: + def uninstrument_producer(producer: Producer) -> Producer: if isinstance(producer, ProxiedProducer): return producer.original_producer() + return producer @staticmethod - def uninstrument_consumer(consumer) -> Consumer: + def uninstrument_consumer(consumer: Consumer) -> Consumer: if isinstance(consumer, ProxiedConsumer): return consumer.original_consumer() + return consumer def instrumentation_dependencies(self) -> Collection[str]: return _instruments @@ -237,16 +263,26 @@ def _instrument(self, **kwargs): self._tracer = tracer def _inner_wrap_produce(func, instance, args, kwargs): - return ConfluentKafkaInstrumentor.wrap_produce(func, instance, self._tracer, args, kwargs) + return ConfluentKafkaInstrumentor.wrap_produce( + func, instance, self._tracer, args, kwargs + ) def _inner_wrap_poll(func, instance, args, kwargs): - return ConfluentKafkaInstrumentor.wrap_poll(func, instance, self._tracer, args, kwargs) + return ConfluentKafkaInstrumentor.wrap_poll( + func, instance, self._tracer, args, kwargs + ) - wrapt.wrap_function_wrapper("kafka_instrumentation", - "AutoInstrumentedProducer.produce", _inner_wrap_produce) + wrapt.wrap_function_wrapper( + AutoInstrumentedProducer, + "produce", + _inner_wrap_produce, + ) - wrapt.wrap_function_wrapper("kafka_instrumentation", - "AutoInstrumentedConsumer.poll", _inner_wrap_poll) + wrapt.wrap_function_wrapper( + AutoInstrumentedConsumer, + "poll", + _inner_wrap_poll, + ) def _uninstrument(self, **kwargs): confluent_kafka.Producer = self._original_kafka_producer @@ -261,22 +297,29 @@ def wrap_produce(func, instance, tracer, args, kwargs): if not topic: topic = args[0] - span_name = _get_span_name("send", topic) - with tracer.start_as_current_span(name=span_name, kind=trace.SpanKind.PRODUCER) as span: - headers = KafkaPropertiesExtractor.extract_produce_headers(args, kwargs) + span_name = _get_span_name("send", topic) + with tracer.start_as_current_span( + name=span_name, kind=trace.SpanKind.PRODUCER + ) as span: + headers = KafkaPropertiesExtractor.extract_produce_headers( + args, kwargs + ) if headers is None: headers = [] kwargs["headers"] = headers topic = KafkaPropertiesExtractor.extract_produce_topic(args) - bootstrap_servers = KafkaPropertiesExtractor.extract_bootstrap_servers(instance) - _enrich_span(span, topic, bootstrap_servers, operation=MessagingOperationValues.RECEIVE) # Replace + _enrich_span( + span, + topic, + operation=MessagingOperationValues.RECEIVE, + ) # Replace propagate.inject( headers, setter=_kafka_setter, ) return func(*args, **kwargs) - + @staticmethod def wrap_poll(func, instance, tracer, args, kwargs): if instance._current_consume_span: @@ -285,7 +328,9 @@ def wrap_poll(func, instance, tracer, args, kwargs): instance._current_consume_span.end() instance._current_consume_span = None - with tracer.start_as_current_span("recv", end_on_exit=True, kind=trace.SpanKind.CONSUMER) as span: + with tracer.start_as_current_span( + "recv", end_on_exit=True, kind=trace.SpanKind.CONSUMER + ): record = func(*args, **kwargs) if record: links = [] @@ -296,20 +341,20 @@ def wrap_poll(func, instance, tracer, args, kwargs): links.append(Link(context=item.get_span_context())) instance._current_consume_span = tracer.start_span( - name=f"{record.topic()} process", links=links, kind=SpanKind.CONSUMER + name=f"{record.topic()} process", + links=links, + kind=SpanKind.CONSUMER, ) - bootstrap_servers = KafkaPropertiesExtractor.extract_bootstrap_servers(instance) _enrich_span( instance._current_consume_span, record.topic(), - bootstrap_servers, record.partition(), record.offset(), operation=MessagingOperationValues.PROCESS, - ) instance._current_context_token = context.attach( - trace.set_span_in_context(instance._current_consume_span)) + trace.set_span_in_context(instance._current_consume_span) + ) - return record \ No newline at end of file + return record diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/package.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/package.py index b75cdf6fff..dbe3ac484b 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/package.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/package.py @@ -13,4 +13,4 @@ # limitations under the License. -_instruments = ("confluent-kafka ~= 1.8.2",) \ No newline at end of file +_instruments = ("confluent-kafka ~= 1.8.2",) diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py index 3e8c302fb5..4907031a75 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py @@ -1,16 +1,12 @@ -import json -import trace from logging import getLogger from typing import List, Optional -from opentelemetry import propagate from opentelemetry.propagators import textmap from opentelemetry.semconv.trace import ( - SpanAttributes, - MessagingOperationValues, MessagingDestinationKindValues, + MessagingOperationValues, + SpanAttributes, ) -from opentelemetry.trace import Link, SpanKind _LOG = getLogger(__name__) @@ -68,14 +64,14 @@ def set(self, carrier: textmap.CarrierT, key: str, value: str) -> None: _kafka_getter = KafkaContextGetter() + + def _enrich_span( span, topic, - bootstrap_servers: List[str], partition: Optional[int] = None, offset: Optional[int] = None, operation: Optional[MessagingOperationValues] = None, - ): if not span.is_recording(): @@ -83,11 +79,10 @@ def _enrich_span( span.set_attribute(SpanAttributes.MESSAGING_SYSTEM, "kafka") span.set_attribute(SpanAttributes.MESSAGING_DESTINATION, topic) - span.set_attribute(SpanAttributes.MESSAGING_KAFKA_PARTITION, partition) - span.set_attribute( - SpanAttributes.MESSAGING_URL, json.dumps(bootstrap_servers) - ) + if partition: + span.set_attribute(SpanAttributes.MESSAGING_KAFKA_PARTITION, partition) + span.set_attribute( SpanAttributes.MESSAGING_DESTINATION_KIND, MessagingDestinationKindValues.QUEUE.value, @@ -101,7 +96,10 @@ def _enrich_span( # https://stackoverflow.com/questions/65935155/identify-and-find-specific-message-in-kafka-topic # A message within Kafka is uniquely defined by its topic name, topic partition and offset. if partition and offset and topic: - span.set_attribute(SpanAttributes.MESSAGING_MESSAGE_ID, f"{topic}.{partition}.{offset}") + span.set_attribute( + SpanAttributes.MESSAGING_MESSAGE_ID, + f"{topic}.{partition}.{offset}", + ) _kafka_setter = KafkaContextSetter() diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/version.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/version.py index 713d32087d..d8dc1e1ed7 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/version.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/version.py @@ -12,4 +12,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -__version__ = "0.27b0" \ No newline at end of file +__version__ = "0.31b0" diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py index 593a5ada1b..8adadbb824 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py @@ -13,41 +13,44 @@ # limitations under the License. from unittest import TestCase - -from confluent_kafka import Producer, Consumer - -from kafka_instrumentation import ConfluentKafkaInstrumentor, ProxiedProducer, ProxiedConsumer +from confluent_kafka import Consumer, Producer +from opentelemetry.instrumentation.confluent_kafka import ( + ConfluentKafkaInstrumentor, + ProxiedConsumer, + ProxiedProducer, +) class TestConfluentKafka(TestCase): def test_instrument_api(self) -> None: instrumentation = ConfluentKafkaInstrumentor() - p = Producer({'bootstrap.servers': 'localhost:29092'}) - p = instrumentation.instrument_producer(p) - - self.assertEqual(p.__class__, ProxiedProducer) + producer = Producer({"bootstrap.servers": "localhost:29092"}) + producer = instrumentation.instrument_producer(producer) - p = instrumentation.uninstrument_producer(p) - self.assertEqual(p.__class__, Producer) + self.assertEqual(producer.__class__, ProxiedProducer) - p = Producer({'bootstrap.servers': 'localhost:29092'}) - p = instrumentation.instrument_producer(p) + producer = instrumentation.uninstrument_producer(producer) + self.assertEqual(producer.__class__, Producer) - self.assertEqual(p.__class__, ProxiedProducer) + producer = Producer({"bootstrap.servers": "localhost:29092"}) + producer = instrumentation.instrument_producer(producer) - p = instrumentation.uninstrument_producer(p) - self.assertEqual(p.__class__, Producer) + self.assertEqual(producer.__class__, ProxiedProducer) - c = Consumer({ - 'bootstrap.servers': 'localhost:29092', - 'group.id': 'mygroup', - 'auto.offset.reset': 'earliest' - }) + producer = instrumentation.uninstrument_producer(producer) + self.assertEqual(producer.__class__, Producer) - c = instrumentation.instrument_consumer(c) - self.assertEqual(c.__class__, ProxiedConsumer) + consumer = Consumer( + { + "bootstrap.servers": "localhost:29092", + "group.id": "mygroup", + "auto.offset.reset": "earliest", + } + ) - c = instrumentation.uninstrument_consumer(c) - self.assertEqual(c.__class__, Consumer) + consumer = instrumentation.instrument_consumer(consumer) + self.assertEqual(consumer.__class__, ProxiedConsumer) + consumer = instrumentation.uninstrument_consumer(consumer) + self.assertEqual(consumer.__class__, Consumer) diff --git a/opentelemetry-contrib-instrumentations/setup.cfg b/opentelemetry-contrib-instrumentations/setup.cfg index 7a13641330..017fb56302 100644 --- a/opentelemetry-contrib-instrumentations/setup.cfg +++ b/opentelemetry-contrib-instrumentations/setup.cfg @@ -37,6 +37,7 @@ install_requires = opentelemetry-instrumentation-boto3sqs==0.31b0 opentelemetry-instrumentation-botocore==0.31b0 opentelemetry-instrumentation-celery==0.31b0 + opentelemetry-instrumentation-confluent-kafka==0.31b0 opentelemetry-instrumentation-dbapi==0.31b0 opentelemetry-instrumentation-django==0.31b0 opentelemetry-instrumentation-elasticsearch==0.31b0 diff --git a/opentelemetry-instrumentation/src/opentelemetry/instrumentation/bootstrap_gen.py b/opentelemetry-instrumentation/src/opentelemetry/instrumentation/bootstrap_gen.py index e3062cf6b3..36a78d6e8d 100644 --- a/opentelemetry-instrumentation/src/opentelemetry/instrumentation/bootstrap_gen.py +++ b/opentelemetry-instrumentation/src/opentelemetry/instrumentation/bootstrap_gen.py @@ -48,6 +48,10 @@ "library": "celery >= 4.0, < 6.0", "instrumentation": "opentelemetry-instrumentation-celery==0.31b0", }, + "confluent-kafka": { + "library": "confluent-kafka ~= 1.8.2", + "instrumentation": "opentelemetry-instrumentation-confluent-kafka==0.31b0", + }, "django": { "library": "django >= 1.10", "instrumentation": "opentelemetry-instrumentation-django==0.31b0", From a0f6eeaf2c6a61277cb82342d4774015b3541841 Mon Sep 17 00:00:00 2001 From: dor Date: Tue, 31 May 2022 15:34:45 +0300 Subject: [PATCH 3/6] fix tests --- .../tests/test_instrumentation.py | 1 + 1 file changed, 1 insertion(+) diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py index 8adadbb824..af87c4bcc6 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py @@ -14,6 +14,7 @@ from unittest import TestCase from confluent_kafka import Consumer, Producer + from opentelemetry.instrumentation.confluent_kafka import ( ConfluentKafkaInstrumentor, ProxiedConsumer, From fcf8c507247868b6f7511d43049e4a38139d3546 Mon Sep 17 00:00:00 2001 From: dor Date: Tue, 31 May 2022 15:57:27 +0300 Subject: [PATCH 4/6] change documentation --- .../opentelemetry/instrumentation/confluent_kafka/__init__.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py index 8aa6ab2658..031d2f42ff 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py @@ -47,8 +47,7 @@ def basic_consume_loop(consumer, topics): if msg.error(): if msg.error().code() == KafkaError._PARTITION_EOF: # End of partition event - sys.stderr.write('%% %s [%d] reached end at offset %d\n' % - (msg.topic(), msg.partition(), msg.offset())) + sys.stderr.write(f"{msg.topic()} [{msg.partition()}] reached end at offset {msg.offset()}}\n") elif msg.error(): raise KafkaException(msg.error()) else: From ee185fe338cf76dad3f33f5ca86d2b8a3eac9e12 Mon Sep 17 00:00:00 2001 From: dor Date: Tue, 31 May 2022 16:28:15 +0300 Subject: [PATCH 5/6] lint fix --- .../instrumentation/confluent_kafka/__init__.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py index 031d2f42ff..ed4ee930b3 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py @@ -103,19 +103,20 @@ def instrument_consumer(consumer: Consumer, tracer_provider=None) from confluent_kafka import Consumer, Producer from opentelemetry import context, propagate, trace -from opentelemetry.instrumentation.confluent_kafka.package import _instruments -from opentelemetry.instrumentation.confluent_kafka.utils import ( +from opentelemetry.instrumentation.instrumentor import BaseInstrumentor +from opentelemetry.instrumentation.utils import unwrap +from opentelemetry.semconv.trace import MessagingOperationValues +from opentelemetry.trace import Link, SpanKind, Tracer + +from .package import _instruments +from .utils import ( KafkaPropertiesExtractor, _enrich_span, _get_span_name, _kafka_getter, _kafka_setter, ) -from opentelemetry.instrumentation.confluent_kafka.version import __version__ -from opentelemetry.instrumentation.instrumentor import BaseInstrumentor -from opentelemetry.instrumentation.utils import unwrap -from opentelemetry.semconv.trace import MessagingOperationValues -from opentelemetry.trace import Link, SpanKind, Tracer +from .version import __version__ class AutoInstrumentedProducer(Producer): From 442db3af25b714bb4187b5552913e2f919371071 Mon Sep 17 00:00:00 2001 From: dor Date: Wed, 1 Jun 2022 10:33:44 +0300 Subject: [PATCH 6/6] fix lint --- .../opentelemetry-instrumentation-confluent-kafka/README.rst | 2 +- .../tests/test_instrumentation.py | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/README.rst b/instrumentation/opentelemetry-instrumentation-confluent-kafka/README.rst index dfd9d0283f..163c2a4393 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/README.rst +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/README.rst @@ -1,5 +1,5 @@ OpenTelemetry confluent-kafka Instrumentation -=========================== +============================================= |pypi| diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py index af87c4bcc6..e9462d7898 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py @@ -11,6 +11,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + +# pylint: disable=no-name-in-module + from unittest import TestCase from confluent_kafka import Consumer, Producer