Skip to content

Commit

Permalink
Accepts Zipkin v2 Span format in all current transports (openzipkin#1684
Browse files Browse the repository at this point in the history
)

This accepts the json format from openzipkin#1499 on current transports. It does
so by generalizing format detection from the two Kafka libraries, and
a new `SpanDecoder` interface. Types are still internal, but this allows
us to proceed with other work in openzipkin#1644, including implementing reporters
in any language.

Concretely, you can send a json list of span2 format as a Kafka or Http
message. If using http, use the /api/v2/spans endpoint like so:

```bash
$ curl -X POST -s localhost:9411/api/v2/spans -H'Content-Type: application/json' -d'[{
  "timestamp_millis": 1502101460678,
  "traceId": "9032b04972e475c5",
  "id": "9032b04972e475c5",
  "kind": "SERVER",
  "name": "get",
  "timestamp": 1502101460678880,
  "duration": 612898,
  "localEndpoint": {
    "serviceName": "brave-webmvc-example",
    "ipv4": "192.168.1.113"
  },
  "remoteEndpoint": {
    "serviceName": "",
    "ipv4": "127.0.0.1",
    "port": 60149
  },
  "tags": {
    "error": "500 Internal Server Error",
    "http.path": "/a"
  }
}]'
```
  • Loading branch information
adriancole authored and abesto committed Sep 10, 2019
1 parent e9135aa commit c0f68f2
Show file tree
Hide file tree
Showing 14 changed files with 391 additions and 107 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2015-2016 The OpenZipkin Authors
* Copyright 2015-2017 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand All @@ -13,10 +13,8 @@
*/
package zipkin.collector.kafka;

import java.util.Collections;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import zipkin.Codec;
import zipkin.collector.Collector;
import zipkin.collector.CollectorMetrics;

Expand Down Expand Up @@ -47,24 +45,7 @@ public void run() {
continue;
}

// In TBinaryProtocol encoding, the first byte is the TType, in a range 0-16
// .. If the first byte isn't in that range, it isn't a thrift.
//
// When byte(0) == '[' (91), assume it is a list of json-encoded spans
//
// When byte(0) <= 16, assume it is a TBinaryProtocol-encoded thrift
// .. When serializing a Span (Struct), the first byte will be the type of a field
// .. When serializing a List[ThriftSpan], the first byte is the member type, TType.STRUCT(12)
// .. As ThriftSpan has no STRUCT fields: so, if the first byte is TType.STRUCT(12), it is a list.
if (bytes[0] == '[') {
collector.acceptSpans(bytes, Codec.JSON, NOOP);
} else {
if (bytes[0] == 12 /* TType.STRUCT */) {
collector.acceptSpans(bytes, Codec.THRIFT, NOOP);
} else {
collector.acceptSpans(Collections.singletonList(bytes), Codec.THRIFT, NOOP);
}
}
collector.acceptSpans(bytes, NOOP);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2015-2016 The OpenZipkin Authors
* Copyright 2015-2017 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand All @@ -13,6 +13,7 @@
*/
package zipkin.collector.kafka;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -29,12 +30,16 @@
import zipkin.TestObjects;
import zipkin.collector.InMemoryCollectorMetrics;
import zipkin.collector.kafka.KafkaCollector.Builder;
import zipkin.internal.ApplyTimestampAndDuration;
import zipkin.internal.Span2Codec;
import zipkin.internal.Span2Converter;
import zipkin.storage.AsyncSpanConsumer;
import zipkin.storage.AsyncSpanStore;
import zipkin.storage.SpanStore;
import zipkin.storage.StorageComponent;

import static org.assertj.core.api.Assertions.assertThat;
import static zipkin.TestObjects.LOTS_OF_SPANS;
import static zipkin.TestObjects.TRACE;

public class KafkaCollectorTest {
Expand Down Expand Up @@ -130,6 +135,32 @@ public void messageWithMultipleSpans_json() throws Exception {
assertThat(kafkaMetrics.spans()).isEqualTo(TestObjects.TRACE.size());
}

/** Ensures list encoding works: a version 2 json encoded list of spans */
@Test
public void messageWithMultipleSpans_json2() throws Exception {
Builder builder = builder("multiple_spans_json2");

List<Span> spans = Arrays.asList(
ApplyTimestampAndDuration.apply(LOTS_OF_SPANS[0]),
ApplyTimestampAndDuration.apply(LOTS_OF_SPANS[1])
);

byte[] bytes = Span2Codec.JSON.writeSpans(Arrays.asList(
Span2Converter.fromSpan(spans.get(0)).get(0),
Span2Converter.fromSpan(spans.get(1)).get(0)
));

producer.send(new KeyedMessage<>(builder.topic, bytes));

try (KafkaCollector collector = newKafkaTransport(builder, consumer)) {
assertThat(recvdSpans.take()).containsAll(spans);
}

assertThat(kafkaMetrics.messages()).isEqualTo(1);
assertThat(kafkaMetrics.bytes()).isEqualTo(bytes.length);
assertThat(kafkaMetrics.spans()).isEqualTo(spans.size());
}

/** Ensures malformed spans don't hang the collector */
@Test
public void skipsMalformedData() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import zipkin.Codec;
import zipkin.collector.Collector;
import zipkin.collector.CollectorMetrics;

Expand Down Expand Up @@ -74,24 +73,7 @@ public void run() {
if (bytes.length == 0) {
metrics.incrementMessagesDropped();
} else {
// In TBinaryProtocol encoding, the first byte is the TType, in a range 0-16
// .. If the first byte isn't in that range, it isn't a thrift.
//
// When byte(0) == '[' (91), assume it is a list of json-encoded spans
//
// When byte(0) <= 16, assume it is a TBinaryProtocol-encoded thrift
// .. When serializing a Span (Struct), the first byte will be the type of a field
// .. When serializing a List[ThriftSpan], the first byte is the member type, TType.STRUCT(12)
// .. As ThriftSpan has no STRUCT fields: so, if the first byte is TType.STRUCT(12), it is a list.
if (bytes[0] == '[') {
collector.acceptSpans(bytes, Codec.JSON, NOOP);
} else {
if (bytes[0] == 12 /* TType.STRUCT */) {
collector.acceptSpans(bytes, Codec.THRIFT, NOOP);
} else {
collector.acceptSpans(Collections.singletonList(bytes), Codec.THRIFT, NOOP);
}
}
collector.acceptSpans(bytes, NOOP);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.github.charithe.kafka.EphemeralKafkaBroker;
import com.github.charithe.kafka.KafkaJunitRule;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CopyOnWriteArraySet;
Expand All @@ -36,12 +37,16 @@
import zipkin.Span;
import zipkin.collector.InMemoryCollectorMetrics;
import zipkin.collector.kafka10.KafkaCollector.Builder;
import zipkin.internal.ApplyTimestampAndDuration;
import zipkin.internal.Span2Codec;
import zipkin.internal.Span2Converter;
import zipkin.storage.AsyncSpanConsumer;
import zipkin.storage.AsyncSpanStore;
import zipkin.storage.SpanStore;
import zipkin.storage.StorageComponent;

import static org.assertj.core.api.Assertions.assertThat;
import static zipkin.TestObjects.LOTS_OF_SPANS;
import static zipkin.TestObjects.TRACE;

public class KafkaCollectorTest {
Expand Down Expand Up @@ -184,6 +189,33 @@ public void messageWithMultipleSpans_json() throws Exception {
assertThat(kafkaMetrics.spans()).isEqualTo(TRACE.size());
}

/** Ensures list encoding works: a version 2 json encoded list of spans */
@Test
public void messageWithMultipleSpans_json2() throws Exception {
Builder builder = builder("multiple_spans_json2");

List<Span> spans = Arrays.asList(
ApplyTimestampAndDuration.apply(LOTS_OF_SPANS[0]),
ApplyTimestampAndDuration.apply(LOTS_OF_SPANS[1])
);

byte[] bytes = Span2Codec.JSON.writeSpans(Arrays.asList(
Span2Converter.fromSpan(spans.get(0)).get(0),
Span2Converter.fromSpan(spans.get(1)).get(0)
));

produceSpans(bytes, builder.topic);

try (KafkaCollector collector = builder.build()) {
collector.start();
assertThat(receivedSpans.take()).containsAll(spans);
}

assertThat(kafkaMetrics.messages()).isEqualTo(1);
assertThat(kafkaMetrics.bytes()).isEqualTo(bytes.length);
assertThat(kafkaMetrics.spans()).isEqualTo(spans.size());
}

/** Ensures malformed spans don't hang the collector */
@Test
public void skipsMalformedData() throws Exception {
Expand Down
70 changes: 41 additions & 29 deletions zipkin-junit/src/main/java/zipkin/junit/ZipkinDispatcher.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2015-2016 The OpenZipkin Authors
* Copyright 2015-2017 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand All @@ -25,8 +25,10 @@
import zipkin.Codec;
import zipkin.DependencyLink;
import zipkin.Span;
import zipkin.SpanDecoder;
import zipkin.collector.Collector;
import zipkin.collector.CollectorMetrics;
import zipkin.internal.Span2JsonDecoder;
import zipkin.storage.Callback;
import zipkin.storage.QueryRequest;
import zipkin.storage.SpanStore;
Expand All @@ -35,6 +37,8 @@
import static zipkin.internal.Util.lowerHexToUnsignedLong;

final class ZipkinDispatcher extends Dispatcher {
static final SpanDecoder JSON2_DECODER = new Span2JsonDecoder();

private final SpanStore store;
private final Collector consumer;
private final CollectorMetrics metrics;
Expand Down Expand Up @@ -77,42 +81,50 @@ public MockResponse dispatch(RecordedRequest request) {
}
} else if (request.getMethod().equals("POST")) {
if (url.encodedPath().equals("/api/v1/spans")) {
metrics.incrementMessages();
byte[] body = request.getBody().readByteArray();
String encoding = request.getHeader("Content-Encoding");
if (encoding != null && encoding.contains("gzip")) {
try {
Buffer result = new Buffer();
GzipSource source = new GzipSource(new Buffer().write(body));
while (source.read(result, Integer.MAX_VALUE) != -1) ;
body = result.readByteArray();
} catch (IOException e) {
metrics.incrementMessagesDropped();
return new MockResponse().setResponseCode(400).setBody("Cannot gunzip spans");
}
}
String type = request.getHeader("Content-Type");
Codec codec = type != null && type.contains("/x-thrift") ? Codec.THRIFT : Codec.JSON;

final MockResponse result = new MockResponse();
consumer.acceptSpans(body, codec, new Callback<Void>() {
@Override public void onSuccess(Void value) {
result.setResponseCode(202);
}

@Override public void onError(Throwable t) {
String message = t.getMessage();
result.setBody(message).setResponseCode(message.startsWith("Cannot store") ? 500 : 400);
}
});
return result;
SpanDecoder decoder = type != null && type.contains("/x-thrift")
? SpanDecoder.THRIFT_DECODER
: SpanDecoder.JSON_DECODER;
return acceptSpans(request, decoder);
} else if (url.encodedPath().equals("/api/v2/spans")) {
return acceptSpans(request, JSON2_DECODER);
}
} else { // unsupported method
return new MockResponse().setResponseCode(405);
}
return new MockResponse().setResponseCode(404);
}

MockResponse acceptSpans(RecordedRequest request, SpanDecoder decoder) {
metrics.incrementMessages();
byte[] body = request.getBody().readByteArray();
String encoding = request.getHeader("Content-Encoding");
if (encoding != null && encoding.contains("gzip")) {
try {
Buffer result = new Buffer();
GzipSource source = new GzipSource(new Buffer().write(body));
while (source.read(result, Integer.MAX_VALUE) != -1) ;
body = result.readByteArray();
} catch (IOException e) {
metrics.incrementMessagesDropped();
return new MockResponse().setResponseCode(400).setBody("Cannot gunzip spans");
}
}

final MockResponse result = new MockResponse();
consumer.acceptSpans(body, decoder, new Callback<Void>() {
@Override public void onSuccess(Void value) {
result.setResponseCode(202);
}

@Override public void onError(Throwable t) {
String message = t.getMessage();
result.setBody(message).setResponseCode(message.startsWith("Cannot store") ? 500 : 400);
}
});
return result;
}

static QueryRequest toQueryRequest(HttpUrl url) {
return QueryRequest.builder().serviceName(url.queryParameter("serviceName"))
.spanName(url.queryParameter("spanName"))
Expand Down
27 changes: 27 additions & 0 deletions zipkin-junit/src/test/java/zipkin/junit/ZipkinRuleTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
import zipkin.Annotation;
import zipkin.Codec;
import zipkin.Span;
import zipkin.internal.ApplyTimestampAndDuration;
import zipkin.internal.Span2Codec;
import zipkin.internal.Span2Converter;

import static java.lang.String.format;
import static java.util.Arrays.asList;
Expand Down Expand Up @@ -58,6 +61,30 @@ public void getTraces_storedViaPost() throws IOException {
.containsOnly(trace);
}

@Test
public void getTraces_storedViaPostVersion2() throws IOException {
List<Span> spans = Arrays.asList(
ApplyTimestampAndDuration.apply(LOTS_OF_SPANS[0]),
ApplyTimestampAndDuration.apply(LOTS_OF_SPANS[1])
);

byte[] bytes = Span2Codec.JSON.writeSpans(Arrays.asList(
Span2Converter.fromSpan(spans.get(0)).get(0),
Span2Converter.fromSpan(spans.get(1)).get(0)
));

// write the span to the zipkin using http api v2
Response response = client.newCall(new Request.Builder()
.url(zipkin.httpUrl() + "/api/v2/spans")
.post(RequestBody.create(MediaType.parse("application/json"), bytes)).build()
).execute();
assertThat(response.code()).isEqualTo(202);

// read the traces directly
assertThat(zipkin.getTraces())
.containsOnly(asList(spans.get(0)), asList(spans.get(1)));
}

/** The rule is here to help debugging. Even partial spans should be returned */
@Test
public void getTraces_whenMissingTimestamps() throws IOException {
Expand Down
Loading

0 comments on commit c0f68f2

Please sign in to comment.