Skip to content

Commit

Permalink
Adds support for json encoding, notably zipkin2 format (#48)
Browse files Browse the repository at this point in the history
Before the AWS transports only supported thrift encoding. This adds json
support, which will notably be used for zipkin2 format.

This uses the recently released detecting decoder to support the new
Span2 format defined in openzipkin/zipkin#1499
  • Loading branch information
adriancole authored Aug 13, 2017
1 parent 733c277 commit 1ec3daf
Show file tree
Hide file tree
Showing 8 changed files with 109 additions and 35 deletions.
6 changes: 3 additions & 3 deletions aws-junit/src/main/java/zipkin/junit/aws/AmazonSQSRule.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2016 The OpenZipkin Authors
* Copyright 2016-2017 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand Down Expand Up @@ -30,6 +30,7 @@
import org.junit.rules.ExternalResource;
import zipkin.Codec;
import zipkin.Span;
import zipkin.SpanDecoder;

import static java.util.Collections.singletonList;

Expand Down Expand Up @@ -136,7 +137,6 @@ private void sendSpansInternal(List<Span> spans) {

private static List<Span> fromBase64(String base64) {
byte[] bytes = Base64.decode(base64);
return Codec.THRIFT.readSpans(bytes);
return SpanDecoder.DETECTING_DECODER.readSpans(bytes);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,11 @@
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
import com.amazonaws.services.kinesis.model.Record;
import java.util.ArrayList;
import java.util.List;
import zipkin.Codec;
import zipkin.collector.Collector;
import zipkin.internal.Nullable;
import zipkin.storage.Callback;

import static zipkin.SpanDecoder.DETECTING_DECODER;

public class KinesisSpanProcessor implements IRecordProcessor {

private final Collector collector;
Expand All @@ -40,7 +38,7 @@ public void initialize(InitializationInput initializationInput) {
@Override
public void processRecords(ProcessRecordsInput processRecordsInput) {
for (Record record : processRecordsInput.getRecords()) {
collector.acceptSpans(record.getData().array(), Codec.THRIFT, Callback.NOOP);
collector.acceptSpans(record.getData().array(), DETECTING_DECODER, Callback.NOOP);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.logging.Logger;
import zipkin.Codec;
import zipkin.Component;
import zipkin.collector.Collector;
import zipkin.internal.Nullable;
import zipkin.storage.Callback;

import static zipkin.SpanDecoder.DETECTING_DECODER;

final class SQSSpanProcessor implements Runnable, Component {

Expand Down Expand Up @@ -101,7 +101,7 @@ private void process(final List<Message> messages) {
final String deleteId = String.valueOf(count++);
try {
byte[] spans = Base64.decode(message.getBody());
collector.acceptSpans(spans, Codec.THRIFT, new Callback<Void>() {
collector.acceptSpans(spans, DETECTING_DECODER, new Callback<Void>() {
@Override public void onSuccess(@Nullable Void value) {
toDelete.add(new DeleteMessageBatchRequestEntry(deleteId, message.getReceiptHandle()));
}
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@

<license-maven-plugin.version>2.11</license-maven-plugin.version>

<zipkin.version>1.29.4</zipkin.version>
<zipkin.version>1.30.1</zipkin.version>
<zipkin-reporter.version>1.0.1</zipkin-reporter.version>

<slf4j.version>1.7.25</slf4j.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public static KinesisSender create(String streamName) {
public static Builder builder() {
return new AutoValue_KinesisSender.Builder()
.credentialsProvider(new DefaultAWSCredentialsProviderChain())
.encoding(Encoding.THRIFT)
.messageMaxBytes(1024 * 1024); // 1MB Kinesis limit.
}

Expand All @@ -68,6 +69,9 @@ public interface Builder {
/** Maximum size of a message. Kinesis max message size is 1MB */
Builder messageMaxBytes(int messageMaxBytes);

/** Allows you to change to json format. Default is thrift */
Builder encoding(Encoding encoding);

KinesisSender build();
}

Expand All @@ -83,6 +87,8 @@ public interface Builder {
@Nullable
abstract EndpointConfiguration endpointConfiguration();

abstract Builder toBuilder();

private final AtomicBoolean closeCalled = new AtomicBoolean(false);

private final AtomicReference<String> partitionKey = new AtomicReference<>("");
Expand Down Expand Up @@ -128,14 +134,9 @@ protected AmazonKinesisAsync compute() {
return builder.build();
}

@Override
public Encoding encoding() {
return Encoding.THRIFT;
}

@Override
public int messageSizeInBytes(List<byte[]> list) {
return Encoding.THRIFT.listSizeInBytes(list);
return encoding().listSizeInBytes(list);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,18 @@
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import zipkin.Codec;
import zipkin.Component;
import zipkin.Span;
import zipkin.SpanDecoder;
import zipkin.TestObjects;
import zipkin.internal.ApplyTimestampAndDuration;
import zipkin.internal.Span2Codec;
import zipkin.internal.Span2Converter;
import zipkin.reporter.Encoder;
import zipkin.reporter.Encoding;
import zipkin.reporter.internal.AwaitableCallback;

import static java.util.Arrays.asList;
import static java.util.stream.Collectors.toList;
import static org.assertj.core.api.Assertions.assertThat;

Expand All @@ -58,14 +63,46 @@ public void setup() throws Exception {
}

@Test
public void sendsSpans() throws Exception {
server.enqueue(new MockResponse());
public void sendsSpans_thrift() throws Exception {
sendsSpans(Encoder.THRIFT);
}

@Test
public void sendsSpans_json() throws Exception {
sender.close();
sender = sender.toBuilder().encoding(Encoding.JSON).build();
sendsSpans(Encoder.JSON);
}

send(TestObjects.TRACE);
@Test
public void sendsSpans_json2() throws Exception {
sender.close();
sender = sender.toBuilder().encoding(Encoding.JSON).build();

// temporary span2 encoder until the type is made public
sendsSpans(new Encoder<Span>() {
@Override public Encoding encoding() {
return Encoding.JSON;
}

@Override public byte[] encode(Span span) {
return Span2Codec.JSON.writeSpan(Span2Converter.fromSpan(span).get(0));
}
});
}

void sendsSpans(Encoder<Span> encoder) throws InterruptedException, IOException {
server.enqueue(new MockResponse());
List<Span> spans = asList( // ensure nothing is lost in translation
ApplyTimestampAndDuration.apply(TestObjects.LOTS_OF_SPANS[0]),
ApplyTimestampAndDuration.apply(TestObjects.LOTS_OF_SPANS[1]),
ApplyTimestampAndDuration.apply(TestObjects.LOTS_OF_SPANS[2])
);
send(encoder, spans);

RecordedRequest request = server.takeRequest();
assertThat(extractSpans(request.getBody()))
.isEqualTo(TestObjects.TRACE);
.isEqualTo(spans);
}

@Test
Expand Down Expand Up @@ -111,13 +148,13 @@ void enqueueCborResponse(JsonNode document) throws JsonProcessingException {
}

List<Span> extractSpans(Buffer body) throws IOException {
byte[] thriftEncodedSpans = mapper.readTree(body.inputStream()).get("Data").binaryValue();
return Codec.THRIFT.readSpans(thriftEncodedSpans);
byte[] encodedSpans = mapper.readTree(body.inputStream()).get("Data").binaryValue();
return SpanDecoder.DETECTING_DECODER.readSpans(encodedSpans);
}

void send(List<Span> spans) {
<S> void send(Encoder<S> encoder, List<S> spans) {
AwaitableCallback callback = new AwaitableCallback();
sender.sendSpans(spans.stream().map(Encoder.THRIFT::encode).collect(toList()), callback);
sender.sendSpans(spans.stream().map(encoder::encode).collect(toList()), callback);
callback.await();
}
}
8 changes: 4 additions & 4 deletions sender-sqs/src/main/java/zipkin/reporter/sqs/SQSSender.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public static SQSSender create(String url) {
public static Builder builder() {
return new AutoValue_SQSSender.Builder()
.credentialsProvider(new DefaultAWSCredentialsProviderChain())
.encoding(Encoding.THRIFT)
.messageMaxBytes(256 * 1024); // 256KB SQS limit.
}

Expand All @@ -75,6 +76,9 @@ public interface Builder {
/** Maximum size of a message. SQS max message size is 256KB including attributes. */
Builder messageMaxBytes(int messageMaxBytes);

/** Allows you to change to json format. Default is thrift */
Builder encoding(Encoding encoding);

SQSSender build();
}

Expand Down Expand Up @@ -105,10 +109,6 @@ public interface Builder {
return (listSize + 2) * 4 / 3; // account for base64 encoding
}

@Override public Encoding encoding() {
return Encoding.THRIFT;
}

@Override public void sendSpans(List<byte[]> list, Callback callback) {
if (closeCalled.get()) throw new IllegalStateException("closed");

Expand Down
48 changes: 43 additions & 5 deletions sender-sqs/src/test/java/zipkin/reporter/sqs/SQSSenderTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,15 @@
import zipkin.Component;
import zipkin.Span;
import zipkin.TestObjects;
import zipkin.internal.ApplyTimestampAndDuration;
import zipkin.internal.Span2Codec;
import zipkin.internal.Span2Converter;
import zipkin.junit.aws.AmazonSQSRule;
import zipkin.reporter.Encoder;
import zipkin.reporter.Encoding;
import zipkin.reporter.internal.AwaitableCallback;

import static java.util.Arrays.asList;
import static java.util.stream.Collectors.toList;
import static org.assertj.core.api.Assertions.assertThat;

Expand All @@ -44,13 +49,46 @@ public class SQSSenderTest {
.build();

@Test
public void sendsSpans() throws Exception {
send(TestObjects.TRACE);
public void sendsSpans_thrift() throws Exception {
sendSpans(Encoder.THRIFT);
}

@Test
public void sendsSpans_json() throws Exception {
sender.close();
sender = sender.toBuilder().encoding(Encoding.JSON).build();
sendSpans(Encoder.JSON);
}

@Test
public void sendsSpans_json2() throws Exception {
sender.close();
sender = sender.toBuilder().encoding(Encoding.JSON).build();

// temporary span2 encoder until the type is made public
sendSpans(new Encoder<Span>() {
@Override public Encoding encoding() {
return Encoding.JSON;
}

@Override public byte[] encode(Span span) {
return Span2Codec.JSON.writeSpan(Span2Converter.fromSpan(span).get(0));
}
});
}

void sendSpans(Encoder<Span> encoder) {
List<Span> spans = asList( // ensure nothing is lost in translation
ApplyTimestampAndDuration.apply(TestObjects.LOTS_OF_SPANS[0]),
ApplyTimestampAndDuration.apply(TestObjects.LOTS_OF_SPANS[1]),
ApplyTimestampAndDuration.apply(TestObjects.LOTS_OF_SPANS[2])
);
send(encoder, spans);

assertThat(sqsRule.queueCount()).isEqualTo(1);

List<Span> traces = sqsRule.getSpans();
List<Span> expected = TestObjects.TRACE;
List<Span> expected = spans;

assertThat(traces.size()).isEqualTo(expected.size());
assertThat(traces).isEqualTo(expected);
Expand All @@ -61,9 +99,9 @@ public void checkOk() throws Exception {
assertThat(sender.check()).isEqualTo(Component.CheckResult.OK);
}

private void send(List<Span> spans) {
<S> void send(Encoder<S> encoder, List<S> spans) {
AwaitableCallback callback = new AwaitableCallback();
sender.sendSpans(spans.stream().map(Encoder.THRIFT::encode).collect(toList()), callback);
sender.sendSpans(spans.stream().map(encoder::encode).collect(toList()), callback);
callback.await();
}
}

0 comments on commit 1ec3daf

Please sign in to comment.