Skip to content

Commit

Permalink
Merge pull request #143 from openzipkin/kafka-deux
Browse files Browse the repository at this point in the history
Refactors such that Kafka bundles spans and shares more code
  • Loading branch information
kristofa committed Mar 5, 2016
2 parents 69b2ef1 + 74cc5af commit af557af
Show file tree
Hide file tree
Showing 11 changed files with 396 additions and 383 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package com.github.kristofa.brave;

import com.github.kristofa.brave.internal.Nullable;
import com.twitter.zipkin.gen.Span;
import com.twitter.zipkin.gen.SpanCodec;
import java.io.Closeable;
import java.io.Flushable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;

import static java.util.concurrent.TimeUnit.SECONDS;

/**
* Implemented {@link #sendSpans} to transport a encoded list of spans to Zipkin.
*/
public abstract class AbstractSpanCollector implements SpanCollector, Flushable, Closeable {

private final SpanCodec codec;
private final SpanCollectorMetricsHandler metrics;
private final BlockingQueue<Span> pending = new LinkedBlockingQueue<Span>(1000);
@Nullable // for testing
private final Flusher flusher;

/**
* @param flushInterval in seconds. 0 implies spans are {@link #flush() flushed externally.
*/
public AbstractSpanCollector(SpanCodec codec, SpanCollectorMetricsHandler metrics,
int flushInterval) {
this.codec = codec;
this.metrics = metrics;
this.flusher = flushInterval > 0 ? new Flusher(this, flushInterval) : null;
}

/**
* Queues the span for collection, or drops it if the queue is full.
*
* @param span Span, should not be <code>null</code>.
*/
@Override
public void collect(Span span) {
metrics.incrementAcceptedSpans(1);
if (!pending.offer(span)) {
metrics.incrementDroppedSpans(1);
}
}

/**
* Calling this will flush any pending spans to the transport on the current thread.
*/
@Override
public void flush() {
if (pending.isEmpty()) return;
List<Span> drained = new ArrayList<Span>(pending.size());
pending.drainTo(drained);
if (drained.isEmpty()) return;

// encode the spans for transport
int spanCount = drained.size();
byte[] encoded;
try {
encoded = codec.writeSpans(drained);
} catch (RuntimeException e) {
metrics.incrementDroppedSpans(spanCount);
return;
}

// transport the spans
try {
sendSpans(encoded);
} catch (IOException e) {
metrics.incrementDroppedSpans(spanCount);
return;
}
}

/** Calls flush on a fixed interval */
static final class Flusher implements Runnable {
final Flushable flushable;
final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

Flusher(Flushable flushable, int flushInterval) {
this.flushable = flushable;
this.scheduler.scheduleWithFixedDelay(this, 0, flushInterval, SECONDS);
}

@Override
public void run() {
try {
flushable.flush();
} catch (IOException ignored) {
}
}
}

/**
* Sends a encoded list of spans over the current transport.
*
* @throws IOException when thrown, drop metrics will increment accordingly
*/
protected abstract void sendSpans(byte[] encoded) throws IOException;

@Override
public void addDefaultAnnotation(String key, String value) {
throw new UnsupportedOperationException();
}

/**
* Requests a cease of delivery. There will be at most one in-flight send after this call.
*/
@Override
public void close() {
if (flusher != null) flusher.scheduler.shutdown();
// throw any outstanding spans on the floor
int dropped = pending.drainTo(new LinkedList<Span>());
metrics.incrementDroppedSpans(dropped);
}
}
8 changes: 7 additions & 1 deletion brave-spancollector-http/README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
# brave-spancollector-http #

SpanCollector that is used to submit spans to Zipkins Http endpoint `/spans`.
SpanCollector that encodes spans into a json list, POSTed to `/api/v1/spans`.

## Configuration ##

By default...

* Spans are flushed to a POST request every second. Configure with `HttpSpanCollector.Config.flushInterval`.
* The POST body is not compressed. Configure with `HttpSpanCollector.Config.compressionEnabled`.
10 changes: 0 additions & 10 deletions brave-spancollector-http/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,6 @@
<artifactId>auto-value</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
</dependency>
<!-- org.apache.thrift.ProcessFunction v0.9 uses SLF4J at runtime -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.zipkin.java</groupId>
<artifactId>zipkin-junit</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,34 +1,21 @@
package com.github.kristofa.brave.http;

import com.github.kristofa.brave.AbstractSpanCollector;
import com.github.kristofa.brave.EmptySpanCollectorMetricsHandler;
import com.github.kristofa.brave.SpanCollector;
import com.github.kristofa.brave.SpanCollectorMetricsHandler;
import com.github.kristofa.brave.internal.Nullable;
import com.google.auto.value.AutoValue;
import com.twitter.zipkin.gen.Span;
import com.twitter.zipkin.gen.SpanCodec;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.Flushable;
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.zip.GZIPOutputStream;

import static java.util.concurrent.TimeUnit.SECONDS;

/**
* SpanCollector which submits spans to Zipkin, using its {@code POST /spans} endpoint.
*/
public final class HttpSpanCollector implements SpanCollector, Flushable, Closeable {
public final class HttpSpanCollector extends AbstractSpanCollector {

@AutoValue
public static abstract class Config {
Expand Down Expand Up @@ -72,104 +59,38 @@ public interface Builder {

private final String url;
private final Config config;
private final SpanCollectorMetricsHandler metrics;
private final BlockingQueue<Span> pending = new LinkedBlockingQueue<>(1000);
@Nullable // for testing
private final Flusher flusher;

/**
* Create a new instance with default configuration.
*
* @param baseUrl URL of the zipkin query server instance. Like: http://localhost:9411/
* @param metrics Gets notified when spans are accepted or dropped. If you are not interested in
* these events you can use {@linkplain EmptySpanCollectorMetricsHandler}
* these events you can use {@linkplain EmptySpanCollectorMetricsHandler}
*/
public static HttpSpanCollector create(String baseUrl, SpanCollectorMetricsHandler metrics) {
return new HttpSpanCollector(baseUrl, Config.builder().build(), metrics);
}

/**
* @param baseUrl URL of the zipkin query server instance. Like: http://localhost:9411/
* @param config controls flush interval and timeouts
* @param config includes flush interval and timeouts
* @param metrics Gets notified when spans are accepted or dropped. If you are not interested in
* these events you can use {@linkplain EmptySpanCollectorMetricsHandler}
* these events you can use {@linkplain EmptySpanCollectorMetricsHandler}
*/
public static HttpSpanCollector create(String baseUrl, Config config,
SpanCollectorMetricsHandler metrics) {
return new HttpSpanCollector(baseUrl, config, metrics);
}

// Visible for testing. Ex when tests need to explicitly control flushing, set interval to 0.
HttpSpanCollector(String baseUrl, Config config,
SpanCollectorMetricsHandler metrics) {
HttpSpanCollector(String baseUrl, Config config, SpanCollectorMetricsHandler metrics) {
super(SpanCodec.JSON, metrics, config.flushInterval());
this.url = baseUrl + (baseUrl.endsWith("/") ? "" : "/") + "api/v1/spans";
this.metrics = metrics;
this.config = config;
this.flusher = config.flushInterval() > 0 ? new Flusher(this, config.flushInterval()) : null;
}

/**
* Queues the span for collection, or drops it if the queue is full.
*
* @param span Span, should not be <code>null</code>.
*/
@Override
public void collect(Span span) {
metrics.incrementAcceptedSpans(1);
if (!pending.offer(span)) {
metrics.incrementDroppedSpans(1);
}
}

/**
* Calling this will flush any pending spans to the http transport on the current thread.
*/
@Override
public void flush() {
if (pending.isEmpty()) return;
List<Span> drained = new ArrayList<>(pending.size());
pending.drainTo(drained);
if (drained.isEmpty()) return;

// json-encode the spans for transport
int spanCount = drained.size();
byte[] json;
try {
json = SpanCodec.JSON.writeSpans(drained);
} catch (RuntimeException e) {
metrics.incrementDroppedSpans(spanCount);
return;
}

// Send the json to the zipkin endpoint
try {
postSpans(json);
} catch (IOException e) {
metrics.incrementDroppedSpans(spanCount);
return;
}
}

/** Calls flush on a fixed interval */
static final class Flusher implements Runnable {
final Flushable flushable;
final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

Flusher(Flushable flushable, int flushInterval) {
this.flushable = flushable;
this.scheduler.scheduleWithFixedDelay(this, 0, flushInterval, SECONDS);
}

@Override
public void run() {
try {
flushable.flush();
} catch (IOException ignored) {
}
}
}

void postSpans(byte[] json) throws IOException {
protected void sendSpans(byte[] json) throws IOException {
// intentionally not closing the connection, so as to use keep-alives
HttpURLConnection connection = (HttpURLConnection) new URL(url).openConnection();
connection.setConnectTimeout(config.connectTimeout());
Expand Down Expand Up @@ -199,21 +120,4 @@ void postSpans(byte[] json) throws IOException {
throw e;
}
}

@Override
public void addDefaultAnnotation(String key, String value) {
throw new UnsupportedOperationException();
}

/**
* Requests a cease of delivery. There will be at most one in-flight request processing after this
* call returns.
*/
@Override
public void close() {
if (flusher != null) flusher.scheduler.shutdown();
// throw any outstanding spans on the floor
int dropped = pending.drainTo(new LinkedList<>());
metrics.incrementDroppedSpans(dropped);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.twitter.zipkin.gen.Span;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.After;
import org.junit.Rule;
import org.junit.Test;
import zipkin.junit.HttpFailure;
Expand All @@ -23,6 +24,11 @@ public class HttpSpanCollectorTest {
HttpSpanCollector.Config config = HttpSpanCollector.Config.builder().flushInterval(0).build();
HttpSpanCollector collector = new HttpSpanCollector(zipkin.httpUrl(), config, metrics);

@After
public void closeCollector(){
collector.close();
}

@Test
public void collectDoesntDoIO() throws Exception {
collector.collect(span(1L, "foo"));
Expand Down
13 changes: 11 additions & 2 deletions brave-spancollector-kafka/README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,17 @@
# brave-spancollector-kafka #

SpanCollector that is used to submit spans to Kafka.
SpanCollector that encodes spans into a thrift list, sent to the Kafka topic `zipkin`.

Spans are sent to a topic named `zipkin` and contain no key or partition only a value which is a TBinaryProtocol encoded Span.
Kafka messages contain no key or partition, only a value which is a TBinaryProtocol encoded list of spans.

*Important*
If using zipkin-collector-service (or zipkin-receiver-kafka), you must run v1.35+

## Configuration ##

By default...

* Spans are flushed to a Kafka message every second. Configure with `KafkaSpanCollector.Config.flushInterval`.

## Monitoring ##

Expand Down
11 changes: 5 additions & 6 deletions brave-spancollector-kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@
<artifactId>brave-core</artifactId>
<version>3.4.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.google.auto.value</groupId>
<artifactId>auto-value</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
Expand All @@ -52,11 +57,5 @@
<version>1.7</version>
<scope>test</scope>
</dependency>
<!-- org.apache.thrift.ProcessFunction v0.9 uses SLF4J at runtime -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Loading

0 comments on commit af557af

Please sign in to comment.