Skip to content

Commit

Permalink
Drops internal DependencyLinkSpan for Span2 (#1673)
Browse files Browse the repository at this point in the history
* Adds zipkin.internal.Span2Converter (#1670)

This adds an internal copy of a span conversion utility mentioned in
issue #1499. This is starting internal to ease review and allow
incremental progress. The first consumer will be dependency linking.

* Adds zipkin.internal.Span2Codec.JSON (#1671)

This adds an internal copy of a span json codec issue #1499. This starts
internal to ease review and allow incremental progress. The first
consumer will be Elasticsearch, as this format removes nested queries.

Note: this change also introduces json serialization of Span2, which
allows future use in Spark.

* Drops internal DependencyLinkSpan for Span2

This drops the internal type of DependencyLinkSpan in favor of the Span2
type coming in #1499. Doing so now gives us practice, solves a few bugs
along the way. When Span2 becomes non-internal, the change will be a
simple package rename.
  • Loading branch information
adriancole authored Jul 28, 2017
1 parent 4aeeaec commit b848283
Show file tree
Hide file tree
Showing 16 changed files with 1,107 additions and 720 deletions.
29 changes: 28 additions & 1 deletion benchmarks/src/main/java/zipkin/benchmarks/CodecBenchmarks.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import zipkin.Codec;
import zipkin.Endpoint;
import zipkin.Span;
import zipkin.internal.Span2;
import zipkin.internal.Span2Codec;

/**
* This compares the speed of the bundled java codec with the approach used in the scala
Expand Down Expand Up @@ -154,6 +156,31 @@ public byte[] writeClientSpan_thrift_libthrift() throws TException {
return serialize(clientSpanLibThrift);
}

static final byte[] span2Json = read("/span2.json");
static final Span2 span2 = Span2Codec.JSON.readSpan(span2Json);
static final List<Span2> tenClientSpan2s = Collections.nCopies(10, span2);
static final byte[] tenClientSpan2sJson = Span2Codec.JSON.writeSpans(tenClientSpan2s);

@Benchmark
public Span2 readClientSpan_json_span2() {
return Span2Codec.JSON.readSpan(span2Json);
}

@Benchmark
public List<Span2> readTenClientSpans_json_span2() {
return Span2Codec.JSON.readSpans(tenClientSpan2sJson);
}

@Benchmark
public byte[] writeClientSpan_json_span2() {
return Span2Codec.JSON.writeSpan(span2);
}

@Benchmark
public byte[] writeTenClientSpans_json_span2() {
return Span2Codec.JSON.writeSpans(tenClientSpan2s);
}

static final byte[] rpcSpanJson = read("/span-rpc.json");
static final Span rpcSpan = Codec.JSON.readSpan(rpcSpanJson);
static final byte[] rpcSpanThrift = Codec.THRIFT.writeSpan(rpcSpan);
Expand Down Expand Up @@ -227,7 +254,7 @@ public byte[] writeRpcV6Span_thrift_libthrift() throws TException {
// Convenience main entry-point
public static void main(String[] args) throws RunnerException {
Options opt = new OptionsBuilder()
.include(".*" + CodecBenchmarks.class.getSimpleName() + ".*lientSpan.*")
.include(".*" + CodecBenchmarks.class.getSimpleName())
.build();

new Runner(opt).run();
Expand Down
18 changes: 4 additions & 14 deletions benchmarks/src/main/resources/span-client.json
Original file line number Diff line number Diff line change
Expand Up @@ -40,25 +40,16 @@
}
],
"binaryAnnotations": [
{
"key": "ca",
"value": true,
"endpoint": {
"serviceName": "frontend",
"ipv4": "127.0.0.1",
"port": 49504
}
},
{
"key": "clnt/finagle.version",
"value": "6.36.0",
"value": "6.45.0",
"endpoint": {
"serviceName": "frontend",
"ipv4": "127.0.0.1"
}
},
{
"key": "http.uri",
"key": "http.path",
"value": "/api",
"endpoint": {
"serviceName": "frontend",
Expand All @@ -70,10 +61,9 @@
"value": true,
"endpoint": {
"serviceName": "backend",
"ipv4": "127.0.0.1",
"ipv4": "192.168.99.101",
"port": 9000
}
}
],
"debug": false
]
}
32 changes: 32 additions & 0 deletions benchmarks/src/main/resources/span2.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
{
"traceId": "86154a4ba6e91385",
"parentId": "86154a4ba6e91385",
"id": "4d1e00c0db9010db",
"kind": "CLIENT",
"name": "get",
"timestamp": 1472470996199000,
"duration": 207000,
"localEndpoint": {
"serviceName": "frontend",
"ipv4": "127.0.0.1"
},
"remoteEndpoint": {
"serviceName": "backend",
"ipv4": "192.168.99.101",
"port": 9000
},
"annotations": [
{
"timestamp": 1472470996238000,
"value": "ws"
},
{
"timestamp": 1472470996403000,
"value": "wr"
}
],
"tags": {
"http.path": "/api",
"clnt/finagle.version": "6.45.0"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,32 @@
import java.util.Iterator;
import org.jooq.Record;
import org.jooq.TableField;
import zipkin.internal.DependencyLinkSpan;
import zipkin.Endpoint;
import zipkin.internal.Nullable;
import zipkin.internal.PeekingIterator;
import zipkin.internal.Span2;
import zipkin.storage.mysql.internal.generated.tables.ZipkinSpans;

import static zipkin.Constants.CLIENT_ADDR;
import static zipkin.Constants.CLIENT_SEND;
import static zipkin.Constants.SERVER_ADDR;
import static zipkin.Constants.SERVER_RECV;
import static zipkin.internal.Util.equal;
import static zipkin.storage.mysql.internal.generated.tables.ZipkinAnnotations.ZIPKIN_ANNOTATIONS;

/**
* Convenience that lazy converts rows into {@linkplain DependencyLinkSpan} objects.
* Lazy converts rows into {@linkplain Span2} objects suitable for dependency links. This takes
* short-cuts to require less data. For example, it folds shared RPC spans into one, and doesn't
* include tags, non-core annotations or time units.
*
* <p>Out-of-date schemas may be missing the trace_id_high field. When present, this becomes {@link
* DependencyLinkSpan.TraceId#hi} used as the left-most 16 characters of the traceId in logging
* Span2#traceIdHigh()} used as the left-most 16 characters of the traceId in logging
* statements.
*/
final class DependencyLinkSpanIterator implements Iterator<DependencyLinkSpan> {
final class DependencyLinkSpan2Iterator implements Iterator<Span2> {

/** Assumes the input records are sorted by trace id, span id */
static final class ByTraceId implements Iterator<Iterator<DependencyLinkSpan>> {
static final class ByTraceId implements Iterator<Iterator<Span2>> {
final PeekingIterator<Record> delegate;
final boolean hasTraceIdHigh;

Expand All @@ -53,10 +57,10 @@ static final class ByTraceId implements Iterator<Iterator<DependencyLinkSpan>> {
return delegate.hasNext();
}

@Override public Iterator<DependencyLinkSpan> next() {
@Override public Iterator<Span2> next() {
currentTraceIdHi = hasTraceIdHigh ? traceIdHigh(delegate) : null;
currentTraceIdLo = delegate.peek().getValue(ZipkinSpans.ZIPKIN_SPANS.TRACE_ID);
return new DependencyLinkSpanIterator(delegate, currentTraceIdHi, currentTraceIdLo);
return new DependencyLinkSpan2Iterator(delegate, currentTraceIdHi, currentTraceIdLo);
}

@Override public void remove() {
Expand All @@ -68,7 +72,7 @@ static final class ByTraceId implements Iterator<Iterator<DependencyLinkSpan>> {
@Nullable final Long traceIdHi;
final long traceIdLo;

DependencyLinkSpanIterator(PeekingIterator<Record> delegate, Long traceIdHi, long traceIdLo) {
DependencyLinkSpan2Iterator(PeekingIterator<Record> delegate, Long traceIdHi, long traceIdLo) {
this.delegate = delegate;
this.traceIdHi = traceIdHi;
this.traceIdLo = traceIdLo;
Expand All @@ -83,17 +87,11 @@ public boolean hasNext() {
}

@Override
public DependencyLinkSpan next() {
public Span2 next() {
Record row = delegate.peek();

long spanId = row.getValue(ZipkinSpans.ZIPKIN_SPANS.ID);
DependencyLinkSpan.Builder result = DependencyLinkSpan.builder(
traceIdHi != null ? traceIdHi : 0L,
traceIdLo,
row.getValue(ZipkinSpans.ZIPKIN_SPANS.PARENT_ID),
spanId
);

String srService = null, csService = null, caService = null, saService = null;
while (hasNext()) { // there are more values for this trace
if (spanId != delegate.peek().getValue(ZipkinSpans.ZIPKIN_SPANS.ID)) {
break; // if we are in a new span
Expand All @@ -105,18 +103,48 @@ public DependencyLinkSpan next() {
if (key == null || value == null) continue; // neither client nor server
switch (key) {
case CLIENT_ADDR:
result.caService(value);
caService = value;
break;
case CLIENT_SEND:
result.csService(value);
csService = value;
break;
case SERVER_ADDR:
result.saService(value);
saService = value;
break;
case SERVER_RECV:
result.srService(value);
srService = value;
}
}

// The client address is more authoritative than the client send owner.
if (caService == null) caService = csService;

// Finagle labels two sides of the same socket ("ca", "sa") with the same name.
// Skip the client side, so it isn't mistaken for a loopback request
if (equal(saService, caService)) caService = null;

Span2.Builder result = Span2.builder()
.traceIdHigh(traceIdHi != null ? traceIdHi : 0L)
.traceId(traceIdLo)
.parentId(row.getValue(ZipkinSpans.ZIPKIN_SPANS.PARENT_ID))
.id(spanId);

if (srService != null) {
return result.kind(Span2.Kind.SERVER)
.localEndpoint(ep(srService))
.remoteEndpoint(ep(caService))
.build();
} else if (saService != null) {
return result
.kind(csService != null ? Span2.Kind.CLIENT : null)
.localEndpoint(ep(caService))
.remoteEndpoint(ep(saService))
.build();
} else if (csService != null) {
return result.kind(Span2.Kind.SERVER)
.localEndpoint(ep(caService))
.build();
}
return result.build();
}

Expand All @@ -129,8 +157,12 @@ static long traceIdHigh(PeekingIterator<Record> delegate) {
return delegate.peek().getValue(ZipkinSpans.ZIPKIN_SPANS.TRACE_ID_HIGH);
}

static String emptyToNull(Record next, TableField<Record, String> field) {
static @Nullable String emptyToNull(Record next, TableField<Record, String> field) {
String result = next.getValue(field);
return result != null && !"".equals(result) ? result : null;
}

static Endpoint ep(@Nullable String serviceName) {
return serviceName != null ? Endpoint.builder().serviceName(serviceName).build() : null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@
import zipkin.DependencyLink;
import zipkin.Endpoint;
import zipkin.Span;
import zipkin.internal.DependencyLinkSpan;
import zipkin.internal.DependencyLinker;
import zipkin.internal.GroupByTraceId;
import zipkin.internal.Nullable;
import zipkin.internal.Pair;
import zipkin.internal.Span2;
import zipkin.storage.QueryRequest;
import zipkin.storage.SpanStore;
import zipkin.storage.mysql.internal.generated.tables.ZipkinAnnotations;
Expand Down Expand Up @@ -324,8 +324,8 @@ List<DependencyLink> aggregateDependencies(long endTs, @Nullable Long lookback,
// Grouping so that later code knows when a span or trace is finished.
.groupBy(schema.dependencyLinkGroupByFields).fetchLazy();

Iterator<Iterator<DependencyLinkSpan>> traces =
new DependencyLinkSpanIterator.ByTraceId(cursor.iterator(), schema.hasTraceIdHigh);
Iterator<Iterator<Span2>> traces =
new DependencyLinkSpan2Iterator.ByTraceId(cursor.iterator(), schema.hasTraceIdHigh);

if (!traces.hasNext()) return Collections.emptyList();

Expand Down
Loading

0 comments on commit b848283

Please sign in to comment.