Skip to content

Commit

Permalink
Remove messageMaxBytes fro AsyncReporter::report in favor of postponi…
Browse files Browse the repository at this point in the history
…ng them till flushing occurs

Signed-off-by: Andriy Redko <drreta@gmail.com>
  • Loading branch information
reta committed Apr 9, 2024
1 parent 2a4de0f commit 4209ba0
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;

import org.openjdk.jmh.annotations.AuxCounters;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
Expand All @@ -24,6 +26,7 @@
import org.openjdk.jmh.annotations.Warmup;
import zipkin2.Span;
import zipkin2.TestObjects;
import zipkin2.reporter.BytesEncoder;
import zipkin2.reporter.Encoding;
import zipkin2.reporter.InMemoryReporterMetrics;
import zipkin2.reporter.SpanBytesEncoder;
Expand All @@ -42,6 +45,9 @@ public class AsyncReporterBenchmarks {
@Param
public Encoding encoding;

@Param( {"0", "10000000"})
public int maxBytes;

@AuxCounters
@State(Scope.Thread)
public static class InMemoryReporterMetricsAsCounters {
Expand Down Expand Up @@ -77,10 +83,17 @@ public void clean() {

@Setup(Level.Trial)
public void setup() {
final BytesEncoder<Span> encoder = Stream
.of(SpanBytesEncoder.JSON_V2, SpanBytesEncoder.PROTO3, SpanBytesEncoder.THRIFT)
.filter(e -> e.encoding().equals(encoding))
.findAny()
.orElseThrow(() -> new IllegalStateException("Unable to find BytesEncoder<Span> for " + encoding));

reporter = AsyncReporter.newBuilder(new NoopSender(encoding))
.messageMaxBytes(1000000) // example default from Kafka message.max.bytes
.queuedMaxBytes(maxBytes)
.metrics(metrics)
.build(SpanBytesEncoder.JSON_V2);
.build(encoder);
}

@Benchmark @Group("no_contention") @GroupThreads(1)
Expand Down
28 changes: 19 additions & 9 deletions core/src/main/java/zipkin2/reporter/internal/AsyncReporter.java
Original file line number Diff line number Diff line change
Expand Up @@ -216,20 +216,30 @@ void startFlusherThread() {
flushThread.start();
}

@SuppressWarnings("unchecked")
@Override public void report(S next) {
if (next == null) throw new NullPointerException("span == null");
// Lazy start so that reporters never used don't spawn threads
if (started.compareAndSet(false, true)) startFlusherThread();
metrics.incrementSpans(1);
int nextSizeInBytes = encoder.sizeInBytes(next);
int messageSizeOfNextSpan = sender.messageSizeInBytes(nextSizeInBytes);
metrics.incrementSpanBytes(nextSizeInBytes);
if (closed.get() ||
// don't enqueue something larger than we can drain
messageSizeOfNextSpan > messageMaxBytes ||
!pending.offer(next, nextSizeInBytes)) {
metrics.incrementSpansDropped(1);
}

if (pending instanceof UnsizedSpanConsumer) {
// enqueue now and filter our when we drain
final UnsizedSpanConsumer<S> consumer = (UnsizedSpanConsumer<S>)pending;
if (closed.get() || !consumer.offer(next)) {
metrics.incrementSpansDropped(1);
}
} else {
int nextSizeInBytes = encoder.sizeInBytes(next);
int messageSizeOfNextSpan = sender.messageSizeInBytes(nextSizeInBytes);
metrics.incrementSpanBytes(nextSizeInBytes);
if (closed.get() ||
// don't enqueue something larger than we can drain
messageSizeOfNextSpan > messageMaxBytes ||
!pending.offer(next, nextSizeInBytes)) {
metrics.incrementSpansDropped(1);
}
}
}

@Override public void flush() {
Expand Down

0 comments on commit 4209ba0

Please sign in to comment.