diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py index de99bd351e..37ac316c2d 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py @@ -137,7 +137,7 @@ def __init__(self, config): # This method is deliberately implemented in order to allow wrapt to wrap this function def poll(self, timeout=-1): # pylint: disable=useless-super-delegation return super().poll(timeout) - + # This method is deliberately implemented in order to allow wrapt to wrap this function def consume(self, *args, **kwargs): # pylint: disable=useless-super-delegation return super().consume(*args, **kwargs) @@ -182,9 +182,7 @@ def committed(self, partitions, timeout=-1): def commit(self, *args, **kwargs): return self._consumer.commit(*args, **kwargs) - def consume( - self, *args, **kwargs - ): + def consume(self, *args, **kwargs): return ConfluentKafkaInstrumentor.wrap_consume( self._consumer.consume, self, self._tracer, args, kwargs, ) @@ -281,7 +279,7 @@ def _inner_wrap_poll(func, instance, args, kwargs): return ConfluentKafkaInstrumentor.wrap_poll( func, instance, self._tracer, args, kwargs ) - + def _inner_wrap_consume(func, instance, args, kwargs): return ConfluentKafkaInstrumentor.wrap_consume( func, instance, self._tracer, args, kwargs @@ -298,7 +296,7 @@ def _inner_wrap_consume(func, instance, args, kwargs): "poll", _inner_wrap_poll, ) - + wrapt.wrap_function_wrapper( AutoInstrumentedConsumer, "consume", @@ -373,7 +371,7 @@ def wrap_poll(func, instance, tracer, args, kwargs): ) return record - + @staticmethod def wrap_consume(func, instance, tracer, args, kwargs): if instance._current_consume_span: @@ -399,7 +397,7 @@ def wrap_consume(func, instance, tracer, args, kwargs): records[0].topic(), operation=MessagingOperationValues.PROCESS, ) - + instance._current_context_token = context.attach( trace.set_span_in_context(instance._current_consume_span) ) diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/utils.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/utils.py index ab2e6b48a9..ffcbb66d61 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/utils.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/utils.py @@ -7,7 +7,7 @@ def __init__(self, queue, config): self._queue = queue super().__init__(config) - def consume(self, num_messages=1, *args, **kwargs): + def consume(self, num_messages=1, *args, **kwargs): # pylint: disable=keyword-arg-before-vararg messages = self._queue[:num_messages] self._queue = self._queue[num_messages:] return messages @@ -15,8 +15,7 @@ def consume(self, num_messages=1, *args, **kwargs): def poll(self, timeout=None): if len(self._queue) > 0: return self._queue.pop(0) - else: - return None + return None class MockedMessage: