From ad69c0dda5c75479d211ff6c34fa381e421d423e Mon Sep 17 00:00:00 2001 From: Mateusz Lach Date: Wed, 10 Apr 2024 16:52:03 +0200 Subject: [PATCH 1/2] stop creating spans on empty reads --- .../ConsumerConsumeSyncIntegration.cs | 13 ++++++---- .../Kafka/KafkaInstrumentation.cs | 25 +++++++------------ test/IntegrationTests/KafkaTests.cs | 25 ++++--------------- 3 files changed, 22 insertions(+), 41 deletions(-) diff --git a/src/OpenTelemetry.AutoInstrumentation/Instrumentations/Kafka/Integrations/ConsumerConsumeSyncIntegration.cs b/src/OpenTelemetry.AutoInstrumentation/Instrumentations/Kafka/Integrations/ConsumerConsumeSyncIntegration.cs index fec9847042..a20b39dace 100644 --- a/src/OpenTelemetry.AutoInstrumentation/Instrumentations/Kafka/Integrations/ConsumerConsumeSyncIntegration.cs +++ b/src/OpenTelemetry.AutoInstrumentation/Instrumentations/Kafka/Integrations/ConsumerConsumeSyncIntegration.cs @@ -46,13 +46,16 @@ internal static CallTargetReturn OnMethodEnd(TTar consumeResult = response == null ? null : response.DuckAs(); } - var activity = KafkaInstrumentation.StartConsumerActivity(consumeResult, (DateTimeOffset)state.StartTime!, instance!); - if (exception is not null) + if (consumeResult is not null) { - activity.SetException(exception); - } + var activity = KafkaInstrumentation.StartConsumerActivity(consumeResult, (DateTimeOffset)state.StartTime!, instance!); + if (exception is not null) + { + activity.SetException(exception); + } - activity?.Stop(); + activity?.Stop(); + } return new CallTargetReturn(response); } diff --git a/src/OpenTelemetry.AutoInstrumentation/Instrumentations/Kafka/KafkaInstrumentation.cs b/src/OpenTelemetry.AutoInstrumentation/Instrumentations/Kafka/KafkaInstrumentation.cs index 5215ca71b5..96440f1e49 100644 --- a/src/OpenTelemetry.AutoInstrumentation/Instrumentations/Kafka/KafkaInstrumentation.cs +++ b/src/OpenTelemetry.AutoInstrumentation/Instrumentations/Kafka/KafkaInstrumentation.cs @@ -14,23 +14,19 @@ internal static class KafkaInstrumentation { private static ActivitySource Source { get; } = new("OpenTelemetry.AutoInstrumentation.Kafka"); - public static Activity? StartConsumerActivity(IConsumeResult? consumeResult, DateTimeOffset startTime, object consumer) + public static Activity? StartConsumerActivity(IConsumeResult consumeResult, DateTimeOffset startTime, object consumer) { - PropagationContext? propagatedContext = null; - if (consumeResult is not null) - { - propagatedContext = Propagators.DefaultTextMapPropagator.Extract(default, consumeResult, MessageHeaderValueGetter); - } + PropagationContext? propagatedContext = Propagators.DefaultTextMapPropagator.Extract(default, consumeResult, MessageHeaderValueGetter); string? spanName = null; - if (!string.IsNullOrEmpty(consumeResult?.Topic)) + if (!string.IsNullOrEmpty(consumeResult.Topic)) { - spanName = $"{consumeResult?.Topic} {MessagingAttributes.Values.ReceiveOperationName}"; + spanName = $"{consumeResult.Topic} {MessagingAttributes.Values.ReceiveOperationName}"; } spanName ??= MessagingAttributes.Values.ReceiveOperationName; - var activityLinks = propagatedContext is not null && propagatedContext.Value.ActivityContext.IsValid() + var activityLinks = propagatedContext.Value.ActivityContext.IsValid() ? new[] { new ActivityLink(propagatedContext.Value.ActivityContext) } : Array.Empty(); @@ -56,15 +52,12 @@ Activity.Current is null ? SetCommonAttributes( activity, MessagingAttributes.Values.ReceiveOperationName, - consumeResult?.Topic, - consumeResult?.Partition, - consumeResult?.Message?.Key, + consumeResult.Topic, + consumeResult.Partition, + consumeResult.Message?.Key, consumer.DuckCast()); - if (consumeResult is not null) - { - activity.SetTag(MessagingAttributes.Keys.Kafka.PartitionOffset, consumeResult.Offset.Value); - } + activity.SetTag(MessagingAttributes.Keys.Kafka.PartitionOffset, consumeResult.Offset.Value); if (ConsumerCache.TryGet(consumer, out var groupId)) { diff --git a/test/IntegrationTests/KafkaTests.cs b/test/IntegrationTests/KafkaTests.cs index 12f4095837..2b48172787 100644 --- a/test/IntegrationTests/KafkaTests.cs +++ b/test/IntegrationTests/KafkaTests.cs @@ -56,16 +56,13 @@ public void SubmitsTraces(string packageVersion) collector.Expect(KafkaInstrumentationScopeName, span => span.Kind == Span.Types.SpanKind.Producer && ValidateProduceExceptionSpan(span, topicName), "Failed Produce attempt without delivery handler set."); collector.Expect(KafkaInstrumentationScopeName, span => span.Kind == Span.Types.SpanKind.Producer && ValidateResultProcessingProduceExceptionSpan(span, topicName), "Failed ProduceAsync attempt."); - if (packageVersion != string.Empty && Version.Parse(packageVersion) == new Version(1, 4, 0)) - { - // For 1.4.0 null is returned when attempting to read from non-existent topic, - // and no exception is thrown. - collector.Expect(KafkaInstrumentationScopeName, span => span.Kind == Span.Types.SpanKind.Consumer && ValidateEmptyReadConsumerSpan(span, topicName), "Successful Consume attempt that returned no message."); - } - else + if (packageVersion == string.Empty || Version.Parse(packageVersion) != new Version(1, 4, 0)) { // Failed consume attempt. - collector.Expect(KafkaInstrumentationScopeName, span => span.Kind == Span.Types.SpanKind.Consumer && ValidateConsumeExceptionSpan(span, topicName), "Failed Consume attempt."); + collector.Expect( + KafkaInstrumentationScopeName, + span => span.Kind == Span.Types.SpanKind.Consumer && ValidateConsumeExceptionSpan(span, topicName), + "Failed Consume attempt."); } // Successful produce attempts after topic was created with admin client. @@ -81,9 +78,6 @@ public void SubmitsTraces(string packageVersion) collector.Expect(KafkaInstrumentationScopeName, span => span.Kind == Span.Types.SpanKind.Consumer && ValidateConsumerSpan(span, topicName, 1), "Second successful Consume attempt."); collector.Expect(KafkaInstrumentationScopeName, span => span.Kind == Span.Types.SpanKind.Consumer && ValidateConsumerSpan(span, topicName, 2), "Third successful Consume attempt."); - // Consume attempt that returns no message. - collector.Expect(KafkaInstrumentationScopeName, span => span.Kind == Span.Types.SpanKind.Consumer && ValidateEmptyReadConsumerSpan(span, topicName), "Additional successful attempt after all the produced messages were already read."); - collector.ExpectCollected(collection => ValidatePropagation(collection, topicName)); EnableBytecodeInstrumentation(); @@ -97,15 +91,6 @@ public void SubmitsTraces(string packageVersion) collector.AssertExpectations(); } - private static bool ValidateEmptyReadConsumerSpan(Span span, string topicName) - { - var consumerGroupId = span.Attributes.Single(kv => kv.Key == KafkaConsumerGroupAttributeName).Value.StringValue; - return span.Name == MessagingReceiveOperationAttributeValue && - span.Links.Count == 0 && - ValidateBasicSpanAttributes(span.Attributes, KafkaConsumerClientIdAttributeValue, MessagingReceiveOperationAttributeValue) && - consumerGroupId == GetConsumerGroupIdAttributeValue(topicName); - } - private static string GetConsumerGroupIdAttributeValue(string topicName) { return $"test-consumer-group-{topicName}"; From 6a4d2de67e0b3b3c47e2cf994019f02a1e129661 Mon Sep 17 00:00:00 2001 From: Mateusz Lach Date: Wed, 10 Apr 2024 18:37:22 +0200 Subject: [PATCH 2/2] changelog update --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index a8a270095d..423769ccb6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,8 @@ This component adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.h ### Fixed +- Stop creating `receive` consumer spans for consume attempts that returned no message. + ## [1.5.0](https://github.com/open-telemetry/opentelemetry-dotnet-instrumentation/releases/tag/v1.5.0) - [Core components](https://github.com/open-telemetry/opentelemetry-dotnet/blob/main/VERSIONING.md#core-components):