diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py index 1e3f304188..2c9ebc1abd 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py @@ -14,7 +14,11 @@ # pylint: disable=no-name-in-module -from unittest import TestCase +from unittest.mock import patch + +from opentelemetry.semconv.trace import SpanAttributes, MessagingDestinationKindValues +from opentelemetry.test.test_base import TestBase +from .utils import MockConsumer, MockedMessage from confluent_kafka import Consumer, Producer @@ -29,7 +33,7 @@ ) -class TestConfluentKafka(TestCase): +class TestConfluentKafka(TestBase): def test_instrument_api(self) -> None: instrumentation = ConfluentKafkaInstrumentor() @@ -104,3 +108,160 @@ def test_context_getter(self) -> None: context_setter.set(carrier_list, "key1", "val1") self.assertEqual(context_getter.get(carrier_list, "key1"), ["val1"]) self.assertEqual(["key1"], context_getter.keys(carrier_list)) + + def test_poll(self) -> None: + instrumentation = ConfluentKafkaInstrumentor() + mocked_messages = [ + MockedMessage("topic-10", 0, 0, []), + MockedMessage("topic-20", 2, 4, []), + MockedMessage("topic-30", 1, 3, []), + ] + expected_spans= [ + { + "name": "recv", + "attributes": {} + }, + { + "name": "topic-10 process", + "attributes": { + SpanAttributes.MESSAGING_OPERATION: "process", + SpanAttributes.MESSAGING_KAFKA_PARTITION: 0, + SpanAttributes.MESSAGING_SYSTEM: "kafka", + SpanAttributes.MESSAGING_DESTINATION: "topic-10", + SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value, + SpanAttributes.MESSAGING_MESSAGE_ID: "topic-10.0.0", + } + }, + { + "name": "recv", + "attributes": {} + }, + { + "name": "topic-20 process", + "attributes": { + SpanAttributes.MESSAGING_OPERATION: "process", + SpanAttributes.MESSAGING_KAFKA_PARTITION: 2, + SpanAttributes.MESSAGING_SYSTEM: "kafka", + SpanAttributes.MESSAGING_DESTINATION: "topic-20", + SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value, + SpanAttributes.MESSAGING_MESSAGE_ID: "topic-20.2.4", + } + }, + { + "name": "recv", + "attributes": {} + }, + { + "name": "topic-30 process", + "attributes": { + SpanAttributes.MESSAGING_OPERATION: "process", + SpanAttributes.MESSAGING_KAFKA_PARTITION: 1, + SpanAttributes.MESSAGING_SYSTEM: "kafka", + SpanAttributes.MESSAGING_DESTINATION: "topic-30", + SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value, + SpanAttributes.MESSAGING_MESSAGE_ID: "topic-30.1.3", + } + }, + { + "name": "recv", + "attributes": {} + }, + ] + + consumer = MockConsumer( + mocked_messages, + { + "bootstrap.servers": "localhost:29092", + "group.id": "mygroup", + "auto.offset.reset": "earliest", + } + ) + span_list = self.memory_exporter.clear() + consumer = instrumentation.instrument_consumer(consumer) + consumer.poll(1) + consumer.poll(1) + consumer.poll(1) + consumer.poll(1) + + span_list = self.memory_exporter.get_finished_spans() + self._compare_spans(span_list, expected_spans) + + def test_consume(self) -> None: + instrumentation = ConfluentKafkaInstrumentor() + mocked_messages = [ + MockedMessage("topic-1", 0, 0, []), + MockedMessage("topic-1", 2, 1, []), + MockedMessage("topic-1", 3, 2, []), + MockedMessage("topic-2", 0, 0, []), + MockedMessage("topic-3", 0, 3, []), + MockedMessage("topic-2", 0, 1, []), + ] + expected_spans= [ + { + "name": "recv", + "attributes": {} + }, + { + "name": "topic-1 process", + "attributes": { + SpanAttributes.MESSAGING_OPERATION: "process", + SpanAttributes.MESSAGING_SYSTEM: "kafka", + SpanAttributes.MESSAGING_DESTINATION: "topic-1", + SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value, + } + }, + { + "name": "recv", + "attributes": {} + }, + { + "name": "topic-2 process", + "attributes": { + SpanAttributes.MESSAGING_OPERATION: "process", + SpanAttributes.MESSAGING_SYSTEM: "kafka", + SpanAttributes.MESSAGING_DESTINATION: "topic-2", + SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value, + } + }, + { + "name": "recv", + "attributes": {} + }, + { + "name": "topic-3 process", + "attributes": { + SpanAttributes.MESSAGING_OPERATION: "process", + SpanAttributes.MESSAGING_SYSTEM: "kafka", + SpanAttributes.MESSAGING_DESTINATION: "topic-3", + SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value, + } + }, + { + "name": "recv", + "attributes": {} + }, + ] + + consumer = MockConsumer( + mocked_messages, + { + "bootstrap.servers": "localhost:29092", + "group.id": "mygroup", + "auto.offset.reset": "earliest", + } + ) + + span_list = self.memory_exporter.clear() + consumer = instrumentation.instrument_consumer(consumer) + consumer.consume(3) + consumer.consume(1) + consumer.consume(2) + consumer.consume(1) + span_list = self.memory_exporter.get_finished_spans() + self._compare_spans(span_list, expected_spans) + + def _compare_spans(self, spans, expected_spans): + for (span, expected_span) in zip(spans, expected_spans): + self.assertEqual(expected_span['name'], span.name) + for attribute_key, expected_attribute_value in expected_span['attributes'].items(): + self.assertEqual(expected_attribute_value, span.attributes[attribute_key]) diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/utils.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/utils.py new file mode 100644 index 0000000000..ab2e6b48a9 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/utils.py @@ -0,0 +1,39 @@ +from confluent_kafka import Consumer + + +class MockConsumer(Consumer): + + def __init__(self, queue, config): + self._queue = queue + super().__init__(config) + + def consume(self, num_messages=1, *args, **kwargs): + messages = self._queue[:num_messages] + self._queue = self._queue[num_messages:] + return messages + + def poll(self, timeout=None): + if len(self._queue) > 0: + return self._queue.pop(0) + else: + return None + + +class MockedMessage: + def __init__(self, topic: str, partition: int, offset: int, headers): + self._topic = topic + self._partition = partition + self._offset = offset + self._headers = headers + + def topic(self): + return self._topic + + def partition(self): + return self._partition + + def offset(self): + return self._offset + + def headers(self): + return self._headers