Skip to content

Commit

Permalink
Adds zipkin.internal.Span2Codec.JSON (#1671)
Browse files Browse the repository at this point in the history
This adds an internal copy of a span json codec issue #1499. This starts
internal to ease review and allow incremental progress. The first
consumer will be Elasticsearch, as this format removes nested queries.

Note: this change also introduces json serialization of Span2, which
allows future use in Spark.
  • Loading branch information
adriancole authored Jul 28, 2017
1 parent e58e3a8 commit 998f50d
Show file tree
Hide file tree
Showing 8 changed files with 696 additions and 19 deletions.
29 changes: 28 additions & 1 deletion benchmarks/src/main/java/zipkin/benchmarks/CodecBenchmarks.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import zipkin.Codec;
import zipkin.Endpoint;
import zipkin.Span;
import zipkin.internal.Span2;
import zipkin.internal.Span2Codec;

/**
* This compares the speed of the bundled java codec with the approach used in the scala
Expand Down Expand Up @@ -154,6 +156,31 @@ public byte[] writeClientSpan_thrift_libthrift() throws TException {
return serialize(clientSpanLibThrift);
}

static final byte[] span2Json = read("/span2.json");
static final Span2 span2 = Span2Codec.JSON.readSpan(span2Json);
static final List<Span2> tenClientSpan2s = Collections.nCopies(10, span2);
static final byte[] tenClientSpan2sJson = Span2Codec.JSON.writeSpans(tenClientSpan2s);

@Benchmark
public Span2 readClientSpan_json_span2() {
return Span2Codec.JSON.readSpan(span2Json);
}

@Benchmark
public List<Span2> readTenClientSpans_json_span2() {
return Span2Codec.JSON.readSpans(tenClientSpan2sJson);
}

@Benchmark
public byte[] writeClientSpan_json_span2() {
return Span2Codec.JSON.writeSpan(span2);
}

@Benchmark
public byte[] writeTenClientSpans_json_span2() {
return Span2Codec.JSON.writeSpans(tenClientSpan2s);
}

static final byte[] rpcSpanJson = read("/span-rpc.json");
static final Span rpcSpan = Codec.JSON.readSpan(rpcSpanJson);
static final byte[] rpcSpanThrift = Codec.THRIFT.writeSpan(rpcSpan);
Expand Down Expand Up @@ -227,7 +254,7 @@ public byte[] writeRpcV6Span_thrift_libthrift() throws TException {
// Convenience main entry-point
public static void main(String[] args) throws RunnerException {
Options opt = new OptionsBuilder()
.include(".*" + CodecBenchmarks.class.getSimpleName() + ".*lientSpan.*")
.include(".*" + CodecBenchmarks.class.getSimpleName())
.build();

new Runner(opt).run();
Expand Down
18 changes: 4 additions & 14 deletions benchmarks/src/main/resources/span-client.json
Original file line number Diff line number Diff line change
Expand Up @@ -40,25 +40,16 @@
}
],
"binaryAnnotations": [
{
"key": "ca",
"value": true,
"endpoint": {
"serviceName": "frontend",
"ipv4": "127.0.0.1",
"port": 49504
}
},
{
"key": "clnt/finagle.version",
"value": "6.36.0",
"value": "6.45.0",
"endpoint": {
"serviceName": "frontend",
"ipv4": "127.0.0.1"
}
},
{
"key": "http.uri",
"key": "http.path",
"value": "/api",
"endpoint": {
"serviceName": "frontend",
Expand All @@ -70,10 +61,9 @@
"value": true,
"endpoint": {
"serviceName": "backend",
"ipv4": "127.0.0.1",
"ipv4": "192.168.99.101",
"port": 9000
}
}
],
"debug": false
]
}
32 changes: 32 additions & 0 deletions benchmarks/src/main/resources/span2.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
{
"traceId": "86154a4ba6e91385",
"parentId": "86154a4ba6e91385",
"id": "4d1e00c0db9010db",
"kind": "CLIENT",
"name": "get",
"timestamp": 1472470996199000,
"duration": 207000,
"localEndpoint": {
"serviceName": "frontend",
"ipv4": "127.0.0.1"
},
"remoteEndpoint": {
"serviceName": "backend",
"ipv4": "192.168.99.101",
"port": 9000
},
"annotations": [
{
"timestamp": 1472470996238000,
"value": "ws"
},
{
"timestamp": 1472470996403000,
"value": "wr"
}
],
"tags": {
"http.path": "/api",
"clnt/finagle.version": "6.45.0"
}
}
36 changes: 35 additions & 1 deletion zipkin/src/main/java/zipkin/internal/Span2.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
package zipkin.internal;

import com.google.auto.value.AutoValue;
import java.io.ObjectStreamException;
import java.io.Serializable;
import java.io.StreamCorruptedException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
Expand All @@ -27,6 +30,7 @@
import zipkin.Span;
import zipkin.TraceKeys;

import static zipkin.internal.Util.UTF_8;
import static zipkin.internal.Util.checkNotNull;
import static zipkin.internal.Util.lowerHexToUnsignedLong;
import static zipkin.internal.Util.sortedList;
Expand Down Expand Up @@ -54,7 +58,8 @@
* and smaller data.
*/
@AutoValue
public abstract class Span2 { // TODO: make serializable when needed between stages in Spark jobs
public abstract class Span2 implements Serializable { // for Spark jobs
private static final long serialVersionUID = 0L;

/** When non-zero, the trace containing this span uses 128-bit trace identifiers. */
public abstract long traceIdHigh();
Expand Down Expand Up @@ -409,4 +414,33 @@ public Span2 build() {
);
}
}

@Override
public String toString() {
return new String(Span2Codec.JSON.writeSpan(this), UTF_8);
}

// Since this is an immutable object, and we have json handy, defer to a serialization proxy.
final Object writeReplace() throws ObjectStreamException {
return new SerializedForm(Span2Codec.JSON.writeSpan(this));
}

static final class SerializedForm implements Serializable {
private static final long serialVersionUID = 0L;

private final byte[] bytes;

SerializedForm(byte[] bytes) {
this.bytes = bytes;
}

Object readResolve() throws ObjectStreamException {
try {
return Span2Codec.JSON.readSpan(bytes);
} catch (IllegalArgumentException e) {
e.printStackTrace();
throw new StreamCorruptedException(e.getMessage());
}
}
}
}
33 changes: 33 additions & 0 deletions zipkin/src/main/java/zipkin/internal/Span2Codec.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/**
* Copyright 2015-2017 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package zipkin.internal;

import java.util.List;

/** Utilities for working with {@link Span2} */
public interface Span2Codec {
Span2Codec JSON = new Span2JsonCodec();

/** Serialize a span recorded from instrumentation into its binary form. */
byte[] writeSpan(Span2 span);

/** Serialize a list of spans recorded from instrumentation into their binary form. */
byte[] writeSpans(List<Span2> spans);

/** throws {@linkplain IllegalArgumentException} if a span couldn't be decoded */
Span2 readSpan(byte[] bytes);

/** throws {@linkplain IllegalArgumentException} if the spans couldn't be decoded */
List<Span2> readSpans(byte[] bytes);
}
Loading

0 comments on commit 998f50d

Please sign in to comment.