diff --git a/brave-core/src/main/java/com/github/kristofa/brave/AbstractSpanCollector.java b/brave-core/src/main/java/com/github/kristofa/brave/AbstractSpanCollector.java new file mode 100755 index 0000000000..6fab4db46a --- /dev/null +++ b/brave-core/src/main/java/com/github/kristofa/brave/AbstractSpanCollector.java @@ -0,0 +1,123 @@ +package com.github.kristofa.brave; + +import com.github.kristofa.brave.internal.Nullable; +import com.twitter.zipkin.gen.Span; +import com.twitter.zipkin.gen.SpanCodec; +import java.io.Closeable; +import java.io.Flushable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; + +import static java.util.concurrent.TimeUnit.SECONDS; + +/** + * Implemented {@link #sendSpans} to transport a encoded list of spans to Zipkin. + */ +public abstract class AbstractSpanCollector implements SpanCollector, Flushable, Closeable { + + private final SpanCodec codec; + private final SpanCollectorMetricsHandler metrics; + private final BlockingQueue pending = new LinkedBlockingQueue(1000); + @Nullable // for testing + private final Flusher flusher; + + /** + * @param flushInterval in seconds. 0 implies spans are {@link #flush() flushed externally. + */ + public AbstractSpanCollector(SpanCodec codec, SpanCollectorMetricsHandler metrics, + int flushInterval) { + this.codec = codec; + this.metrics = metrics; + this.flusher = flushInterval > 0 ? new Flusher(this, flushInterval) : null; + } + + /** + * Queues the span for collection, or drops it if the queue is full. + * + * @param span Span, should not be null. + */ + @Override + public void collect(Span span) { + metrics.incrementAcceptedSpans(1); + if (!pending.offer(span)) { + metrics.incrementDroppedSpans(1); + } + } + + /** + * Calling this will flush any pending spans to the transport on the current thread. + */ + @Override + public void flush() { + if (pending.isEmpty()) return; + List drained = new ArrayList(pending.size()); + pending.drainTo(drained); + if (drained.isEmpty()) return; + + // encode the spans for transport + int spanCount = drained.size(); + byte[] encoded; + try { + encoded = codec.writeSpans(drained); + } catch (RuntimeException e) { + metrics.incrementDroppedSpans(spanCount); + return; + } + + // transport the spans + try { + sendSpans(encoded); + } catch (IOException e) { + metrics.incrementDroppedSpans(spanCount); + return; + } + } + + /** Calls flush on a fixed interval */ + static final class Flusher implements Runnable { + final Flushable flushable; + final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); + + Flusher(Flushable flushable, int flushInterval) { + this.flushable = flushable; + this.scheduler.scheduleWithFixedDelay(this, 0, flushInterval, SECONDS); + } + + @Override + public void run() { + try { + flushable.flush(); + } catch (IOException ignored) { + } + } + } + + /** + * Sends a encoded list of spans over the current transport. + * + * @throws IOException when thrown, drop metrics will increment accordingly + */ + protected abstract void sendSpans(byte[] encoded) throws IOException; + + @Override + public void addDefaultAnnotation(String key, String value) { + throw new UnsupportedOperationException(); + } + + /** + * Requests a cease of delivery. There will be at most one in-flight send after this call. + */ + @Override + public void close() { + if (flusher != null) flusher.scheduler.shutdown(); + // throw any outstanding spans on the floor + int dropped = pending.drainTo(new LinkedList()); + metrics.incrementDroppedSpans(dropped); + } +} diff --git a/brave-spancollector-http/README.md b/brave-spancollector-http/README.md index 190cea69f1..d18aad6a4d 100644 --- a/brave-spancollector-http/README.md +++ b/brave-spancollector-http/README.md @@ -1,4 +1,10 @@ # brave-spancollector-http # -SpanCollector that is used to submit spans to Zipkins Http endpoint `/spans`. +SpanCollector that encodes spans into a json list, POSTed to `/api/v1/spans`. +## Configuration ## + +By default... + +* Spans are flushed to a POST request every second. Configure with `HttpSpanCollector.Config.flushInterval`. +* The POST body is not compressed. Configure with `HttpSpanCollector.Config.compressionEnabled`. diff --git a/brave-spancollector-http/pom.xml b/brave-spancollector-http/pom.xml index 3e8cc3cf61..8a3a64fb42 100644 --- a/brave-spancollector-http/pom.xml +++ b/brave-spancollector-http/pom.xml @@ -36,16 +36,6 @@ auto-value provided - - org.apache.thrift - libthrift - - - - org.apache.logging.log4j - log4j-slf4j-impl - test - io.zipkin.java zipkin-junit diff --git a/brave-spancollector-http/src/main/java/com/github/kristofa/brave/http/HttpSpanCollector.java b/brave-spancollector-http/src/main/java/com/github/kristofa/brave/http/HttpSpanCollector.java index dc87eeb42a..e901b21d19 100755 --- a/brave-spancollector-http/src/main/java/com/github/kristofa/brave/http/HttpSpanCollector.java +++ b/brave-spancollector-http/src/main/java/com/github/kristofa/brave/http/HttpSpanCollector.java @@ -1,34 +1,21 @@ package com.github.kristofa.brave.http; +import com.github.kristofa.brave.AbstractSpanCollector; import com.github.kristofa.brave.EmptySpanCollectorMetricsHandler; -import com.github.kristofa.brave.SpanCollector; import com.github.kristofa.brave.SpanCollectorMetricsHandler; -import com.github.kristofa.brave.internal.Nullable; import com.google.auto.value.AutoValue; -import com.twitter.zipkin.gen.Span; import com.twitter.zipkin.gen.SpanCodec; import java.io.ByteArrayOutputStream; -import java.io.Closeable; -import java.io.Flushable; import java.io.IOException; import java.io.InputStream; import java.net.HttpURLConnection; import java.net.URL; -import java.util.ArrayList; -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ScheduledExecutorService; import java.util.zip.GZIPOutputStream; -import static java.util.concurrent.TimeUnit.SECONDS; - /** * SpanCollector which submits spans to Zipkin, using its {@code POST /spans} endpoint. */ -public final class HttpSpanCollector implements SpanCollector, Flushable, Closeable { +public final class HttpSpanCollector extends AbstractSpanCollector { @AutoValue public static abstract class Config { @@ -72,17 +59,13 @@ public interface Builder { private final String url; private final Config config; - private final SpanCollectorMetricsHandler metrics; - private final BlockingQueue pending = new LinkedBlockingQueue<>(1000); - @Nullable // for testing - private final Flusher flusher; /** * Create a new instance with default configuration. * * @param baseUrl URL of the zipkin query server instance. Like: http://localhost:9411/ * @param metrics Gets notified when spans are accepted or dropped. If you are not interested in - * these events you can use {@linkplain EmptySpanCollectorMetricsHandler} + * these events you can use {@linkplain EmptySpanCollectorMetricsHandler} */ public static HttpSpanCollector create(String baseUrl, SpanCollectorMetricsHandler metrics) { return new HttpSpanCollector(baseUrl, Config.builder().build(), metrics); @@ -90,9 +73,9 @@ public static HttpSpanCollector create(String baseUrl, SpanCollectorMetricsHandl /** * @param baseUrl URL of the zipkin query server instance. Like: http://localhost:9411/ - * @param config controls flush interval and timeouts + * @param config includes flush interval and timeouts * @param metrics Gets notified when spans are accepted or dropped. If you are not interested in - * these events you can use {@linkplain EmptySpanCollectorMetricsHandler} + * these events you can use {@linkplain EmptySpanCollectorMetricsHandler} */ public static HttpSpanCollector create(String baseUrl, Config config, SpanCollectorMetricsHandler metrics) { @@ -100,76 +83,14 @@ public static HttpSpanCollector create(String baseUrl, Config config, } // Visible for testing. Ex when tests need to explicitly control flushing, set interval to 0. - HttpSpanCollector(String baseUrl, Config config, - SpanCollectorMetricsHandler metrics) { + HttpSpanCollector(String baseUrl, Config config, SpanCollectorMetricsHandler metrics) { + super(SpanCodec.JSON, metrics, config.flushInterval()); this.url = baseUrl + (baseUrl.endsWith("/") ? "" : "/") + "api/v1/spans"; - this.metrics = metrics; this.config = config; - this.flusher = config.flushInterval() > 0 ? new Flusher(this, config.flushInterval()) : null; - } - - /** - * Queues the span for collection, or drops it if the queue is full. - * - * @param span Span, should not be null. - */ - @Override - public void collect(Span span) { - metrics.incrementAcceptedSpans(1); - if (!pending.offer(span)) { - metrics.incrementDroppedSpans(1); - } } - /** - * Calling this will flush any pending spans to the http transport on the current thread. - */ @Override - public void flush() { - if (pending.isEmpty()) return; - List drained = new ArrayList<>(pending.size()); - pending.drainTo(drained); - if (drained.isEmpty()) return; - - // json-encode the spans for transport - int spanCount = drained.size(); - byte[] json; - try { - json = SpanCodec.JSON.writeSpans(drained); - } catch (RuntimeException e) { - metrics.incrementDroppedSpans(spanCount); - return; - } - - // Send the json to the zipkin endpoint - try { - postSpans(json); - } catch (IOException e) { - metrics.incrementDroppedSpans(spanCount); - return; - } - } - - /** Calls flush on a fixed interval */ - static final class Flusher implements Runnable { - final Flushable flushable; - final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); - - Flusher(Flushable flushable, int flushInterval) { - this.flushable = flushable; - this.scheduler.scheduleWithFixedDelay(this, 0, flushInterval, SECONDS); - } - - @Override - public void run() { - try { - flushable.flush(); - } catch (IOException ignored) { - } - } - } - - void postSpans(byte[] json) throws IOException { + protected void sendSpans(byte[] json) throws IOException { // intentionally not closing the connection, so as to use keep-alives HttpURLConnection connection = (HttpURLConnection) new URL(url).openConnection(); connection.setConnectTimeout(config.connectTimeout()); @@ -199,21 +120,4 @@ void postSpans(byte[] json) throws IOException { throw e; } } - - @Override - public void addDefaultAnnotation(String key, String value) { - throw new UnsupportedOperationException(); - } - - /** - * Requests a cease of delivery. There will be at most one in-flight request processing after this - * call returns. - */ - @Override - public void close() { - if (flusher != null) flusher.scheduler.shutdown(); - // throw any outstanding spans on the floor - int dropped = pending.drainTo(new LinkedList<>()); - metrics.incrementDroppedSpans(dropped); - } } diff --git a/brave-spancollector-http/src/test/java/com/github/kristofa/brave/http/HttpSpanCollectorTest.java b/brave-spancollector-http/src/test/java/com/github/kristofa/brave/http/HttpSpanCollectorTest.java index b6176c6f2e..ab5823f2a4 100644 --- a/brave-spancollector-http/src/test/java/com/github/kristofa/brave/http/HttpSpanCollectorTest.java +++ b/brave-spancollector-http/src/test/java/com/github/kristofa/brave/http/HttpSpanCollectorTest.java @@ -5,6 +5,7 @@ import com.twitter.zipkin.gen.Span; import java.util.Arrays; import java.util.concurrent.atomic.AtomicInteger; +import org.junit.After; import org.junit.Rule; import org.junit.Test; import zipkin.junit.HttpFailure; @@ -23,6 +24,11 @@ public class HttpSpanCollectorTest { HttpSpanCollector.Config config = HttpSpanCollector.Config.builder().flushInterval(0).build(); HttpSpanCollector collector = new HttpSpanCollector(zipkin.httpUrl(), config, metrics); + @After + public void closeCollector(){ + collector.close(); + } + @Test public void collectDoesntDoIO() throws Exception { collector.collect(span(1L, "foo")); diff --git a/brave-spancollector-kafka/README.md b/brave-spancollector-kafka/README.md index 39e31ea6ec..53193499de 100644 --- a/brave-spancollector-kafka/README.md +++ b/brave-spancollector-kafka/README.md @@ -1,8 +1,17 @@ # brave-spancollector-kafka # -SpanCollector that is used to submit spans to Kafka. +SpanCollector that encodes spans into a thrift list, sent to the Kafka topic `zipkin`. -Spans are sent to a topic named `zipkin` and contain no key or partition only a value which is a TBinaryProtocol encoded Span. +Kafka messages contain no key or partition, only a value which is a TBinaryProtocol encoded list of spans. + +*Important* +If using zipkin-collector-service (or zipkin-receiver-kafka), you must run v1.35+ + +## Configuration ## + +By default... + +* Spans are flushed to a Kafka message every second. Configure with `KafkaSpanCollector.Config.flushInterval`. ## Monitoring ## diff --git a/brave-spancollector-kafka/pom.xml b/brave-spancollector-kafka/pom.xml index c97d293436..98bfdec7fa 100644 --- a/brave-spancollector-kafka/pom.xml +++ b/brave-spancollector-kafka/pom.xml @@ -40,6 +40,11 @@ brave-core 3.4.1-SNAPSHOT + + com.google.auto.value + auto-value + provided + org.apache.kafka kafka-clients @@ -52,11 +57,5 @@ 1.7 test - - - org.apache.logging.log4j - log4j-slf4j-impl - test - diff --git a/brave-spancollector-kafka/src/main/java/com/github/kristofa/brave/kafka/KafkaSpanCollector.java b/brave-spancollector-kafka/src/main/java/com/github/kristofa/brave/kafka/KafkaSpanCollector.java index 776d751e51..5e0d48c34a 100644 --- a/brave-spancollector-kafka/src/main/java/com/github/kristofa/brave/kafka/KafkaSpanCollector.java +++ b/brave-spancollector-kafka/src/main/java/com/github/kristofa/brave/kafka/KafkaSpanCollector.java @@ -1,107 +1,101 @@ package com.github.kristofa.brave.kafka; -import com.github.kristofa.brave.SpanCollector; - -import java.io.Closeable; -import java.util.Properties; -import java.util.concurrent.*; -import java.util.logging.Level; -import java.util.logging.Logger; - +import com.github.kristofa.brave.AbstractSpanCollector; +import com.github.kristofa.brave.EmptySpanCollectorMetricsHandler; import com.github.kristofa.brave.SpanCollectorMetricsHandler; -import com.twitter.zipkin.gen.Span; +import com.google.auto.value.AutoValue; +import com.twitter.zipkin.gen.SpanCodec; +import java.io.IOException; +import java.util.Properties; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; -import com.github.kristofa.brave.EmptySpanCollectorMetricsHandler; +import org.apache.kafka.clients.producer.ProducerRecord; +import zipkin.internal.ThriftCodec; /** - * SpanCollector which submits spans to Kafka using Kafka Producer api. - *

- * Spans are sent to kafka as keyed messages: the key is the topic zipkin and the value is a TBinaryProtocol encoded Span. - *

+ * SpanCollector which sends a thrift-encoded list of spans to the Kafka topic "zipkin". + * + *

Imporant If using zipkin-collector-service (or zipkin-receiver-kafka), you must run v1.35+ */ -public class KafkaSpanCollector implements SpanCollector, Closeable { +public final class KafkaSpanCollector extends AbstractSpanCollector { - private static final Logger LOGGER = Logger.getLogger(KafkaSpanCollector.class.getName()); - private static final Properties DEFAULT_PROPERTIES = new Properties(); - - static { - DEFAULT_PROPERTIES.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); - DEFAULT_PROPERTIES.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); + @AutoValue + public static abstract class Config { + public static Builder builder() { + return new AutoValue_KafkaSpanCollector_Config.Builder() + .flushInterval(1); } - private static Properties defaultPropertiesWith(String bootstrapServers) { - Properties props = new Properties(); - for (String name : DEFAULT_PROPERTIES.stringPropertyNames()) { - props.setProperty(name, DEFAULT_PROPERTIES.getProperty(name)); - } - props.setProperty("bootstrap.servers", bootstrapServers); - return props; + public static Builder builder(String bootstrapServers) { + Properties props = new Properties(); + props.put("bootstrap.servers", bootstrapServers); + props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); + props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); + return builder().kafkaProperties(props); } - private final Producer producer; - private final ExecutorService executorService; - private final SpanProcessingTask spanProcessingTask; - private final Future future; - private final BlockingQueue queue; - private final SpanCollectorMetricsHandler metricsHandler; + abstract Properties kafkaProperties(); - /** - * Create a new instance with default configuration. - * - * @param bootstrapServers A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. - * Like: host1:port1,host2:port2,... Does not to be all the servers part of Kafka cluster. - * @param metricsHandler Gets notified when spans are accepted or dropped. If you are not interested in these events you - * can use {@linkplain EmptySpanCollectorMetricsHandler} - */ - public KafkaSpanCollector(String bootstrapServers, SpanCollectorMetricsHandler metricsHandler) { - this(KafkaSpanCollector.defaultPropertiesWith(bootstrapServers), metricsHandler); - } + abstract int flushInterval(); - /** - * KafkaSpanCollector. - * - * @param kafkaProperties Configuration for Kafka producer. Essential configuration properties are: - * bootstrap.servers, key.serializer, value.serializer. For a - * full list of config options, see http://kafka.apache.org/documentation.html#producerconfigs. - * @param metricsHandler Gets notified when spans are accepted or dropped. If you are not interested in these events you - * can use {@linkplain EmptySpanCollectorMetricsHandler} - */ - public KafkaSpanCollector(Properties kafkaProperties, SpanCollectorMetricsHandler metricsHandler) { - producer = new KafkaProducer<>(kafkaProperties); - this.metricsHandler = metricsHandler; - executorService = Executors.newSingleThreadExecutor(); - queue = new ArrayBlockingQueue(1000); - spanProcessingTask = new SpanProcessingTask(queue, producer, metricsHandler); - future = executorService.submit(spanProcessingTask); - } + @AutoValue.Builder + public interface Builder { + /** + * Configuration for Kafka producer. Essential configuration properties are: + * bootstrap.servers, key.serializer, value.serializer. For a full list of config options, see + * http://kafka.apache.org/documentation.html#kafkaPropertiess. + * + *

Must include the following mappings: + */ + Builder kafkaProperties(Properties kafkaProperties); - @Override - public void collect(com.twitter.zipkin.gen.Span span) { - metricsHandler.incrementAcceptedSpans(1); - if (!queue.offer(span)) { - metricsHandler.incrementDroppedSpans(1); - LOGGER.log(Level.WARNING, "Queue rejected span!"); - } - } + /** Default 1 second. 0 implies spans are {@link #flush() flushed} externally. */ + Builder flushInterval(int flushInterval); - @Override - public void addDefaultAnnotation(String key, String value) { - throw new UnsupportedOperationException(); + Config build(); } + } - @Override - public void close() { - spanProcessingTask.stop(); - try { - Integer nrProcessedSpans = future.get(6000, TimeUnit.MILLISECONDS); - LOGGER.info("SpanProcessingTask processed " + nrProcessedSpans + " spans."); - } catch (Exception e) { - LOGGER.log(Level.WARNING, "Exception when waiting for SpanProcessTask to finish.", e); - } - executorService.shutdown(); - producer.close(); - metricsHandler.incrementDroppedSpans(queue.size()); - LOGGER.info("KafkaSpanCollector closed."); - } -} \ No newline at end of file + private final Config config; + private final Producer producer; + private final ThriftCodec thriftCodec = new ThriftCodec(); + + /** + * Create a new instance with default configuration. + * + * @param bootstrapServers A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. + * Like: host1:port1,host2:port2,... Does not to be all the servers part of Kafka cluster. + * @param metrics Gets notified when spans are accepted or dropped. If you are not interested in + * these events you can use {@linkplain EmptySpanCollectorMetricsHandler} + */ + public static KafkaSpanCollector create(String bootstrapServers, SpanCollectorMetricsHandler metrics) { + return new KafkaSpanCollector(Config.builder(bootstrapServers).build(), metrics); + } + + /** + * @param config includes flush interval and kafka properties + * @param metrics Gets notified when spans are accepted or dropped. If you are not interested in + * these events you can use {@linkplain EmptySpanCollectorMetricsHandler} + */ + public static KafkaSpanCollector create(Config config, SpanCollectorMetricsHandler metrics) { + return new KafkaSpanCollector(config, metrics); + } + + // Visible for testing. Ex when tests need to explicitly control flushing, set interval to 0. + KafkaSpanCollector(Config config, SpanCollectorMetricsHandler metrics) { + super(SpanCodec.THRIFT, metrics, config.flushInterval()); + this.config = config; + this.producer = new KafkaProducer<>(config.kafkaProperties()); + } + + @Override + protected void sendSpans(byte[] thrift) throws IOException { + producer.send(new ProducerRecord("zipkin", thrift)); + } + + @Override + public void close() { + producer.close(); + super.close(); + } +} diff --git a/brave-spancollector-kafka/src/main/java/com/github/kristofa/brave/kafka/SpanProcessingTask.java b/brave-spancollector-kafka/src/main/java/com/github/kristofa/brave/kafka/SpanProcessingTask.java deleted file mode 100644 index 4dd8a9eae4..0000000000 --- a/brave-spancollector-kafka/src/main/java/com/github/kristofa/brave/kafka/SpanProcessingTask.java +++ /dev/null @@ -1,58 +0,0 @@ -package com.github.kristofa.brave.kafka; - -import com.github.kristofa.brave.SpanCollectorMetricsHandler; -import com.twitter.zipkin.gen.SpanCodec; -import com.twitter.zipkin.gen.Span; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerRecord; - -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Callable; -import java.util.concurrent.TimeUnit; -import java.util.logging.Level; -import java.util.logging.Logger; - -/** - * Processes spans by sending them, one at a time, to the topic `zipkin`, encoded in {@code TBinaryProtocol}. - *

- *

Note: this class was written to be used by a single-threaded executor, hence it is not thead-safe. - */ -class SpanProcessingTask implements Callable { - - private static final Logger LOGGER = Logger.getLogger(SpanProcessingTask.class.getName()); - private final BlockingQueue queue; - private final Producer producer; - private final SpanCollectorMetricsHandler metricsHandler; - private volatile boolean stop = false; - private int numProcessedSpans = 0; - - - SpanProcessingTask(BlockingQueue queue, Producer producer, SpanCollectorMetricsHandler metricsHandler) { - this.queue = queue; - this.producer = producer; - this.metricsHandler = metricsHandler; - } - - public void stop() { - stop = true; - } - - @Override - public Integer call() throws Exception { - do { - final Span span = queue.poll(5, TimeUnit.SECONDS); - if (span == null) { - continue; - } - try { - final ProducerRecord message = new ProducerRecord<>("zipkin", SpanCodec.THRIFT.writeSpan(span)); - producer.send(message); - numProcessedSpans++; - } catch (RuntimeException e) { - metricsHandler.incrementDroppedSpans(1); - LOGGER.log(Level.WARNING, "RuntimeException when writing span.", e); - } - } while (!stop); - return numProcessedSpans; - } -} diff --git a/brave-spancollector-kafka/src/test/java/com/github/kristofa/brave/kafka/ITKafkaSpanCollector.java b/brave-spancollector-kafka/src/test/java/com/github/kristofa/brave/kafka/ITKafkaSpanCollector.java deleted file mode 100644 index f56f4a46d7..0000000000 --- a/brave-spancollector-kafka/src/test/java/com/github/kristofa/brave/kafka/ITKafkaSpanCollector.java +++ /dev/null @@ -1,114 +0,0 @@ -package com.github.kristofa.brave.kafka; - -import com.github.charithe.kafka.KafkaJunitRule; -import com.github.kristofa.brave.SpanCollectorMetricsHandler; -import com.twitter.zipkin.gen.SpanCodec; -import com.twitter.zipkin.gen.Span; -import kafka.serializer.DefaultDecoder; -import org.junit.Rule; -import org.junit.Test; -import java.util.*; -import java.util.concurrent.*; - -import static org.junit.Assert.assertEquals; - -public class ITKafkaSpanCollector { - - private final EventsHandler metricsHandler = new EventsHandler(); - - private static class EventsHandler implements SpanCollectorMetricsHandler { - - public int acceptedSpans = 0; - public int droppedSpans = 0; - - @Override - public synchronized void incrementAcceptedSpans(int quantity) { - acceptedSpans += quantity; - } - - @Override - public synchronized void incrementDroppedSpans(int quantity) { - droppedSpans += quantity; - } - } - - - @Rule - public KafkaJunitRule kafkaRule = new KafkaJunitRule(); - - @Test - public void submitSingleSpan() throws TimeoutException { - - KafkaSpanCollector kafkaCollector = new KafkaSpanCollector("localhost:"+kafkaRule.kafkaBrokerPort(), metricsHandler); - Span span = span(1l, "test_kafka_span"); - kafkaCollector.collect(span); - kafkaCollector.close(); - - List spans = getCollectedSpans(kafkaRule.readMessages("zipkin", 1, new DefaultDecoder(kafkaRule.consumerConfig().props()))); - assertEquals(1, spans.size()); - assertEquals(span, spans.get(0)); - assertEquals(1, metricsHandler.acceptedSpans); - assertEquals(0, metricsHandler.droppedSpans); - - } - - @Test - public void submitMultipleSpansInParallel() throws InterruptedException, ExecutionException, TimeoutException { - KafkaSpanCollector kafkaCollector = new KafkaSpanCollector("localhost:"+kafkaRule.kafkaBrokerPort(), metricsHandler); - Callable spanProducer1 = new Callable() { - - @Override - public Void call() throws Exception { - for(int i=1; i<=200; i++) - { - kafkaCollector.collect(span(i, "producer1_" + i)); - } - return null; - } - }; - - Callable spanProducer2 = new Callable() { - - @Override - public Void call() throws Exception { - for(int i=1; i<=200; i++) - { - kafkaCollector.collect(span(i, "producer2_"+i)); - } - return null; - } - }; - - ExecutorService executorService = Executors.newFixedThreadPool(2); - Future future1 = executorService.submit(spanProducer1); - Future future2 = executorService.submit(spanProducer2); - - future1.get(2000, TimeUnit.MILLISECONDS); - future2.get(2000, TimeUnit.MILLISECONDS); - - List spans = getCollectedSpans(kafkaRule.readMessages("zipkin", 400, new DefaultDecoder(kafkaRule.consumerConfig().props()))); - assertEquals(400, spans.size()); - assertEquals(400, metricsHandler.acceptedSpans); - assertEquals(0, metricsHandler.droppedSpans); - kafkaCollector.close(); - } - - private List getCollectedSpans(List rawSpans) { - - List spans = new ArrayList<>(); - - for (byte[] rawSpan : rawSpans) { - Span span = SpanCodec.THRIFT.readSpan(rawSpan); - spans.add(span); - } - return spans; - } - - private Span span(long traceId, String spanName) { - final Span span = new Span(); - span.setId(traceId); - span.setTrace_id(traceId); - span.setName(spanName); - return span; - } -} diff --git a/brave-spancollector-kafka/src/test/java/com/github/kristofa/brave/kafka/KafkaSpanCollectorTest.java b/brave-spancollector-kafka/src/test/java/com/github/kristofa/brave/kafka/KafkaSpanCollectorTest.java new file mode 100644 index 0000000000..16b89dc90b --- /dev/null +++ b/brave-spancollector-kafka/src/test/java/com/github/kristofa/brave/kafka/KafkaSpanCollectorTest.java @@ -0,0 +1,154 @@ +package com.github.kristofa.brave.kafka; + +import com.github.charithe.kafka.KafkaJunitRule; +import com.github.kristofa.brave.SpanCollectorMetricsHandler; +import com.github.kristofa.brave.kafka.KafkaSpanCollector.Config; +import com.twitter.zipkin.gen.Span; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import kafka.serializer.DefaultDecoder; +import org.junit.After; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import zipkin.Codec; + +import static org.assertj.core.api.Assertions.assertThat; + +public class KafkaSpanCollectorTest { + + @Rule + public KafkaJunitRule kafka = new KafkaJunitRule(); + @Rule + public ExpectedException thrown = ExpectedException.none(); + + TestMetricsHander metrics = new TestMetricsHander(); + // set flush interval to 0 so that tests can drive flushing explicitly + Config config = Config.builder("localhost:" + kafka.kafkaBrokerPort()).flushInterval(0).build(); + KafkaSpanCollector collector = new KafkaSpanCollector(config, metrics); + + @After + public void closeCollector(){ + collector.close(); + } + + @Test + public void collectDoesntDoIO() throws Exception { + thrown.expect(TimeoutException.class); + collector.collect(span(1L, "foo")); + + assertThat(readMessages()).isEmpty(); + } + + @Test + public void collectIncrementsAcceptedMetrics() throws Exception { + collector.collect(span(1L, "foo")); + + assertThat(metrics.acceptedSpans.get()).isEqualTo(1); + assertThat(metrics.droppedSpans.get()).isZero(); + } + + @Test + public void dropsWhenQueueIsFull() throws Exception { + for (int i = 0; i < 1001; i++) + collector.collect(span(1L, "foo")); + + collector.flush(); // manually flush the spans + + assertThat(Codec.THRIFT.readSpans(readMessages().get(0))).hasSize(1000); + assertThat(metrics.droppedSpans.get()).isEqualTo(1); + } + + @Test + public void sendsSpans() throws Exception { + collector.collect(span(1L, "foo")); + collector.collect(span(2L, "bar")); + + collector.flush(); // manually flush the spans + + // Ensure only one message was sent + List messages = readMessages(); + assertThat(messages).hasSize(1); + + // Now, let's read back the spans we sent! + assertThat(Codec.THRIFT.readSpans(messages.get(0))).containsExactly( + zipkinSpan(1L, "foo"), + zipkinSpan(2L, "bar") + ); + } + + @Test + public void submitMultipleSpansInParallel() throws Exception { + Callable spanProducer1 = new Callable() { + + @Override + public Void call() throws Exception { + for (int i = 1; i <= 200; i++) { + collector.collect(span(i, "producer1_" + i)); + } + return null; + } + }; + + Callable spanProducer2 = new Callable() { + + @Override + public Void call() throws Exception { + for (int i = 1; i <= 200; i++) { + collector.collect(span(i, "producer2_" + i)); + } + return null; + } + }; + + ExecutorService executorService = Executors.newFixedThreadPool(2); + Future future1 = executorService.submit(spanProducer1); + Future future2 = executorService.submit(spanProducer2); + + future1.get(2000, TimeUnit.MILLISECONDS); + future2.get(2000, TimeUnit.MILLISECONDS); + + collector.flush(); // manually flush the spans + + // Ensure only one message was sent + List messages = readMessages(); + assertThat(messages).hasSize(1); + + // Now, let's make sure we read the correct count of spans. + assertThat(Codec.THRIFT.readSpans(messages.get(0))).hasSize(400); + } + + class TestMetricsHander implements SpanCollectorMetricsHandler { + + final AtomicInteger acceptedSpans = new AtomicInteger(); + final AtomicInteger droppedSpans = new AtomicInteger(); + + @Override + public void incrementAcceptedSpans(int quantity) { + acceptedSpans.addAndGet(quantity); + } + + @Override + public void incrementDroppedSpans(int quantity) { + droppedSpans.addAndGet(quantity); + } + } + + static Span span(long traceId, String spanName) { + return new Span().setTrace_id(traceId).setId(traceId).setName(spanName); + } + + static zipkin.Span zipkinSpan(long traceId, String spanName) { + return new zipkin.Span.Builder().traceId(traceId).id(traceId).name(spanName).build(); + } + + private List readMessages() throws TimeoutException { + return kafka.readMessages("zipkin", 1, new DefaultDecoder(kafka.consumerConfig().props())); + } +}