diff --git a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaConsumerInfoInstrumentation.java b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaConsumerInfoInstrumentation.java index 727442b8261..2ef11751691 100644 --- a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaConsumerInfoInstrumentation.java +++ b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaConsumerInfoInstrumentation.java @@ -1,6 +1,11 @@ package datadog.trace.instrumentation.kafka_clients; import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan; +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan; +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan; +import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.KAFKA_RECORDS_COUNT; +import static datadog.trace.instrumentation.kafka_clients.KafkaDecorator.KAFKA_POLL; import static net.bytebuddy.matcher.ElementMatchers.isConstructor; import static net.bytebuddy.matcher.ElementMatchers.isMethod; import static net.bytebuddy.matcher.ElementMatchers.isPublic; @@ -13,6 +18,8 @@ import datadog.trace.agent.tooling.InstrumenterModule; import datadog.trace.api.Config; import datadog.trace.bootstrap.InstrumentationContext; +import datadog.trace.bootstrap.instrumentation.api.AgentScope; +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -130,18 +137,43 @@ public static void muzzleCheck(ConsumerRecord record) { * KafkaConsumer class. */ public static class RecordsAdvice { + @Advice.OnMethodEnter(suppress = Throwable.class) + public static AgentScope onEnter() { + boolean dataStreamsEnabled; + if (activeSpan() != null) { + dataStreamsEnabled = activeSpan().traceConfig().isDataStreamsEnabled(); + } else { + dataStreamsEnabled = Config.get().isDataStreamsEnabled(); + } + if (dataStreamsEnabled) { + final AgentSpan span = startSpan(KAFKA_POLL); + return activateSpan(span); + } + return null; + } + @Advice.OnMethodExit(suppress = Throwable.class) public static void captureGroup( - @Advice.This KafkaConsumer consumer, @Advice.Return ConsumerRecords records) { - if (records == null) { - return; + @Advice.Enter final AgentScope scope, + @Advice.This KafkaConsumer consumer, + @Advice.Return ConsumerRecords records) { + int recordsCount = 0; + if (records != null) { + KafkaConsumerInfo kafkaConsumerInfo = + InstrumentationContext.get(KafkaConsumer.class, KafkaConsumerInfo.class).get(consumer); + if (kafkaConsumerInfo != null) { + InstrumentationContext.get(ConsumerRecords.class, KafkaConsumerInfo.class) + .put(records, kafkaConsumerInfo); + } + recordsCount = records.count(); } - KafkaConsumerInfo kafkaConsumerInfo = - InstrumentationContext.get(KafkaConsumer.class, KafkaConsumerInfo.class).get(consumer); - if (kafkaConsumerInfo != null) { - InstrumentationContext.get(ConsumerRecords.class, KafkaConsumerInfo.class) - .put(records, kafkaConsumerInfo); + if (scope == null) { + return; } + AgentSpan span = scope.span(); + span.setTag(KAFKA_RECORDS_COUNT, recordsCount); + span.finish(); + scope.close(); } } } diff --git a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaDecorator.java b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaDecorator.java index 4cea931d064..b1ca0958116 100644 --- a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaDecorator.java +++ b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaDecorator.java @@ -29,6 +29,8 @@ public class KafkaDecorator extends MessagingClientDecorator { public static final CharSequence KAFKA_CONSUME = UTF8BytesString.create( SpanNaming.instance().namingSchema().messaging().inboundOperation(KAFKA)); + + public static final CharSequence KAFKA_POLL = UTF8BytesString.create("kafka.poll"); public static final CharSequence KAFKA_PRODUCE = UTF8BytesString.create( SpanNaming.instance().namingSchema().messaging().outboundOperation(KAFKA)); diff --git a/dd-java-agent/instrumentation/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy b/dd-java-agent/instrumentation/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy index a5c0f20474f..f17c4ad8fb5 100644 --- a/dd-java-agent/instrumentation/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy +++ b/dd-java-agent/instrumentation/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy @@ -1,3 +1,5 @@ +import datadog.trace.common.writer.ListWriter + import static datadog.trace.agent.test.utils.TraceUtils.basicSpan import static datadog.trace.agent.test.utils.TraceUtils.runUnderTrace import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeScope @@ -53,6 +55,46 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { public static final LinkedHashMap PRODUCER_PATHWAY_EDGE_TAGS + // filter out Kafka poll, since the function is called in a loop, giving inconsistent results + final ListWriter.Filter dropKafkaPoll = new ListWriter.Filter() { + @Override + boolean accept(List trace) { + return !(trace.size() == 1 && + trace.get(0).getResourceName().toString().equals("kafka.poll")) + } + } + + final ListWriter.Filter dropEmptyKafkaPoll = new ListWriter.Filter() { + @Override + boolean accept(List trace) { + return !(trace.size() == 1 && + trace.get(0).getResourceName().toString().equals("kafka.poll") && + trace.get(0).getTag(InstrumentationTags.KAFKA_RECORDS_COUNT).equals(0)) + } + } + + // TraceID, start times & names changed based on the configuration, so overriding the sort to give consistent test results + private static class SortKafkaTraces implements Comparator> { + @Override + int compare(List o1, List o2) { + return rootSpanTrace(o1) - rootSpanTrace(o2) + } + + int rootSpanTrace(List trace) { + assert !trace.isEmpty() + def rootSpan = trace.get(0).localRootSpan + switch (rootSpan.operationName.toString()) { + case "parent": + return 3 + case "kafka.poll": + return 2 + default: + return 1 + } + } + } + + static { PRODUCER_PATHWAY_EDGE_TAGS = new LinkedHashMap<>(3) PRODUCER_PATHWAY_EDGE_TAGS.put("direction", "out") @@ -60,6 +102,10 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { PRODUCER_PATHWAY_EDGE_TAGS.put("type", "kafka") } + def setup() { + TEST_WRITER.setFilter(dropKafkaPoll) + } + @Override int version() { 0 @@ -124,9 +170,6 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { } } - if (isDataStreamsEnabled()) { - } - cleanup: producer.close() } @@ -137,6 +180,7 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { if (isDataStreamsEnabled()) { senderProps.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, 1000) } + TEST_WRITER.setFilter(dropEmptyKafkaPoll) KafkaProducer producer = new KafkaProducer<>(senderProps, new StringSerializer(), new StringSerializer()) String clusterId = "" if (isDataStreamsEnabled()) { @@ -203,28 +247,37 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { received.value() == greeting received.key() == null - assertTraces(2, SORT_TRACES_BY_ID) { - trace(3) { - basicSpan(it, "parent") - basicSpan(it, "producer callback", span(0)) - producerSpan(it, senderProps, span(0), false) - } + int nTraces = isDataStreamsEnabled() ? 3 : 2 + int produceTraceIdx = nTraces - 1 + TEST_WRITER.waitForTraces(nTraces) + def traces = (Arrays.asList(TEST_WRITER.toArray()) as List>) + Collections.sort(traces, new SortKafkaTraces()) + assertTraces(nTraces, new SortKafkaTraces()) { if (hasQueueSpan()) { trace(2) { - consumerSpan(it, consumerProperties, trace(1)[1]) - queueSpan(it, trace(0)[2]) + consumerSpan(it, consumerProperties, span(1)) + queueSpan(it, trace(produceTraceIdx)[2]) } } else { trace(1) { - consumerSpan(it, consumerProperties, trace(0)[2]) + consumerSpan(it, consumerProperties, trace(produceTraceIdx)[2]) } } + if (isDataStreamsEnabled()) { + trace(1, { + pollSpan(it) + }) + } + trace(3) { + basicSpan(it, "parent") + basicSpan(it, "producer callback", span(0)) + producerSpan(it, senderProps, span(0), false) + } } - def headers = received.headers() headers.iterator().hasNext() - new String(headers.headers("x-datadog-trace-id").iterator().next().value()) == "${TEST_WRITER[0][2].traceId}" - new String(headers.headers("x-datadog-parent-id").iterator().next().value()) == "${TEST_WRITER[0][2].spanId}" + new String(headers.headers("x-datadog-trace-id").iterator().next().value()) == "${traces[produceTraceIdx][2].traceId}" + new String(headers.headers("x-datadog-parent-id").iterator().next().value()) == "${traces[produceTraceIdx][2].spanId}" if (isDataStreamsEnabled()) { StatsGroup first = TEST_DATA_STREAMS_WRITER.groups.find { it.parentHash == 0 } @@ -1069,6 +1122,27 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { } } + def pollSpan( + TraceAssert trace, + int recordCount = 1, + DDSpan parentSpan = null, + Range offset = 0..0, + boolean tombstone = false, + boolean distributedRootSpan = !hasQueueSpan() + ) { + trace.span { + serviceName Config.get().getServiceName() + operationName "kafka.poll" + resourceName "kafka.poll" + errored false + measured false + tags { + "$InstrumentationTags.KAFKA_RECORDS_COUNT" recordCount + defaultTags(true) + } + } + } + def waitForKafkaMetadataUpdate(KafkaTemplate kafkaTemplate) { kafkaTemplate.flush() Producer wrappedProducer = kafkaTemplate.getTheProducer() diff --git a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/InstrumentationTags.java b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/InstrumentationTags.java index 5311eff6476..002cf479fdf 100644 --- a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/InstrumentationTags.java +++ b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/InstrumentationTags.java @@ -14,6 +14,7 @@ public class InstrumentationTags { public static final String PROCESSOR_NAME = "processor.name"; public static final String RECORD_QUEUE_TIME_MS = "record.queue_time_ms"; public static final String RECORD_END_TO_END_DURATION_MS = "record.e2e_duration_ms"; + public static final String KAFKA_RECORDS_COUNT = "kafka.records_count"; public static final String TOMBSTONE = "tombstone"; public static final String AWS_AGENT = "aws.agent"; public static final String AWS_SERVICE = "aws.service";