Skip to content

Commit

Permalink
AsyncReporter/SpanHandler: make queuedMaxBytes=0 disable pre-flight s…
Browse files Browse the repository at this point in the history
…ize checks

Signed-off-by: Andriy Redko <drreta@gmail.com>
  • Loading branch information
reta committed Apr 7, 2024
1 parent 16686ce commit 2a4de0f
Show file tree
Hide file tree
Showing 6 changed files with 406 additions and 9 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* Copyright The OpenZipkin Authors
* SPDX-License-Identifier: Apache-2.0
*/
package zipkin2.reporter.internal;

import java.util.concurrent.TimeUnit;
import org.openjdk.jmh.annotations.AuxCounters;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Group;
import org.openjdk.jmh.annotations.GroupThreads;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;

@Measurement(iterations = 5, time = 1)
@Warmup(iterations = 10, time = 1)
@Fork(3)
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
@State(Scope.Group)
public class BoundedQueueBenchmarks {
static final byte ONE = 1;

@Param( {"0", "10000"})
public int maxBytes;

@AuxCounters
@State(Scope.Thread)
public static class OfferCounters {
public int offersFailed;
public int offersMade;

@Setup(Level.Iteration)
public void clean() {
offersFailed = offersMade = 0;
}
}

@AuxCounters
@State(Scope.Thread)
public static class DrainCounters {
public int drained;

@Setup(Level.Iteration)
public void clean() {
drained = 0;
}
}

private static ThreadLocal<Object> marker = new ThreadLocal<>();

@State(Scope.Thread)
public static class ConsumerMarker {
public ConsumerMarker() {
marker.set(this);
}
}

BoundedQueue<Byte> q;

@Setup
public void setup() {
q = BoundedQueue.create(10000, maxBytes);
}

@Benchmark @Group("no_contention") @GroupThreads(1)
public void no_contention_offer(OfferCounters counters) {
if (q.offer(ONE, 1)) {
counters.offersMade++;
} else {
counters.offersFailed++;
}
}

@Benchmark @Group("no_contention") @GroupThreads(1)
public void no_contention_drain(DrainCounters counters, ConsumerMarker cm) {
q.drainTo((s, b) -> {
counters.drained++;
return true;
}, 1000);
}

@Benchmark @Group("mild_contention") @GroupThreads(2)
public void mild_contention_offer(OfferCounters counters) {
if (q.offer(ONE, 1)) {
counters.offersMade++;
} else {
counters.offersFailed++;
}
}

@Benchmark @Group("mild_contention") @GroupThreads(1)
public void mild_contention_drain(DrainCounters counters, ConsumerMarker cm) {
q.drainTo((s, b) -> {
counters.drained++;
return true;
}, 1000);
}

@Benchmark @Group("high_contention") @GroupThreads(8)
public void high_contention_offer(OfferCounters counters) {
if (q.offer(ONE, 1)) {
counters.offersMade++;
} else {
counters.offersFailed++;
}
}

@Benchmark @Group("high_contention") @GroupThreads(1)
public void high_contention_drain(DrainCounters counters, ConsumerMarker cm) {
q.drainTo((s, b) -> {
counters.drained++;
return true;
}, 1000);
}

@TearDown(Level.Iteration)
public void emptyQ() {
// If this thread didn't drain, return
if (marker.get() == null) return;
q.clear();
}

// Convenience main entry-point
public static void main(String[] args) throws RunnerException {
Options opt = new OptionsBuilder()
.include(".*" + BoundedQueueBenchmarks.class.getSimpleName() + ".*")
.build();

new Runner(opt).run();
}
}
12 changes: 6 additions & 6 deletions core/src/main/java/zipkin2/reporter/internal/AsyncReporter.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ public static final class Builder {
this.messageMaxBytes = asyncReporter.messageMaxBytes;
this.messageTimeoutNanos = asyncReporter.messageTimeoutNanos;
this.closeTimeoutNanos = asyncReporter.closeTimeoutNanos;
this.queuedMaxSpans = asyncReporter.pending.maxSize;
this.queuedMaxBytes = asyncReporter.pending.maxBytes;
this.queuedMaxSpans = asyncReporter.pending.maxSize();
this.queuedMaxBytes = asyncReporter.pending.maxBytes();
}

static int onePercentOfMemory() {
Expand Down Expand Up @@ -181,7 +181,7 @@ static final class BoundedAsyncReporter<S> extends AsyncReporter<S> {
static final Logger logger = Logger.getLogger(BoundedAsyncReporter.class.getName());
final AtomicBoolean started, closed;
final BytesEncoder<S> encoder;
final ByteBoundedQueue<S> pending;
final BoundedQueue<S> pending;
final BytesMessageSender sender;
final int messageMaxBytes;
final long messageTimeoutNanos, closeTimeoutNanos;
Expand All @@ -193,7 +193,7 @@ static final class BoundedAsyncReporter<S> extends AsyncReporter<S> {
private boolean shouldWarnException = true;

BoundedAsyncReporter(Builder builder, BytesEncoder<S> encoder) {
this.pending = new ByteBoundedQueue<S>(builder.queuedMaxSpans, builder.queuedMaxBytes);
this.pending = BoundedQueue.create(builder.queuedMaxSpans, builder.queuedMaxBytes);
this.sender = builder.sender;
this.messageMaxBytes = builder.messageMaxBytes;
this.messageTimeoutNanos = builder.messageTimeoutNanos;
Expand Down Expand Up @@ -241,8 +241,8 @@ void flush(BufferNextMessage<S> bundler) {
pending.drainTo(bundler, bundler.remainingNanos());

// record after flushing reduces the amount of gauge events vs on doing this on report
metrics.updateQueuedSpans(pending.count);
metrics.updateQueuedBytes(pending.sizeInBytes);
metrics.updateQueuedSpans(pending.count());
metrics.updateQueuedBytes(pending.sizeInBytes());

// loop around if we are running, and the bundle isn't full
// if we are closed, try to send what's pending
Expand Down
50 changes: 50 additions & 0 deletions core/src/main/java/zipkin2/reporter/internal/BoundedQueue.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright The OpenZipkin Authors
* SPDX-License-Identifier: Apache-2.0
*/

package zipkin2.reporter.internal;

/**
* Multi-producer, multi-consumer queue that could be bounded by count or/and size.
*/
abstract class BoundedQueue<S> implements SpanWithSizeConsumer<S> {
static <S> BoundedQueue<S> create(int maxSize, int maxBytes) {
if (maxBytes > 0) {
return new ByteBoundedQueue<S>(maxSize, maxBytes);
} else {
return new SizeBoundedQueue<S>(maxSize);
}
}

/**
* Max element's count of this bounded queue
*/
abstract int maxSize();

/**
* Max element'size of this bounded queue
*/
abstract int maxBytes();

/**
* Clear this bounded queue
*/
abstract int clear();

/**
* Element's count of this bounded queue
*/
abstract int count();

/**
* Element's size of this bounded queue
*/
abstract int sizeInBytes();

/**
* Drains this bounded queue. Blocks for up to nanosTimeout for spans to appear.
* Then, consume as many as possible.
*/
abstract int drainTo(SpanWithSizeConsumer<S> bundler, long remainingNanos);
}
23 changes: 20 additions & 3 deletions core/src/main/java/zipkin2/reporter/internal/ByteBoundedQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
*
* <p>This is similar to {@link java.util.concurrent.ArrayBlockingQueue} in implementation.
*/
final class ByteBoundedQueue<S> implements SpanWithSizeConsumer<S> {
final class ByteBoundedQueue<S> extends BoundedQueue<S> implements SpanWithSizeConsumer<S> {

final ReentrantLock lock = new ReentrantLock(false);
final Condition available = lock.newCondition();
Expand Down Expand Up @@ -60,7 +60,7 @@ final class ByteBoundedQueue<S> implements SpanWithSizeConsumer<S> {
}

/** Blocks for up to nanosTimeout for spans to appear. Then, consume as many as possible. */
int drainTo(SpanWithSizeConsumer<S> consumer, long nanosTimeout) {
@Override int drainTo(SpanWithSizeConsumer<S> consumer, long nanosTimeout) {
try {
// This may be called by multiple threads. If one is holding a lock, another is waiting. We
// use lockInterruptibly to ensure the one waiting can be interrupted.
Expand All @@ -81,7 +81,7 @@ int drainTo(SpanWithSizeConsumer<S> consumer, long nanosTimeout) {
}

/** Clears the queue unconditionally and returns count of spans cleared. */
int clear() {
@Override int clear() {
lock.lock();
try {
int result = count;
Expand Down Expand Up @@ -115,6 +115,23 @@ int doDrain(SpanWithSizeConsumer<S> consumer) {
sizeInBytes -= drainedSizeInBytes;
return drainedCount;
}

@Override int count() {
return count;
}

@Override int maxBytes() {
return maxBytes;
}

@Override int maxSize() {
return maxSize;
}

@Override
public int sizeInBytes() {
return sizeInBytes;
}
}

interface SpanWithSizeConsumer<S> {
Expand Down
Loading

0 comments on commit 2a4de0f

Please sign in to comment.