Skip to content
This repository has been archived by the owner on Jul 1, 2022. It is now read-only.

Commit

Permalink
Add ZipkinReporter to support Zipkin v2
Browse files Browse the repository at this point in the history
This adds a new class called Zipkin2Reporter that adapts a Zipkin v2
reporter and converts Jaeger spans to Zipkin v2 spans.  This enables us
to send spans to a Zipkin v2 server using the newer protocol.

This also moves the existing Thrift conversion logic to a new package
`io.jaegertracing.zipkin` where it exists alongside the new v2
conversion logic.

Signed-off-by: Ben Keith <bkeith@signalfx.com>
  • Loading branch information
Ben Keith committed May 15, 2018
1 parent 4386282 commit ee27972
Show file tree
Hide file tree
Showing 11 changed files with 707 additions and 31 deletions.
31 changes: 29 additions & 2 deletions jaeger-zipkin/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,39 @@ tracer = new Tracer.Builder(serviceName, reporter, sampler)
```

## Sending data to Zipkin
Zipkin supports transports including Http and Kafka. You can configure Jaeger to send to a Zipkin server with
`ZipkinSender`.
There are two ways to send spans to a Zipkin server:

### Thrift
If you want to send Zipkin v1 Thrift-encoded spans, you should use the `ZipkinSender` sender, which
wraps a Zipkin sender class to enable the use of various transports such as HTTP and Kafka.

For example:
```java
import io.jaegertracing.senders.zipkin.ZipkinSender;

reporter = new RemoteReporter(ZipkinSender.create("http://localhost:9411/api/v1/spans"));
tracer = new Tracer.Builder(serviceName, reporter, sampler)
...
```

### Zipkin 2 Reporters
You can reuse a Zipkin 2 reporter instance as-is by using `Zipkin2Reporter`, which adapts a Zipkin
2 reporter to the Jaeger reporter interface and deals with converting Jaeger spans to the Zipkin 2
model.

For example:
```java
import io.jaegertracing.zipkin.reporters.Zipkin2Reporter;
import zipkin2.reporter.AsyncReporter;
import zipkin2.reporter.urlconnection.URLConnectionSender;

reporter = new Zipkin2Reporter(
AsyncReporter.create(URLConnectionSender.create("http://localhost:9411/api/v2/spans")));

tracer = new Tracer.Builder(serviceName)
.withReporter(reporter)
...
.build()
```

This will send spans to the Zipkin v2 endpoint using the v2 JSON encoding.
17 changes: 14 additions & 3 deletions jaeger-zipkin/build.gradle
Original file line number Diff line number Diff line change
@@ -1,12 +1,23 @@
description = 'Integration library for Zipkin'

dependencies {
compile group: 'io.zipkin.reporter', name: 'zipkin-reporter', version: '1.1.2'
compile group: 'io.zipkin.reporter', name: 'zipkin-sender-urlconnection', version: '1.1.2'
compile(group: 'io.zipkin.reporter', name: 'zipkin-reporter', version: '1.1.2') {
exclude group:'io.zipkin.zipkin2'
exclude group:'io.zipkin.java'
}
compile(group: 'io.zipkin.reporter', name: 'zipkin-sender-urlconnection', version: '1.1.2') {
exclude group:'io.zipkin.zipkin2'
exclude group:'io.zipkin.java'
}

compile group: 'io.zipkin.java', name: 'zipkin', version: '2.8.1'
compile group: 'io.zipkin.reporter2', name: 'zipkin-reporter', version: '2.6.0'

compile project(path: ':jaeger-core', configuration: 'shadow')

testCompile group: 'io.zipkin.reporter2', name: 'zipkin-sender-urlconnection', version: '2.6.0'
testCompile group: 'junit', name: 'junit', version: junitVersion
testCompile group: 'io.zipkin.java', name: 'zipkin-junit', version: '2.3.0'
testCompile group: 'io.zipkin.java', name: 'zipkin-junit', version: '2.7.1'
testCompile group: 'com.tngtech.java', name: 'junit-dataprovider', version: junitDataProviderVersion

signature 'org.codehaus.mojo.signature:java16:1.1@signature'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.twitter.zipkin.thriftjava.zipkincoreConstants;
import io.jaegertracing.exceptions.SenderException;
import io.jaegertracing.senders.Sender;
import io.jaegertracing.zipkin.ThriftSpanConverter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -33,7 +34,8 @@

/**
* This sends (TBinaryProtocol big-endian) encoded spans to a Zipkin Collector (usually a
* zipkin-server).
* zipkin-server). If you want to send newer Zipkin V2 spans in protocols other than Thrift,
* see {@link io.jaegertracing.zipkin.reporters.Zipkin2Reporter Zipkin2Reporter}.
*
* <p>
* Example usage:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright (c) 2018, The Jaeger Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package io.jaegertracing.zipkin;

import io.jaegertracing.Span;
import io.opentracing.tag.Tags;

/**
* Logic that is common to both Thrift v1 and JSON v2 senders
*/
class ConverterUtil {
static boolean isRpcServer(Span span) {
return Tags.SPAN_KIND_SERVER.equals(span.getTags().get(Tags.SPAN_KIND.getKey()));
}

static boolean isRpc(Span span) {
return isRpcServer(span) || isRpcClient(span);

}

static boolean isRpcClient(Span span) {
return Tags.SPAN_KIND_CLIENT.equals(span.getTags().get(Tags.SPAN_KIND.getKey()));
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
* the License.
*/

package io.jaegertracing.senders.zipkin;
package io.jaegertracing.zipkin;

import com.google.gson.Gson;
import com.twitter.zipkin.thriftjava.Annotation;
Expand Down Expand Up @@ -56,10 +56,10 @@ public static com.twitter.zipkin.thriftjava.Span convertSpan(Span span) {
private static List<Annotation> buildAnnotations(Span span, Endpoint host) {
List<Annotation> annotations = new ArrayList<Annotation>();

if (isRpc(span)) {
if (ConverterUtil.isRpc(span)) {
String startLabel = zipkincoreConstants.SERVER_RECV;
String endLabel = zipkincoreConstants.SERVER_SEND;
if (isRpcClient(span)) {
if (ConverterUtil.isRpcClient(span)) {
startLabel = zipkincoreConstants.CLIENT_SEND;
endLabel = zipkincoreConstants.CLIENT_RECV;
}
Expand Down Expand Up @@ -87,9 +87,9 @@ private static List<Annotation> buildAnnotations(Span span, Endpoint host) {
private static List<BinaryAnnotation> buildBinaryAnnotations(Span span, Endpoint host) {
List<BinaryAnnotation> binaryAnnotations = new ArrayList<BinaryAnnotation>();
Map<String, Object> tags = span.getTags();
boolean isRpc = isRpc(span);
boolean isClient = isRpcClient(span);
boolean firstSpanInProcess = span.getReferences().isEmpty() || isRpcServer(span);
boolean isRpc = ConverterUtil.isRpc(span);
boolean isClient = ConverterUtil.isRpcClient(span);
boolean firstSpanInProcess = span.getReferences().isEmpty() || ConverterUtil.isRpcServer(span);

if (firstSpanInProcess) {
Map<String, ?> processTags = span.getTracer().tags();
Expand Down Expand Up @@ -154,20 +154,6 @@ private static BinaryAnnotation buildBinaryAnnotation(String tagKey, Object tagV
return banno;
}

static boolean isRpcServer(Span span) {
return Tags.SPAN_KIND_SERVER.equals(span.getTags().get(Tags.SPAN_KIND.getKey()));
}

static boolean isRpc(Span span) {
Object spanKindValue = span.getTags().get(Tags.SPAN_KIND.getKey());
return Tags.SPAN_KIND_CLIENT.equals(spanKindValue) || Tags.SPAN_KIND_SERVER.equals(spanKindValue);

}

static boolean isRpcClient(Span span) {
return Tags.SPAN_KIND_CLIENT.equals(span.getTags().get(Tags.SPAN_KIND.getKey()));
}

/**
* Extract peer Endpoint from tags
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
/*
* Copyright (c) 2018, The Jaeger Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package io.jaegertracing.zipkin;

import com.google.gson.Gson;
import io.jaegertracing.Constants;
import io.jaegertracing.LogData;
import io.jaegertracing.Span;
import io.jaegertracing.SpanContext;
import io.jaegertracing.Tracer;
import io.opentracing.tag.Tags;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;

/**
* Converts a Jaeger span to a Zipkin2 span.
*/
public class V2SpanConverter {

private static final Gson gson = new Gson();

public static zipkin2.Span convertSpan(Span span) {
Tracer tracer = span.getTracer();
zipkin2.Endpoint host = zipkin2.Endpoint.newBuilder()
.ip(convertIp(tracer.getIpv4()))
.serviceName(tracer.getServiceName())
.build();

zipkin2.Endpoint peerEndpoint = extractPeerEndpoint(span.getTags());

SpanContext context = span.context();
zipkin2.Span.Builder builder = zipkin2.Span.newBuilder()
.id(Long.toHexString(context.getSpanId()))
.traceId(Long.toHexString(context.getTraceId()))
.name(span.getOperationName())
.parentId(Long.toHexString(context.getParentId()))
.debug(context.isDebug())
.localEndpoint(host)
.remoteEndpoint(peerEndpoint)
.kind(convertKind(span.getTags().get(Tags.SPAN_KIND.getKey())))
.timestamp(span.getStart())
.duration(span.getDuration());

buildAnnotations(span, builder);
buildTags(span, builder);

return builder.build();
}

private static zipkin2.Span.Kind convertKind(Object kind) {
if (Tags.SPAN_KIND_SERVER.equals(kind)) {
return zipkin2.Span.Kind.SERVER;
} else if (Tags.SPAN_KIND_CLIENT.equals(kind)) {
return zipkin2.Span.Kind.CLIENT;
} else if (Tags.SPAN_KIND_CONSUMER.equals(kind)) {
return zipkin2.Span.Kind.CONSUMER;
} else if (Tags.SPAN_KIND_PRODUCER.equals(kind)) {
return zipkin2.Span.Kind.PRODUCER;
} else {
return null;
}
}

private static void buildAnnotations(Span span, zipkin2.Span.Builder builder) {
if (ConverterUtil.isRpc(span)) {
String startLabel = zipkin.Constants.SERVER_RECV;
String endLabel = zipkin.Constants.SERVER_SEND;
if (ConverterUtil.isRpcClient(span)) {
startLabel = zipkin.Constants.CLIENT_SEND;
endLabel = zipkin.Constants.CLIENT_RECV;
}

builder.addAnnotation(span.getStart(), startLabel);
builder.addAnnotation(span.getStart() + span.getDuration(), endLabel);
}

List<LogData> logs = span.getLogs();
if (logs != null) {
for (LogData logData : logs) {
String logMessage = logData.getMessage();
Map<String, ?> logFields = logData.getFields();
if (logMessage != null) {
builder.addAnnotation(logData.getTime(), logMessage);
} else if (logFields != null) {
builder.addAnnotation(logData.getTime(), gson.toJson(logFields));
}
}
}
}

private static void buildTags(Span span, zipkin2.Span.Builder builder) {
Map<String, Object> tags = span.getTags();
boolean isRpc = ConverterUtil.isRpc(span);
boolean firstSpanInProcess = span.getReferences().isEmpty() || ConverterUtil.isRpcServer(span);

if (firstSpanInProcess) {
Map<String, ?> processTags = span.getTracer().tags();
// add tracer tags to first zipkin span in a process but remove "ip" tag as it is
// taken care of separately.
for (Map.Entry<String, ?> entry : processTags.entrySet()) {
String tagKey = entry.getKey();
if (!Constants.TRACER_IP_TAG_KEY.equals(tagKey)) {
Object tagValue = entry.getValue();
// add a tracer. prefix to process tags for zipkin
builder.putTag("tracer." + tagKey, tagValue.toString());
}
}
}

if (!isRpc) {
String componentName;
Object componentTag = tags.get(Tags.COMPONENT.getKey());
if (componentTag instanceof String) {
componentName = componentTag.toString();
} else {
// spans always have associated tracers, and service names
componentName = span.getTracer().getServiceName();
}

builder.putTag(zipkin.Constants.LOCAL_COMPONENT, componentName);
}

if (tags != null) {
for (Map.Entry<String, Object> entry : tags.entrySet()) {
String tagKey = entry.getKey();
// Every value is converted to string because zipkin search doesn't
// work well with ints, and bytes.
Object tagValue = entry.getValue();
builder.putTag(tagKey, tagValue.toString());
}
}
}

private static InetAddress convertIp(int ip) {
byte[] bytes = ByteBuffer.allocate(4).putInt(ip).array();
try {
return InetAddress.getByAddress(bytes);
} catch (UnknownHostException e) {
return null;
}
}

/**
* Extract peer Endpoint from tags
*
* @param tags tags
* @return null or peer endpoint
*/
public static zipkin2.Endpoint extractPeerEndpoint(Map<String, Object> tags) {
Object peerIpv4 = tags.get(Tags.PEER_HOST_IPV4.getKey());
Object peerPort = tags.get(Tags.PEER_PORT.getKey());
Object peerService = tags.get(Tags.PEER_SERVICE.getKey());

if (peerIpv4 == null && peerPort == null && peerService == null) {
return null;
}

zipkin2.Endpoint.Builder builder = zipkin2.Endpoint.newBuilder();

if (peerIpv4 instanceof String) {
builder.ip((String) peerIpv4);
}
if (peerPort instanceof Number) {
builder.port(((Number) peerPort).intValue());
}
if (peerService instanceof String) {
builder.serviceName((String) peerService);
}

return builder.build();
}
}
Loading

0 comments on commit ee27972

Please sign in to comment.