Skip to content
This repository has been archived by the owner on Jul 1, 2022. It is now read-only.

Commit

Permalink
Make GenericZipkinSender to share logic between Zipkin 1 and 2 senders
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Keith <bkeith@signalfx.com>
  • Loading branch information
Ben Keith committed May 2, 2018
1 parent 5050c73 commit 8724fef
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 129 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Copyright (c) 2016, Uber Technologies, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package io.jaegertracing.senders.zipkin;

import io.jaegertracing.exceptions.SenderException;
import io.jaegertracing.senders.Sender;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import lombok.ToString;

@ToString(exclude = "spanBuffer")
public abstract class GenericZipkinSender implements Sender {

final List<byte[]> spanBuffer;

protected GenericZipkinSender() {
this.spanBuffer = new ArrayList<byte[]>();
}

public abstract byte[] convertAndEncodeSpan(io.jaegertracing.Span span);

public abstract int messageSizeInBytes(List<byte[]> msg);

public abstract int messageMaxBytes();

public abstract void sendSpans(List<byte[]> spanBuffer) throws IOException;

public abstract void closeSender() throws IOException;

public abstract String delegateToString();

/*
* Adds spans to an internal queue that flushes them as udp packets later.
* This function does not need to be synchronized because the reporter creates
* a single thread that calls this append function
*/
@Override
public int append(io.jaegertracing.Span span) throws SenderException {
byte[] next = this.convertAndEncodeSpan(span);
int messageSizeOfNextSpan = this.messageSizeInBytes(Collections.singletonList(next));
// don't enqueue something larger than we can drain
if (messageSizeOfNextSpan > this.messageMaxBytes()) {
throw new SenderException(
delegateToString() + " received a span that was too large", null, 1);
}

spanBuffer.add(next); // speculatively add to the buffer so we can size it
int nextSizeInBytes = this.messageSizeInBytes(spanBuffer);
// If we can fit queued spans and the next into one message...
if (nextSizeInBytes <= this.messageMaxBytes()) {

// If there's still room, don't flush yet.
if (nextSizeInBytes < this.messageMaxBytes()) {
return 0;
}
// If we have exactly met the max message size, flush
return flush();
}

// Otherwise, remove speculatively added span and flush until we have room for it.
spanBuffer.remove(spanBuffer.size() - 1);
int n;
try {
n = flush();
} catch (SenderException e) {
// +1 for the span not submitted in the buffer above
throw new SenderException(e.getMessage(), e.getCause(), e.getDroppedSpanCount() + 1);
}

// Now that there's room, add the span as the only element in the buffer
spanBuffer.add(next);
return n;
}

@Override
public int flush() throws SenderException {
if (spanBuffer.isEmpty()) {
return 0;
}

int n = spanBuffer.size();
try {
this.sendSpans(spanBuffer);
} catch (RuntimeException e) {
throw new SenderException("Failed to flush spans.", e, n);
} catch (IOException e) {
throw new SenderException("Failed to flush spans.", e, n);
} finally {
spanBuffer.clear();
}
return n;
}

@Override
public int close() throws SenderException {
int n = flush();
try {
this.closeSender();
} catch (IOException e) {
throw new SenderException("Failed to close " + delegateToString(), e, n);
}
return n;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@
* <p>
* See https://github.com/openzipkin/zipkin/tree/master/zipkin-server
*/
@ToString(exclude = "spanBuffer")
public final class Zipkin2Sender implements Sender {
public final class Zipkin2Sender extends GenericZipkinSender {

static final BytesEncoder<Span> DEFAULT_ENCODER = SpanBytesEncoder.JSON_V2;

Expand Down Expand Up @@ -85,85 +84,40 @@ public static Zipkin2Sender create(zipkin2.reporter.Sender delegate, BytesEncode

final BytesEncoder<Span> encoder;
final zipkin2.reporter.Sender delegate;
final List<byte[]> spanBuffer;

Zipkin2Sender(zipkin2.reporter.Sender delegate, BytesEncoder<Span> encoder) {
this.delegate = delegate;
this.encoder = encoder;
this.spanBuffer = new ArrayList<byte[]>();
}

/*
* Adds spans to an internal queue that flushes them with the configured delegate later.
* This function does not need to be synchronized because the reporter creates
* a single thread that calls this append function
*/
@Override
public int append(io.jaegertracing.Span span) throws SenderException {
byte[] next = encoder.encode(V2SpanConverter.convertSpan(span));
int messageSizeOfNextSpan = delegate.messageSizeInBytes(Collections.singletonList(next));
// don't enqueue something larger than we can drain
if (messageSizeOfNextSpan > delegate.messageMaxBytes()) {
throw new SenderException(
delegate.toString() + " received a span that was too large", null, 1);
}

spanBuffer.add(next); // speculatively add to the buffer so we can size it
int nextSizeInBytes = delegate.messageSizeInBytes(spanBuffer);
// If we can fit queued spans and the next into one message...
if (nextSizeInBytes <= delegate.messageMaxBytes()) {

// If there's still room, don't flush yet.
if (nextSizeInBytes < delegate.messageMaxBytes()) {
return 0;
}
// If we have exactly met the max message size, flush
return flush();
}

// Otherwise, remove speculatively added span and flush until we have room for it.
spanBuffer.remove(spanBuffer.size() - 1);
int n;
try {
n = flush();
} catch (SenderException e) {
// +1 for the span not submitted in the buffer above
throw new SenderException(e.getMessage(), e.getCause(), e.getDroppedSpanCount() + 1);
}

// Now that there's room, add the span as the only element in the buffer
spanBuffer.add(next);
return n;
public byte[] convertAndEncodeSpan(io.jaegertracing.Span span) {
return encoder.encode(V2SpanConverter.convertSpan(span));
}

@Override
public int messageSizeInBytes(List<byte[]> msg) {
return delegate.messageSizeInBytes(msg);
}

@Override
public int messageMaxBytes() {
return delegate.messageMaxBytes();
}

@Override
public void sendSpans(List<byte[]> spanBuffer) throws IOException {
Call<Void> call = delegate.sendSpans(spanBuffer);
call.execute();
}

@Override
public int flush() throws SenderException {
if (spanBuffer.isEmpty()) {
return 0;
}

int n = spanBuffer.size();
try {
Call<Void> call = delegate.sendSpans(spanBuffer);
call.execute();
} catch (RuntimeException e) {
throw new SenderException("Failed to flush spans.", e, n);
} catch (IOException e) {
throw new SenderException("Failed to flush spans.", e, n);
} finally {
spanBuffer.clear();
}
return n;
public void closeSender() throws IOException {
delegate.close();
}

@Override
public int close() throws SenderException {
int n = flush();
try {
delegate.close();
} catch (IOException e) {
throw new SenderException("Failed to close " + delegate, e, n);
}
return n;
public String delegateToString() {
return delegate.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@
* <p>
* See https://github.com/openzipkin/zipkin/tree/master/zipkin-server
*/
@ToString(exclude = "spanBuffer")
public final class ZipkinSender implements Sender {
public final class ZipkinSender extends GenericZipkinSender {

/**
* @param endpoint The POST URL for zipkin's <a href="http://zipkin.io/zipkin-api/#/">v1 api</a>,
Expand Down Expand Up @@ -79,84 +78,45 @@ public static ZipkinSender create(zipkin.reporter.Sender delegate) {

final ThriftSpanEncoder encoder = new ThriftSpanEncoder();
final zipkin.reporter.Sender delegate;
final List<byte[]> spanBuffer;

ZipkinSender(zipkin.reporter.Sender delegate) {
this.delegate = delegate;
this.spanBuffer = new ArrayList<byte[]>();
}

/*
* Adds spans to an internal queue that flushes them as udp packets later.
* This function does not need to be synchronized because the reporter creates
* a single thread that calls this append function
*/
@Override
public int append(io.jaegertracing.Span span) throws SenderException {
byte[] next = encoder.encode(backFillHostOnAnnotations(ThriftSpanConverter.convertSpan(span)));
int messageSizeOfNextSpan = delegate.messageSizeInBytes(Collections.singletonList(next));
// don't enqueue something larger than we can drain
if (messageSizeOfNextSpan > delegate.messageMaxBytes()) {
throw new SenderException(
delegate.toString() + " received a span that was too large", null, 1);
}

spanBuffer.add(next); // speculatively add to the buffer so we can size it
int nextSizeInBytes = delegate.messageSizeInBytes(spanBuffer);
// If we can fit queued spans and the next into one message...
if (nextSizeInBytes <= delegate.messageMaxBytes()) {

// If there's still room, don't flush yet.
if (nextSizeInBytes < delegate.messageMaxBytes()) {
return 0;
}
// If we have exactly met the max message size, flush
return flush();
}

// Otherwise, remove speculatively added span and flush until we have room for it.
spanBuffer.remove(spanBuffer.size() - 1);
int n;
try {
n = flush();
} catch (SenderException e) {
// +1 for the span not submitted in the buffer above
throw new SenderException(e.getMessage(), e.getCause(), e.getDroppedSpanCount() + 1);
}
public byte[] convertAndEncodeSpan(io.jaegertracing.Span span) {
return encoder.encode(backFillHostOnAnnotations(ThriftSpanConverter.convertSpan(span)));
}

// Now that there's room, add the span as the only element in the buffer
spanBuffer.add(next);
return n;
@Override
public int messageSizeInBytes(List<byte[]> msg) {
return delegate.messageSizeInBytes(msg);
}

@Override
public int flush() throws SenderException {
if (spanBuffer.isEmpty()) {
return 0;
}
public int messageMaxBytes() {
return delegate.messageMaxBytes();
}

@Override
public void sendSpans(List<byte[]> spanBuffer) throws IOException {
AwaitableCallback captor = new AwaitableCallback();
int n = spanBuffer.size();
try {
delegate.sendSpans(spanBuffer, captor);
captor.await();
} catch (RuntimeException e) {
throw new SenderException("Failed to flush spans.", e, n);
} finally {
spanBuffer.clear();
throw new IOException("Failed to send span", e);
}
return n;
}

@Override
public int close() throws SenderException {
int n = flush();
try {
delegate.close();
} catch (IOException e) {
throw new SenderException("Failed to close " + delegate, e, n);
}
return n;
public void closeSender() throws IOException {
delegate.close();
}

@Override
public String delegateToString() {
return delegate.toString();
}

// serviceName/host is needed on annotations so zipkin can query
Expand Down

0 comments on commit 8724fef

Please sign in to comment.