From 81983204dad020df7dcb52e8289e1133b682496f Mon Sep 17 00:00:00 2001 From: Adrian Cole Date: Thu, 23 Aug 2018 22:01:57 +0800 Subject: [PATCH 1/3] Adds b3SingleFormat flag to message instrumentation --- instrumentation/kafka-clients/README.md | 1 + .../brave/kafka/clients/KafkaPropagation.java | 21 ++++++++++++++ .../brave/kafka/clients/KafkaTracing.java | 27 +++++++++++++++++- .../brave/kafka/clients/KafkaTracingTest.java | 28 ++++++++++++++++++- .../kafka/clients/TracingProducerTest.java | 16 +++++++++++ instrumentation/spring-rabbit/README.md | 1 + .../rabbit/SpringRabbitPropagation.java | 21 ++++++++++++++ .../spring/rabbit/SpringRabbitTracing.java | 28 ++++++++++++++++++- .../rabbit/SpringRabbitTracingTest.java | 27 +++++++++++++++++- .../TracingMessagePostProcessorTest.java | 14 ++++++++++ 10 files changed, 180 insertions(+), 4 deletions(-) diff --git a/instrumentation/kafka-clients/README.md b/instrumentation/kafka-clients/README.md index 35ec754d89..7bc7a0d81b 100644 --- a/instrumentation/kafka-clients/README.md +++ b/instrumentation/kafka-clients/README.md @@ -8,6 +8,7 @@ Add decorators for Kafka producer and consumer to enable tracing. First, setup the generic Kafka component like this: ```java kafkaTracing = KafkaTracing.newBuilder(tracing) + .b3SingleFormat(true) // for more efficient propagation .remoteServiceName("my-broker") .build(); ``` diff --git a/instrumentation/kafka-clients/src/main/java/brave/kafka/clients/KafkaPropagation.java b/instrumentation/kafka-clients/src/main/java/brave/kafka/clients/KafkaPropagation.java index 95c1402489..2731665e9e 100644 --- a/instrumentation/kafka-clients/src/main/java/brave/kafka/clients/KafkaPropagation.java +++ b/instrumentation/kafka-clients/src/main/java/brave/kafka/clients/KafkaPropagation.java @@ -2,14 +2,35 @@ import brave.propagation.Propagation.Getter; import brave.propagation.Propagation.Setter; +import brave.propagation.TraceContext; +import brave.propagation.TraceContext.Injector; import java.nio.charset.Charset; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeaders; + +import static brave.propagation.B3SingleFormat.writeB3SingleFormat; +import static brave.propagation.B3SingleFormat.writeB3SingleFormatWithoutParentIdAsBytes; final class KafkaPropagation { static final Charset UTF_8 = Charset.forName("UTF-8"); + static final TraceContext TEST_CONTEXT = TraceContext.newBuilder().traceId(1L).spanId(1L).build(); + static final Headers B3_SINGLE_TEST_HEADERS = + new RecordHeaders().add("b3", writeB3SingleFormat(TEST_CONTEXT).getBytes(UTF_8)); + + static final Injector B3_SINGLE_INJECTOR = new Injector() { + @Override public void inject(TraceContext traceContext, Headers carrier) { + carrier.remove("b3"); + carrier.add("b3", writeB3SingleFormatWithoutParentIdAsBytes(traceContext)); + } + + @Override public String toString() { + return "Headers::add(\"b3\",singleHeaderFormatWithoutParent)"; + } + }; + static final Setter SETTER = (carrier, key, value) -> { carrier.remove(key); carrier.add(key, value.getBytes(UTF_8)); diff --git a/instrumentation/kafka-clients/src/main/java/brave/kafka/clients/KafkaTracing.java b/instrumentation/kafka-clients/src/main/java/brave/kafka/clients/KafkaTracing.java index a10522fef8..a9f274af58 100644 --- a/instrumentation/kafka-clients/src/main/java/brave/kafka/clients/KafkaTracing.java +++ b/instrumentation/kafka-clients/src/main/java/brave/kafka/clients/KafkaTracing.java @@ -3,6 +3,7 @@ import brave.Span; import brave.SpanCustomizer; import brave.Tracing; +import brave.propagation.B3SingleFormat; import brave.propagation.TraceContext; import brave.propagation.TraceContextOrSamplingFlags; import java.util.List; @@ -11,6 +12,9 @@ import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.common.header.Headers; +import static brave.kafka.clients.KafkaPropagation.B3_SINGLE_TEST_HEADERS; +import static brave.kafka.clients.KafkaPropagation.TEST_CONTEXT; + /** Use this class to decorate your Kafka consumer / producer and enable Tracing. */ public final class KafkaTracing { @@ -25,6 +29,7 @@ public static Builder newBuilder(Tracing tracing) { public static final class Builder { final Tracing tracing; String remoteServiceName = "kafka"; + boolean b3SingleFormat; Builder(Tracing tracing) { if (tracing == null) throw new NullPointerException("tracing == null"); @@ -40,6 +45,17 @@ public Builder remoteServiceName(String remoteServiceName) { return this; } + /** + * When true, only writes a single {@link B3SingleFormat b3 header} for outbound propagation. + * + *

Use this to reduce overhead. Note: normal {@link Tracing#propagation()} is used to parse + * incoming headers. The implementation must be able to read "b3" headers. + */ + public Builder b3SingleFormat(boolean b3SingleFormat) { + this.b3SingleFormat = b3SingleFormat; + return this; + } + public KafkaTracing build() { return new KafkaTracing(this); } @@ -54,7 +70,16 @@ public KafkaTracing build() { KafkaTracing(Builder builder) { // intentionally hidden constructor this.tracing = builder.tracing; this.extractor = tracing.propagation().extractor(KafkaPropagation.GETTER); - this.injector = tracing.propagation().injector(KafkaPropagation.SETTER); + if (builder.b3SingleFormat) { + TraceContext testExtraction = extractor.extract(B3_SINGLE_TEST_HEADERS).context(); + if (!TEST_CONTEXT.equals(testExtraction)) { + throw new IllegalArgumentException( + "KafkaTracing.Builder.b3SingleFormat set, but Tracing.Builder.propagationFactory cannot parse this format!"); + } + this.injector = KafkaPropagation.B3_SINGLE_INJECTOR; + } else { + this.injector = tracing.propagation().injector(KafkaPropagation.SETTER); + } this.propagationKeys = builder.tracing.propagation().keys(); this.remoteServiceName = builder.remoteServiceName; } diff --git a/instrumentation/kafka-clients/src/test/java/brave/kafka/clients/KafkaTracingTest.java b/instrumentation/kafka-clients/src/test/java/brave/kafka/clients/KafkaTracingTest.java index b59f27fe00..2be45336db 100644 --- a/instrumentation/kafka-clients/src/test/java/brave/kafka/clients/KafkaTracingTest.java +++ b/instrumentation/kafka-clients/src/test/java/brave/kafka/clients/KafkaTracingTest.java @@ -1,13 +1,20 @@ package brave.kafka.clients; import brave.Span; +import brave.Tracing; import brave.internal.HexCodec; +import brave.propagation.Propagation; import brave.propagation.TraceContext; +import brave.propagation.TraceContextOrSamplingFlags; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.junit.Test; +import static brave.kafka.clients.KafkaPropagation.GETTER; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.assertj.core.api.Assertions.entry; -import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class KafkaTracingTest extends BaseTracingTest { @@ -92,4 +99,23 @@ public void nextSpan_should_not_clear_other_headers() { kafkaTracing.nextSpan(fakeRecord); assertThat(fakeRecord.headers().headers("foo")).isNotEmpty(); } + + @Test public void failsFastIfPropagationDoesntSupportSingleHeader() { + // Fake propagation because B3 by default does support single header extraction! + Propagation propagation = mock(Propagation.class); + when(propagation.extractor(GETTER)).thenReturn(carrier -> { + assertThat(carrier.lastHeader("b3")).isNotNull(); // sanity check + return TraceContextOrSamplingFlags.EMPTY; // pretend we couldn't parse + }); + + Propagation.Factory propagationFactory = mock(Propagation.Factory.class); + when(propagationFactory.create(Propagation.KeyFactory.STRING)).thenReturn(propagation); + + assertThatThrownBy(() -> KafkaTracing.newBuilder( + Tracing.newBuilder().propagationFactory(propagationFactory).build()) + .b3SingleFormat(true) + .build() + ).hasMessage( + "KafkaTracing.Builder.b3SingleFormat set, but Tracing.Builder.propagationFactory cannot parse this format!"); + } } diff --git a/instrumentation/kafka-clients/src/test/java/brave/kafka/clients/TracingProducerTest.java b/instrumentation/kafka-clients/src/test/java/brave/kafka/clients/TracingProducerTest.java index 985f245789..9456474454 100644 --- a/instrumentation/kafka-clients/src/test/java/brave/kafka/clients/TracingProducerTest.java +++ b/instrumentation/kafka-clients/src/test/java/brave/kafka/clients/TracingProducerTest.java @@ -85,6 +85,22 @@ public class TracingProducerTest extends BaseTracingTest { assertThat(headerKeys).containsAll(expectedHeaders); } + @Test public void should_add_b3_single_header_to_message() { + tracingProducer = KafkaTracing.newBuilder(tracing).b3SingleFormat(true).build() + .producer(mockProducer); + + tracingProducer.send(new ProducerRecord<>(TEST_TOPIC, TEST_KEY, TEST_VALUE)); + + List

headers = mockProducer.history().stream() + .flatMap(records -> Arrays.stream(records.headers().toArray())) + .collect(Collectors.toList()); + + assertThat(headers).hasSize(1); + assertThat(headers.get(0).key()).isEqualTo("b3"); + assertThat(new String(headers.get(0).value(), UTF_8)) + .matches("^[0-9a-f]{16}-[0-9a-f]{16}-1$"); + } + @Test public void should_call_wrapped_producer() { tracingProducer.send(new ProducerRecord<>(TEST_TOPIC, TEST_KEY, TEST_VALUE)); diff --git a/instrumentation/spring-rabbit/README.md b/instrumentation/spring-rabbit/README.md index fb2cefe15a..ff46629916 100644 --- a/instrumentation/spring-rabbit/README.md +++ b/instrumentation/spring-rabbit/README.md @@ -16,6 +16,7 @@ public Tracing tracing() { @Bean public SpringRabbitTracing springRabbitTracing(Tracing tracing) { return SpringRabbitTracing.newBuilder(tracing) + .b3SingleFormat(true) // for more efficient propagation .remoteServiceName("my-mq-service") .build(); } diff --git a/instrumentation/spring-rabbit/src/main/java/brave/spring/rabbit/SpringRabbitPropagation.java b/instrumentation/spring-rabbit/src/main/java/brave/spring/rabbit/SpringRabbitPropagation.java index 83d4bb924b..e8bb11b3ee 100644 --- a/instrumentation/spring-rabbit/src/main/java/brave/spring/rabbit/SpringRabbitPropagation.java +++ b/instrumentation/spring-rabbit/src/main/java/brave/spring/rabbit/SpringRabbitPropagation.java @@ -2,9 +2,30 @@ import brave.propagation.Propagation.Getter; import brave.propagation.Propagation.Setter; +import brave.propagation.TraceContext; +import brave.propagation.TraceContext.Injector; import org.springframework.amqp.core.MessageProperties; +import static brave.propagation.B3SingleFormat.writeB3SingleFormat; +import static brave.propagation.B3SingleFormat.writeB3SingleFormatWithoutParentId; + final class SpringRabbitPropagation { + static final TraceContext TEST_CONTEXT = TraceContext.newBuilder().traceId(1L).spanId(1L).build(); + static final MessageProperties B3_SINGLE_TEST_HEADERS = new MessageProperties(); + + static { + B3_SINGLE_TEST_HEADERS.setHeader("b3", writeB3SingleFormat(TEST_CONTEXT)); + } + + static final Injector B3_SINGLE_INJECTOR = new Injector() { + @Override public void inject(TraceContext traceContext, MessageProperties carrier) { + carrier.setHeader("b3", writeB3SingleFormatWithoutParentId(traceContext)); + } + + @Override public String toString() { + return "MessageProperties::setHeader(\"b3\",singleHeaderFormatWithoutParent)"; + } + }; static final Setter SETTER = new Setter() { @Override public void put(MessageProperties carrier, String key, String value) { diff --git a/instrumentation/spring-rabbit/src/main/java/brave/spring/rabbit/SpringRabbitTracing.java b/instrumentation/spring-rabbit/src/main/java/brave/spring/rabbit/SpringRabbitTracing.java index 6068f03300..1c2338f51a 100644 --- a/instrumentation/spring-rabbit/src/main/java/brave/spring/rabbit/SpringRabbitTracing.java +++ b/instrumentation/spring-rabbit/src/main/java/brave/spring/rabbit/SpringRabbitTracing.java @@ -1,6 +1,8 @@ package brave.spring.rabbit; import brave.Tracing; +import brave.propagation.B3SingleFormat; +import brave.propagation.TraceContext; import brave.propagation.TraceContext.Extractor; import brave.propagation.TraceContext.Injector; import brave.propagation.TraceContextOrSamplingFlags; @@ -17,6 +19,9 @@ import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; +import static brave.spring.rabbit.SpringRabbitPropagation.B3_SINGLE_TEST_HEADERS; +import static brave.spring.rabbit.SpringRabbitPropagation.TEST_CONTEXT; + /** * Factory for Brave instrumented Spring Rabbit classes. */ @@ -39,6 +44,7 @@ public static Builder newBuilder(Tracing tracing) { public static final class Builder { final Tracing tracing; String remoteServiceName = "rabbitmq"; + boolean b3SingleFormat; Builder(Tracing tracing) { this.tracing = tracing; @@ -53,6 +59,17 @@ public Builder remoteServiceName(String remoteServiceName) { return this; } + /** + * When true, only writes a single {@link B3SingleFormat b3 header} for outbound propagation. + * + *

Use this to reduce overhead. Note: normal {@link Tracing#propagation()} is used to parse + * incoming headers. The implementation must be able to read "b3" headers. + */ + public Builder b3SingleFormat(boolean b3SingleFormat) { + this.b3SingleFormat = b3SingleFormat; + return this; + } + public SpringRabbitTracing build() { return new SpringRabbitTracing(this); } @@ -68,7 +85,16 @@ public SpringRabbitTracing build() { SpringRabbitTracing(Builder builder) { // intentionally hidden constructor this.tracing = builder.tracing; this.extractor = tracing.propagation().extractor(SpringRabbitPropagation.GETTER); - this.injector = tracing.propagation().injector(SpringRabbitPropagation.SETTER); + if (builder.b3SingleFormat) { + TraceContext testExtraction = extractor.extract(B3_SINGLE_TEST_HEADERS).context(); + if (!TEST_CONTEXT.equals(testExtraction)) { + throw new IllegalArgumentException( + "SpringRabbitTracing.Builder.b3SingleFormat set, but Tracing.Builder.propagationFactory cannot parse this format!"); + } + this.injector = SpringRabbitPropagation.B3_SINGLE_INJECTOR; + } else { + this.injector = tracing.propagation().injector(SpringRabbitPropagation.SETTER); + } this.propagationKeys = builder.tracing.propagation().keys(); this.remoteServiceName = builder.remoteServiceName; Field beforePublishPostProcessorsField = null; diff --git a/instrumentation/spring-rabbit/src/test/java/brave/spring/rabbit/SpringRabbitTracingTest.java b/instrumentation/spring-rabbit/src/test/java/brave/spring/rabbit/SpringRabbitTracingTest.java index 0dfae87408..13a3bb67ef 100644 --- a/instrumentation/spring-rabbit/src/test/java/brave/spring/rabbit/SpringRabbitTracingTest.java +++ b/instrumentation/spring-rabbit/src/test/java/brave/spring/rabbit/SpringRabbitTracingTest.java @@ -1,7 +1,9 @@ package brave.spring.rabbit; import brave.Tracing; +import brave.propagation.Propagation; import brave.propagation.ThreadLocalCurrentTraceContext; +import brave.propagation.TraceContextOrSamplingFlags; import java.util.Collection; import org.junit.After; import org.junit.Test; @@ -11,7 +13,11 @@ import org.springframework.cache.interceptor.CacheInterceptor; import zipkin2.reporter.Reporter; -import static org.assertj.core.api.Java6Assertions.assertThat; +import static brave.spring.rabbit.SpringRabbitPropagation.GETTER; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class SpringRabbitTracingTest { Tracing tracing = Tracing.newBuilder() @@ -75,4 +81,23 @@ public class SpringRabbitTracingTest { assertThat(rabbitTracing.decorateSimpleRabbitListenerContainerFactory(factory).getAdviceChain()) .anyMatch(advice -> advice instanceof TracingRabbitListenerAdvice); } + + @Test public void failsFastIfPropagationDoesntSupportSingleHeader() { + // Fake propagation because B3 by default does support single header extraction! + Propagation propagation = mock(Propagation.class); + when(propagation.extractor(GETTER)).thenReturn(carrier -> { + assertThat(carrier.getHeaders().get("b3")).isNotNull(); // sanity check + return TraceContextOrSamplingFlags.EMPTY; // pretend we couldn't parse + }); + + Propagation.Factory propagationFactory = mock(Propagation.Factory.class); + when(propagationFactory.create(Propagation.KeyFactory.STRING)).thenReturn(propagation); + + assertThatThrownBy(() -> SpringRabbitTracing.newBuilder( + Tracing.newBuilder().propagationFactory(propagationFactory).build()) + .b3SingleFormat(true) + .build() + ).hasMessage( + "SpringRabbitTracing.Builder.b3SingleFormat set, but Tracing.Builder.propagationFactory cannot parse this format!"); + } } diff --git a/instrumentation/spring-rabbit/src/test/java/brave/spring/rabbit/TracingMessagePostProcessorTest.java b/instrumentation/spring-rabbit/src/test/java/brave/spring/rabbit/TracingMessagePostProcessorTest.java index cc9aaecbe9..423a0f9c09 100644 --- a/instrumentation/spring-rabbit/src/test/java/brave/spring/rabbit/TracingMessagePostProcessorTest.java +++ b/instrumentation/spring-rabbit/src/test/java/brave/spring/rabbit/TracingMessagePostProcessorTest.java @@ -71,6 +71,20 @@ public class TracingMessagePostProcessorTest { assertThat(headerKeys).containsAll(expectedHeaders); } + @Test public void should_add_b3_single_header_to_message() { + TracingMessagePostProcessor tracingMessagePostProcessor = new TracingMessagePostProcessor( + SpringRabbitTracing.newBuilder(tracing).b3SingleFormat(true).build() + ); + + Message message = MessageBuilder.withBody(new byte[0]).build(); + Message postProcessMessage = tracingMessagePostProcessor.postProcessMessage(message); + + assertThat(postProcessMessage.getMessageProperties().getHeaders()) + .containsOnlyKeys("b3"); + assertThat(postProcessMessage.getMessageProperties().getHeaders().get("b3").toString()) + .matches("^[0-9a-f]{16}-[0-9a-f]{16}-1$"); + } + @Test public void should_report_span() { Message message = MessageBuilder.withBody(new byte[0]).build(); tracingMessagePostProcessor.postProcessMessage(message); From 465e5452c39f6a39ef276d1a7b99cf8e2df896a6 Mon Sep 17 00:00:00 2001 From: Adrian Cole Date: Fri, 24 Aug 2018 19:07:25 +0800 Subject: [PATCH 2/3] Adds kafka and rabbit producer benchmarks --- instrumentation/benchmarks/pom.xml | 28 ++++ .../clients/TracingProducerBenchmarks.java | 126 ++++++++++++++++++ ...TracingMessagePostProcessorBenchmarks.java | 64 +++++++++ instrumentation/spring-rabbit/pom.xml | 1 - pom.xml | 3 + 5 files changed, 221 insertions(+), 1 deletion(-) create mode 100644 instrumentation/benchmarks/src/main/java/brave/kafka/clients/TracingProducerBenchmarks.java create mode 100644 instrumentation/benchmarks/src/main/java/brave/spring/rabbit/TracingMessagePostProcessorBenchmarks.java diff --git a/instrumentation/benchmarks/pom.xml b/instrumentation/benchmarks/pom.xml index ab55dec57d..ff2569d5d2 100644 --- a/instrumentation/benchmarks/pom.xml +++ b/instrumentation/benchmarks/pom.xml @@ -142,6 +142,34 @@ log4j-slf4j-impl + + ${project.groupId} + brave-instrumentation-kafka-clients + ${project.version} + + + org.apache.kafka + kafka-clients + ${kafka.version} + + + org.slf4j + slf4j-log4j12 + + + + + + ${project.groupId} + brave-instrumentation-spring-rabbit + ${project.version} + + + org.springframework.amqp + spring-rabbit + ${spring-rabbit.version} + + io.undertow undertow-core diff --git a/instrumentation/benchmarks/src/main/java/brave/kafka/clients/TracingProducerBenchmarks.java b/instrumentation/benchmarks/src/main/java/brave/kafka/clients/TracingProducerBenchmarks.java new file mode 100644 index 0000000000..c008f1cc8d --- /dev/null +++ b/instrumentation/benchmarks/src/main/java/brave/kafka/clients/TracingProducerBenchmarks.java @@ -0,0 +1,126 @@ +package brave.kafka.clients; + +import brave.Tracing; +import com.google.common.util.concurrent.Futures; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; +import zipkin2.reporter.Reporter; + +@Measurement(iterations = 5, time = 1) +@Warmup(iterations = 10, time = 1) +@Fork(3) +@BenchmarkMode(Mode.SampleTime) +@OutputTimeUnit(TimeUnit.MICROSECONDS) +@State(Scope.Thread) +public class TracingProducerBenchmarks { + ProducerRecord record = new ProducerRecord<>("topic", "key", "value"); + Producer producer, tracingProducer, tracingB3SingleProducer; + + @Setup(Level.Trial) public void init() { + Tracing tracing = Tracing.newBuilder().spanReporter(Reporter.NOOP).build(); + producer = new FakeProducer(); + tracingProducer = KafkaTracing.create(tracing).producer(producer); + tracingB3SingleProducer = + KafkaTracing.newBuilder(tracing).b3SingleFormat(true).build().producer(producer); + } + + @TearDown(Level.Trial) public void close() { + Tracing.current().close(); + } + + @Benchmark public RecordMetadata send_baseCase() throws Exception { + return producer.send(record).get(); + } + + @Benchmark public RecordMetadata send_traced() throws Exception { + return tracingProducer.send(record).get(); + } + + @Benchmark public RecordMetadata send_traced_b3Single() throws Exception { + return tracingB3SingleProducer.send(record).get(); + } + + // Convenience main entry-point + public static void main(String[] args) throws RunnerException { + Options opt = new OptionsBuilder() + .addProfiler("gc") + .include(".*" + TracingProducerBenchmarks.class.getSimpleName()) + .build(); + + new Runner(opt).run(); + } + + static final class FakeProducer implements Producer { + @Override public void initTransactions() { + } + + @Override public void beginTransaction() { + } + + @Override + public void sendOffsetsToTransaction(Map map, String s) { + } + + @Override public void commitTransaction() { + } + + @Override public void abortTransaction() { + } + + @Override public Future send(ProducerRecord record) { + return send(record, null); + } + + @Override + public Future send(ProducerRecord record, Callback callback) { + TopicPartition tp = new TopicPartition(record.topic(), 0); + RecordMetadata rm = new RecordMetadata(tp, -1L, -1L, 1L, 2L, 3, 4); + if (callback != null) callback.onCompletion(rm, null); + return Futures.immediateFuture(rm); + } + + @Override public void flush() { + } + + @Override public List partitionsFor(String s) { + return null; + } + + @Override public Map metrics() { + return null; + } + + @Override public void close() { + } + + @Override public void close(long l, TimeUnit timeUnit) { + } + } +} diff --git a/instrumentation/benchmarks/src/main/java/brave/spring/rabbit/TracingMessagePostProcessorBenchmarks.java b/instrumentation/benchmarks/src/main/java/brave/spring/rabbit/TracingMessagePostProcessorBenchmarks.java new file mode 100644 index 0000000000..f621338699 --- /dev/null +++ b/instrumentation/benchmarks/src/main/java/brave/spring/rabbit/TracingMessagePostProcessorBenchmarks.java @@ -0,0 +1,64 @@ +package brave.spring.rabbit; + +import brave.Tracing; +import java.util.concurrent.TimeUnit; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.core.MessageBuilder; +import zipkin2.reporter.Reporter; + +@Measurement(iterations = 5, time = 1) +@Warmup(iterations = 10, time = 1) +@Fork(3) +@BenchmarkMode(Mode.SampleTime) +@OutputTimeUnit(TimeUnit.MICROSECONDS) +@State(Scope.Thread) +public class TracingMessagePostProcessorBenchmarks { + Message message = MessageBuilder.withBody(new byte[0]).build(); + TracingMessagePostProcessor tracingPostProcessor, tracingB3SinglePostProcessor; + + @Setup(Level.Trial) public void init() { + Tracing tracing = Tracing.newBuilder().spanReporter(Reporter.NOOP).build(); + tracingPostProcessor = new TracingMessagePostProcessor(SpringRabbitTracing.create(tracing)); + tracingB3SinglePostProcessor = new TracingMessagePostProcessor( + SpringRabbitTracing.newBuilder(tracing).b3SingleFormat(true).build() + ); + } + + @TearDown(Level.Trial) public void close() { + Tracing.current().close(); + } + + @Benchmark public Message send_traced() { + return tracingPostProcessor.postProcessMessage(message); + } + + @Benchmark public Message send_traced_b3Single() { + return tracingB3SinglePostProcessor.postProcessMessage(message); + } + + // Convenience main entry-point + public static void main(String[] args) throws RunnerException { + Options opt = new OptionsBuilder() + .addProfiler("gc") + .include(".*" + TracingMessagePostProcessorBenchmarks.class.getSimpleName()) + .build(); + + new Runner(opt).run(); + } +} diff --git a/instrumentation/spring-rabbit/pom.xml b/instrumentation/spring-rabbit/pom.xml index c4e9de5a61..0cccd77008 100644 --- a/instrumentation/spring-rabbit/pom.xml +++ b/instrumentation/spring-rabbit/pom.xml @@ -14,7 +14,6 @@ ${project.basedir}/../.. 1.6 java16 - 1.7.9.RELEASE diff --git a/pom.xml b/pom.xml index bee5bb7ed1..52a37f5d21 100755 --- a/pom.xml +++ b/pom.xml @@ -68,6 +68,7 @@ 4.3.18.RELEASE 3.2.18.RELEASE + 8.1.22.v20160922 7.6.21.v20160908 @@ -78,6 +79,8 @@ 4.1.2 + 1.7.9.RELEASE + 18.7.0 2.11.0 3.11.0 From bf18da2b9f7b059c43b4775a026e0f51d2c3c421 Mon Sep 17 00:00:00 2001 From: Adrian Cole Date: Fri, 24 Aug 2018 22:26:33 +0800 Subject: [PATCH 3/3] Improves performance a little and adds some notes for brave 6 --- .../brave/kafka/clients/KafkaTracing.java | 19 ++++++++++++++----- .../brave/kafka/clients/TracingProducer.java | 14 ++++++++------ .../rabbit/TracingMessagePostProcessor.java | 6 ++---- 3 files changed, 24 insertions(+), 15 deletions(-) diff --git a/instrumentation/kafka-clients/src/main/java/brave/kafka/clients/KafkaTracing.java b/instrumentation/kafka-clients/src/main/java/brave/kafka/clients/KafkaTracing.java index a9f274af58..314bb96ae4 100644 --- a/instrumentation/kafka-clients/src/main/java/brave/kafka/clients/KafkaTracing.java +++ b/instrumentation/kafka-clients/src/main/java/brave/kafka/clients/KafkaTracing.java @@ -6,10 +6,13 @@ import brave.propagation.B3SingleFormat; import brave.propagation.TraceContext; import brave.propagation.TraceContextOrSamplingFlags; -import java.util.List; +import java.util.Iterator; +import java.util.LinkedHashSet; +import java.util.Set; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.Headers; import static brave.kafka.clients.KafkaPropagation.B3_SINGLE_TEST_HEADERS; @@ -64,7 +67,7 @@ public KafkaTracing build() { final Tracing tracing; final TraceContext.Extractor extractor; final TraceContext.Injector injector; - final List propagationKeys; + final Set propagationKeys; final String remoteServiceName; KafkaTracing(Builder builder) { // intentionally hidden constructor @@ -80,7 +83,7 @@ public KafkaTracing build() { } else { this.injector = tracing.propagation().injector(KafkaPropagation.SETTER); } - this.propagationKeys = builder.tracing.propagation().keys(); + this.propagationKeys = new LinkedHashSet<>(builder.tracing.propagation().keys()); this.remoteServiceName = builder.remoteServiceName; } @@ -123,9 +126,15 @@ TraceContextOrSamplingFlags extractAndClearHeaders(Headers headers) { return extracted; } + // BRAVE6: consider a messaging variant of extraction which clears headers as they are read. + // this could prevent having to go back and clear them later. Another option is to encourage, + // then special-case single header propagation. When there's only 1 propagation key, you don't + // need to do a loop! void clearHeaders(Headers headers) { - for (int i = 0, length = propagationKeys.size(); i < length; i++) { - headers.remove(propagationKeys.get(i)); + // Headers::remove creates and consumes an iterator each time. This does one loop instead. + for (Iterator

i = headers.iterator(); i.hasNext(); ) { + Header next = i.next(); + if (propagationKeys.contains(next.key())) i.remove(); } } diff --git a/instrumentation/kafka-clients/src/main/java/brave/kafka/clients/TracingProducer.java b/instrumentation/kafka-clients/src/main/java/brave/kafka/clients/TracingProducer.java index bd027c5038..f0e5ac2652 100644 --- a/instrumentation/kafka-clients/src/main/java/brave/kafka/clients/TracingProducer.java +++ b/instrumentation/kafka-clients/src/main/java/brave/kafka/clients/TracingProducer.java @@ -85,22 +85,24 @@ public Future send(ProducerRecord record, @Nullable Callba // always clear message headers after reading. Span span; if (maybeParent == null) { - span = tracer.nextSpan(extractor.extract(record.headers())); + span = tracer.nextSpan(kafkaTracing.extractAndClearHeaders(record.headers())); } else { + // If we have a span in scope assume headers were cleared before span = tracer.newChild(maybeParent); } - kafkaTracing.clearHeaders(record.headers()); - injector.inject(span.context(), record.headers()); - if (!span.isNoop()) { + span.kind(Span.Kind.PRODUCER).name("send"); + if (remoteServiceName != null) span.remoteServiceName(remoteServiceName); if (record.key() instanceof String && !"".equals(record.key())) { span.tag(KafkaTags.KAFKA_KEY_TAG, record.key().toString()); } - if (remoteServiceName != null) span.remoteServiceName(remoteServiceName); - span.tag(KafkaTags.KAFKA_TOPIC_TAG, record.topic()).name("send").kind(Kind.PRODUCER).start(); + span.tag(KafkaTags.KAFKA_TOPIC_TAG, record.topic()); + span.start(); } + injector.inject(span.context(), record.headers()); + try (Tracer.SpanInScope ws = tracer.withSpanInScope(span)) { return delegate.send(record, new TracingCallback(span, callback)); } catch (RuntimeException | Error e) { diff --git a/instrumentation/spring-rabbit/src/main/java/brave/spring/rabbit/TracingMessagePostProcessor.java b/instrumentation/spring-rabbit/src/main/java/brave/spring/rabbit/TracingMessagePostProcessor.java index 2c660eb2e5..be8c0ec78c 100644 --- a/instrumentation/spring-rabbit/src/main/java/brave/spring/rabbit/TracingMessagePostProcessor.java +++ b/instrumentation/spring-rabbit/src/main/java/brave/spring/rabbit/TracingMessagePostProcessor.java @@ -7,7 +7,6 @@ import brave.propagation.CurrentTraceContext; import brave.propagation.TraceContext; import brave.propagation.TraceContext.Injector; -import brave.propagation.TraceContextOrSamplingFlags; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.amqp.core.MessageProperties; @@ -45,11 +44,10 @@ final class TracingMessagePostProcessor implements MessagePostProcessor { // always clear message headers after reading. Span span; if (maybeParent == null) { - TraceContextOrSamplingFlags extracted = springRabbitTracing.extractAndClearHeaders(message); - span = tracer.nextSpan(extracted); + span = tracer.nextSpan(springRabbitTracing.extractAndClearHeaders(message)); } else { + // If we have a span in scope assume headers were cleared before span = tracer.newChild(maybeParent); - springRabbitTracing.clearHeaders(message.getMessageProperties().getHeaders()); } if (!span.isNoop()) {