Skip to content

Commit

Permalink
Merge pull request #1448 from shields478/manual_kafka_header_disable
Browse files Browse the repository at this point in the history
  • Loading branch information
tylerbenson committed May 12, 2020
2 parents c20caa9 + 0c8a9a8 commit 38417de
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.api.Config;
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import java.util.Map;
Expand Down Expand Up @@ -76,7 +77,12 @@ public static AgentScope onEnter(
// Do not inject headers for batch versions below 2
// This is how similar check is being done in Kafka client itself:
// https://github.com/apache/kafka/blob/05fcfde8f69b0349216553f711fdfc3f0259c601/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java#L411-L412
if (apiVersions.maxUsableProduceMagic() >= RecordBatch.MAGIC_VALUE_V2) {
// Also, do not inject headers if specified by JVM option or environment variable
// This can help in mixed client environments where clients < 0.11 that do not support
// headers attempt to read messages that were produced by clients > 0.11 and the magic
// value of the broker(s) is >= 2
if (apiVersions.maxUsableProduceMagic() >= RecordBatch.MAGIC_VALUE_V2
&& Config.get().isKafkaClientPropagationEnabled()) {
try {
propagate().inject(span, record.headers(), SETTER);
} catch (final IllegalStateException e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import datadog.trace.agent.test.AgentTestRunner
import datadog.trace.api.Config
import datadog.trace.bootstrap.instrumentation.api.Tags
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.ConsumerRecord
Expand All @@ -15,10 +16,13 @@ import org.springframework.kafka.listener.MessageListener
import org.springframework.kafka.test.rule.KafkaEmbedded
import org.springframework.kafka.test.utils.ContainerTestUtils
import org.springframework.kafka.test.utils.KafkaTestUtils
import spock.lang.Unroll

import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.TimeUnit

import static datadog.trace.agent.test.utils.ConfigUtils.withConfigOverride

class KafkaClientTest extends AgentTestRunner {
static final SHARED_TOPIC = "shared.topic"

Expand Down Expand Up @@ -204,4 +208,72 @@ class KafkaClientTest extends AgentTestRunner {
}
@Unroll
def "test kafka client header propagation manual config"() {
setup:
def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString())
def producerFactory = new DefaultKafkaProducerFactory<String, String>(senderProps)
def kafkaTemplate = new KafkaTemplate<String, String>(producerFactory)
// set up the Kafka consumer properties
def consumerProperties = KafkaTestUtils.consumerProps("sender", "false", embeddedKafka)
// create a Kafka consumer factory
def consumerFactory = new DefaultKafkaConsumerFactory<String, String>(consumerProperties)
// set the topic that needs to be consumed
def containerProperties
try {
// Different class names for test and latestDepTest.
containerProperties = Class.forName("org.springframework.kafka.listener.config.ContainerProperties").newInstance(SHARED_TOPIC)
} catch (ClassNotFoundException | NoClassDefFoundError e) {
containerProperties = Class.forName("org.springframework.kafka.listener.ContainerProperties").newInstance(SHARED_TOPIC)
}
// create a Kafka MessageListenerContainer
def container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties)
// create a thread safe queue to store the received message
def records = new LinkedBlockingQueue<ConsumerRecord<String, String>>()
// setup a Kafka message listener
container.setupMessageListener(new MessageListener<String, String>() {
@Override
void onMessage(ConsumerRecord<String, String> record) {
TEST_WRITER.waitForTraces(1) // ensure consistent ordering of traces
records.add(record)
}
})
// start the container and underlying message listener
container.start()
// wait until the container has the required number of assigned partitions
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic())
when:
String message = "Testing without headers"
withConfigOverride(Config.KAFKA_CLIENT_PROPAGATION_ENABLED, value) {
kafkaTemplate.send(SHARED_TOPIC, message)
}
then:
// check that the message was received
def received = records.poll(5, TimeUnit.SECONDS)
received.headers().iterator().hasNext() == expected
cleanup:
producerFactory.stop()
container?.stop()
where:
value | expected
"false" | false
"true" | true
String.valueOf(Config.DEFAULT_KAFKA_CLIENT_PROPAGATION_ENABLED) | true
}
}
14 changes: 14 additions & 0 deletions dd-trace-api/src/main/java/datadog/trace/api/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ public class Config {
public static final String PROFILING_EXCEPTION_HISTOGRAM_MAX_COLLECTION_SIZE =
"profiling.exception.histogram.max-collection-size";

public static final String KAFKA_CLIENT_PROPAGATION_ENABLED = "kafka.client.propagation.enabled";

public static final String RUNTIME_ID_TAG = "runtime-id";
public static final String SERVICE = "service";
public static final String SERVICE_TAG = SERVICE;
Expand Down Expand Up @@ -205,6 +207,8 @@ public class Config {
public static final int DEFAULT_PROFILING_EXCEPTION_HISTOGRAM_TOP_ITEMS = 50;
public static final int DEFAULT_PROFILING_EXCEPTION_HISTOGRAM_MAX_COLLECTION_SIZE = 10000;

public static final boolean DEFAULT_KAFKA_CLIENT_PROPAGATION_ENABLED = true;

private static final String SPLIT_BY_SPACE_OR_COMMA_REGEX = "[,\\s]+";

private static final boolean DEFAULT_TRACE_REPORT_HOSTNAME = false;
Expand Down Expand Up @@ -330,6 +334,8 @@ private String profilingProxyPasswordMasker() {
@Getter private final int profilingExceptionHistogramTopItems;
@Getter private final int profilingExceptionHistogramMaxCollectionSize;

@Getter private final boolean kafkaClientPropagationEnabled;

// Values from an optionally provided properties file
private static Properties propertiesFromConfigFile;

Expand Down Expand Up @@ -545,6 +551,10 @@ private String profilingProxyPasswordMasker() {
PROFILING_EXCEPTION_HISTOGRAM_MAX_COLLECTION_SIZE,
DEFAULT_PROFILING_EXCEPTION_HISTOGRAM_MAX_COLLECTION_SIZE);

kafkaClientPropagationEnabled =
getBooleanSettingFromEnvironment(
KAFKA_CLIENT_PROPAGATION_ENABLED, DEFAULT_KAFKA_CLIENT_PROPAGATION_ENABLED);

// Setting this last because we have a few places where this can come from
apiKey = tmpApiKey;

Expand Down Expand Up @@ -734,6 +744,10 @@ private Config(final Properties properties, final Config parent) {
PROFILING_EXCEPTION_HISTOGRAM_MAX_COLLECTION_SIZE,
parent.profilingExceptionHistogramMaxCollectionSize);

kafkaClientPropagationEnabled =
getPropertyBooleanValue(
properties, KAFKA_CLIENT_PROPAGATION_ENABLED, parent.kafkaClientPropagationEnabled);

log.debug("New instance: {}", this);
}

Expand Down

0 comments on commit 38417de

Please sign in to comment.