diff --git a/brave/src/main/java/zipkin2/reporter/brave/AsyncZipkinSpanHandler.java b/brave/src/main/java/zipkin2/reporter/brave/AsyncZipkinSpanHandler.java index d9402b56..c52d7d20 100644 --- a/brave/src/main/java/zipkin2/reporter/brave/AsyncZipkinSpanHandler.java +++ b/brave/src/main/java/zipkin2/reporter/brave/AsyncZipkinSpanHandler.java @@ -21,6 +21,8 @@ import java.io.Flushable; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import zipkin2.reporter.BytesEncoder; +import zipkin2.reporter.Encoding; import zipkin2.reporter.Reporter; import zipkin2.reporter.ReporterMetrics; import zipkin2.reporter.Sender; @@ -69,15 +71,18 @@ public Builder toBuilder() { /** @since 2.14 */ public static final class Builder extends ZipkinSpanHandler.Builder { final AsyncReporter.Builder delegate; + final Encoding encoding; Builder(AsyncZipkinSpanHandler handler) { this.delegate = ((AsyncReporter) handler.spanReporter).toBuilder(); + this.encoding = handler.encoding; this.alwaysReportSpans = handler.alwaysReportSpans; this.errorTag = handler.errorTag; } Builder(Sender sender) { this.delegate = AsyncReporter.newBuilder(sender); + this.encoding = sender.encoding(); } /** @@ -151,18 +156,42 @@ public Builder queuedMaxBytes(int queuedMaxBytes) { return (Builder) super.alwaysReportSpans(alwaysReportSpans); } + /** + * Builds an async span handler that encodes zipkin spans according to the sender's encoding. + */ // AsyncZipkinSpanHandler not SpanHandler, so that Flushable and Closeable are accessible public AsyncZipkinSpanHandler build() { - return new AsyncZipkinSpanHandler(this); + switch (encoding) { + case JSON: + return build(new JsonV2Encoder(errorTag)); + default: + throw new UnsupportedOperationException(encoding.name()); + } + } + + /** + * Builds an async span handler that encodes zipkin spans according to the encoder. + * + *

Note: The input encoder must use the same error tag implementation as configured by + * {@link #errorTag(Tag)}. + * + * @since 3.1 + */ + // AsyncZipkinSpanHandler not SpanHandler, so that Flushable and Closeable are accessible + public AsyncZipkinSpanHandler build(BytesEncoder encoder) { + if (encoder == null) throw new NullPointerException("encoder == null"); + return new AsyncZipkinSpanHandler(delegate.build(encoder), this); } } final Reporter spanReporter; + final Encoding encoding; final Tag errorTag; // for toBuilder() final boolean alwaysReportSpans; - AsyncZipkinSpanHandler(Builder builder) { - this.spanReporter = builder.delegate.build(new JsonV2Encoder(builder.errorTag)); + AsyncZipkinSpanHandler(AsyncReporter spanReporter, Builder builder) { + this.spanReporter = spanReporter; + this.encoding = builder.encoding; this.errorTag = builder.errorTag; this.alwaysReportSpans = builder.alwaysReportSpans; } diff --git a/brave/src/test/java/zipkin2/reporter/brave/AsyncZipkinSpanHandlerTest.java b/brave/src/test/java/zipkin2/reporter/brave/AsyncZipkinSpanHandlerTest.java new file mode 100644 index 00000000..575ee58c --- /dev/null +++ b/brave/src/test/java/zipkin2/reporter/brave/AsyncZipkinSpanHandlerTest.java @@ -0,0 +1,53 @@ +/* + * Copyright 2016-2024 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin2.reporter.brave; + +import brave.handler.MutableSpan; +import org.junit.jupiter.api.Test; +import zipkin2.reporter.BytesEncoder; +import zipkin2.reporter.Encoding; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; + +class AsyncZipkinSpanHandlerTest { + @Test void build_protoNotYetSupported() { + FakeSender sender = FakeSender.create().encoding(Encoding.PROTO3); + AsyncZipkinSpanHandler.Builder builder = AsyncZipkinSpanHandler.newBuilder(sender); + assertThrows(UnsupportedOperationException.class, builder::build); + } + + /** Ready for custom format such as OTLP or Stackdriver. */ + @Test void build_customProtoEncoder() { + FakeSender sender = FakeSender.create().encoding(Encoding.PROTO3); + AsyncZipkinSpanHandler.Builder builder = AsyncZipkinSpanHandler.newBuilder(sender); + BytesEncoder protoEncoder = new BytesEncoder<>() { + @Override public Encoding encoding() { + return Encoding.PROTO3; + } + + @Override public int sizeInBytes(MutableSpan input) { + return 0; + } + + @Override public byte[] encode(MutableSpan input) { + return new byte[0]; + } + }; + + try (AsyncZipkinSpanHandler spanReporter = builder.build(protoEncoder)) { + assertThat(spanReporter).isNotNull(); + } + } +} diff --git a/brave/src/test/java/zipkin2/reporter/brave/FakeSender.java b/brave/src/test/java/zipkin2/reporter/brave/FakeSender.java new file mode 100644 index 00000000..5d071c32 --- /dev/null +++ b/brave/src/test/java/zipkin2/reporter/brave/FakeSender.java @@ -0,0 +1,96 @@ +/* + * Copyright 2016-2024 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin2.reporter.brave; + +import java.util.List; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import zipkin2.Span; +import zipkin2.codec.BytesDecoder; +import zipkin2.codec.SpanBytesDecoder; +import zipkin2.reporter.BytesEncoder; +import zipkin2.reporter.BytesMessageEncoder; +import zipkin2.reporter.Call; +import zipkin2.reporter.ClosedSenderException; +import zipkin2.reporter.Encoding; +import zipkin2.reporter.Sender; +import zipkin2.reporter.SpanBytesEncoder; + +public final class FakeSender extends Sender { + + public static FakeSender create() { + return new FakeSender(Encoding.JSON, Integer.MAX_VALUE, + BytesMessageEncoder.forEncoding(Encoding.JSON), SpanBytesEncoder.JSON_V2, + SpanBytesDecoder.JSON_V2, spans -> { + }); + } + + final Encoding encoding; + final int messageMaxBytes; + final BytesMessageEncoder messageEncoder; + final BytesEncoder encoder; + final BytesDecoder decoder; + final Consumer> onSpans; + + FakeSender(Encoding encoding, int messageMaxBytes, BytesMessageEncoder messageEncoder, + BytesEncoder encoder, BytesDecoder decoder, Consumer> onSpans) { + this.encoding = encoding; + this.messageMaxBytes = messageMaxBytes; + this.messageEncoder = messageEncoder; + this.encoder = encoder; + this.decoder = decoder; + this.onSpans = onSpans; + } + + FakeSender encoding(Encoding encoding) { + return new FakeSender(encoding, messageMaxBytes, messageEncoder, // invalid but not needed, yet + encoder, // invalid but not needed, yet + decoder, // invalid but not needed, yet + onSpans); + } + + @Override public Encoding encoding() { + return encoding; + } + + @Override public int messageMaxBytes() { + return messageMaxBytes; + } + + @Override public int messageSizeInBytes(List encodedSpans) { + return encoding.listSizeInBytes(encodedSpans); + } + + @Override public int messageSizeInBytes(int encodedSizeInBytes) { + return encoding.listSizeInBytes(encodedSizeInBytes); + } + + /** close is typically called from a different thread */ + volatile boolean closeCalled; + + @Override public Call sendSpans(List encodedSpans) { + if (closeCalled) throw new ClosedSenderException(); + List decoded = encodedSpans.stream().map(decoder::decodeOne).collect(Collectors.toList()); + onSpans.accept(decoded); + return Call.create(null); + } + + @Override public void close() { + closeCalled = true; + } + + @Override public String toString() { + return "FakeSender"; + } +}