Skip to content

Commit

Permalink
Refactors such that Kafka and Http collectors share more code
Browse files Browse the repository at this point in the history
  • Loading branch information
Adrian Cole committed Mar 2, 2016
1 parent d20b703 commit 3a504cd
Show file tree
Hide file tree
Showing 4 changed files with 234 additions and 104 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package com.github.kristofa.brave;

import com.github.kristofa.brave.internal.Nullable;
import com.twitter.zipkin.gen.Span;
import com.twitter.zipkin.gen.SpanCodec;
import java.io.Closeable;
import java.io.Flushable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;

import static java.util.concurrent.TimeUnit.SECONDS;

/**
* Implemented {@link #sendSpans} to transport a encoded list of spans to Zipkin.
*/
public abstract class AbstractSpanCollector implements SpanCollector, Flushable, Closeable {

private final SpanCodec codec;
private final SpanCollectorMetricsHandler metrics;
private final BlockingQueue<Span> pending = new LinkedBlockingQueue<Span>(1000);
@Nullable // for testing
private final Flusher flusher;

/**
* @param flushInterval in seconds. 0 implies spans are {@link #flush() flushed externally.
*/
public AbstractSpanCollector(SpanCodec codec, SpanCollectorMetricsHandler metrics,
int flushInterval) {
this.codec = codec;
this.metrics = metrics;
this.flusher = flushInterval > 0 ? new Flusher(this, flushInterval) : null;
}

/**
* Queues the span for collection, or drops it if the queue is full.
*
* @param span Span, should not be <code>null</code>.
*/
@Override
public void collect(Span span) {
metrics.incrementAcceptedSpans(1);
if (!pending.offer(span)) {
metrics.incrementDroppedSpans(1);
}
}

/**
* Calling this will flush any pending spans to the http transport on the current thread.
*/
@Override
public void flush() {
if (pending.isEmpty()) return;
List<Span> drained = new ArrayList<Span>(pending.size());
pending.drainTo(drained);
if (drained.isEmpty()) return;

// encode the spans for transport
int spanCount = drained.size();
byte[] encoded;
try {
encoded = codec.writeSpans(drained);
} catch (RuntimeException e) {
metrics.incrementDroppedSpans(spanCount);
return;
}

// transport the spans
try {
sendSpans(encoded);
} catch (IOException e) {
metrics.incrementDroppedSpans(spanCount);
return;
}
}

/** Calls flush on a fixed interval */
static final class Flusher implements Runnable {
final Flushable flushable;
final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

Flusher(Flushable flushable, int flushInterval) {
this.flushable = flushable;
this.scheduler.scheduleWithFixedDelay(this, 0, flushInterval, SECONDS);
}

@Override
public void run() {
try {
flushable.flush();
} catch (IOException ignored) {
}
}
}

/**
* Sends a encoded list of spans over the current transport.
*
* @throws IOException when thrown, drop metrics will increment accordingly
*/
protected abstract void sendSpans(byte[] encoded) throws IOException;

@Override
public void addDefaultAnnotation(String key, String value) {
throw new UnsupportedOperationException();
}

/**
* Requests a cease of delivery. There will be at most one in-flight send after this call.
*/
@Override
public void close() {
if (flusher != null) flusher.scheduler.shutdown();
// throw any outstanding spans on the floor
int dropped = pending.drainTo(new LinkedList<Span>());
metrics.incrementDroppedSpans(dropped);
}
}
Original file line number Diff line number Diff line change
@@ -1,30 +1,19 @@
package com.github.kristofa.brave.http;

import com.github.kristofa.brave.*;
import com.github.kristofa.brave.internal.Nullable;
import com.github.kristofa.brave.AbstractSpanCollector;
import com.github.kristofa.brave.EmptySpanCollectorMetricsHandler;
import com.github.kristofa.brave.SpanCollectorMetricsHandler;
import com.google.auto.value.AutoValue;
import com.twitter.zipkin.gen.SpanCodec;
import com.twitter.zipkin.gen.Span;
import java.io.Closeable;
import java.io.Flushable;
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;

import static java.util.concurrent.TimeUnit.SECONDS;

/**
* SpanCollector which submits spans to Zipkin, using its {@code POST /spans} endpoint.
*/
public final class HttpSpanCollector implements SpanCollector, Flushable, Closeable {
public final class HttpSpanCollector extends AbstractSpanCollector {

@AutoValue
public static abstract class Config {
Expand Down Expand Up @@ -58,104 +47,38 @@ public interface Builder {

private final String url;
private final Config config;
private final SpanCollectorMetricsHandler metrics;
private final BlockingQueue<Span> pending = new LinkedBlockingQueue<>(1000);
@Nullable // for testing
private final Flusher flusher;

/**
* Create a new instance with default configuration.
*
* @param baseUrl URL of the zipkin query server instance. Like: http://localhost:9411/
* @param metrics Gets notified when spans are accepted or dropped. If you are not interested in
* these events you can use {@linkplain EmptySpanCollectorMetricsHandler}
* these events you can use {@linkplain EmptySpanCollectorMetricsHandler}
*/
public static HttpSpanCollector create(String baseUrl, SpanCollectorMetricsHandler metrics) {
return new HttpSpanCollector(baseUrl, Config.builder().build(), metrics);
}

/**
* @param baseUrl URL of the zipkin query server instance. Like: http://localhost:9411/
* @param config controls flush interval and timeouts
* @param config includes flush interval and timeouts
* @param metrics Gets notified when spans are accepted or dropped. If you are not interested in
* these events you can use {@linkplain EmptySpanCollectorMetricsHandler}
* these events you can use {@linkplain EmptySpanCollectorMetricsHandler}
*/
public static HttpSpanCollector create(String baseUrl, Config config,
SpanCollectorMetricsHandler metrics) {
return new HttpSpanCollector(baseUrl, config, metrics);
}

// Visible for testing. Ex when tests need to explicitly control flushing, set interval to 0.
HttpSpanCollector(String baseUrl, Config config,
SpanCollectorMetricsHandler metrics) {
HttpSpanCollector(String baseUrl, Config config, SpanCollectorMetricsHandler metrics) {
super(SpanCodec.JSON, metrics, config.flushInterval());
this.url = baseUrl + (baseUrl.endsWith("/") ? "" : "/") + "api/v1/spans";
this.metrics = metrics;
this.config = config;
this.flusher = config.flushInterval() > 0 ? new Flusher(this, config.flushInterval()) : null;
}

/**
* Queues the span for collection, or drops it if the queue is full.
*
* @param span Span, should not be <code>null</code>.
*/
@Override
public void collect(Span span) {
metrics.incrementAcceptedSpans(1);
if (!pending.offer(span)) {
metrics.incrementDroppedSpans(1);
}
}

/**
* Calling this will flush any pending spans to the http transport on the current thread.
*/
@Override
public void flush() {
if (pending.isEmpty()) return;
List<Span> drained = new ArrayList<>(pending.size());
pending.drainTo(drained);
if (drained.isEmpty()) return;

// json-encode the spans for transport
int spanCount = drained.size();
byte[] json;
try {
json = SpanCodec.JSON.writeSpans(drained);
} catch (RuntimeException e) {
metrics.incrementDroppedSpans(spanCount);
return;
}

// Send the json to the zipkin endpoint
try {
postSpans(json);
} catch (IOException e) {
metrics.incrementDroppedSpans(spanCount);
return;
}
}

/** Calls flush on a fixed interval */
static final class Flusher implements Runnable {
final Flushable flushable;
final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

Flusher(Flushable flushable, int flushInterval) {
this.flushable = flushable;
this.scheduler.scheduleWithFixedDelay(this, 0, flushInterval, SECONDS);
}

@Override
public void run() {
try {
flushable.flush();
} catch (IOException ignored) {
}
}
}

void postSpans(byte[] json) throws IOException {
protected void sendSpans(byte[] json) throws IOException {
// intentionally not closing the connection, so as to use keep-alives
HttpURLConnection connection = (HttpURLConnection) new URL(url).openConnection();
connection.setConnectTimeout(config.connectTimeout());
Expand All @@ -177,21 +100,4 @@ void postSpans(byte[] json) throws IOException {
throw e;
}
}

@Override
public void addDefaultAnnotation(String key, String value) {
throw new UnsupportedOperationException();
}

/**
* Requests a cease of delivery. There will be at most one in-flight request processing after this
* call returns.
*/
@Override
public void close() {
if (flusher != null) flusher.scheduler.shutdown();
// throw any outstanding spans on the floor
int dropped = pending.drainTo(new LinkedList<>());
metrics.incrementDroppedSpans(dropped);
}
}
5 changes: 5 additions & 0 deletions brave-spancollector-kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@
<artifactId>brave-core</artifactId>
<version>3.4.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.google.auto.value</groupId>
<artifactId>auto-value</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
Expand Down
Loading

0 comments on commit 3a504cd

Please sign in to comment.