Skip to content

Commit

Permalink
Decreases switch statements and deprecates BytesMessageEncoder
Browse files Browse the repository at this point in the history
Signed-off-by: Adrian Cole <adrian@tetrate.io>
  • Loading branch information
Adrian Cole committed Feb 12, 2024
1 parent b5fbad8 commit 3d1d869
Show file tree
Hide file tree
Showing 19 changed files with 306 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import javax.jms.QueueSender;
import org.apache.activemq.ActiveMQConnectionFactory;
import zipkin2.reporter.AsyncReporter;
import zipkin2.reporter.BytesMessageEncoder;
import zipkin2.reporter.BytesMessageSender;
import zipkin2.reporter.Call;
import zipkin2.reporter.Callback;
Expand Down Expand Up @@ -111,7 +110,7 @@ public Builder messageMaxBytes(int messageMaxBytes) {
return this;
}

public final ActiveMQSender build() {
public ActiveMQSender build() {
if (connectionFactory == null) throw new NullPointerException("connectionFactory == null");
return new ActiveMQSender(this);
}
Expand All @@ -122,14 +121,12 @@ public final ActiveMQSender build() {

final Encoding encoding;
final int messageMaxBytes;
final BytesMessageEncoder encoder;

final LazyInit lazyInit;

ActiveMQSender(Builder builder) {
this.encoding = builder.encoding;
this.messageMaxBytes = builder.messageMaxBytes;
this.encoder = BytesMessageEncoder.forEncoding(encoding);
this.lazyInit = new LazyInit(builder);
}

Expand All @@ -155,14 +152,14 @@ public final ActiveMQSender build() {
/** {@inheritDoc} */
@Override @Deprecated public Call<Void> sendSpans(List<byte[]> encodedSpans) {
if (closeCalled) throw new ClosedSenderException();
byte[] message = encoder.encode(encodedSpans);
byte[] message = encoding.encode(encodedSpans);
return new ActiveMQCall(message);
}

/** {@inheritDoc} */
@Override public void send(List<byte[]> encodedSpans) throws IOException {
if (closeCalled) throw new ClosedSenderException();
send(encoder.encode(encodedSpans));
send(encoding.encode(encodedSpans));
}

void send(byte[] message) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.util.List;
import java.util.concurrent.TimeoutException;
import zipkin2.reporter.AsyncReporter;
import zipkin2.reporter.BytesMessageEncoder;
import zipkin2.reporter.BytesMessageSender;
import zipkin2.reporter.Call;
import zipkin2.reporter.Callback;
Expand Down Expand Up @@ -179,12 +178,10 @@ public final RabbitMQSender build() {
final List<Address> addresses;
final String queue;
final ConnectionFactory connectionFactory;
final BytesMessageEncoder encoder;

RabbitMQSender(Builder builder) {
if (builder.addresses == null) throw new NullPointerException("addresses == null");
encoding = builder.encoding;
encoder = BytesMessageEncoder.forEncoding(encoding);
messageMaxBytes = builder.messageMaxBytes;
addresses = builder.addresses;
queue = builder.queue;
Expand Down Expand Up @@ -218,14 +215,14 @@ public Builder toBuilder() {
/** {@inheritDoc} */
@Override @Deprecated public Call<Void> sendSpans(List<byte[]> encodedSpans) {
if (closeCalled) throw new ClosedSenderException();
byte[] message = encoder.encode(encodedSpans);
byte[] message = encoding.encode(encodedSpans);
return new RabbitMQCall(message);
}

/** {@inheritDoc} */
@Override public void send(List<byte[]> encodedSpans) throws IOException {
if (closeCalled) throw new ClosedSenderException();
publish(encoder.encode(encodedSpans));
publish(encoding.encode(encodedSpans));
}

void publish(byte[] message) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,28 +14,24 @@
package zipkin2.reporter.internal;

import java.util.List;
import zipkin2.reporter.BytesMessageEncoder;
import zipkin2.reporter.BytesMessageSender;
import zipkin2.reporter.Encoding;

/** Encodes messages on {@link #send(List)}, but doesn't do anything else. */
final class NoopSender extends BytesMessageSender.Base {
final BytesMessageEncoder messageEncoder;

/** close is typically called from a different thread */
volatile boolean closeCalled;

NoopSender(Encoding encoding) {
super(encoding);
this.messageEncoder = BytesMessageEncoder.forEncoding(encoding);
}

@Override public int messageMaxBytes() {
return Integer.MAX_VALUE;
}

@Override public void send(List<byte[]> encodedSpans) {
messageEncoder.encode(encodedSpans);
encoding.encode(encodedSpans);
}

@Override public void close() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,12 +172,7 @@ public Builder queuedMaxBytes(int queuedMaxBytes) {
*/
// AsyncZipkinSpanHandler not SpanHandler, so that Flushable and Closeable are accessible
public AsyncZipkinSpanHandler build() {
switch (encoding) {
case JSON:
return build(new JsonV2Encoder(errorTag));
default:
throw new UnsupportedOperationException(encoding.name());
}
return build(MutableSpanBytesEncoder.create(encoding, errorTag));
}

/**
Expand Down
2 changes: 2 additions & 0 deletions brave/src/main/java/zipkin2/reporter/brave/JsonV2Encoder.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@
package zipkin2.reporter.brave;

import brave.Tag;
import brave.Tags;
import brave.handler.MutableSpan;
import brave.handler.MutableSpanBytesEncoder;
import zipkin2.reporter.BytesEncoder;
import zipkin2.reporter.Encoding;

final class JsonV2Encoder implements BytesEncoder<MutableSpan> {
static final BytesEncoder<MutableSpan> INSTANCE = new JsonV2Encoder(Tags.ERROR);
final MutableSpanBytesEncoder delegate;

JsonV2Encoder(Tag<Throwable> errorTag) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright 2016-2024 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package zipkin2.reporter.brave;

import brave.Tag;
import brave.Tags;
import brave.handler.MutableSpan;
import zipkin2.reporter.BytesEncoder;
import zipkin2.reporter.Encoding;

/** Includes built-in formats used in Zipkin. */
public enum MutableSpanBytesEncoder implements BytesEncoder<MutableSpan> {
/** Corresponds to the Zipkin v2 json format */
JSON_V2 {
@Override public Encoding encoding() {
return Encoding.JSON;
}

@Override public int sizeInBytes(MutableSpan input) {
return JsonV2Encoder.INSTANCE.sizeInBytes(input);
}

@Override public byte[] encode(MutableSpan input) {
return JsonV2Encoder.INSTANCE.encode(input);
}
};

/**
* Returns the default {@linkplain MutableSpan} encoder for given encoding.
*
* @throws UnsupportedOperationException if the encoding is not yet supported.
* @since 3.3
*/
public static BytesEncoder<MutableSpan> forEncoding(Encoding encoding) {
if (encoding == null) throw new NullPointerException("encoding == null");
switch (encoding) {
case JSON:
return JSON_V2;
case PROTO3:
throw new UnsupportedOperationException("PROTO3 is not yet a built-in encoder");
case THRIFT:
throw new UnsupportedOperationException("THRIFT is not yet a built-in encoder");
default: // BUG: as encoding is an enum!
throw new UnsupportedOperationException("BUG: " + encoding.name());
}
}

/**
* Like {@linkplain #forEncoding(Encoding)}, except you can override the default throwable parser,
* which is {@linkplain brave.Tags#ERROR}.
*
* @since 3.3
*/
public static BytesEncoder<MutableSpan> create(Encoding encoding, Tag<Throwable> errorTag) {
if (encoding == null) throw new NullPointerException("encoding == null");
if (errorTag == null) throw new NullPointerException("errorTag == null");
if (errorTag == Tags.ERROR) return forEncoding(encoding);
switch (encoding) {
case JSON:
return new JsonV2Encoder(errorTag);
case PROTO3:
throw new UnsupportedOperationException("PROTO3 is not yet a built-in encoder");
case THRIFT:
throw new UnsupportedOperationException("THRIFT is not yet a built-in encoder");
default: // BUG: as encoding is an enum!
throw new UnsupportedOperationException("BUG: " + encoding.name());
}
}
}
14 changes: 5 additions & 9 deletions brave/src/test/java/zipkin2/reporter/brave/FakeSender.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import zipkin2.codec.BytesDecoder;
import zipkin2.codec.SpanBytesDecoder;
import zipkin2.reporter.BytesEncoder;
import zipkin2.reporter.BytesMessageEncoder;
import zipkin2.reporter.BytesMessageSender;
import zipkin2.reporter.ClosedSenderException;
import zipkin2.reporter.Encoding;
Expand All @@ -29,37 +28,34 @@
public final class FakeSender extends BytesMessageSender.Base {

public static FakeSender create() {
return new FakeSender(Encoding.JSON, Integer.MAX_VALUE,
BytesMessageEncoder.forEncoding(Encoding.JSON), SpanBytesEncoder.JSON_V2,
return new FakeSender(Encoding.JSON, Integer.MAX_VALUE, SpanBytesEncoder.JSON_V2,
SpanBytesDecoder.JSON_V2, spans -> {
});
}

final int messageMaxBytes;
final BytesMessageEncoder messageEncoder;
final BytesEncoder<Span> encoder;
final BytesDecoder<Span> decoder;
final Consumer<List<Span>> onSpans;

FakeSender(Encoding encoding, int messageMaxBytes, BytesMessageEncoder messageEncoder,
BytesEncoder<Span> encoder, BytesDecoder<Span> decoder, Consumer<List<Span>> onSpans) {
FakeSender(Encoding encoding, int messageMaxBytes, BytesEncoder<Span> encoder,
BytesDecoder<Span> decoder, Consumer<List<Span>> onSpans) {
super(encoding);
this.messageMaxBytes = messageMaxBytes;
this.messageEncoder = messageEncoder;
this.encoder = encoder;
this.decoder = decoder;
this.onSpans = onSpans;
}

FakeSender encoding(Encoding encoding) {
return new FakeSender(encoding, messageMaxBytes, messageEncoder, // invalid but not needed, yet
return new FakeSender(encoding, messageMaxBytes, // invalid but not needed, yet
encoder, // invalid but not needed, yet
decoder, // invalid but not needed, yet
onSpans);
}

FakeSender onSpans(Consumer<List<Span>> onSpans) {
return new FakeSender(encoding, messageMaxBytes, messageEncoder, encoder, decoder, onSpans);
return new FakeSender(encoding, messageMaxBytes, encoder, decoder, onSpans);
}

@Override public int messageMaxBytes() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright 2016-2024 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package zipkin2.reporter.brave;

import brave.Tag;
import brave.Tags;
import brave.handler.MutableSpan;
import brave.propagation.TraceContext;
import java.nio.charset.StandardCharsets;
import org.junit.jupiter.api.Test;
import zipkin2.reporter.BytesEncoder;
import zipkin2.reporter.Encoding;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

class MutableSpanBytesEncoderTest {
@Test void forEncoding() {
assertThat(MutableSpanBytesEncoder.forEncoding(Encoding.JSON))
.isSameAs(MutableSpanBytesEncoder.JSON_V2);
assertThatThrownBy(() -> MutableSpanBytesEncoder.forEncoding(Encoding.PROTO3))
.isInstanceOf(UnsupportedOperationException.class)
.hasMessage("PROTO3 is not yet a built-in encoder");
assertThatThrownBy(() -> MutableSpanBytesEncoder.forEncoding(Encoding.THRIFT))
.isInstanceOf(UnsupportedOperationException.class)
.hasMessage("THRIFT is not yet a built-in encoder");
}

Tag<Throwable> iceCream = new Tag<>("exception") {
@Override protected String parseValue(Throwable throwable, TraceContext traceContext) {
return "ice cream";
}
};

@Test void create_json() {
// doesn't allocate on defaults
assertThat(MutableSpanBytesEncoder.create(Encoding.JSON, Tags.ERROR))
.isSameAs(MutableSpanBytesEncoder.JSON_V2);

MutableSpan span = new MutableSpan();
span.traceId("1");
span.id("2");
span.error(new OutOfMemoryError("out of memory"));

// Default makes a tag named error
assertThat(new String(MutableSpanBytesEncoder.JSON_V2.encode(span), StandardCharsets.UTF_8))
.isEqualTo("{\"traceId\":\"0000000000000001\",\"id\":\"0000000000000002\",\"tags\":{\"error\":\"out of memory\"}}");


// but, using create, you can override with something else.
BytesEncoder<MutableSpan> iceCreamEncoder =
MutableSpanBytesEncoder.create(Encoding.JSON, iceCream);
assertThat(new String(iceCreamEncoder.encode(span), StandardCharsets.UTF_8))
.isEqualTo("{\"traceId\":\"0000000000000001\",\"id\":\"0000000000000002\",\"tags\":{\"exception\":\"ice cream\"}}");
}

@Test void create_unsupported() {
assertThatThrownBy(() -> MutableSpanBytesEncoder.create(Encoding.PROTO3, iceCream))
.isInstanceOf(UnsupportedOperationException.class)
.hasMessage("PROTO3 is not yet a built-in encoder");
assertThatThrownBy(() -> MutableSpanBytesEncoder.create(Encoding.THRIFT, iceCream))
.isInstanceOf(UnsupportedOperationException.class)
.hasMessage("THRIFT is not yet a built-in encoder");
}
}
11 changes: 1 addition & 10 deletions core/src/main/java/zipkin2/reporter/AsyncReporter.java
Original file line number Diff line number Diff line change
Expand Up @@ -163,16 +163,7 @@ public Builder queuedMaxBytes(int queuedMaxBytes) {

/** Builds an async reporter that encodes zipkin spans as they are reported. */
public AsyncReporter<zipkin2.Span> build() {
switch (encoding) {
case JSON:
return build(SpanBytesEncoder.JSON_V2);
case PROTO3:
return build(SpanBytesEncoder.PROTO3);
case THRIFT:
return build(SpanBytesEncoder.THRIFT);
default:
throw new UnsupportedOperationException(encoding.name());
}
return build(SpanBytesEncoder.forEncoding(encoding));
}

/** Builds an async reporter that encodes arbitrary spans as they are reported. */
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/java/zipkin2/reporter/BytesMessageEncoder.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
/**
* Senders like Kafka use byte[] message encoding. This provides helpers to concatenate spans into a
* list.
*
* @deprecated As of 3.3, use {@linkplain Encoding#encode(List)}. This will be removed in v4.0.
*/
@Deprecated
public enum BytesMessageEncoder {
JSON {
@Override public byte[] encode(List<byte[]> values) {
Expand Down
Loading

0 comments on commit 3d1d869

Please sign in to comment.