Skip to content

Commit

Permalink
deprecates BytesMessageEncoder for Encoding.encode and adds Encoding.…
Browse files Browse the repository at this point in the history
…mediaType

Signed-off-by: Adrian Cole <adrian@tetrate.io>
  • Loading branch information
Adrian Cole committed Feb 12, 2024
1 parent 0f03cf6 commit efaeceb
Show file tree
Hide file tree
Showing 12 changed files with 98 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import javax.jms.QueueSender;
import org.apache.activemq.ActiveMQConnectionFactory;
import zipkin2.reporter.AsyncReporter;
import zipkin2.reporter.BytesMessageEncoder;
import zipkin2.reporter.BytesMessageSender;
import zipkin2.reporter.Call;
import zipkin2.reporter.Callback;
Expand Down Expand Up @@ -111,7 +110,7 @@ public Builder messageMaxBytes(int messageMaxBytes) {
return this;
}

public final ActiveMQSender build() {
public ActiveMQSender build() {
if (connectionFactory == null) throw new NullPointerException("connectionFactory == null");
return new ActiveMQSender(this);
}
Expand All @@ -122,14 +121,12 @@ public final ActiveMQSender build() {

final Encoding encoding;
final int messageMaxBytes;
final BytesMessageEncoder encoder;

final LazyInit lazyInit;

ActiveMQSender(Builder builder) {
this.encoding = builder.encoding;
this.messageMaxBytes = builder.messageMaxBytes;
this.encoder = BytesMessageEncoder.forEncoding(encoding);
this.lazyInit = new LazyInit(builder);
}

Expand All @@ -155,14 +152,14 @@ public final ActiveMQSender build() {
/** {@inheritDoc} */
@Override @Deprecated public Call<Void> sendSpans(List<byte[]> encodedSpans) {
if (closeCalled) throw new ClosedSenderException();
byte[] message = encoder.encode(encodedSpans);
byte[] message = encoding.encode(encodedSpans);
return new ActiveMQCall(message);
}

/** {@inheritDoc} */
@Override public void send(List<byte[]> encodedSpans) throws IOException {
if (closeCalled) throw new ClosedSenderException();
send(encoder.encode(encodedSpans));
send(encoding.encode(encodedSpans));
}

void send(byte[] message) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.util.List;
import java.util.concurrent.TimeoutException;
import zipkin2.reporter.AsyncReporter;
import zipkin2.reporter.BytesMessageEncoder;
import zipkin2.reporter.BytesMessageSender;
import zipkin2.reporter.Call;
import zipkin2.reporter.Callback;
Expand Down Expand Up @@ -179,12 +178,10 @@ public final RabbitMQSender build() {
final List<Address> addresses;
final String queue;
final ConnectionFactory connectionFactory;
final BytesMessageEncoder encoder;

RabbitMQSender(Builder builder) {
if (builder.addresses == null) throw new NullPointerException("addresses == null");
encoding = builder.encoding;
encoder = BytesMessageEncoder.forEncoding(encoding);
messageMaxBytes = builder.messageMaxBytes;
addresses = builder.addresses;
queue = builder.queue;
Expand Down Expand Up @@ -218,14 +215,14 @@ public Builder toBuilder() {
/** {@inheritDoc} */
@Override @Deprecated public Call<Void> sendSpans(List<byte[]> encodedSpans) {
if (closeCalled) throw new ClosedSenderException();
byte[] message = encoder.encode(encodedSpans);
byte[] message = encoding.encode(encodedSpans);
return new RabbitMQCall(message);
}

/** {@inheritDoc} */
@Override public void send(List<byte[]> encodedSpans) throws IOException {
if (closeCalled) throw new ClosedSenderException();
publish(encoder.encode(encodedSpans));
publish(encoding.encode(encodedSpans));
}

void publish(byte[] message) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,28 +14,24 @@
package zipkin2.reporter.internal;

import java.util.List;
import zipkin2.reporter.BytesMessageEncoder;
import zipkin2.reporter.BytesMessageSender;
import zipkin2.reporter.Encoding;

/** Encodes messages on {@link #send(List)}, but doesn't do anything else. */
final class NoopSender extends BytesMessageSender.Base {
final BytesMessageEncoder messageEncoder;

/** close is typically called from a different thread */
volatile boolean closeCalled;

NoopSender(Encoding encoding) {
super(encoding);
this.messageEncoder = BytesMessageEncoder.forEncoding(encoding);
}

@Override public int messageMaxBytes() {
return Integer.MAX_VALUE;
}

@Override public void send(List<byte[]> encodedSpans) {
messageEncoder.encode(encodedSpans);
encoding.encode(encodedSpans);
}

@Override public void close() {
Expand Down
14 changes: 5 additions & 9 deletions brave/src/test/java/zipkin2/reporter/brave/FakeSender.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import zipkin2.codec.BytesDecoder;
import zipkin2.codec.SpanBytesDecoder;
import zipkin2.reporter.BytesEncoder;
import zipkin2.reporter.BytesMessageEncoder;
import zipkin2.reporter.BytesMessageSender;
import zipkin2.reporter.ClosedSenderException;
import zipkin2.reporter.Encoding;
Expand All @@ -29,37 +28,34 @@
public final class FakeSender extends BytesMessageSender.Base {

public static FakeSender create() {
return new FakeSender(Encoding.JSON, Integer.MAX_VALUE,
BytesMessageEncoder.forEncoding(Encoding.JSON), SpanBytesEncoder.JSON_V2,
return new FakeSender(Encoding.JSON, Integer.MAX_VALUE, SpanBytesEncoder.JSON_V2,
SpanBytesDecoder.JSON_V2, spans -> {
});
}

final int messageMaxBytes;
final BytesMessageEncoder messageEncoder;
final BytesEncoder<Span> encoder;
final BytesDecoder<Span> decoder;
final Consumer<List<Span>> onSpans;

FakeSender(Encoding encoding, int messageMaxBytes, BytesMessageEncoder messageEncoder,
BytesEncoder<Span> encoder, BytesDecoder<Span> decoder, Consumer<List<Span>> onSpans) {
FakeSender(Encoding encoding, int messageMaxBytes, BytesEncoder<Span> encoder,
BytesDecoder<Span> decoder, Consumer<List<Span>> onSpans) {
super(encoding);
this.messageMaxBytes = messageMaxBytes;
this.messageEncoder = messageEncoder;
this.encoder = encoder;
this.decoder = decoder;
this.onSpans = onSpans;
}

FakeSender encoding(Encoding encoding) {
return new FakeSender(encoding, messageMaxBytes, messageEncoder, // invalid but not needed, yet
return new FakeSender(encoding, messageMaxBytes, // invalid but not needed, yet
encoder, // invalid but not needed, yet
decoder, // invalid but not needed, yet
onSpans);
}

FakeSender onSpans(Consumer<List<Span>> onSpans) {
return new FakeSender(encoding, messageMaxBytes, messageEncoder, encoder, decoder, onSpans);
return new FakeSender(encoding, messageMaxBytes, encoder, decoder, onSpans);
}

@Override public int messageMaxBytes() {
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/java/zipkin2/reporter/BytesMessageEncoder.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
/**
* Senders like Kafka use byte[] message encoding. This provides helpers to concatenate spans into a
* list.
*
* @deprecated As of 3.3, use {@linkplain Encoding#encode(List)}. This will be removed in v4.0.
*/
@Deprecated
public enum BytesMessageEncoder {
JSON {
@Override public byte[] encode(List<byte[]> values) {
Expand Down
48 changes: 48 additions & 0 deletions core/src/main/java/zipkin2/reporter/Encoding.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,15 @@ public enum Encoding {
}
return sizeInBytes;
}

@SuppressWarnings("deprecation")
@Override public byte[] encode(List<byte[]> encodedSpans) {
return BytesMessageEncoder.JSON.encode(encodedSpans);
}

@Override public String mediaType() {
return "application/json";
}
},
/**
* The first format of Zipkin was TBinaryProtocol, big-endian thrift. It is no longer used, but
Expand All @@ -61,6 +70,15 @@ public enum Encoding {
}
return sizeInBytes;
}

@SuppressWarnings("deprecation")
@Override public byte[] encode(List<byte[]> encodedSpans) {
return BytesMessageEncoder.THRIFT.encode(encodedSpans);
}

@Override public String mediaType() {
return "application/x-thrift";
}
},
/**
* Repeated (type 2) fields are length-prefixed. A list is a concatenation of fields with no
Expand All @@ -82,10 +100,40 @@ public enum Encoding {
}
return sizeInBytes;
}

@SuppressWarnings("deprecation")
@Override public byte[] encode(List<byte[]> encodedSpans) {
return BytesMessageEncoder.PROTO3.encode(encodedSpans);
}

@Override public String mediaType() {
return "application/x-protobuf";
}
};

/**
* Combines a list of encoded spans into an encoded list. For example, in {@linkplain #THRIFT},
* this would be length-prefixed, whereas in {@linkplain #JSON}, this would be comma-separated and
* enclosed by brackets.
*
* <p>The primary use of this is batch reporting spans. For example, spans are {@link
* BytesEncoder#encode(Object) encoded} one-by-one into a queue. This queue is drained up to a byte
* threshold. Then, the list is encoded with this function and reported out-of-process.
*
* @since 3.3
*/
public abstract byte[] encode(List<byte[]> encodedSpans);

/** Like {@link #listSizeInBytes(List)}, except for a single element. */
public abstract int listSizeInBytes(int encodedSizeInBytes);

public abstract int listSizeInBytes(List<byte[]> values);

/**
* Returns this message's "Content-Type" for use in an HTTP {@link BytesMessageSender sender}
* targeting the Zipkin <a href="https://zipkin.io/zipkin-api/#/">POST</a> endpoint.
*
* @since 3.3
*/
public abstract String mediaType();
}
27 changes: 13 additions & 14 deletions core/src/test/java/zipkin2/reporter/BytesMessageEncoderTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,56 +13,55 @@
*/
package zipkin2.reporter;

import java.util.Arrays;
import java.util.List;
import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;

class BytesMessageEncoderTest {
class EncodingTest {
@Test void emptyList_json() {
List<byte[]> encoded = Arrays.asList();
assertThat(BytesMessageEncoder.JSON.encode(encoded))
List<byte[]> encoded = List.of();
assertThat(Encoding.JSON.encode(encoded))
.containsExactly('[', ']');
}

@Test void singletonList_json() {
List<byte[]> encoded = Arrays.asList(new byte[] {'{', '}'});
List<byte[]> encoded = List.of(new byte[] {'{', '}'});

assertThat(BytesMessageEncoder.JSON.encode(encoded))
assertThat(Encoding.JSON.encode(encoded))
.containsExactly('[', '{', '}', ']');
}

@Test void multiItemList_json() {
List<byte[]> encoded = Arrays.asList(
List<byte[]> encoded = List.of(
"{\"k\":\"1\"}".getBytes(),
"{\"k\":\"2\"}".getBytes(),
"{\"k\":\"3\"}".getBytes()
);
assertThat(new String(BytesMessageEncoder.JSON.encode(encoded)))
assertThat(new String(Encoding.JSON.encode(encoded)))
.isEqualTo("[{\"k\":\"1\"},{\"k\":\"2\"},{\"k\":\"3\"}]");
}

@Test void emptyList_proto3() {
List<byte[]> encoded = Arrays.asList();
assertThat(BytesMessageEncoder.PROTO3.encode(encoded))
List<byte[]> encoded = List.of();
assertThat(Encoding.PROTO3.encode(encoded))
.isEmpty();
}

@Test void singletonList_proto3() {
List<byte[]> encoded = Arrays.asList(new byte[] {1, 1, 'a'});
List<byte[]> encoded = List.of(new byte[] {1, 1, 'a'});

assertThat(BytesMessageEncoder.PROTO3.encode(encoded))
assertThat(Encoding.PROTO3.encode(encoded))
.containsExactly(1, 1, 'a');
}

@Test void multiItemList_proto3() {
List<byte[]> encoded = Arrays.asList(
List<byte[]> encoded = List.of(
new byte[] {1, 1, 'a'},
new byte[] {1, 1, 'b'},
new byte[] {1, 1, 'c'}
);
assertThat(BytesMessageEncoder.PROTO3.encode(encoded)).containsExactly(
assertThat(Encoding.PROTO3.encode(encoded)).containsExactly(
1, 1, 'a',
1, 1, 'b',
1, 1, 'c'
Expand Down
15 changes: 6 additions & 9 deletions core/src/test/java/zipkin2/reporter/FakeSender.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,41 +23,38 @@
public final class FakeSender extends BytesMessageSender.Base {

public static FakeSender create() {
return new FakeSender(Encoding.JSON, Integer.MAX_VALUE,
BytesMessageEncoder.forEncoding(Encoding.JSON), SpanBytesEncoder.JSON_V2,
return new FakeSender(Encoding.JSON, Integer.MAX_VALUE, SpanBytesEncoder.JSON_V2,
SpanBytesDecoder.JSON_V2, spans -> {
});
}

final int messageMaxBytes;
final BytesMessageEncoder messageEncoder;
final BytesEncoder<Span> encoder;
final BytesDecoder<Span> decoder;
final Consumer<List<Span>> onSpans;

FakeSender(Encoding encoding, int messageMaxBytes, BytesMessageEncoder messageEncoder,
BytesEncoder<Span> encoder, BytesDecoder<Span> decoder, Consumer<List<Span>> onSpans) {
FakeSender(Encoding encoding, int messageMaxBytes, BytesEncoder<Span> encoder,
BytesDecoder<Span> decoder, Consumer<List<Span>> onSpans) {
super(encoding);
this.messageMaxBytes = messageMaxBytes;
this.messageEncoder = messageEncoder;
this.encoder = encoder;
this.decoder = decoder;
this.onSpans = onSpans;
}

public FakeSender encoding(Encoding encoding) {
return new FakeSender(encoding, messageMaxBytes, messageEncoder, // invalid but not needed, yet
return new FakeSender(encoding, messageMaxBytes, // invalid but not needed, yet
encoder, // invalid but not needed, yet
decoder, // invalid but not needed, yet
onSpans);
}

public FakeSender onSpans(Consumer<List<Span>> onSpans) {
return new FakeSender(encoding, messageMaxBytes, messageEncoder, encoder, decoder, onSpans);
return new FakeSender(encoding, messageMaxBytes, encoder, decoder, onSpans);
}

public FakeSender messageMaxBytes(int messageMaxBytes) {
return new FakeSender(encoding, messageMaxBytes, messageEncoder, encoder, decoder, onSpans);
return new FakeSender(encoding, messageMaxBytes, encoder, decoder, onSpans);
}

@Override public int messageMaxBytes() {
Expand Down
Loading

0 comments on commit efaeceb

Please sign in to comment.