Skip to content
This repository has been archived by the owner on Jul 1, 2022. It is now read-only.

Commit

Permalink
Implement Zipkin 2 JSON Sender
Browse files Browse the repository at this point in the history
The existing ZipkinSender class only supported the Thrift v1 protocol,
but the new Zipkin2Sender supports both v1 and v2 JSON.  Otherwise,
functionality is mostly the same.

There is some conceptually duplicate code in the Sender classes due to
the use of different classes between Zipkin 1/2 but it seemed overly
complicated to adapt them to a common adapter interface.

Signed-off-by: Ben Keith <bkeith@signalfx.com>
  • Loading branch information
Ben Keith committed Apr 17, 2018
1 parent 3e868f0 commit f4b4ed5
Show file tree
Hide file tree
Showing 8 changed files with 894 additions and 24 deletions.
6 changes: 5 additions & 1 deletion jaeger-zipkin/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,14 @@ description = 'Integration library for Zipkin'
dependencies {
compile group: 'io.zipkin.reporter', name: 'zipkin-reporter', version: '1.1.2'
compile group: 'io.zipkin.reporter', name: 'zipkin-sender-urlconnection', version: '1.1.2'

compile group: 'io.zipkin.reporter2', name: 'zipkin-reporter', version: '2.5.0'
compile group: 'io.zipkin.reporter2', name: 'zipkin-sender-urlconnection', version: '2.5.0'

compile project(path: ':jaeger-core', configuration: 'shadow')

testCompile group: 'junit', name: 'junit', version: junitVersion
testCompile group: 'io.zipkin.java', name: 'zipkin-junit', version: '2.3.0'
testCompile group: 'io.zipkin.java', name: 'zipkin-junit', version: '2.7.1'
testCompile group: 'io.zipkin.brave', name: 'brave-http', version: braveHttpVersion
testCompile group: 'com.tngtech.java', name: 'junit-dataprovider', version: junitDataProviderVersion

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright (c) 2018, The Jaeger Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package io.jaegertracing.senders.zipkin;

import io.jaegertracing.Span;
import io.opentracing.tag.Tags;

/**
* Logic that is common to both Thrift v1 and JSON v2 senders
*/
class ConverterUtil {
static boolean isRpcServer(Span span) {
return Tags.SPAN_KIND_SERVER.equals(span.getTags().get(Tags.SPAN_KIND.getKey()));
}

static boolean isRpc(Span span) {
Object spanKindValue = span.getTags().get(Tags.SPAN_KIND.getKey());
return Tags.SPAN_KIND_CLIENT.equals(spanKindValue) || Tags.SPAN_KIND_SERVER.equals(spanKindValue);

}

static boolean isRpcClient(Span span) {
return Tags.SPAN_KIND_CLIENT.equals(span.getTags().get(Tags.SPAN_KIND.getKey()));
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,10 @@ public static com.twitter.zipkin.thriftjava.Span convertSpan(Span span) {
private static List<Annotation> buildAnnotations(Span span, Endpoint host) {
List<Annotation> annotations = new ArrayList<Annotation>();

if (isRpc(span)) {
if (ConverterUtil.isRpc(span)) {
String startLabel = zipkincoreConstants.SERVER_RECV;
String endLabel = zipkincoreConstants.SERVER_SEND;
if (isRpcClient(span)) {
if (ConverterUtil.isRpcClient(span)) {
startLabel = zipkincoreConstants.CLIENT_SEND;
endLabel = zipkincoreConstants.CLIENT_RECV;
}
Expand Down Expand Up @@ -87,9 +87,9 @@ private static List<Annotation> buildAnnotations(Span span, Endpoint host) {
private static List<BinaryAnnotation> buildBinaryAnnotations(Span span, Endpoint host) {
List<BinaryAnnotation> binaryAnnotations = new ArrayList<BinaryAnnotation>();
Map<String, Object> tags = span.getTags();
boolean isRpc = isRpc(span);
boolean isClient = isRpcClient(span);
boolean firstSpanInProcess = span.getReferences().isEmpty() || isRpcServer(span);
boolean isRpc = ConverterUtil.isRpc(span);
boolean isClient = ConverterUtil.isRpcClient(span);
boolean firstSpanInProcess = span.getReferences().isEmpty() || ConverterUtil.isRpcServer(span);

if (firstSpanInProcess) {
Map<String, ?> processTags = span.getTracer().tags();
Expand Down Expand Up @@ -154,20 +154,6 @@ private static BinaryAnnotation buildBinaryAnnotation(String tagKey, Object tagV
return banno;
}

static boolean isRpcServer(Span span) {
return Tags.SPAN_KIND_SERVER.equals(span.getTags().get(Tags.SPAN_KIND.getKey()));
}

static boolean isRpc(Span span) {
Object spanKindValue = span.getTags().get(Tags.SPAN_KIND.getKey());
return Tags.SPAN_KIND_CLIENT.equals(spanKindValue) || Tags.SPAN_KIND_SERVER.equals(spanKindValue);

}

static boolean isRpcClient(Span span) {
return Tags.SPAN_KIND_CLIENT.equals(span.getTags().get(Tags.SPAN_KIND.getKey()));
}

/**
* Extract peer Endpoint from tags
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
/*
* Copyright (c) 2018, The Jaeger Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package io.jaegertracing.senders.zipkin;

import com.google.gson.Gson;
import io.jaegertracing.Constants;
import io.jaegertracing.LogData;
import io.jaegertracing.Span;
import io.jaegertracing.SpanContext;
import io.jaegertracing.Tracer;
import io.opentracing.tag.Tags;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import zipkin2.Endpoint;

/**
* Converts a Jaeger span to a Zipkin2 span.
*/
public class V2SpanConverter {

private static final Gson gson = new Gson();

public static zipkin2.Span convertSpan(Span span) {
Tracer tracer = span.getTracer();
Endpoint host = zipkin2.Endpoint.newBuilder()
.ip(convertIp(tracer.getIpv4()))
.port(0)
.serviceName(tracer.getServiceName())
.build();

Endpoint peerEndpoint = extractPeerEndpoint(span.getTags());

SpanContext context = span.context();
zipkin2.Span.Builder builder = zipkin2.Span.newBuilder()
.id(Long.toHexString(context.getSpanId()))
.traceId(Long.toHexString(context.getTraceId()))
.name(span.getOperationName())
.parentId(Long.toHexString(context.getParentId()))
.debug(context.isDebug())
.localEndpoint(host)
.remoteEndpoint(peerEndpoint)
.kind(convertKind(span.getTags().get(Tags.SPAN_KIND.getKey())))
.timestamp(span.getStart())
.duration(span.getDuration());

buildAnnotations(span, builder);
buildTags(span, builder);

return builder.build();
}

private static zipkin2.Span.Kind convertKind(Object kind) {
if (kind == null) {
return null;
} else if (kind.equals(Tags.SPAN_KIND_SERVER)) {
return zipkin2.Span.Kind.SERVER;
} else if (kind.equals(Tags.SPAN_KIND_CLIENT)) {
return zipkin2.Span.Kind.CLIENT;
} else if (kind.equals(Tags.SPAN_KIND_CONSUMER)) {
return zipkin2.Span.Kind.CONSUMER;
} else if (kind.equals(Tags.SPAN_KIND_PRODUCER)) {
return zipkin2.Span.Kind.PRODUCER;
} else {
return null;
}
}

private static void buildAnnotations(Span span, zipkin2.Span.Builder builder) {
if (ConverterUtil.isRpc(span)) {
String startLabel = zipkin.Constants.SERVER_RECV;
String endLabel = zipkin.Constants.SERVER_SEND;
if (ConverterUtil.isRpcClient(span)) {
startLabel = zipkin.Constants.CLIENT_SEND;
endLabel = zipkin.Constants.CLIENT_RECV;
}

builder.addAnnotation(span.getStart(), startLabel);
builder.addAnnotation(span.getStart() + span.getDuration(), endLabel);
}

List<LogData> logs = span.getLogs();
if (logs != null) {
for (LogData logData : logs) {
String logMessage = logData.getMessage();
Map<String, ?> logFields = logData.getFields();
if (logMessage != null) {
builder.addAnnotation(logData.getTime(), logMessage);
} else if (logFields != null) {
builder.addAnnotation(logData.getTime(), gson.toJson(logFields));
}
}
}
}

private static void buildTags(Span span, zipkin2.Span.Builder builder) {
Map<String, Object> tags = span.getTags();
boolean isRpc = ConverterUtil.isRpc(span);
boolean firstSpanInProcess = span.getReferences().isEmpty() || ConverterUtil.isRpcServer(span);

if (firstSpanInProcess) {
Map<String, ?> processTags = span.getTracer().tags();
// add tracer tags to first zipkin span in a process but remove "ip" tag as it is
// taken care of separately.
for (Map.Entry<String, ?> entry : processTags.entrySet()) {
String tagKey = entry.getKey();
if (!Constants.TRACER_IP_TAG_KEY.equals(tagKey)) {
Object tagValue = entry.getValue();
// add a tracer. prefix to process tags for zipkin
builder.putTag("tracer." + tagKey, tagValue.toString());
}
}
}

if (!isRpc) {
String componentName;
Object componentTag = tags.get(Tags.COMPONENT.getKey());
if (componentTag instanceof String) {
componentName = componentTag.toString();
} else {
// spans always have associated tracers, and service names
componentName = span.getTracer().getServiceName();
}

builder.putTag(zipkin.Constants.LOCAL_COMPONENT, componentName);
}

if (tags != null) {
for (Map.Entry<String, Object> entry : tags.entrySet()) {
String tagKey = entry.getKey();
// Every value is converted to string because zipkin search doesn't
// work well with ints, and bytes.
Object tagValue = entry.getValue();
builder.putTag(tagKey, tagValue.toString());
}
}
}

private static InetAddress convertIp(Integer ip) {
byte[] bytes = ByteBuffer.allocate(4).putInt(ip).array();
try {
return InetAddress.getByAddress(bytes);
} catch (UnknownHostException e) {
return null;
}
}

/**
* Extract peer Endpoint from tags
*
* @param tags tags
* @return null or peer endpoint
*/
public static Endpoint extractPeerEndpoint(Map<String, Object> tags) {
Object peerIpv4 = tags.get(Tags.PEER_HOST_IPV4.getKey());
Object peerPort = tags.get(Tags.PEER_PORT.getKey());
Object peerService = tags.get(Tags.PEER_SERVICE.getKey());

if (peerIpv4 == null && peerPort == null && peerService == null) {
return null;
}

Endpoint.Builder builder = Endpoint.newBuilder();

if (peerIpv4 instanceof Integer) {
builder.ip(convertIp((Integer) peerIpv4));
} else if (peerIpv4 instanceof String) {
builder.ip((String) peerIpv4);
}
if (peerPort instanceof Number) {
builder.port(((Number) peerPort).intValue());
}
if (peerService instanceof String) {
builder.serviceName((String) peerService);
}

return builder.build();
}
}
Loading

0 comments on commit f4b4ed5

Please sign in to comment.