From 3d1d869eec555af793c101e83822dc24cb96e339 Mon Sep 17 00:00:00 2001 From: Adrian Cole Date: Mon, 12 Feb 2024 22:39:56 +0800 Subject: [PATCH] Decreases switch statements and deprecates BytesMessageEncoder Signed-off-by: Adrian Cole --- .../reporter/activemq/ActiveMQSender.java | 9 +-- .../zipkin2/reporter/amqp/RabbitMQSender.java | 7 +- .../zipkin2/reporter/internal/NoopSender.java | 6 +- .../brave/AsyncZipkinSpanHandler.java | 7 +- .../zipkin2/reporter/brave/JsonV2Encoder.java | 2 + .../brave/MutableSpanBytesEncoder.java | 80 +++++++++++++++++++ .../zipkin2/reporter/brave/FakeSender.java | 14 ++-- .../brave/MutableSpanBytesEncoderTest.java | 76 ++++++++++++++++++ .../java/zipkin2/reporter/AsyncReporter.java | 11 +-- .../zipkin2/reporter/BytesMessageEncoder.java | 3 + .../main/java/zipkin2/reporter/Encoding.java | 48 +++++++++++ .../zipkin2/reporter/SpanBytesEncoder.java | 20 ++++- ...sageEncoderTest.java => EncodingTest.java} | 27 +++---- .../java/zipkin2/reporter/FakeSender.java | 15 ++-- .../reporter/SpanBytesEncoderTest.java | 29 +++++++ .../zipkin2/reporter/kafka/KafkaSender.java | 7 +- .../reporter/okhttp3/OkHttpSender.java | 3 +- .../okhttp3/RequestBodyMessageEncoder.java | 16 ++-- .../urlconnection/URLConnectionSender.java | 34 ++------ 19 files changed, 306 insertions(+), 108 deletions(-) create mode 100644 brave/src/main/java/zipkin2/reporter/brave/MutableSpanBytesEncoder.java create mode 100644 brave/src/test/java/zipkin2/reporter/brave/MutableSpanBytesEncoderTest.java rename core/src/test/java/zipkin2/reporter/{BytesMessageEncoderTest.java => EncodingTest.java} (66%) create mode 100644 core/src/test/java/zipkin2/reporter/SpanBytesEncoderTest.java diff --git a/activemq-client/src/main/java/zipkin2/reporter/activemq/ActiveMQSender.java b/activemq-client/src/main/java/zipkin2/reporter/activemq/ActiveMQSender.java index dedfa3e0..8a7c74bb 100644 --- a/activemq-client/src/main/java/zipkin2/reporter/activemq/ActiveMQSender.java +++ b/activemq-client/src/main/java/zipkin2/reporter/activemq/ActiveMQSender.java @@ -20,7 +20,6 @@ import javax.jms.QueueSender; import org.apache.activemq.ActiveMQConnectionFactory; import zipkin2.reporter.AsyncReporter; -import zipkin2.reporter.BytesMessageEncoder; import zipkin2.reporter.BytesMessageSender; import zipkin2.reporter.Call; import zipkin2.reporter.Callback; @@ -111,7 +110,7 @@ public Builder messageMaxBytes(int messageMaxBytes) { return this; } - public final ActiveMQSender build() { + public ActiveMQSender build() { if (connectionFactory == null) throw new NullPointerException("connectionFactory == null"); return new ActiveMQSender(this); } @@ -122,14 +121,12 @@ public final ActiveMQSender build() { final Encoding encoding; final int messageMaxBytes; - final BytesMessageEncoder encoder; final LazyInit lazyInit; ActiveMQSender(Builder builder) { this.encoding = builder.encoding; this.messageMaxBytes = builder.messageMaxBytes; - this.encoder = BytesMessageEncoder.forEncoding(encoding); this.lazyInit = new LazyInit(builder); } @@ -155,14 +152,14 @@ public final ActiveMQSender build() { /** {@inheritDoc} */ @Override @Deprecated public Call sendSpans(List encodedSpans) { if (closeCalled) throw new ClosedSenderException(); - byte[] message = encoder.encode(encodedSpans); + byte[] message = encoding.encode(encodedSpans); return new ActiveMQCall(message); } /** {@inheritDoc} */ @Override public void send(List encodedSpans) throws IOException { if (closeCalled) throw new ClosedSenderException(); - send(encoder.encode(encodedSpans)); + send(encoding.encode(encodedSpans)); } void send(byte[] message) throws IOException { diff --git a/amqp-client/src/main/java/zipkin2/reporter/amqp/RabbitMQSender.java b/amqp-client/src/main/java/zipkin2/reporter/amqp/RabbitMQSender.java index 490ee764..bb4ffa51 100644 --- a/amqp-client/src/main/java/zipkin2/reporter/amqp/RabbitMQSender.java +++ b/amqp-client/src/main/java/zipkin2/reporter/amqp/RabbitMQSender.java @@ -22,7 +22,6 @@ import java.util.List; import java.util.concurrent.TimeoutException; import zipkin2.reporter.AsyncReporter; -import zipkin2.reporter.BytesMessageEncoder; import zipkin2.reporter.BytesMessageSender; import zipkin2.reporter.Call; import zipkin2.reporter.Callback; @@ -179,12 +178,10 @@ public final RabbitMQSender build() { final List
addresses; final String queue; final ConnectionFactory connectionFactory; - final BytesMessageEncoder encoder; RabbitMQSender(Builder builder) { if (builder.addresses == null) throw new NullPointerException("addresses == null"); encoding = builder.encoding; - encoder = BytesMessageEncoder.forEncoding(encoding); messageMaxBytes = builder.messageMaxBytes; addresses = builder.addresses; queue = builder.queue; @@ -218,14 +215,14 @@ public Builder toBuilder() { /** {@inheritDoc} */ @Override @Deprecated public Call sendSpans(List encodedSpans) { if (closeCalled) throw new ClosedSenderException(); - byte[] message = encoder.encode(encodedSpans); + byte[] message = encoding.encode(encodedSpans); return new RabbitMQCall(message); } /** {@inheritDoc} */ @Override public void send(List encodedSpans) throws IOException { if (closeCalled) throw new ClosedSenderException(); - publish(encoder.encode(encodedSpans)); + publish(encoding.encode(encodedSpans)); } void publish(byte[] message) throws IOException { diff --git a/benchmarks/src/main/java/zipkin2/reporter/internal/NoopSender.java b/benchmarks/src/main/java/zipkin2/reporter/internal/NoopSender.java index f11afcb4..b1de9078 100644 --- a/benchmarks/src/main/java/zipkin2/reporter/internal/NoopSender.java +++ b/benchmarks/src/main/java/zipkin2/reporter/internal/NoopSender.java @@ -14,20 +14,16 @@ package zipkin2.reporter.internal; import java.util.List; -import zipkin2.reporter.BytesMessageEncoder; import zipkin2.reporter.BytesMessageSender; import zipkin2.reporter.Encoding; /** Encodes messages on {@link #send(List)}, but doesn't do anything else. */ final class NoopSender extends BytesMessageSender.Base { - final BytesMessageEncoder messageEncoder; - /** close is typically called from a different thread */ volatile boolean closeCalled; NoopSender(Encoding encoding) { super(encoding); - this.messageEncoder = BytesMessageEncoder.forEncoding(encoding); } @Override public int messageMaxBytes() { @@ -35,7 +31,7 @@ final class NoopSender extends BytesMessageSender.Base { } @Override public void send(List encodedSpans) { - messageEncoder.encode(encodedSpans); + encoding.encode(encodedSpans); } @Override public void close() { diff --git a/brave/src/main/java/zipkin2/reporter/brave/AsyncZipkinSpanHandler.java b/brave/src/main/java/zipkin2/reporter/brave/AsyncZipkinSpanHandler.java index 119b577a..26cc73c0 100644 --- a/brave/src/main/java/zipkin2/reporter/brave/AsyncZipkinSpanHandler.java +++ b/brave/src/main/java/zipkin2/reporter/brave/AsyncZipkinSpanHandler.java @@ -172,12 +172,7 @@ public Builder queuedMaxBytes(int queuedMaxBytes) { */ // AsyncZipkinSpanHandler not SpanHandler, so that Flushable and Closeable are accessible public AsyncZipkinSpanHandler build() { - switch (encoding) { - case JSON: - return build(new JsonV2Encoder(errorTag)); - default: - throw new UnsupportedOperationException(encoding.name()); - } + return build(MutableSpanBytesEncoder.create(encoding, errorTag)); } /** diff --git a/brave/src/main/java/zipkin2/reporter/brave/JsonV2Encoder.java b/brave/src/main/java/zipkin2/reporter/brave/JsonV2Encoder.java index 760ab43f..f4c021e7 100644 --- a/brave/src/main/java/zipkin2/reporter/brave/JsonV2Encoder.java +++ b/brave/src/main/java/zipkin2/reporter/brave/JsonV2Encoder.java @@ -14,12 +14,14 @@ package zipkin2.reporter.brave; import brave.Tag; +import brave.Tags; import brave.handler.MutableSpan; import brave.handler.MutableSpanBytesEncoder; import zipkin2.reporter.BytesEncoder; import zipkin2.reporter.Encoding; final class JsonV2Encoder implements BytesEncoder { + static final BytesEncoder INSTANCE = new JsonV2Encoder(Tags.ERROR); final MutableSpanBytesEncoder delegate; JsonV2Encoder(Tag errorTag) { diff --git a/brave/src/main/java/zipkin2/reporter/brave/MutableSpanBytesEncoder.java b/brave/src/main/java/zipkin2/reporter/brave/MutableSpanBytesEncoder.java new file mode 100644 index 00000000..1d217ea8 --- /dev/null +++ b/brave/src/main/java/zipkin2/reporter/brave/MutableSpanBytesEncoder.java @@ -0,0 +1,80 @@ +/* + * Copyright 2016-2024 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin2.reporter.brave; + +import brave.Tag; +import brave.Tags; +import brave.handler.MutableSpan; +import zipkin2.reporter.BytesEncoder; +import zipkin2.reporter.Encoding; + +/** Includes built-in formats used in Zipkin. */ +public enum MutableSpanBytesEncoder implements BytesEncoder { + /** Corresponds to the Zipkin v2 json format */ + JSON_V2 { + @Override public Encoding encoding() { + return Encoding.JSON; + } + + @Override public int sizeInBytes(MutableSpan input) { + return JsonV2Encoder.INSTANCE.sizeInBytes(input); + } + + @Override public byte[] encode(MutableSpan input) { + return JsonV2Encoder.INSTANCE.encode(input); + } + }; + + /** + * Returns the default {@linkplain MutableSpan} encoder for given encoding. + * + * @throws UnsupportedOperationException if the encoding is not yet supported. + * @since 3.3 + */ + public static BytesEncoder forEncoding(Encoding encoding) { + if (encoding == null) throw new NullPointerException("encoding == null"); + switch (encoding) { + case JSON: + return JSON_V2; + case PROTO3: + throw new UnsupportedOperationException("PROTO3 is not yet a built-in encoder"); + case THRIFT: + throw new UnsupportedOperationException("THRIFT is not yet a built-in encoder"); + default: // BUG: as encoding is an enum! + throw new UnsupportedOperationException("BUG: " + encoding.name()); + } + } + + /** + * Like {@linkplain #forEncoding(Encoding)}, except you can override the default throwable parser, + * which is {@linkplain brave.Tags#ERROR}. + * + * @since 3.3 + */ + public static BytesEncoder create(Encoding encoding, Tag errorTag) { + if (encoding == null) throw new NullPointerException("encoding == null"); + if (errorTag == null) throw new NullPointerException("errorTag == null"); + if (errorTag == Tags.ERROR) return forEncoding(encoding); + switch (encoding) { + case JSON: + return new JsonV2Encoder(errorTag); + case PROTO3: + throw new UnsupportedOperationException("PROTO3 is not yet a built-in encoder"); + case THRIFT: + throw new UnsupportedOperationException("THRIFT is not yet a built-in encoder"); + default: // BUG: as encoding is an enum! + throw new UnsupportedOperationException("BUG: " + encoding.name()); + } + } +} diff --git a/brave/src/test/java/zipkin2/reporter/brave/FakeSender.java b/brave/src/test/java/zipkin2/reporter/brave/FakeSender.java index d9792361..5f26c4be 100644 --- a/brave/src/test/java/zipkin2/reporter/brave/FakeSender.java +++ b/brave/src/test/java/zipkin2/reporter/brave/FakeSender.java @@ -20,7 +20,6 @@ import zipkin2.codec.BytesDecoder; import zipkin2.codec.SpanBytesDecoder; import zipkin2.reporter.BytesEncoder; -import zipkin2.reporter.BytesMessageEncoder; import zipkin2.reporter.BytesMessageSender; import zipkin2.reporter.ClosedSenderException; import zipkin2.reporter.Encoding; @@ -29,37 +28,34 @@ public final class FakeSender extends BytesMessageSender.Base { public static FakeSender create() { - return new FakeSender(Encoding.JSON, Integer.MAX_VALUE, - BytesMessageEncoder.forEncoding(Encoding.JSON), SpanBytesEncoder.JSON_V2, + return new FakeSender(Encoding.JSON, Integer.MAX_VALUE, SpanBytesEncoder.JSON_V2, SpanBytesDecoder.JSON_V2, spans -> { }); } final int messageMaxBytes; - final BytesMessageEncoder messageEncoder; final BytesEncoder encoder; final BytesDecoder decoder; final Consumer> onSpans; - FakeSender(Encoding encoding, int messageMaxBytes, BytesMessageEncoder messageEncoder, - BytesEncoder encoder, BytesDecoder decoder, Consumer> onSpans) { + FakeSender(Encoding encoding, int messageMaxBytes, BytesEncoder encoder, + BytesDecoder decoder, Consumer> onSpans) { super(encoding); this.messageMaxBytes = messageMaxBytes; - this.messageEncoder = messageEncoder; this.encoder = encoder; this.decoder = decoder; this.onSpans = onSpans; } FakeSender encoding(Encoding encoding) { - return new FakeSender(encoding, messageMaxBytes, messageEncoder, // invalid but not needed, yet + return new FakeSender(encoding, messageMaxBytes, // invalid but not needed, yet encoder, // invalid but not needed, yet decoder, // invalid but not needed, yet onSpans); } FakeSender onSpans(Consumer> onSpans) { - return new FakeSender(encoding, messageMaxBytes, messageEncoder, encoder, decoder, onSpans); + return new FakeSender(encoding, messageMaxBytes, encoder, decoder, onSpans); } @Override public int messageMaxBytes() { diff --git a/brave/src/test/java/zipkin2/reporter/brave/MutableSpanBytesEncoderTest.java b/brave/src/test/java/zipkin2/reporter/brave/MutableSpanBytesEncoderTest.java new file mode 100644 index 00000000..39fd6168 --- /dev/null +++ b/brave/src/test/java/zipkin2/reporter/brave/MutableSpanBytesEncoderTest.java @@ -0,0 +1,76 @@ +/* + * Copyright 2016-2024 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin2.reporter.brave; + +import brave.Tag; +import brave.Tags; +import brave.handler.MutableSpan; +import brave.propagation.TraceContext; +import java.nio.charset.StandardCharsets; +import org.junit.jupiter.api.Test; +import zipkin2.reporter.BytesEncoder; +import zipkin2.reporter.Encoding; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +class MutableSpanBytesEncoderTest { + @Test void forEncoding() { + assertThat(MutableSpanBytesEncoder.forEncoding(Encoding.JSON)) + .isSameAs(MutableSpanBytesEncoder.JSON_V2); + assertThatThrownBy(() -> MutableSpanBytesEncoder.forEncoding(Encoding.PROTO3)) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage("PROTO3 is not yet a built-in encoder"); + assertThatThrownBy(() -> MutableSpanBytesEncoder.forEncoding(Encoding.THRIFT)) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage("THRIFT is not yet a built-in encoder"); + } + + Tag iceCream = new Tag<>("exception") { + @Override protected String parseValue(Throwable throwable, TraceContext traceContext) { + return "ice cream"; + } + }; + + @Test void create_json() { + // doesn't allocate on defaults + assertThat(MutableSpanBytesEncoder.create(Encoding.JSON, Tags.ERROR)) + .isSameAs(MutableSpanBytesEncoder.JSON_V2); + + MutableSpan span = new MutableSpan(); + span.traceId("1"); + span.id("2"); + span.error(new OutOfMemoryError("out of memory")); + + // Default makes a tag named error + assertThat(new String(MutableSpanBytesEncoder.JSON_V2.encode(span), StandardCharsets.UTF_8)) + .isEqualTo("{\"traceId\":\"0000000000000001\",\"id\":\"0000000000000002\",\"tags\":{\"error\":\"out of memory\"}}"); + + + // but, using create, you can override with something else. + BytesEncoder iceCreamEncoder = + MutableSpanBytesEncoder.create(Encoding.JSON, iceCream); + assertThat(new String(iceCreamEncoder.encode(span), StandardCharsets.UTF_8)) + .isEqualTo("{\"traceId\":\"0000000000000001\",\"id\":\"0000000000000002\",\"tags\":{\"exception\":\"ice cream\"}}"); + } + + @Test void create_unsupported() { + assertThatThrownBy(() -> MutableSpanBytesEncoder.create(Encoding.PROTO3, iceCream)) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage("PROTO3 is not yet a built-in encoder"); + assertThatThrownBy(() -> MutableSpanBytesEncoder.create(Encoding.THRIFT, iceCream)) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage("THRIFT is not yet a built-in encoder"); + } +} diff --git a/core/src/main/java/zipkin2/reporter/AsyncReporter.java b/core/src/main/java/zipkin2/reporter/AsyncReporter.java index 989f373d..4456a40b 100644 --- a/core/src/main/java/zipkin2/reporter/AsyncReporter.java +++ b/core/src/main/java/zipkin2/reporter/AsyncReporter.java @@ -163,16 +163,7 @@ public Builder queuedMaxBytes(int queuedMaxBytes) { /** Builds an async reporter that encodes zipkin spans as they are reported. */ public AsyncReporter build() { - switch (encoding) { - case JSON: - return build(SpanBytesEncoder.JSON_V2); - case PROTO3: - return build(SpanBytesEncoder.PROTO3); - case THRIFT: - return build(SpanBytesEncoder.THRIFT); - default: - throw new UnsupportedOperationException(encoding.name()); - } + return build(SpanBytesEncoder.forEncoding(encoding)); } /** Builds an async reporter that encodes arbitrary spans as they are reported. */ diff --git a/core/src/main/java/zipkin2/reporter/BytesMessageEncoder.java b/core/src/main/java/zipkin2/reporter/BytesMessageEncoder.java index f7a2d325..3d8ad930 100644 --- a/core/src/main/java/zipkin2/reporter/BytesMessageEncoder.java +++ b/core/src/main/java/zipkin2/reporter/BytesMessageEncoder.java @@ -18,7 +18,10 @@ /** * Senders like Kafka use byte[] message encoding. This provides helpers to concatenate spans into a * list. + * + * @deprecated As of 3.3, use {@linkplain Encoding#encode(List)}. This will be removed in v4.0. */ +@Deprecated public enum BytesMessageEncoder { JSON { @Override public byte[] encode(List values) { diff --git a/core/src/main/java/zipkin2/reporter/Encoding.java b/core/src/main/java/zipkin2/reporter/Encoding.java index eba1fe2a..57face7c 100644 --- a/core/src/main/java/zipkin2/reporter/Encoding.java +++ b/core/src/main/java/zipkin2/reporter/Encoding.java @@ -36,6 +36,15 @@ public enum Encoding { } return sizeInBytes; } + + @SuppressWarnings("deprecation") + @Override public byte[] encode(List encodedSpans) { + return BytesMessageEncoder.JSON.encode(encodedSpans); + } + + @Override public String mediaType() { + return "application/json"; + } }, /** * The first format of Zipkin was TBinaryProtocol, big-endian thrift. It is no longer used, but @@ -61,6 +70,15 @@ public enum Encoding { } return sizeInBytes; } + + @SuppressWarnings("deprecation") + @Override public byte[] encode(List encodedSpans) { + return BytesMessageEncoder.THRIFT.encode(encodedSpans); + } + + @Override public String mediaType() { + return "application/x-thrift"; + } }, /** * Repeated (type 2) fields are length-prefixed. A list is a concatenation of fields with no @@ -82,10 +100,40 @@ public enum Encoding { } return sizeInBytes; } + + @SuppressWarnings("deprecation") + @Override public byte[] encode(List encodedSpans) { + return BytesMessageEncoder.PROTO3.encode(encodedSpans); + } + + @Override public String mediaType() { + return "application/x-protobuf"; + } }; + /** + * Combines a list of encoded spans into an encoded list. For example, in {@linkplain #THRIFT}, + * this would be length-prefixed, whereas in {@linkplain #JSON}, this would be comma-separated and + * enclosed by brackets. + * + *

The primary use of this is batch reporting spans. For example, spans are {@link + * BytesEncoder#encode(Object) encoded} one-by-one into a queue. This queue is drained up to a byte + * threshold. Then, the list is encoded with this function and reported out-of-process. + * + * @since 3.3 + */ + public abstract byte[] encode(List encodedSpans); + /** Like {@link #listSizeInBytes(List)}, except for a single element. */ public abstract int listSizeInBytes(int encodedSizeInBytes); public abstract int listSizeInBytes(List values); + + /** + * Returns this message's "Content-Type" for use in an HTTP {@link BytesMessageSender sender} + * targeting the Zipkin POST endpoint. + * + * @since 3.3 + */ + public abstract String mediaType(); } diff --git a/core/src/main/java/zipkin2/reporter/SpanBytesEncoder.java b/core/src/main/java/zipkin2/reporter/SpanBytesEncoder.java index 060014c3..88928ec5 100644 --- a/core/src/main/java/zipkin2/reporter/SpanBytesEncoder.java +++ b/core/src/main/java/zipkin2/reporter/SpanBytesEncoder.java @@ -16,7 +16,6 @@ import zipkin2.Span; /** Includes built-in formats used in Zipkin. */ -@SuppressWarnings("ImmutableEnumChecker") // because span is immutable public enum SpanBytesEncoder implements BytesEncoder { /** Corresponds to the Zipkin v1 thrift format */ THRIFT { @@ -73,4 +72,23 @@ public enum SpanBytesEncoder implements BytesEncoder { return zipkin2.codec.SpanBytesEncoder.PROTO3.encode(input); } }; + + /** + * Returns the default {@linkplain Span} encoder for given encoding. + * + * @since 3.3 + */ + public static BytesEncoder forEncoding(Encoding encoding) { + if (encoding == null) throw new NullPointerException("encoding == null"); + switch (encoding) { + case JSON: + return JSON_V2; + case PROTO3: + return PROTO3; + case THRIFT: + return THRIFT; + default: // BUG: as encoding is an enum! + throw new UnsupportedOperationException("BUG: " + encoding.name()); + } + } } diff --git a/core/src/test/java/zipkin2/reporter/BytesMessageEncoderTest.java b/core/src/test/java/zipkin2/reporter/EncodingTest.java similarity index 66% rename from core/src/test/java/zipkin2/reporter/BytesMessageEncoderTest.java rename to core/src/test/java/zipkin2/reporter/EncodingTest.java index 347ac7b0..04a3e11c 100644 --- a/core/src/test/java/zipkin2/reporter/BytesMessageEncoderTest.java +++ b/core/src/test/java/zipkin2/reporter/EncodingTest.java @@ -13,56 +13,55 @@ */ package zipkin2.reporter; -import java.util.Arrays; import java.util.List; import org.junit.jupiter.api.Test; import static org.assertj.core.api.Assertions.assertThat; -class BytesMessageEncoderTest { +class EncodingTest { @Test void emptyList_json() { - List encoded = Arrays.asList(); - assertThat(BytesMessageEncoder.JSON.encode(encoded)) + List encoded = List.of(); + assertThat(Encoding.JSON.encode(encoded)) .containsExactly('[', ']'); } @Test void singletonList_json() { - List encoded = Arrays.asList(new byte[] {'{', '}'}); + List encoded = List.of(new byte[] {'{', '}'}); - assertThat(BytesMessageEncoder.JSON.encode(encoded)) + assertThat(Encoding.JSON.encode(encoded)) .containsExactly('[', '{', '}', ']'); } @Test void multiItemList_json() { - List encoded = Arrays.asList( + List encoded = List.of( "{\"k\":\"1\"}".getBytes(), "{\"k\":\"2\"}".getBytes(), "{\"k\":\"3\"}".getBytes() ); - assertThat(new String(BytesMessageEncoder.JSON.encode(encoded))) + assertThat(new String(Encoding.JSON.encode(encoded))) .isEqualTo("[{\"k\":\"1\"},{\"k\":\"2\"},{\"k\":\"3\"}]"); } @Test void emptyList_proto3() { - List encoded = Arrays.asList(); - assertThat(BytesMessageEncoder.PROTO3.encode(encoded)) + List encoded = List.of(); + assertThat(Encoding.PROTO3.encode(encoded)) .isEmpty(); } @Test void singletonList_proto3() { - List encoded = Arrays.asList(new byte[] {1, 1, 'a'}); + List encoded = List.of(new byte[] {1, 1, 'a'}); - assertThat(BytesMessageEncoder.PROTO3.encode(encoded)) + assertThat(Encoding.PROTO3.encode(encoded)) .containsExactly(1, 1, 'a'); } @Test void multiItemList_proto3() { - List encoded = Arrays.asList( + List encoded = List.of( new byte[] {1, 1, 'a'}, new byte[] {1, 1, 'b'}, new byte[] {1, 1, 'c'} ); - assertThat(BytesMessageEncoder.PROTO3.encode(encoded)).containsExactly( + assertThat(Encoding.PROTO3.encode(encoded)).containsExactly( 1, 1, 'a', 1, 1, 'b', 1, 1, 'c' diff --git a/core/src/test/java/zipkin2/reporter/FakeSender.java b/core/src/test/java/zipkin2/reporter/FakeSender.java index 05427fa6..0421c3e0 100644 --- a/core/src/test/java/zipkin2/reporter/FakeSender.java +++ b/core/src/test/java/zipkin2/reporter/FakeSender.java @@ -23,41 +23,38 @@ public final class FakeSender extends BytesMessageSender.Base { public static FakeSender create() { - return new FakeSender(Encoding.JSON, Integer.MAX_VALUE, - BytesMessageEncoder.forEncoding(Encoding.JSON), SpanBytesEncoder.JSON_V2, + return new FakeSender(Encoding.JSON, Integer.MAX_VALUE, SpanBytesEncoder.JSON_V2, SpanBytesDecoder.JSON_V2, spans -> { }); } final int messageMaxBytes; - final BytesMessageEncoder messageEncoder; final BytesEncoder encoder; final BytesDecoder decoder; final Consumer> onSpans; - FakeSender(Encoding encoding, int messageMaxBytes, BytesMessageEncoder messageEncoder, - BytesEncoder encoder, BytesDecoder decoder, Consumer> onSpans) { + FakeSender(Encoding encoding, int messageMaxBytes, BytesEncoder encoder, + BytesDecoder decoder, Consumer> onSpans) { super(encoding); this.messageMaxBytes = messageMaxBytes; - this.messageEncoder = messageEncoder; this.encoder = encoder; this.decoder = decoder; this.onSpans = onSpans; } public FakeSender encoding(Encoding encoding) { - return new FakeSender(encoding, messageMaxBytes, messageEncoder, // invalid but not needed, yet + return new FakeSender(encoding, messageMaxBytes, // invalid but not needed, yet encoder, // invalid but not needed, yet decoder, // invalid but not needed, yet onSpans); } public FakeSender onSpans(Consumer> onSpans) { - return new FakeSender(encoding, messageMaxBytes, messageEncoder, encoder, decoder, onSpans); + return new FakeSender(encoding, messageMaxBytes, encoder, decoder, onSpans); } public FakeSender messageMaxBytes(int messageMaxBytes) { - return new FakeSender(encoding, messageMaxBytes, messageEncoder, encoder, decoder, onSpans); + return new FakeSender(encoding, messageMaxBytes, encoder, decoder, onSpans); } @Override public int messageMaxBytes() { diff --git a/core/src/test/java/zipkin2/reporter/SpanBytesEncoderTest.java b/core/src/test/java/zipkin2/reporter/SpanBytesEncoderTest.java new file mode 100644 index 00000000..96055eca --- /dev/null +++ b/core/src/test/java/zipkin2/reporter/SpanBytesEncoderTest.java @@ -0,0 +1,29 @@ +/* + * Copyright 2016-2024 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin2.reporter; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +class SpanBytesEncoderTest { + @Test void forEncoding() { + assertThat(SpanBytesEncoder.forEncoding(Encoding.JSON)) + .isSameAs(SpanBytesEncoder.JSON_V2); + assertThat(SpanBytesEncoder.forEncoding(Encoding.PROTO3)) + .isSameAs(SpanBytesEncoder.PROTO3); + assertThat(SpanBytesEncoder.forEncoding(Encoding.THRIFT)) + .isSameAs(SpanBytesEncoder.THRIFT); + } +} diff --git a/kafka/src/main/java/zipkin2/reporter/kafka/KafkaSender.java b/kafka/src/main/java/zipkin2/reporter/kafka/KafkaSender.java index 55ac8949..dcd655c6 100644 --- a/kafka/src/main/java/zipkin2/reporter/kafka/KafkaSender.java +++ b/kafka/src/main/java/zipkin2/reporter/kafka/KafkaSender.java @@ -28,7 +28,6 @@ import org.apache.kafka.common.serialization.ByteArraySerializer; import zipkin2.reporter.AsyncReporter; import zipkin2.reporter.AwaitableCallback; -import zipkin2.reporter.BytesMessageEncoder; import zipkin2.reporter.BytesMessageSender; import zipkin2.reporter.Call; import zipkin2.reporter.Callback; @@ -204,7 +203,6 @@ public KafkaSender build() { final Properties properties; final String topic; final Encoding encoding; - final BytesMessageEncoder encoder; final int messageMaxBytes; KafkaSender(Builder builder) { @@ -212,7 +210,6 @@ public KafkaSender build() { properties.putAll(builder.properties); topic = builder.topic; encoding = builder.encoding; - encoder = BytesMessageEncoder.forEncoding(builder.encoding); messageMaxBytes = builder.messageMaxBytes; } @@ -260,7 +257,7 @@ public Builder toBuilder() { /** {@inheritDoc} */ @Override @Deprecated public Call sendSpans(List encodedSpans) { if (closeCalled) throw new ClosedSenderException(); - byte[] message = encoder.encode(encodedSpans); + byte[] message = encoding.encode(encodedSpans); return new KafkaCall(message); } @@ -271,7 +268,7 @@ public Builder toBuilder() { */ @Override public void send(List encodedSpans) { if (closeCalled) throw new ClosedSenderException(); - send(encoder.encode(encodedSpans)); + send(encoding.encode(encodedSpans)); } void send(byte[] message) { diff --git a/okhttp3/src/main/java/zipkin2/reporter/okhttp3/OkHttpSender.java b/okhttp3/src/main/java/zipkin2/reporter/okhttp3/OkHttpSender.java index 1a9bcc96..08db19b9 100644 --- a/okhttp3/src/main/java/zipkin2/reporter/okhttp3/OkHttpSender.java +++ b/okhttp3/src/main/java/zipkin2/reporter/okhttp3/OkHttpSender.java @@ -14,6 +14,7 @@ package zipkin2.reporter.okhttp3; import java.io.IOException; +import java.util.Collections; import java.util.List; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; @@ -387,7 +388,7 @@ public Builder toBuilder() { @Override @Deprecated public CheckResult check() { try { Request request = new Request.Builder().url(urlSupplier.get()) - .post(RequestBody.create(MediaType.parse("application/json"), "[]")).build(); + .post(encoder.encode(Collections.emptyList())).build(); try (Response response = client.newCall(request).execute()) { if (!response.isSuccessful()) { return CheckResult.failed(new RuntimeException("check response failed: " + response)); diff --git a/okhttp3/src/main/java/zipkin2/reporter/okhttp3/RequestBodyMessageEncoder.java b/okhttp3/src/main/java/zipkin2/reporter/okhttp3/RequestBodyMessageEncoder.java index 5df1f27a..133f37d4 100644 --- a/okhttp3/src/main/java/zipkin2/reporter/okhttp3/RequestBodyMessageEncoder.java +++ b/okhttp3/src/main/java/zipkin2/reporter/okhttp3/RequestBodyMessageEncoder.java @@ -43,8 +43,8 @@ static abstract class StreamingRequestBody extends RequestBody { final List values; final long contentLength; - StreamingRequestBody(Encoding encoding, MediaType contentType, List values) { - this.contentType = contentType; + StreamingRequestBody(Encoding encoding, List values) { + this.contentType = MediaType.parse(encoding.mediaType()); this.values = values; this.contentLength = encoding.listSizeInBytes(values); } @@ -59,10 +59,8 @@ static abstract class StreamingRequestBody extends RequestBody { } static final class JsonRequestBody extends StreamingRequestBody { - static final MediaType CONTENT_TYPE = MediaType.parse("application/json"); - JsonRequestBody(List values) { - super(Encoding.JSON, CONTENT_TYPE, values); + super(Encoding.JSON, values); } @Override public void writeTo(BufferedSink sink) throws IOException { @@ -77,10 +75,8 @@ static final class JsonRequestBody extends StreamingRequestBody { } static final class ThriftRequestBody extends StreamingRequestBody { - static final MediaType CONTENT_TYPE = MediaType.parse("application/x-thrift"); - ThriftRequestBody(List values) { - super(Encoding.THRIFT, CONTENT_TYPE, values); + super(Encoding.THRIFT, values); } @Override @@ -102,10 +98,8 @@ public void writeTo(BufferedSink sink) throws IOException { } static final class Protobuf3RequestBody extends StreamingRequestBody { - static final MediaType CONTENT_TYPE = MediaType.parse("application/x-protobuf"); - Protobuf3RequestBody(List values) { - super(Encoding.PROTO3, CONTENT_TYPE, values); + super(Encoding.PROTO3, values); } @Override public void writeTo(BufferedSink sink) throws IOException { diff --git a/urlconnection/src/main/java/zipkin2/reporter/urlconnection/URLConnectionSender.java b/urlconnection/src/main/java/zipkin2/reporter/urlconnection/URLConnectionSender.java index 6c91e1bf..07fb5378 100644 --- a/urlconnection/src/main/java/zipkin2/reporter/urlconnection/URLConnectionSender.java +++ b/urlconnection/src/main/java/zipkin2/reporter/urlconnection/URLConnectionSender.java @@ -19,11 +19,11 @@ import java.net.HttpURLConnection; import java.net.MalformedURLException; import java.net.URL; +import java.util.Collections; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Logger; import java.util.zip.GZIPOutputStream; -import zipkin2.reporter.BytesMessageEncoder; import zipkin2.reporter.Call; import zipkin2.reporter.Callback; import zipkin2.reporter.CheckResult; @@ -217,8 +217,6 @@ static final class DynamicHttpURLConnectionSupplier extends HttpURLConnectionSup final HttpURLConnectionSupplier connectionSupplier; final Encoding encoding; - final String mediaType; - final BytesMessageEncoder encoder; final int messageMaxBytes; final int connectTimeout, readTimeout; final boolean compressionEnabled; @@ -229,22 +227,6 @@ static final class DynamicHttpURLConnectionSupplier extends HttpURLConnectionSup this.connectionSupplier = connectionSupplier; this.encoding = builder.encoding; - switch (builder.encoding) { - case JSON: - this.mediaType = "application/json"; - this.encoder = BytesMessageEncoder.JSON; - break; - case THRIFT: - this.mediaType = "application/x-thrift"; - this.encoder = BytesMessageEncoder.THRIFT; - break; - case PROTO3: - this.mediaType = "application/x-protobuf"; - this.encoder = BytesMessageEncoder.PROTO3; - break; - default: - throw new UnsupportedOperationException("Unsupported encoding: " + encoding.name()); - } this.messageMaxBytes = builder.messageMaxBytes; this.connectTimeout = builder.connectTimeout; this.readTimeout = builder.readTimeout; @@ -277,19 +259,19 @@ public Builder toBuilder() { /** {@inheritDoc} */ @Override @Deprecated public Call sendSpans(List encodedSpans) { if (closeCalled.get()) throw new ClosedSenderException(); - return new HttpPostCall(encoder.encode(encodedSpans)); + return new HttpPostCall(encoding.encode(encodedSpans)); } /** Sends spans as a POST to {@link Builder#endpoint}. */ @Override public void send(List encodedSpans) throws IOException { if (closeCalled.get()) throw new ClosedSenderException(); - send(encoder.encode(encodedSpans), mediaType); + send(encoding.encode(encodedSpans)); } /** {@inheritDoc} */ @Override @Deprecated public CheckResult check() { try { - send(new byte[] {'[', ']'}, "application/json"); + send(encoding.encode(Collections.emptyList())); return CheckResult.OK; } catch (Throwable e) { Call.propagateIfFatal(e); @@ -297,7 +279,7 @@ public Builder toBuilder() { } } - void send(byte[] body, String mediaType) throws IOException { + void send(byte[] body) throws IOException { // intentionally not closing the connection, to use keep-alives HttpURLConnection connection = connectionSupplier.openConnection(); connection.setConnectTimeout(connectTimeout); @@ -306,7 +288,7 @@ void send(byte[] body, String mediaType) throws IOException { // Amplification can occur when the Zipkin endpoint is proxied, and the proxy is instrumented. // This prevents that in proxies, such as Envoy, that understand B3 single format, connection.addRequestProperty("b3", "0"); - connection.addRequestProperty("Content-Type", mediaType); + connection.addRequestProperty("Content-Type", encoding.mediaType()); if (compressionEnabled) { connection.addRequestProperty("Content-Encoding", "gzip"); ByteArrayOutputStream gzipped = new ByteArrayOutputStream(); @@ -366,13 +348,13 @@ class HttpPostCall extends Call.Base { } @Override protected Void doExecute() throws IOException { - send(message, mediaType); + send(message); return null; } @Override protected void doEnqueue(Callback callback) { try { - send(message, mediaType); + send(message); callback.onSuccess(null); } catch (Throwable t) { Call.propagateIfFatal(t);