diff --git a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java index f057ac2ab28..bad5a030bc6 100644 --- a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java +++ b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java @@ -13,6 +13,7 @@ import com.google.auto.service.AutoService; import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.api.Config; import datadog.trace.bootstrap.instrumentation.api.AgentScope; import datadog.trace.bootstrap.instrumentation.api.AgentSpan; import java.util.Map; @@ -76,7 +77,12 @@ public static AgentScope onEnter( // Do not inject headers for batch versions below 2 // This is how similar check is being done in Kafka client itself: // https://github.com/apache/kafka/blob/05fcfde8f69b0349216553f711fdfc3f0259c601/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java#L411-L412 - if (apiVersions.maxUsableProduceMagic() >= RecordBatch.MAGIC_VALUE_V2) { + // Also, do not inject headers if specified by JVM option or environment variable + // This can help in mixed client environments where clients < 0.11 that do not support + // headers attempt to read messages that were produced by clients > 0.11 and the magic + // value of the broker(s) is >= 2 + if (apiVersions.maxUsableProduceMagic() >= RecordBatch.MAGIC_VALUE_V2 + && Config.get().isKafkaClientPropagationEnabled()) { try { propagate().inject(span, record.headers(), SETTER); } catch (final IllegalStateException e) { diff --git a/dd-java-agent/instrumentation/kafka-clients-0.11/src/test/groovy/KafkaClientTest.groovy b/dd-java-agent/instrumentation/kafka-clients-0.11/src/test/groovy/KafkaClientTest.groovy index a4434cc489f..ef385f556e0 100644 --- a/dd-java-agent/instrumentation/kafka-clients-0.11/src/test/groovy/KafkaClientTest.groovy +++ b/dd-java-agent/instrumentation/kafka-clients-0.11/src/test/groovy/KafkaClientTest.groovy @@ -1,4 +1,5 @@ import datadog.trace.agent.test.AgentTestRunner +import datadog.trace.api.Config import datadog.trace.bootstrap.instrumentation.api.Tags import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.clients.consumer.ConsumerRecord @@ -15,10 +16,13 @@ import org.springframework.kafka.listener.MessageListener import org.springframework.kafka.test.rule.KafkaEmbedded import org.springframework.kafka.test.utils.ContainerTestUtils import org.springframework.kafka.test.utils.KafkaTestUtils +import spock.lang.Unroll import java.util.concurrent.LinkedBlockingQueue import java.util.concurrent.TimeUnit +import static datadog.trace.agent.test.utils.ConfigUtils.withConfigOverride + class KafkaClientTest extends AgentTestRunner { static final SHARED_TOPIC = "shared.topic" @@ -204,4 +208,72 @@ class KafkaClientTest extends AgentTestRunner { } + @Unroll + def "test kafka client header propagation manual config"() { + setup: + def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString()) + def producerFactory = new DefaultKafkaProducerFactory(senderProps) + def kafkaTemplate = new KafkaTemplate(producerFactory) + + // set up the Kafka consumer properties + def consumerProperties = KafkaTestUtils.consumerProps("sender", "false", embeddedKafka) + + // create a Kafka consumer factory + def consumerFactory = new DefaultKafkaConsumerFactory(consumerProperties) + + // set the topic that needs to be consumed + def containerProperties + try { + // Different class names for test and latestDepTest. + containerProperties = Class.forName("org.springframework.kafka.listener.config.ContainerProperties").newInstance(SHARED_TOPIC) + } catch (ClassNotFoundException | NoClassDefFoundError e) { + containerProperties = Class.forName("org.springframework.kafka.listener.ContainerProperties").newInstance(SHARED_TOPIC) + } + + // create a Kafka MessageListenerContainer + def container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties) + + // create a thread safe queue to store the received message + def records = new LinkedBlockingQueue>() + + // setup a Kafka message listener + container.setupMessageListener(new MessageListener() { + @Override + void onMessage(ConsumerRecord record) { + TEST_WRITER.waitForTraces(1) // ensure consistent ordering of traces + records.add(record) + } + }) + + // start the container and underlying message listener + container.start() + + // wait until the container has the required number of assigned partitions + ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic()) + + when: + String message = "Testing without headers" + withConfigOverride(Config.KAFKA_CLIENT_PROPAGATION_ENABLED, value) { + kafkaTemplate.send(SHARED_TOPIC, message) + } + + then: + // check that the message was received + def received = records.poll(5, TimeUnit.SECONDS) + + received.headers().iterator().hasNext() == expected + + cleanup: + producerFactory.stop() + container?.stop() + + where: + value | expected + "false" | false + "true" | true + String.valueOf(Config.DEFAULT_KAFKA_CLIENT_PROPAGATION_ENABLED) | true + + } + } + diff --git a/dd-trace-api/src/main/java/datadog/trace/api/Config.java b/dd-trace-api/src/main/java/datadog/trace/api/Config.java index 55d61bc7585..0d8c9bb799f 100644 --- a/dd-trace-api/src/main/java/datadog/trace/api/Config.java +++ b/dd-trace-api/src/main/java/datadog/trace/api/Config.java @@ -148,6 +148,8 @@ public class Config { public static final String PROFILING_EXCEPTION_HISTOGRAM_MAX_COLLECTION_SIZE = "profiling.exception.histogram.max-collection-size"; + public static final String KAFKA_CLIENT_PROPAGATION_ENABLED = "kafka.client.propagation.enabled"; + public static final String RUNTIME_ID_TAG = "runtime-id"; public static final String SERVICE = "service"; public static final String SERVICE_TAG = SERVICE; @@ -205,6 +207,8 @@ public class Config { public static final int DEFAULT_PROFILING_EXCEPTION_HISTOGRAM_TOP_ITEMS = 50; public static final int DEFAULT_PROFILING_EXCEPTION_HISTOGRAM_MAX_COLLECTION_SIZE = 10000; + public static final boolean DEFAULT_KAFKA_CLIENT_PROPAGATION_ENABLED = true; + private static final String SPLIT_BY_SPACE_OR_COMMA_REGEX = "[,\\s]+"; private static final boolean DEFAULT_TRACE_REPORT_HOSTNAME = false; @@ -330,6 +334,8 @@ private String profilingProxyPasswordMasker() { @Getter private final int profilingExceptionHistogramTopItems; @Getter private final int profilingExceptionHistogramMaxCollectionSize; + @Getter private final boolean kafkaClientPropagationEnabled; + // Values from an optionally provided properties file private static Properties propertiesFromConfigFile; @@ -545,6 +551,10 @@ private String profilingProxyPasswordMasker() { PROFILING_EXCEPTION_HISTOGRAM_MAX_COLLECTION_SIZE, DEFAULT_PROFILING_EXCEPTION_HISTOGRAM_MAX_COLLECTION_SIZE); + kafkaClientPropagationEnabled = + getBooleanSettingFromEnvironment( + KAFKA_CLIENT_PROPAGATION_ENABLED, DEFAULT_KAFKA_CLIENT_PROPAGATION_ENABLED); + // Setting this last because we have a few places where this can come from apiKey = tmpApiKey; @@ -734,6 +744,10 @@ private Config(final Properties properties, final Config parent) { PROFILING_EXCEPTION_HISTOGRAM_MAX_COLLECTION_SIZE, parent.profilingExceptionHistogramMaxCollectionSize); + kafkaClientPropagationEnabled = + getPropertyBooleanValue( + properties, KAFKA_CLIENT_PROPAGATION_ENABLED, parent.kafkaClientPropagationEnabled); + log.debug("New instance: {}", this); }