Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[kafka] BUG - stop creating spans on empty reads #3363

Merged
merged 4 commits into from
Apr 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ This component adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.h

### Fixed

- Stop creating `receive` consumer spans for consume attempts that returned no message.

## [1.5.0](https://github.com/open-telemetry/opentelemetry-dotnet-instrumentation/releases/tag/v1.5.0)

- [Core components](https://github.com/open-telemetry/opentelemetry-dotnet/blob/main/VERSIONING.md#core-components):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,16 @@ internal static CallTargetReturn<TResponse> OnMethodEnd<TTarget, TResponse>(TTar
consumeResult = response == null ? null : response.DuckAs<IConsumeResult>();
}

var activity = KafkaInstrumentation.StartConsumerActivity(consumeResult, (DateTimeOffset)state.StartTime!, instance!);
if (exception is not null)
if (consumeResult is not null)
{
activity.SetException(exception);
}
var activity = KafkaInstrumentation.StartConsumerActivity(consumeResult, (DateTimeOffset)state.StartTime!, instance!);
if (exception is not null)
Copy link
Contributor

@RassK RassK Apr 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may hide some transient network errors (since consumeResult has to exist) ... not sure if good or bad

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd leave it as is for the quick fix.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't that guaranteed by the check on line 40?

{
activity.SetException(exception);
}

activity?.Stop();
activity?.Stop();
}

return new CallTargetReturn<TResponse>(response);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,19 @@ internal static class KafkaInstrumentation
{
private static ActivitySource Source { get; } = new("OpenTelemetry.AutoInstrumentation.Kafka");

public static Activity? StartConsumerActivity(IConsumeResult? consumeResult, DateTimeOffset startTime, object consumer)
public static Activity? StartConsumerActivity(IConsumeResult consumeResult, DateTimeOffset startTime, object consumer)
{
PropagationContext? propagatedContext = null;
if (consumeResult is not null)
{
propagatedContext = Propagators.DefaultTextMapPropagator.Extract(default, consumeResult, MessageHeaderValueGetter);
}
PropagationContext? propagatedContext = Propagators.DefaultTextMapPropagator.Extract(default, consumeResult, MessageHeaderValueGetter);

string? spanName = null;
if (!string.IsNullOrEmpty(consumeResult?.Topic))
if (!string.IsNullOrEmpty(consumeResult.Topic))
{
spanName = $"{consumeResult?.Topic} {MessagingAttributes.Values.ReceiveOperationName}";
spanName = $"{consumeResult.Topic} {MessagingAttributes.Values.ReceiveOperationName}";
}

spanName ??= MessagingAttributes.Values.ReceiveOperationName;

var activityLinks = propagatedContext is not null && propagatedContext.Value.ActivityContext.IsValid()
var activityLinks = propagatedContext.Value.ActivityContext.IsValid()
? new[] { new ActivityLink(propagatedContext.Value.ActivityContext) }
: Array.Empty<ActivityLink>();

Expand All @@ -56,15 +52,12 @@ Activity.Current is null ?
SetCommonAttributes(
activity,
MessagingAttributes.Values.ReceiveOperationName,
consumeResult?.Topic,
consumeResult?.Partition,
consumeResult?.Message?.Key,
consumeResult.Topic,
consumeResult.Partition,
consumeResult.Message?.Key,
consumer.DuckCast<INamedClient>());

if (consumeResult is not null)
{
activity.SetTag(MessagingAttributes.Keys.Kafka.PartitionOffset, consumeResult.Offset.Value);
}
activity.SetTag(MessagingAttributes.Keys.Kafka.PartitionOffset, consumeResult.Offset.Value);

if (ConsumerCache.TryGet(consumer, out var groupId))
{
Expand Down
25 changes: 5 additions & 20 deletions test/IntegrationTests/KafkaTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,16 +56,13 @@ public void SubmitsTraces(string packageVersion)
collector.Expect(KafkaInstrumentationScopeName, span => span.Kind == Span.Types.SpanKind.Producer && ValidateProduceExceptionSpan(span, topicName), "Failed Produce attempt without delivery handler set.");
collector.Expect(KafkaInstrumentationScopeName, span => span.Kind == Span.Types.SpanKind.Producer && ValidateResultProcessingProduceExceptionSpan(span, topicName), "Failed ProduceAsync attempt.");

if (packageVersion != string.Empty && Version.Parse(packageVersion) == new Version(1, 4, 0))
{
// For 1.4.0 null is returned when attempting to read from non-existent topic,
// and no exception is thrown.
collector.Expect(KafkaInstrumentationScopeName, span => span.Kind == Span.Types.SpanKind.Consumer && ValidateEmptyReadConsumerSpan(span, topicName), "Successful Consume attempt that returned no message.");
}
else
if (packageVersion == string.Empty || Version.Parse(packageVersion) != new Version(1, 4, 0))
{
// Failed consume attempt.
collector.Expect(KafkaInstrumentationScopeName, span => span.Kind == Span.Types.SpanKind.Consumer && ValidateConsumeExceptionSpan(span, topicName), "Failed Consume attempt.");
collector.Expect(
KafkaInstrumentationScopeName,
span => span.Kind == Span.Types.SpanKind.Consumer && ValidateConsumeExceptionSpan(span, topicName),
"Failed Consume attempt.");
}

// Successful produce attempts after topic was created with admin client.
Expand All @@ -81,9 +78,6 @@ public void SubmitsTraces(string packageVersion)
collector.Expect(KafkaInstrumentationScopeName, span => span.Kind == Span.Types.SpanKind.Consumer && ValidateConsumerSpan(span, topicName, 1), "Second successful Consume attempt.");
collector.Expect(KafkaInstrumentationScopeName, span => span.Kind == Span.Types.SpanKind.Consumer && ValidateConsumerSpan(span, topicName, 2), "Third successful Consume attempt.");

// Consume attempt that returns no message.
collector.Expect(KafkaInstrumentationScopeName, span => span.Kind == Span.Types.SpanKind.Consumer && ValidateEmptyReadConsumerSpan(span, topicName), "Additional successful attempt after all the produced messages were already read.");

collector.ExpectCollected(collection => ValidatePropagation(collection, topicName));

EnableBytecodeInstrumentation();
Expand All @@ -97,15 +91,6 @@ public void SubmitsTraces(string packageVersion)
collector.AssertExpectations();
}

private static bool ValidateEmptyReadConsumerSpan(Span span, string topicName)
{
var consumerGroupId = span.Attributes.Single(kv => kv.Key == KafkaConsumerGroupAttributeName).Value.StringValue;
return span.Name == MessagingReceiveOperationAttributeValue &&
span.Links.Count == 0 &&
ValidateBasicSpanAttributes(span.Attributes, KafkaConsumerClientIdAttributeValue, MessagingReceiveOperationAttributeValue) &&
consumerGroupId == GetConsumerGroupIdAttributeValue(topicName);
}

private static string GetConsumerGroupIdAttributeValue(string topicName)
{
return $"test-consumer-group-{topicName}";
Expand Down
Loading