Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds b3SingleFormat flag to message instrumentation #772

Merged
merged 3 commits into from
Aug 25, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions instrumentation/benchmarks/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,34 @@
<artifactId>log4j-slf4j-impl</artifactId>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>brave-instrumentation-kafka-clients</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>brave-instrumentation-spring-rabbit</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>${spring-rabbit.version}</version>
</dependency>

<dependency>
<groupId>io.undertow</groupId>
<artifactId>undertow-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package brave.kafka.clients;

import brave.Tracing;
import com.google.common.util.concurrent.Futures;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;
import zipkin2.reporter.Reporter;

@Measurement(iterations = 5, time = 1)
@Warmup(iterations = 10, time = 1)
@Fork(3)
@BenchmarkMode(Mode.SampleTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
@State(Scope.Thread)
public class TracingProducerBenchmarks {
ProducerRecord<String, String> record = new ProducerRecord<>("topic", "key", "value");
Producer<String, String> producer, tracingProducer, tracingB3SingleProducer;

@Setup(Level.Trial) public void init() {
Tracing tracing = Tracing.newBuilder().spanReporter(Reporter.NOOP).build();
producer = new FakeProducer();
tracingProducer = KafkaTracing.create(tracing).producer(producer);
tracingB3SingleProducer =
KafkaTracing.newBuilder(tracing).b3SingleFormat(true).build().producer(producer);
}

@TearDown(Level.Trial) public void close() {
Tracing.current().close();
}

@Benchmark public RecordMetadata send_baseCase() throws Exception {
return producer.send(record).get();
}

@Benchmark public RecordMetadata send_traced() throws Exception {
return tracingProducer.send(record).get();
}

@Benchmark public RecordMetadata send_traced_b3Single() throws Exception {
return tracingB3SingleProducer.send(record).get();
}

// Convenience main entry-point
public static void main(String[] args) throws RunnerException {
Options opt = new OptionsBuilder()
.addProfiler("gc")
.include(".*" + TracingProducerBenchmarks.class.getSimpleName())
.build();

new Runner(opt).run();
}

static final class FakeProducer implements Producer<String, String> {
@Override public void initTransactions() {
}

@Override public void beginTransaction() {
}

@Override
public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> map, String s) {
}

@Override public void commitTransaction() {
}

@Override public void abortTransaction() {
}

@Override public Future<RecordMetadata> send(ProducerRecord<String, String> record) {
return send(record, null);
}

@Override
public Future<RecordMetadata> send(ProducerRecord<String, String> record, Callback callback) {
TopicPartition tp = new TopicPartition(record.topic(), 0);
RecordMetadata rm = new RecordMetadata(tp, -1L, -1L, 1L, 2L, 3, 4);
if (callback != null) callback.onCompletion(rm, null);
return Futures.immediateFuture(rm);
}

@Override public void flush() {
}

@Override public List<PartitionInfo> partitionsFor(String s) {
return null;
}

@Override public Map<MetricName, ? extends Metric> metrics() {
return null;
}

@Override public void close() {
}

@Override public void close(long l, TimeUnit timeUnit) {
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package brave.spring.rabbit;

import brave.Tracing;
import java.util.concurrent.TimeUnit;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import zipkin2.reporter.Reporter;

@Measurement(iterations = 5, time = 1)
@Warmup(iterations = 10, time = 1)
@Fork(3)
@BenchmarkMode(Mode.SampleTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
@State(Scope.Thread)
public class TracingMessagePostProcessorBenchmarks {
Message message = MessageBuilder.withBody(new byte[0]).build();
TracingMessagePostProcessor tracingPostProcessor, tracingB3SinglePostProcessor;

@Setup(Level.Trial) public void init() {
Tracing tracing = Tracing.newBuilder().spanReporter(Reporter.NOOP).build();
tracingPostProcessor = new TracingMessagePostProcessor(SpringRabbitTracing.create(tracing));
tracingB3SinglePostProcessor = new TracingMessagePostProcessor(
SpringRabbitTracing.newBuilder(tracing).b3SingleFormat(true).build()
);
}

@TearDown(Level.Trial) public void close() {
Tracing.current().close();
}

@Benchmark public Message send_traced() {
return tracingPostProcessor.postProcessMessage(message);
}

@Benchmark public Message send_traced_b3Single() {
return tracingB3SinglePostProcessor.postProcessMessage(message);
}

// Convenience main entry-point
public static void main(String[] args) throws RunnerException {
Options opt = new OptionsBuilder()
.addProfiler("gc")
.include(".*" + TracingMessagePostProcessorBenchmarks.class.getSimpleName())
.build();

new Runner(opt).run();
}
}
1 change: 1 addition & 0 deletions instrumentation/kafka-clients/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ Add decorators for Kafka producer and consumer to enable tracing.
First, setup the generic Kafka component like this:
```java
kafkaTracing = KafkaTracing.newBuilder(tracing)
.b3SingleFormat(true) // for more efficient propagation
.remoteServiceName("my-broker")
.build();
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,35 @@

import brave.propagation.Propagation.Getter;
import brave.propagation.Propagation.Setter;
import brave.propagation.TraceContext;
import brave.propagation.TraceContext.Injector;
import java.nio.charset.Charset;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;

import static brave.propagation.B3SingleFormat.writeB3SingleFormat;
import static brave.propagation.B3SingleFormat.writeB3SingleFormatWithoutParentIdAsBytes;

final class KafkaPropagation {

static final Charset UTF_8 = Charset.forName("UTF-8");

static final TraceContext TEST_CONTEXT = TraceContext.newBuilder().traceId(1L).spanId(1L).build();
static final Headers B3_SINGLE_TEST_HEADERS =
new RecordHeaders().add("b3", writeB3SingleFormat(TEST_CONTEXT).getBytes(UTF_8));

static final Injector<Headers> B3_SINGLE_INJECTOR = new Injector<Headers>() {
@Override public void inject(TraceContext traceContext, Headers carrier) {
carrier.remove("b3");
carrier.add("b3", writeB3SingleFormatWithoutParentIdAsBytes(traceContext));
}

@Override public String toString() {
return "Headers::add(\"b3\",singleHeaderFormatWithoutParent)";
}
};

static final Setter<Headers, String> SETTER = (carrier, key, value) -> {
carrier.remove(key);
carrier.add(key, value.getBytes(UTF_8));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,21 @@
import brave.Span;
import brave.SpanCustomizer;
import brave.Tracing;
import brave.propagation.B3SingleFormat;
import brave.propagation.TraceContext;
import brave.propagation.TraceContextOrSamplingFlags;
import java.util.List;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.Set;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;

import static brave.kafka.clients.KafkaPropagation.B3_SINGLE_TEST_HEADERS;
import static brave.kafka.clients.KafkaPropagation.TEST_CONTEXT;

/** Use this class to decorate your Kafka consumer / producer and enable Tracing. */
public final class KafkaTracing {

Expand All @@ -25,6 +32,7 @@ public static Builder newBuilder(Tracing tracing) {
public static final class Builder {
final Tracing tracing;
String remoteServiceName = "kafka";
boolean b3SingleFormat;

Builder(Tracing tracing) {
if (tracing == null) throw new NullPointerException("tracing == null");
Expand All @@ -40,6 +48,17 @@ public Builder remoteServiceName(String remoteServiceName) {
return this;
}

/**
* When true, only writes a single {@link B3SingleFormat b3 header} for outbound propagation.
*
* <p>Use this to reduce overhead. Note: normal {@link Tracing#propagation()} is used to parse
* incoming headers. The implementation must be able to read "b3" headers.
*/
public Builder b3SingleFormat(boolean b3SingleFormat) {
this.b3SingleFormat = b3SingleFormat;
return this;
}

public KafkaTracing build() {
return new KafkaTracing(this);
}
Expand All @@ -48,14 +67,23 @@ public KafkaTracing build() {
final Tracing tracing;
final TraceContext.Extractor<Headers> extractor;
final TraceContext.Injector<Headers> injector;
final List<String> propagationKeys;
final Set<String> propagationKeys;
final String remoteServiceName;

KafkaTracing(Builder builder) { // intentionally hidden constructor
this.tracing = builder.tracing;
this.extractor = tracing.propagation().extractor(KafkaPropagation.GETTER);
this.injector = tracing.propagation().injector(KafkaPropagation.SETTER);
this.propagationKeys = builder.tracing.propagation().keys();
if (builder.b3SingleFormat) {
TraceContext testExtraction = extractor.extract(B3_SINGLE_TEST_HEADERS).context();
if (!TEST_CONTEXT.equals(testExtraction)) {
throw new IllegalArgumentException(
"KafkaTracing.Builder.b3SingleFormat set, but Tracing.Builder.propagationFactory cannot parse this format!");
}
this.injector = KafkaPropagation.B3_SINGLE_INJECTOR;
} else {
this.injector = tracing.propagation().injector(KafkaPropagation.SETTER);
}
this.propagationKeys = new LinkedHashSet<>(builder.tracing.propagation().keys());
this.remoteServiceName = builder.remoteServiceName;
}

Expand Down Expand Up @@ -98,9 +126,15 @@ TraceContextOrSamplingFlags extractAndClearHeaders(Headers headers) {
return extracted;
}

// BRAVE6: consider a messaging variant of extraction which clears headers as they are read.
// this could prevent having to go back and clear them later. Another option is to encourage,
// then special-case single header propagation. When there's only 1 propagation key, you don't
// need to do a loop!
void clearHeaders(Headers headers) {
for (int i = 0, length = propagationKeys.size(); i < length; i++) {
headers.remove(propagationKeys.get(i));
// Headers::remove creates and consumes an iterator each time. This does one loop instead.
for (Iterator<Header> i = headers.iterator(); i.hasNext(); ) {
Header next = i.next();
if (propagationKeys.contains(next.key())) i.remove();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,22 +85,24 @@ public Future<RecordMetadata> send(ProducerRecord<K, V> record, @Nullable Callba
// always clear message headers after reading.
Span span;
if (maybeParent == null) {
span = tracer.nextSpan(extractor.extract(record.headers()));
span = tracer.nextSpan(kafkaTracing.extractAndClearHeaders(record.headers()));
} else {
// If we have a span in scope assume headers were cleared before
span = tracer.newChild(maybeParent);
}

kafkaTracing.clearHeaders(record.headers());
injector.inject(span.context(), record.headers());

if (!span.isNoop()) {
span.kind(Span.Kind.PRODUCER).name("send");
if (remoteServiceName != null) span.remoteServiceName(remoteServiceName);
if (record.key() instanceof String && !"".equals(record.key())) {
span.tag(KafkaTags.KAFKA_KEY_TAG, record.key().toString());
}
if (remoteServiceName != null) span.remoteServiceName(remoteServiceName);
span.tag(KafkaTags.KAFKA_TOPIC_TAG, record.topic()).name("send").kind(Kind.PRODUCER).start();
span.tag(KafkaTags.KAFKA_TOPIC_TAG, record.topic());
span.start();
}

injector.inject(span.context(), record.headers());

try (Tracer.SpanInScope ws = tracer.withSpanInScope(span)) {
return delegate.send(record, new TracingCallback(span, callback));
} catch (RuntimeException | Error e) {
Expand Down
Loading