Skip to content
This repository has been archived by the owner on Jul 1, 2022. It is now read-only.

Implement Zipkin 2 JSON Sender #399

Merged
merged 4 commits into from
May 18, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 29 additions & 2 deletions jaeger-zipkin/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,39 @@ tracer = new Tracer.Builder(serviceName, reporter, sampler)
```

## Sending data to Zipkin
Zipkin supports transports including Http and Kafka. You can configure Jaeger to send to a Zipkin server with
`ZipkinSender`.
There are two ways to send spans to a Zipkin server:

### Thrift
If you want to send Zipkin v1 Thrift-encoded spans, you should use the `ZipkinSender` sender, which
wraps a Zipkin sender class to enable the use of various transports such as HTTP and Kafka.

For example:
```java
import io.jaegertracing.senders.zipkin.ZipkinSender;

reporter = new RemoteReporter(ZipkinSender.create("http://localhost:9411/api/v1/spans"));
tracer = new Tracer.Builder(serviceName, reporter, sampler)
...
```

### Zipkin 2 Reporters
You can reuse a Zipkin 2 reporter instance as-is by using `ZipkinV2Reporter`, which adapts a Zipkin
2 reporter to the Jaeger reporter interface and deals with converting Jaeger spans to the Zipkin 2
model.

For example:
```java
import io.jaegertracing.zipkin.reporters.ZipkinV2Reporter;
import zipkin2.reporter.AsyncReporter;
import zipkin2.reporter.urlconnection.URLConnectionSender;

reporter = new ZipkinV2Reporter(
AsyncReporter.create(URLConnectionSender.create("http://localhost:9411/api/v2/spans")));

tracer = new Tracer.Builder(serviceName)
.withReporter(reporter)
...
.build()
```

This will send spans to the Zipkin v2 endpoint using the v2 JSON encoding.
7 changes: 6 additions & 1 deletion jaeger-zipkin/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,15 @@ description = 'Integration library for Zipkin'
dependencies {
compile group: 'io.zipkin.reporter', name: 'zipkin-reporter', version: '1.1.2'
compile group: 'io.zipkin.reporter', name: 'zipkin-sender-urlconnection', version: '1.1.2'

compile group: 'io.zipkin.java', name: 'zipkin', version: '2.8.1'
compile group: 'io.zipkin.reporter2', name: 'zipkin-reporter', version: '2.6.0'

compile project(path: ':jaeger-core', configuration: 'shadow')

testCompile group: 'io.zipkin.reporter2', name: 'zipkin-sender-urlconnection', version: '2.6.0'
testCompile group: 'junit', name: 'junit', version: junitVersion
testCompile group: 'io.zipkin.java', name: 'zipkin-junit', version: '2.3.0'
testCompile group: 'io.zipkin.java', name: 'zipkin-junit', version: '2.7.1'
testCompile group: 'com.tngtech.java', name: 'junit-dataprovider', version: junitDataProviderVersion

signature 'org.codehaus.mojo.signature:java16:1.1@signature'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.jaegertracing.Span;
import io.jaegertracing.SpanContext;
import io.jaegertracing.Tracer;
import io.jaegertracing.zipkin.ConverterUtil;
import io.opentracing.tag.Tags;
import java.nio.charset.Charset;
import java.util.ArrayList;
Expand Down Expand Up @@ -56,10 +57,10 @@ public static com.twitter.zipkin.thriftjava.Span convertSpan(Span span) {
private static List<Annotation> buildAnnotations(Span span, Endpoint host) {
List<Annotation> annotations = new ArrayList<Annotation>();

if (isRpc(span)) {
if (ConverterUtil.isRpc(span)) {
String startLabel = zipkincoreConstants.SERVER_RECV;
String endLabel = zipkincoreConstants.SERVER_SEND;
if (isRpcClient(span)) {
if (ConverterUtil.isRpcClient(span)) {
startLabel = zipkincoreConstants.CLIENT_SEND;
endLabel = zipkincoreConstants.CLIENT_RECV;
}
Expand Down Expand Up @@ -87,9 +88,9 @@ private static List<Annotation> buildAnnotations(Span span, Endpoint host) {
private static List<BinaryAnnotation> buildBinaryAnnotations(Span span, Endpoint host) {
List<BinaryAnnotation> binaryAnnotations = new ArrayList<BinaryAnnotation>();
Map<String, Object> tags = span.getTags();
boolean isRpc = isRpc(span);
boolean isClient = isRpcClient(span);
boolean firstSpanInProcess = span.getReferences().isEmpty() || isRpcServer(span);
boolean isRpc = ConverterUtil.isRpc(span);
boolean isClient = ConverterUtil.isRpcClient(span);
boolean firstSpanInProcess = span.getReferences().isEmpty() || ConverterUtil.isRpcServer(span);

if (firstSpanInProcess) {
Map<String, ?> processTags = span.getTracer().tags();
Expand Down Expand Up @@ -154,20 +155,6 @@ private static BinaryAnnotation buildBinaryAnnotation(String tagKey, Object tagV
return banno;
}

static boolean isRpcServer(Span span) {
return Tags.SPAN_KIND_SERVER.equals(span.getTags().get(Tags.SPAN_KIND.getKey()));
}

static boolean isRpc(Span span) {
Object spanKindValue = span.getTags().get(Tags.SPAN_KIND.getKey());
return Tags.SPAN_KIND_CLIENT.equals(spanKindValue) || Tags.SPAN_KIND_SERVER.equals(spanKindValue);

}

static boolean isRpcClient(Span span) {
return Tags.SPAN_KIND_CLIENT.equals(span.getTags().get(Tags.SPAN_KIND.getKey()));
}

/**
* Extract peer Endpoint from tags
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@

/**
* This sends (TBinaryProtocol big-endian) encoded spans to a Zipkin Collector (usually a
* zipkin-server).
* zipkin-server). If you want to send newer Zipkin V2 spans in protocols other than Thrift,
* see {@link io.jaegertracing.zipkin.reporters.ZipkinV2Reporter ZipkinV2Reporter}.
*
* <p>
* Example usage:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright (c) 2018, The Jaeger Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package io.jaegertracing.zipkin;

import io.jaegertracing.Span;
import io.opentracing.tag.Tags;

/**
* Logic that is common to both Thrift v1 and JSON v2 senders
*/
public class ConverterUtil {
public static boolean isRpcServer(Span span) {
return Tags.SPAN_KIND_SERVER.equals(span.getTags().get(Tags.SPAN_KIND.getKey()));
}

public static boolean isRpc(Span span) {
return isRpcServer(span) || isRpcClient(span);
}

public static boolean isRpcClient(Span span) {
return Tags.SPAN_KIND_CLIENT.equals(span.getTags().get(Tags.SPAN_KIND.getKey()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
/*
* Copyright (c) 2018, The Jaeger Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package io.jaegertracing.zipkin;

import com.google.gson.Gson;
import io.jaegertracing.Constants;
import io.jaegertracing.LogData;
import io.jaegertracing.Span;
import io.jaegertracing.SpanContext;
import io.jaegertracing.Tracer;
import io.opentracing.tag.Tags;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;

/**
* Converts a Jaeger span to a Zipkin2 span.
*/
@Slf4j
public class V2SpanConverter {

private static final Gson gson = new Gson();

public static zipkin2.Span convertSpan(Span span) {
Tracer tracer = span.getTracer();
zipkin2.Endpoint host = zipkin2.Endpoint.newBuilder()
.ip(convertIp(tracer.getIpv4()))
.serviceName(tracer.getServiceName())
.build();

zipkin2.Endpoint peerEndpoint = extractPeerEndpoint(span.getTags());

SpanContext context = span.context();
zipkin2.Span.Builder builder = zipkin2.Span.newBuilder()
.id(Long.toHexString(context.getSpanId()))
.traceId(Long.toHexString(context.getTraceId()))
.name(span.getOperationName())
.parentId(Long.toHexString(context.getParentId()))
.debug(context.isDebug())
.localEndpoint(host)
.remoteEndpoint(peerEndpoint)
.kind(convertKind(span.getTags().get(Tags.SPAN_KIND.getKey())))
.timestamp(span.getStart())
.duration(span.getDuration());

buildAnnotations(span, builder);
buildTags(span, builder);

return builder.build();
}

private static zipkin2.Span.Kind convertKind(Object kind) {
if (Tags.SPAN_KIND_SERVER.equals(kind)) {
return zipkin2.Span.Kind.SERVER;
} else if (Tags.SPAN_KIND_CLIENT.equals(kind)) {
return zipkin2.Span.Kind.CLIENT;
} else if (Tags.SPAN_KIND_CONSUMER.equals(kind)) {
return zipkin2.Span.Kind.CONSUMER;
} else if (Tags.SPAN_KIND_PRODUCER.equals(kind)) {
return zipkin2.Span.Kind.PRODUCER;
} else {
return null;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this an expected case? If not, a log statement should be produced, like, "Could not identify the span kind during conversion. Skipping."

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC internal spans has a null kind.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes they have kind set to null

}
}

private static void buildAnnotations(Span span, zipkin2.Span.Builder builder) {
List<LogData> logs = span.getLogs();
if (logs != null) {
for (LogData logData : logs) {
String logMessage = logData.getMessage();
Map<String, ?> logFields = logData.getFields();
if (logMessage != null) {
builder.addAnnotation(logData.getTime(), logMessage);
} else if (logFields != null) {
builder.addAnnotation(logData.getTime(), gson.toJson(logFields));
}
}
}
}

private static void buildTags(Span span, zipkin2.Span.Builder builder) {
Map<String, Object> tags = span.getTags();
boolean firstSpanInProcess = span.getReferences().isEmpty() || ConverterUtil.isRpcServer(span);

if (firstSpanInProcess) {
Map<String, ?> processTags = span.getTracer().tags();
// add tracer tags to first zipkin span in a process but remove "ip" tag as it is
// taken care of separately.
for (Map.Entry<String, ?> entry : processTags.entrySet()) {
String tagKey = entry.getKey();
if (!Constants.TRACER_IP_TAG_KEY.equals(tagKey)) {
Object tagValue = entry.getValue();
// add a tracer. prefix to process tags for zipkin
builder.putTag("tracer." + tagKey, tagValue.toString());
}
}
}

if (tags != null) {
for (Map.Entry<String, Object> entry : tags.entrySet()) {
String tagKey = entry.getKey();
// Every value is converted to string because zipkin search doesn't
// work well with ints, and bytes.
Object tagValue = entry.getValue();
builder.putTag(tagKey, tagValue.toString());
}
}
}

private static InetAddress convertIp(int ip) {
byte[] bytes = ByteBuffer.allocate(4).putInt(ip).array();
try {
return InetAddress.getByAddress(bytes);
} catch (UnknownHostException e) {
log.error("Jaeger span IP " + ip + " could not be converted", e);
return null;
}
}

/**
* Extract peer Endpoint from tags
*
* @param tags tags
* @return null or peer endpoint
*/
public static zipkin2.Endpoint extractPeerEndpoint(Map<String, Object> tags) {
Object peerIpv4 = tags.get(Tags.PEER_HOST_IPV4.getKey());
Object peerPort = tags.get(Tags.PEER_PORT.getKey());
Object peerService = tags.get(Tags.PEER_SERVICE.getKey());

if (peerIpv4 == null && peerPort == null && peerService == null) {
return null;
}

zipkin2.Endpoint.Builder builder = zipkin2.Endpoint.newBuilder();

if (peerIpv4 instanceof String) {
builder.ip((String) peerIpv4);
}
if (peerPort instanceof Number) {
builder.port(((Number) peerPort).intValue());
}
if (peerService instanceof String) {
builder.serviceName((String) peerService);
}

return builder.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright (c) 2018, The Jaeger Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package io.jaegertracing.zipkin.reporters;

import io.jaegertracing.reporters.Reporter;
import io.jaegertracing.zipkin.V2SpanConverter;

/**
* Wrapper around a zipkin v2 AsyncReporter that reports spans using the newer v2 Span class
*/
public class ZipkinV2Reporter implements Reporter {
public final zipkin2.reporter.AsyncReporter<zipkin2.Span> reporter;

public ZipkinV2Reporter(zipkin2.reporter.AsyncReporter<zipkin2.Span> reporter) {
this.reporter = reporter;
}

@Override
public void report(io.jaegertracing.Span span) {
reporter.report(V2SpanConverter.convertSpan(span));
}

@Override
public void close() {
reporter.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.jaegertracing.Tracer;
import io.jaegertracing.reporters.InMemoryReporter;
import io.jaegertracing.samplers.ConstSampler;
import io.jaegertracing.zipkin.ConverterUtil;
import io.opentracing.propagation.Format;
import io.opentracing.propagation.TextMap;
import io.opentracing.propagation.TextMapExtractAdapter;
Expand Down Expand Up @@ -241,17 +242,17 @@ public void testSpanDetectsIsClient() {
Span span = (Span) tracer.buildSpan("test-service-operation").start();
Tags.SPAN_KIND.set(span, Tags.SPAN_KIND_CLIENT);

assertTrue(ThriftSpanConverter.isRpc(span));
assertTrue(ThriftSpanConverter.isRpcClient(span));
assertTrue(ConverterUtil.isRpc(span));
assertTrue(ConverterUtil.isRpcClient(span));
}

@Test
public void testSpanDetectsIsServer() {
Span span = (Span) tracer.buildSpan("test-service-operation").start();
Tags.SPAN_KIND.set(span, Tags.SPAN_KIND_SERVER);

assertTrue(ThriftSpanConverter.isRpc(span));
assertFalse(ThriftSpanConverter.isRpcClient(span));
assertTrue(ConverterUtil.isRpc(span));
assertFalse(ConverterUtil.isRpcClient(span));
}

@Test
Expand Down
Loading