Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Makes SpanBytesDecoder work on ByteBuffer #2589

Merged
merged 9 commits into from
May 15, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public List<Span> bytes_wireDecoder() {

@Benchmark
public List<Span> bytebuffer_zipkinDecoder() {
return SpanBytesDecoder.PROTO3.decodeList(ByteBufUtil.getBytes(encodedBuf));
return SpanBytesDecoder.PROTO3.decodeList(encodedBuf.nioBuffer());
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here is us decoding directly the bytebuffer in benchmarks

}

@Benchmark
Expand All @@ -103,7 +103,7 @@ public List<Span> bytebuffer_wireDecoder() {
// Convenience main entry-point
public static void main(String[] args) throws Exception {
Options opt = new OptionsBuilder()
.include(".*" + ProtoCodecBenchmarks.class.getSimpleName() + ".*bytes.*")
.include(".*" + ProtoCodecBenchmarks.class.getSimpleName())
.addProfiler("gc")
.build();

Expand Down
103 changes: 0 additions & 103 deletions benchmarks/src/main/java/zipkin2/internal/UnsafeBufferBenchmarks.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -156,18 +156,18 @@ public class Proto3CodecInteropTest {
zipkin2.Annotation zipkinAnnotation = ZIPKIN_SPAN.annotations().get(0);
Span wireSpan = new Span.Builder().annotations(PROTO_SPAN.annotations).build();

UnsafeBuffer zipkinBytes = UnsafeBuffer.allocate(ANNOTATION.sizeInBytes(zipkinAnnotation));
ANNOTATION.write(zipkinBytes, zipkinAnnotation);
byte[] zipkinBytes = new byte[ANNOTATION.sizeInBytes(zipkinAnnotation)];
ANNOTATION.write(WriteBuffer.wrap(zipkinBytes, 0), zipkinAnnotation);

assertThat(zipkinBytes.unwrap())
assertThat(zipkinBytes)
.containsExactly(wireSpan.encode());
}

@Test public void annotation_read_matchesWireEncodingWithTag() {
zipkin2.Annotation zipkinAnnotation = ZIPKIN_SPAN.annotations().get(0);
Span wireSpan = new Span.Builder().annotations(PROTO_SPAN.annotations).build();

UnsafeBuffer wireBytes = UnsafeBuffer.wrap(wireSpan.encode(), 0);
ReadBuffer wireBytes = ReadBuffer.wrap(wireSpan.encode(), 0);
assertThat(wireBytes.readVarint32())
.isEqualTo(ANNOTATION.key);

Expand All @@ -179,7 +179,7 @@ public class Proto3CodecInteropTest {

@Test public void endpoint_sizeInBytes_matchesWireEncodingWithTag() {
assertThat(LOCAL_ENDPOINT.sizeInBytes(ZIPKIN_SPAN.localEndpoint())).isEqualTo(
Endpoint.ADAPTER.encodedSizeWithTag(LOCAL_ENDPOINT.fieldNumber, PROTO_SPAN.local_endpoint)
Endpoint.ADAPTER.encodedSizeWithTag(LOCAL_ENDPOINT.fieldNumber, PROTO_SPAN.local_endpoint)
);

assertThat(REMOTE_ENDPOINT.sizeInBytes(ZIPKIN_SPAN.remoteEndpoint())).isEqualTo(
Expand All @@ -188,20 +188,20 @@ public class Proto3CodecInteropTest {
}

@Test public void localEndpoint_write_matchesWire() {
UnsafeBuffer zipkinBytes = UnsafeBuffer.allocate(LOCAL_ENDPOINT.sizeInBytes(ZIPKIN_SPAN.localEndpoint()));
LOCAL_ENDPOINT.write(zipkinBytes, ZIPKIN_SPAN.localEndpoint());
byte[] zipkinBytes = new byte[LOCAL_ENDPOINT.sizeInBytes(ZIPKIN_SPAN.localEndpoint())];
LOCAL_ENDPOINT.write(WriteBuffer.wrap(zipkinBytes, 0), ZIPKIN_SPAN.localEndpoint());
Span wireSpan = new Span.Builder().local_endpoint(PROTO_SPAN.local_endpoint).build();

assertThat(zipkinBytes.unwrap())
assertThat(zipkinBytes)
.containsExactly(wireSpan.encode());
}

@Test public void remoteEndpoint_write_matchesWire() {
UnsafeBuffer zipkinBytes = UnsafeBuffer.allocate(REMOTE_ENDPOINT.sizeInBytes(ZIPKIN_SPAN.remoteEndpoint()));
REMOTE_ENDPOINT.write(zipkinBytes, ZIPKIN_SPAN.remoteEndpoint());
byte[] zipkinBytes = new byte[REMOTE_ENDPOINT.sizeInBytes(ZIPKIN_SPAN.remoteEndpoint())];
REMOTE_ENDPOINT.write(WriteBuffer.wrap(zipkinBytes, 0), ZIPKIN_SPAN.remoteEndpoint());
Span wireSpan = new Span.Builder().remote_endpoint(PROTO_SPAN.remote_endpoint).build();

assertThat(zipkinBytes.unwrap())
assertThat(zipkinBytes)
.containsExactly(wireSpan.encode());
}

Expand All @@ -216,22 +216,22 @@ public class Proto3CodecInteropTest {
@Test public void writeTagField_matchesWire() {
MapEntry<String, String> entry = entry("clnt/finagle.version", "6.45.0");
TagField field = new TagField(TAG_KEY);
UnsafeBuffer zipkinBytes = UnsafeBuffer.allocate(field.sizeInBytes(entry));
field.write(zipkinBytes, entry);
byte[] zipkinBytes = new byte[field.sizeInBytes(entry)];
field.write(WriteBuffer.wrap(zipkinBytes, 0), entry);

Span oneField = new Span.Builder().tags(singletonMap(entry.key, entry.value)).build();
assertThat(zipkinBytes.unwrap())
assertThat(zipkinBytes)
.containsExactly(oneField.encode());
}

@Test public void writeTagField_matchesWire_emptyValue() {
MapEntry<String, String> entry = entry("error", "");
TagField field = new TagField(TAG_KEY);
UnsafeBuffer zipkinBytes = UnsafeBuffer.allocate(field.sizeInBytes(entry));
field.write(zipkinBytes, entry);
byte[] zipkinBytes = new byte[field.sizeInBytes(entry)];
field.write(WriteBuffer.wrap(zipkinBytes, 0), entry);

Span oneField = new Span.Builder().tags(singletonMap(entry.key, entry.value)).build();
assertThat(zipkinBytes.unwrap())
assertThat(zipkinBytes)
.containsExactly(oneField.encode());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import zipkin2.codec.DependencyLinkBytesEncoder;
import zipkin2.codec.SpanBytesEncoder;
import zipkin2.internal.JsonCodec;
import zipkin2.internal.UnsafeBuffer;
import zipkin2.internal.WriteBuffer;
import zipkin2.storage.QueryRequest;
import zipkin2.storage.StorageComponent;

Expand Down Expand Up @@ -147,12 +147,12 @@ static AggregatedHttpMessage jsonResponse(byte[] body) {
.setInt(HttpHeaderNames.CONTENT_LENGTH, body.length).build(), HttpData.of(body));
}

static final UnsafeBuffer.Writer<String> QUOTED_STRING_WRITER = new UnsafeBuffer.Writer<String>() {
static final WriteBuffer.Writer<String> QUOTED_STRING_WRITER = new WriteBuffer.Writer<String>() {
@Override public int sizeInBytes(String value) {
return UnsafeBuffer.utf8SizeInBytes(value) + 2; // quotes
return WriteBuffer.utf8SizeInBytes(value) + 2; // quotes
}

@Override public void write(String value, UnsafeBuffer buffer) {
@Override public void write(String value, WriteBuffer buffer) {
buffer.writeByte('"');
buffer.writeUtf8(value);
buffer.writeByte('"');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import zipkin2.Span;
import zipkin2.internal.Nullable;
import zipkin2.internal.Platform;
import zipkin2.internal.UnsafeBuffer;
import zipkin2.storage.QueryRequest;

import static com.google.common.base.Preconditions.checkArgument;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import zipkin2.internal.FilterTraces;
import zipkin2.internal.HexCodec;
import zipkin2.internal.Nullable;
import zipkin2.internal.ReadBuffer;
import zipkin2.internal.V1ThriftSpanReader;
import zipkin2.storage.GroupByTraceId;
import zipkin2.storage.QueryRequest;
Expand All @@ -59,11 +60,11 @@ static class Factory {
this.accumulateSpans = new DecodeAndConvertSpans();

this.preparedStatement =
session.prepare(
QueryBuilder.select("trace_id", "span")
.from("traces")
.where(QueryBuilder.in("trace_id", QueryBuilder.bindMarker("trace_id")))
.limit(QueryBuilder.bindMarker("limit_")));
session.prepare(
QueryBuilder.select("trace_id", "span")
.from("traces")
.where(QueryBuilder.in("trace_id", QueryBuilder.bindMarker("trace_id")))
.limit(QueryBuilder.bindMarker("limit_")));
this.maxTraceCols = maxTraceCols;
this.strictTraceId = strictTraceId;
this.groupByTraceId = GroupByTraceId.create(strictTraceId);
Expand All @@ -72,8 +73,8 @@ static class Factory {
Call<List<Span>> newCall(String hexTraceId) {
long traceId = HexCodec.lowerHexToUnsignedLong(hexTraceId);
Call<List<Span>> result =
new SelectFromTraces(this, Collections.singleton(traceId), maxTraceCols)
.flatMap(accumulateSpans);
new SelectFromTraces(this, Collections.singleton(traceId), maxTraceCols)
.flatMap(accumulateSpans);
return strictTraceId ? result.map(StrictTraceId.filterSpans(hexTraceId)) : result;
}

Expand All @@ -95,7 +96,7 @@ FlatMapper<Set<Long>, List<List<Span>>> newFlatMapper(QueryRequest request) {
@Override
protected ResultSetFuture newFuture() {
return factory.session.executeAsync(
factory.preparedStatement.bind().setSet("trace_id", trace_id).setInt("limit_", limit_));
factory.preparedStatement.bind().setSet("trace_id", trace_id).setInt("limit_", limit_));
}

@Override public ResultSet map(ResultSet input) {
Expand Down Expand Up @@ -139,9 +140,9 @@ public Call<List<List<Span>>> map(Set<Long> input) {
traceIds = input;
}
Call<List<List<Span>>> result =
new SelectFromTraces(factory, traceIds, factory.maxTraceCols)
.flatMap(factory.accumulateSpans)
.map(factory.groupByTraceId);
new SelectFromTraces(factory, traceIds, factory.maxTraceCols)
.flatMap(factory.accumulateSpans)
.map(factory.groupByTraceId);
return filter != null ? result.map(filter) : result;
}

Expand All @@ -163,7 +164,7 @@ protected BiConsumer<Row, List<Span>> accumulator() {
return (row, result) -> {
V1ThriftSpanReader reader = V1ThriftSpanReader.create();
V1SpanConverter converter = V1SpanConverter.create();
V1Span read = reader.read(row.getBytes("span"));
V1Span read = reader.read(ReadBuffer.wrapUnsafe(row.getBytes("span")));
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here cassandra is using the bytebuffer variant directly

converter.convert(read, result);
};
}
Expand Down
2 changes: 1 addition & 1 deletion zipkin/src/main/java/zipkin2/Endpoint.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import zipkin2.internal.Nullable;
import zipkin2.internal.Platform;

import static zipkin2.internal.UnsafeBuffer.HEX_DIGITS;
import static zipkin2.internal.HexCodec.HEX_DIGITS;

/** The network context of a node in the service graph. */
//@Immutable
Expand Down
2 changes: 1 addition & 1 deletion zipkin/src/main/java/zipkin2/Span.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@

import static java.lang.String.format;
import static java.util.logging.Level.FINEST;
import static zipkin2.internal.UnsafeBuffer.HEX_DIGITS;
import static zipkin2.internal.HexCodec.HEX_DIGITS;

/**
* A span is a single-host view of an operation. A trace is a series of spans (often RPC calls)
Expand Down
Loading