From ef7769c86ec362e2b8f92ed5938fb1324f526306 Mon Sep 17 00:00:00 2001 From: Ran Nozik Date: Fri, 28 Jan 2022 18:02:50 +0200 Subject: [PATCH] fix: safe kafka partition extraction (#872) * safe partition extraction * update changelog --- CHANGELOG.md | 3 ++ .../instrumentation/kafka/utils.py | 49 +++++++++++-------- 2 files changed, 32 insertions(+), 20 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1a3ae3f322..c622d21f94 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -45,6 +45,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `opentelemetry-instrumentation-sqlite3` Instrumentation now works with `dbapi2.connect` +- `opentelemetry-instrumentation-kafka` Kafka: safe kafka partition extraction + ([#872](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/872)) + ## [1.8.0-0.27b0](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v1.8.0-0.27b0) - 2021-12-17 ### Added diff --git a/instrumentation/opentelemetry-instrumentation-kafka-python/src/opentelemetry/instrumentation/kafka/utils.py b/instrumentation/opentelemetry-instrumentation-kafka-python/src/opentelemetry/instrumentation/kafka/utils.py index 858e879a42..019369b42e 100644 --- a/instrumentation/opentelemetry-instrumentation-kafka-python/src/opentelemetry/instrumentation/kafka/utils.py +++ b/instrumentation/opentelemetry-instrumentation-kafka-python/src/opentelemetry/instrumentation/kafka/utils.py @@ -55,27 +55,36 @@ def extract_send_headers(args, kwargs): @staticmethod def extract_send_partition(instance, args, kwargs): """extract partition `send` method arguments, using the `_partition` method in KafkaProducer class""" - topic = KafkaPropertiesExtractor.extract_send_topic(args) - key = KafkaPropertiesExtractor.extract_send_key(args, kwargs) - value = KafkaPropertiesExtractor.extract_send_value(args, kwargs) - partition = KafkaPropertiesExtractor._extract_argument( - "partition", 4, None, args, kwargs - ) - key_bytes = instance._serialize( - instance.config["key_serializer"], topic, key - ) - value_bytes = instance._serialize( - instance.config["value_serializer"], topic, value - ) - valid_types = (bytes, bytearray, memoryview, type(None)) - if ( - type(key_bytes) not in valid_types - or type(value_bytes) not in valid_types - ): + try: + topic = KafkaPropertiesExtractor.extract_send_topic(args) + key = KafkaPropertiesExtractor.extract_send_key(args, kwargs) + value = KafkaPropertiesExtractor.extract_send_value(args, kwargs) + partition = KafkaPropertiesExtractor._extract_argument( + "partition", 4, None, args, kwargs + ) + key_bytes = instance._serialize( + instance.config["key_serializer"], topic, key + ) + value_bytes = instance._serialize( + instance.config["value_serializer"], topic, value + ) + valid_types = (bytes, bytearray, memoryview, type(None)) + if ( + type(key_bytes) not in valid_types + or type(value_bytes) not in valid_types + ): + return None + + all_partitions = instance._metadata.partitions_for_topic(topic) + if all_partitions is None or len(all_partitions) == 0: + return None + + return instance._partition( + topic, partition, key, value, key_bytes, value_bytes + ) + except Exception as exception: # pylint: disable=W0703 + _LOG.debug("Unable to extract partition: %s", exception) return None - return instance._partition( - topic, partition, key, value, key_bytes, value_bytes - ) ProduceHookT = Optional[Callable[[Span, List, Dict], None]]