From 60b8584fddbcd653ae23d5b3a290346ee3f323ed Mon Sep 17 00:00:00 2001 From: Ben Keith Date: Wed, 2 May 2018 17:08:46 -0400 Subject: [PATCH] Make GenericZipkinSender to share logic between Zipkin 1 and 2 senders Signed-off-by: Ben Keith --- .../senders/zipkin/GenericZipkinSender.java | 118 ++++++++++++++++++ .../senders/zipkin/Zipkin2Sender.java | 92 ++++---------- .../senders/zipkin/ZipkinSender.java | 80 +++--------- 3 files changed, 161 insertions(+), 129 deletions(-) create mode 100644 jaeger-zipkin/src/main/java/io/jaegertracing/senders/zipkin/GenericZipkinSender.java diff --git a/jaeger-zipkin/src/main/java/io/jaegertracing/senders/zipkin/GenericZipkinSender.java b/jaeger-zipkin/src/main/java/io/jaegertracing/senders/zipkin/GenericZipkinSender.java new file mode 100644 index 000000000..c65422ede --- /dev/null +++ b/jaeger-zipkin/src/main/java/io/jaegertracing/senders/zipkin/GenericZipkinSender.java @@ -0,0 +1,118 @@ +/* + * Copyright (c) 2016, Uber Technologies, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package io.jaegertracing.senders.zipkin; + +import io.jaegertracing.exceptions.SenderException; +import io.jaegertracing.senders.Sender; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import lombok.ToString; + +@ToString(exclude = "spanBuffer") +public abstract class GenericZipkinSender implements Sender { + + final List spanBuffer; + + protected GenericZipkinSender() { + this.spanBuffer = new ArrayList(); + } + + public abstract byte[] convertAndEncodeSpan(io.jaegertracing.Span span); + + public abstract int messageSizeInBytes(List msg); + + public abstract int messageMaxBytes(); + + public abstract void sendSpans(List spanBuffer) throws IOException; + + public abstract void closeSender() throws IOException; + + public abstract String delegateToString(); + + /* + * Adds spans to an internal queue that flushes them as udp packets later. + * This function does not need to be synchronized because the reporter creates + * a single thread that calls this append function + */ + @Override + public int append(io.jaegertracing.Span span) throws SenderException { + byte[] next = this.convertAndEncodeSpan(span); + int messageSizeOfNextSpan = this.messageSizeInBytes(Collections.singletonList(next)); + // don't enqueue something larger than we can drain + if (messageSizeOfNextSpan > this.messageMaxBytes()) { + throw new SenderException( + delegateToString() + " received a span that was too large", null, 1); + } + + spanBuffer.add(next); // speculatively add to the buffer so we can size it + int nextSizeInBytes = this.messageSizeInBytes(spanBuffer); + // If we can fit queued spans and the next into one message... + if (nextSizeInBytes <= this.messageMaxBytes()) { + + // If there's still room, don't flush yet. + if (nextSizeInBytes < this.messageMaxBytes()) { + return 0; + } + // If we have exactly met the max message size, flush + return flush(); + } + + // Otherwise, remove speculatively added span and flush until we have room for it. + spanBuffer.remove(spanBuffer.size() - 1); + int n; + try { + n = flush(); + } catch (SenderException e) { + // +1 for the span not submitted in the buffer above + throw new SenderException(e.getMessage(), e.getCause(), e.getDroppedSpanCount() + 1); + } + + // Now that there's room, add the span as the only element in the buffer + spanBuffer.add(next); + return n; + } + + @Override + public int flush() throws SenderException { + if (spanBuffer.isEmpty()) { + return 0; + } + + int n = spanBuffer.size(); + try { + this.sendSpans(spanBuffer); + } catch (RuntimeException e) { + throw new SenderException("Failed to flush spans.", e, n); + } catch (IOException e) { + throw new SenderException("Failed to flush spans.", e, n); + } finally { + spanBuffer.clear(); + } + return n; + } + + @Override + public int close() throws SenderException { + int n = flush(); + try { + this.closeSender(); + } catch (IOException e) { + throw new SenderException("Failed to close " + delegateToString(), e, n); + } + return n; + } +} diff --git a/jaeger-zipkin/src/main/java/io/jaegertracing/senders/zipkin/Zipkin2Sender.java b/jaeger-zipkin/src/main/java/io/jaegertracing/senders/zipkin/Zipkin2Sender.java index 3dcf4baea..bfaa1168e 100644 --- a/jaeger-zipkin/src/main/java/io/jaegertracing/senders/zipkin/Zipkin2Sender.java +++ b/jaeger-zipkin/src/main/java/io/jaegertracing/senders/zipkin/Zipkin2Sender.java @@ -43,8 +43,7 @@ *

* See https://github.com/openzipkin/zipkin/tree/master/zipkin-server */ -@ToString(exclude = "spanBuffer") -public final class Zipkin2Sender implements Sender { +public final class Zipkin2Sender extends GenericZipkinSender { static final BytesEncoder DEFAULT_ENCODER = SpanBytesEncoder.JSON_V2; @@ -85,85 +84,40 @@ public static Zipkin2Sender create(zipkin2.reporter.Sender delegate, BytesEncode final BytesEncoder encoder; final zipkin2.reporter.Sender delegate; - final List spanBuffer; Zipkin2Sender(zipkin2.reporter.Sender delegate, BytesEncoder encoder) { this.delegate = delegate; this.encoder = encoder; - this.spanBuffer = new ArrayList(); } - /* - * Adds spans to an internal queue that flushes them with the configured delegate later. - * This function does not need to be synchronized because the reporter creates - * a single thread that calls this append function - */ @Override - public int append(io.jaegertracing.Span span) throws SenderException { - byte[] next = encoder.encode(V2SpanConverter.convertSpan(span)); - int messageSizeOfNextSpan = delegate.messageSizeInBytes(Collections.singletonList(next)); - // don't enqueue something larger than we can drain - if (messageSizeOfNextSpan > delegate.messageMaxBytes()) { - throw new SenderException( - delegate.toString() + " received a span that was too large", null, 1); - } - - spanBuffer.add(next); // speculatively add to the buffer so we can size it - int nextSizeInBytes = delegate.messageSizeInBytes(spanBuffer); - // If we can fit queued spans and the next into one message... - if (nextSizeInBytes <= delegate.messageMaxBytes()) { - - // If there's still room, don't flush yet. - if (nextSizeInBytes < delegate.messageMaxBytes()) { - return 0; - } - // If we have exactly met the max message size, flush - return flush(); - } - - // Otherwise, remove speculatively added span and flush until we have room for it. - spanBuffer.remove(spanBuffer.size() - 1); - int n; - try { - n = flush(); - } catch (SenderException e) { - // +1 for the span not submitted in the buffer above - throw new SenderException(e.getMessage(), e.getCause(), e.getDroppedSpanCount() + 1); - } - - // Now that there's room, add the span as the only element in the buffer - spanBuffer.add(next); - return n; + public byte[] convertAndEncodeSpan(io.jaegertracing.Span span) { + return encoder.encode(V2SpanConverter.convertSpan(span)); + } + + @Override + public int messageSizeInBytes(List msg) { + return delegate.messageSizeInBytes(msg); + } + + @Override + public int messageMaxBytes() { + return delegate.messageMaxBytes(); + } + + @Override + public void sendSpans(List spanBuffer) throws IOException { + Call call = delegate.sendSpans(spanBuffer); + call.execute(); } @Override - public int flush() throws SenderException { - if (spanBuffer.isEmpty()) { - return 0; - } - - int n = spanBuffer.size(); - try { - Call call = delegate.sendSpans(spanBuffer); - call.execute(); - } catch (RuntimeException e) { - throw new SenderException("Failed to flush spans.", e, n); - } catch (IOException e) { - throw new SenderException("Failed to flush spans.", e, n); - } finally { - spanBuffer.clear(); - } - return n; + public void closeSender() throws IOException { + delegate.close(); } @Override - public int close() throws SenderException { - int n = flush(); - try { - delegate.close(); - } catch (IOException e) { - throw new SenderException("Failed to close " + delegate, e, n); - } - return n; + public String delegateToString() { + return delegate.toString(); } } diff --git a/jaeger-zipkin/src/main/java/io/jaegertracing/senders/zipkin/ZipkinSender.java b/jaeger-zipkin/src/main/java/io/jaegertracing/senders/zipkin/ZipkinSender.java index df25acc7f..0de8fb364 100644 --- a/jaeger-zipkin/src/main/java/io/jaegertracing/senders/zipkin/ZipkinSender.java +++ b/jaeger-zipkin/src/main/java/io/jaegertracing/senders/zipkin/ZipkinSender.java @@ -47,8 +47,7 @@ *

* See https://github.com/openzipkin/zipkin/tree/master/zipkin-server */ -@ToString(exclude = "spanBuffer") -public final class ZipkinSender implements Sender { +public final class ZipkinSender extends GenericZipkinSender { /** * @param endpoint The POST URL for zipkin's v1 api, @@ -79,84 +78,45 @@ public static ZipkinSender create(zipkin.reporter.Sender delegate) { final ThriftSpanEncoder encoder = new ThriftSpanEncoder(); final zipkin.reporter.Sender delegate; - final List spanBuffer; ZipkinSender(zipkin.reporter.Sender delegate) { this.delegate = delegate; - this.spanBuffer = new ArrayList(); } - /* - * Adds spans to an internal queue that flushes them as udp packets later. - * This function does not need to be synchronized because the reporter creates - * a single thread that calls this append function - */ @Override - public int append(io.jaegertracing.Span span) throws SenderException { - byte[] next = encoder.encode(backFillHostOnAnnotations(ThriftSpanConverter.convertSpan(span))); - int messageSizeOfNextSpan = delegate.messageSizeInBytes(Collections.singletonList(next)); - // don't enqueue something larger than we can drain - if (messageSizeOfNextSpan > delegate.messageMaxBytes()) { - throw new SenderException( - delegate.toString() + " received a span that was too large", null, 1); - } - - spanBuffer.add(next); // speculatively add to the buffer so we can size it - int nextSizeInBytes = delegate.messageSizeInBytes(spanBuffer); - // If we can fit queued spans and the next into one message... - if (nextSizeInBytes <= delegate.messageMaxBytes()) { - - // If there's still room, don't flush yet. - if (nextSizeInBytes < delegate.messageMaxBytes()) { - return 0; - } - // If we have exactly met the max message size, flush - return flush(); - } - - // Otherwise, remove speculatively added span and flush until we have room for it. - spanBuffer.remove(spanBuffer.size() - 1); - int n; - try { - n = flush(); - } catch (SenderException e) { - // +1 for the span not submitted in the buffer above - throw new SenderException(e.getMessage(), e.getCause(), e.getDroppedSpanCount() + 1); - } + public byte[] convertAndEncodeSpan(io.jaegertracing.Span span) { + return encoder.encode(backFillHostOnAnnotations(ThriftSpanConverter.convertSpan(span))); + } - // Now that there's room, add the span as the only element in the buffer - spanBuffer.add(next); - return n; + @Override + public int messageSizeInBytes(List msg) { + return delegate.messageSizeInBytes(msg); } @Override - public int flush() throws SenderException { - if (spanBuffer.isEmpty()) { - return 0; - } + public int messageMaxBytes() { + return delegate.messageMaxBytes(); + } + @Override + public void sendSpans(List spanBuffer) throws IOException { AwaitableCallback captor = new AwaitableCallback(); - int n = spanBuffer.size(); try { delegate.sendSpans(spanBuffer, captor); captor.await(); } catch (RuntimeException e) { - throw new SenderException("Failed to flush spans.", e, n); - } finally { - spanBuffer.clear(); + throw new IOException("Failed to send span", e); } - return n; } @Override - public int close() throws SenderException { - int n = flush(); - try { - delegate.close(); - } catch (IOException e) { - throw new SenderException("Failed to close " + delegate, e, n); - } - return n; + public void closeSender() throws IOException { + delegate.close(); + } + + @Override + public String delegateToString() { + return delegate.toString(); } // serviceName/host is needed on annotations so zipkin can query