Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds support for json encoding, notably zipkin2 format #48

Merged
merged 1 commit into from
Aug 13, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions aws-junit/src/main/java/zipkin/junit/aws/AmazonSQSRule.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2016 The OpenZipkin Authors
* Copyright 2016-2017 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand Down Expand Up @@ -30,6 +30,7 @@
import org.junit.rules.ExternalResource;
import zipkin.Codec;
import zipkin.Span;
import zipkin.SpanDecoder;

import static java.util.Collections.singletonList;

Expand Down Expand Up @@ -136,7 +137,6 @@ private void sendSpansInternal(List<Span> spans) {

private static List<Span> fromBase64(String base64) {
byte[] bytes = Base64.decode(base64);
return Codec.THRIFT.readSpans(bytes);
return SpanDecoder.DETECTING_DECODER.readSpans(bytes);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this works with all the formats and prevents us from needing to access internal classes (for the not yet released span2 type)

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,11 @@
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
import com.amazonaws.services.kinesis.model.Record;
import java.util.ArrayList;
import java.util.List;
import zipkin.Codec;
import zipkin.collector.Collector;
import zipkin.internal.Nullable;
import zipkin.storage.Callback;

import static zipkin.SpanDecoder.DETECTING_DECODER;

public class KinesisSpanProcessor implements IRecordProcessor {

private final Collector collector;
Expand All @@ -40,7 +38,7 @@ public void initialize(InitializationInput initializationInput) {
@Override
public void processRecords(ProcessRecordsInput processRecordsInput) {
for (Record record : processRecordsInput.getRecords()) {
collector.acceptSpans(record.getData().array(), Codec.THRIFT, Callback.NOOP);
collector.acceptSpans(record.getData().array(), DETECTING_DECODER, Callback.NOOP);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.logging.Logger;
import zipkin.Codec;
import zipkin.Component;
import zipkin.collector.Collector;
import zipkin.internal.Nullable;
import zipkin.storage.Callback;

import static zipkin.SpanDecoder.DETECTING_DECODER;

final class SQSSpanProcessor implements Runnable, Component {

Expand Down Expand Up @@ -101,7 +101,7 @@ private void process(final List<Message> messages) {
final String deleteId = String.valueOf(count++);
try {
byte[] spans = Base64.decode(message.getBody());
collector.acceptSpans(spans, Codec.THRIFT, new Callback<Void>() {
collector.acceptSpans(spans, DETECTING_DECODER, new Callback<Void>() {
@Override public void onSuccess(@Nullable Void value) {
toDelete.add(new DeleteMessageBatchRequestEntry(deleteId, message.getReceiptHandle()));
}
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@

<license-maven-plugin.version>2.11</license-maven-plugin.version>

<zipkin.version>1.29.4</zipkin.version>
<zipkin.version>1.30.1</zipkin.version>
<zipkin-reporter.version>1.0.1</zipkin-reporter.version>

<slf4j.version>1.7.25</slf4j.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public static KinesisSender create(String streamName) {
public static Builder builder() {
return new AutoValue_KinesisSender.Builder()
.credentialsProvider(new DefaultAWSCredentialsProviderChain())
.encoding(Encoding.THRIFT)
.messageMaxBytes(1024 * 1024); // 1MB Kinesis limit.
}

Expand All @@ -68,6 +69,9 @@ public interface Builder {
/** Maximum size of a message. Kinesis max message size is 1MB */
Builder messageMaxBytes(int messageMaxBytes);

/** Allows you to change to json format. Default is thrift */
Builder encoding(Encoding encoding);
Copy link
Member Author

@codefromthecrypt codefromthecrypt Aug 12, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: "encoding" is thrift or json, the "encoder" says how to encode in thrift or json. So, this automatically works with any format, span1 or span2


KinesisSender build();
}

Expand All @@ -83,6 +87,8 @@ public interface Builder {
@Nullable
abstract EndpointConfiguration endpointConfiguration();

abstract Builder toBuilder();

private final AtomicBoolean closeCalled = new AtomicBoolean(false);

private final AtomicReference<String> partitionKey = new AtomicReference<>("");
Expand Down Expand Up @@ -128,14 +134,9 @@ protected AmazonKinesisAsync compute() {
return builder.build();
}

@Override
public Encoding encoding() {
return Encoding.THRIFT;
}

@Override
public int messageSizeInBytes(List<byte[]> list) {
return Encoding.THRIFT.listSizeInBytes(list);
return encoding().listSizeInBytes(list);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,18 @@
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import zipkin.Codec;
import zipkin.Component;
import zipkin.Span;
import zipkin.SpanDecoder;
import zipkin.TestObjects;
import zipkin.internal.ApplyTimestampAndDuration;
import zipkin.internal.Span2Codec;
import zipkin.internal.Span2Converter;
import zipkin.reporter.Encoder;
import zipkin.reporter.Encoding;
import zipkin.reporter.internal.AwaitableCallback;

import static java.util.Arrays.asList;
import static java.util.stream.Collectors.toList;
import static org.assertj.core.api.Assertions.assertThat;

Expand All @@ -58,14 +63,46 @@ public void setup() throws Exception {
}

@Test
public void sendsSpans() throws Exception {
server.enqueue(new MockResponse());
public void sendsSpans_thrift() throws Exception {
sendsSpans(Encoder.THRIFT);
}

@Test
public void sendsSpans_json() throws Exception {
sender.close();
sender = sender.toBuilder().encoding(Encoding.JSON).build();
sendsSpans(Encoder.JSON);
}

send(TestObjects.TRACE);
@Test
public void sendsSpans_json2() throws Exception {
sender.close();
sender = sender.toBuilder().encoding(Encoding.JSON).build();

// temporary span2 encoder until the type is made public
sendsSpans(new Encoder<Span>() {
@Override public Encoding encoding() {
return Encoding.JSON;
}

@Override public byte[] encode(Span span) {
return Span2Codec.JSON.writeSpan(Span2Converter.fromSpan(span).get(0));
}
});
}

void sendsSpans(Encoder<Span> encoder) throws InterruptedException, IOException {
server.enqueue(new MockResponse());
List<Span> spans = asList( // ensure nothing is lost in translation
ApplyTimestampAndDuration.apply(TestObjects.LOTS_OF_SPANS[0]),
ApplyTimestampAndDuration.apply(TestObjects.LOTS_OF_SPANS[1]),
ApplyTimestampAndDuration.apply(TestObjects.LOTS_OF_SPANS[2])
);
send(encoder, spans);

RecordedRequest request = server.takeRequest();
assertThat(extractSpans(request.getBody()))
.isEqualTo(TestObjects.TRACE);
.isEqualTo(spans);
}

@Test
Expand Down Expand Up @@ -111,13 +148,13 @@ void enqueueCborResponse(JsonNode document) throws JsonProcessingException {
}

List<Span> extractSpans(Buffer body) throws IOException {
byte[] thriftEncodedSpans = mapper.readTree(body.inputStream()).get("Data").binaryValue();
return Codec.THRIFT.readSpans(thriftEncodedSpans);
byte[] encodedSpans = mapper.readTree(body.inputStream()).get("Data").binaryValue();
return SpanDecoder.DETECTING_DECODER.readSpans(encodedSpans);
}

void send(List<Span> spans) {
<S> void send(Encoder<S> encoder, List<S> spans) {
AwaitableCallback callback = new AwaitableCallback();
sender.sendSpans(spans.stream().map(Encoder.THRIFT::encode).collect(toList()), callback);
sender.sendSpans(spans.stream().map(encoder::encode).collect(toList()), callback);
callback.await();
}
}
8 changes: 4 additions & 4 deletions sender-sqs/src/main/java/zipkin/reporter/sqs/SQSSender.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public static SQSSender create(String url) {
public static Builder builder() {
return new AutoValue_SQSSender.Builder()
.credentialsProvider(new DefaultAWSCredentialsProviderChain())
.encoding(Encoding.THRIFT)
.messageMaxBytes(256 * 1024); // 256KB SQS limit.
}

Expand All @@ -75,6 +76,9 @@ public interface Builder {
/** Maximum size of a message. SQS max message size is 256KB including attributes. */
Builder messageMaxBytes(int messageMaxBytes);

/** Allows you to change to json format. Default is thrift */
Builder encoding(Encoding encoding);

SQSSender build();
}

Expand Down Expand Up @@ -105,10 +109,6 @@ public interface Builder {
return (listSize + 2) * 4 / 3; // account for base64 encoding
}

@Override public Encoding encoding() {
return Encoding.THRIFT;
}

@Override public void sendSpans(List<byte[]> list, Callback callback) {
if (closeCalled.get()) throw new IllegalStateException("closed");

Expand Down
48 changes: 43 additions & 5 deletions sender-sqs/src/test/java/zipkin/reporter/sqs/SQSSenderTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,15 @@
import zipkin.Component;
import zipkin.Span;
import zipkin.TestObjects;
import zipkin.internal.ApplyTimestampAndDuration;
import zipkin.internal.Span2Codec;
import zipkin.internal.Span2Converter;
import zipkin.junit.aws.AmazonSQSRule;
import zipkin.reporter.Encoder;
import zipkin.reporter.Encoding;
import zipkin.reporter.internal.AwaitableCallback;

import static java.util.Arrays.asList;
import static java.util.stream.Collectors.toList;
import static org.assertj.core.api.Assertions.assertThat;

Expand All @@ -44,13 +49,46 @@ public class SQSSenderTest {
.build();

@Test
public void sendsSpans() throws Exception {
send(TestObjects.TRACE);
public void sendsSpans_thrift() throws Exception {
sendSpans(Encoder.THRIFT);
}

@Test
public void sendsSpans_json() throws Exception {
sender.close();
sender = sender.toBuilder().encoding(Encoding.JSON).build();
sendSpans(Encoder.JSON);
}

@Test
public void sendsSpans_json2() throws Exception {
sender.close();
sender = sender.toBuilder().encoding(Encoding.JSON).build();

// temporary span2 encoder until the type is made public
sendSpans(new Encoder<Span>() {
@Override public Encoding encoding() {
return Encoding.JSON;
}

@Override public byte[] encode(Span span) {
return Span2Codec.JSON.writeSpan(Span2Converter.fromSpan(span).get(0));
}
});
}

void sendSpans(Encoder<Span> encoder) {
List<Span> spans = asList( // ensure nothing is lost in translation
ApplyTimestampAndDuration.apply(TestObjects.LOTS_OF_SPANS[0]),
ApplyTimestampAndDuration.apply(TestObjects.LOTS_OF_SPANS[1]),
ApplyTimestampAndDuration.apply(TestObjects.LOTS_OF_SPANS[2])
);
send(encoder, spans);

assertThat(sqsRule.queueCount()).isEqualTo(1);

List<Span> traces = sqsRule.getSpans();
List<Span> expected = TestObjects.TRACE;
List<Span> expected = spans;

assertThat(traces.size()).isEqualTo(expected.size());
assertThat(traces).isEqualTo(expected);
Expand All @@ -61,9 +99,9 @@ public void checkOk() throws Exception {
assertThat(sender.check()).isEqualTo(Component.CheckResult.OK);
}

private void send(List<Span> spans) {
<S> void send(Encoder<S> encoder, List<S> spans) {
AwaitableCallback callback = new AwaitableCallback();
sender.sendSpans(spans.stream().map(Encoder.THRIFT::encode).collect(toList()), callback);
sender.sendSpans(spans.stream().map(encoder::encode).collect(toList()), callback);
callback.await();
}
}