diff --git a/.github/component_owners.yml b/.github/component_owners.yml index 85a68bc74..6c6ef2214 100644 --- a/.github/component_owners.yml +++ b/.github/component_owners.yml @@ -3,6 +3,8 @@ components: aws-xray: - anuraaga - willarmiros + consistent-sampling: + - oertl samplers: - anuraaga - iNikem diff --git a/consistent-sampling/build.gradle.kts b/consistent-sampling/build.gradle.kts new file mode 100644 index 000000000..1ad1faad1 --- /dev/null +++ b/consistent-sampling/build.gradle.kts @@ -0,0 +1,12 @@ +plugins { + id("otel.java-conventions") + id("otel.publish-conventions") +} + +description = "Sampler and exporter implementations for consistent sampling" + +dependencies { + api("io.opentelemetry:opentelemetry-sdk-trace") + testImplementation("org.hipparchus:hipparchus-core:2.0") + testImplementation("org.hipparchus:hipparchus-stat:2.0") +} diff --git a/consistent-sampling/src/main/java/io/opentelemetry/contrib/samplers/ConsistentAlwaysOffSampler.java b/consistent-sampling/src/main/java/io/opentelemetry/contrib/samplers/ConsistentAlwaysOffSampler.java new file mode 100644 index 000000000..37759d670 --- /dev/null +++ b/consistent-sampling/src/main/java/io/opentelemetry/contrib/samplers/ConsistentAlwaysOffSampler.java @@ -0,0 +1,30 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.samplers; + +import javax.annotation.concurrent.Immutable; + +@Immutable +final class ConsistentAlwaysOffSampler extends ConsistentSampler { + + private ConsistentAlwaysOffSampler() {} + + private static final ConsistentSampler INSTANCE = new ConsistentAlwaysOffSampler(); + + static ConsistentSampler getInstance() { + return INSTANCE; + } + + @Override + protected int getP(int parentP, boolean isRoot) { + return OtelTraceState.getMaxP(); + } + + @Override + public String getDescription() { + return "ConsistentAlwaysOffSampler"; + } +} diff --git a/consistent-sampling/src/main/java/io/opentelemetry/contrib/samplers/ConsistentAlwaysOnSampler.java b/consistent-sampling/src/main/java/io/opentelemetry/contrib/samplers/ConsistentAlwaysOnSampler.java new file mode 100644 index 000000000..9ca49bd0d --- /dev/null +++ b/consistent-sampling/src/main/java/io/opentelemetry/contrib/samplers/ConsistentAlwaysOnSampler.java @@ -0,0 +1,30 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.samplers; + +import javax.annotation.concurrent.Immutable; + +@Immutable +final class ConsistentAlwaysOnSampler extends ConsistentSampler { + + private ConsistentAlwaysOnSampler() {} + + private static final ConsistentSampler INSTANCE = new ConsistentAlwaysOnSampler(); + + static ConsistentSampler getInstance() { + return INSTANCE; + } + + @Override + protected int getP(int parentP, boolean isRoot) { + return 0; + } + + @Override + public String getDescription() { + return "ConsistentAlwaysOnSampler"; + } +} diff --git a/consistent-sampling/src/main/java/io/opentelemetry/contrib/samplers/ConsistentComposedAndSampler.java b/consistent-sampling/src/main/java/io/opentelemetry/contrib/samplers/ConsistentComposedAndSampler.java new file mode 100644 index 000000000..17f758b46 --- /dev/null +++ b/consistent-sampling/src/main/java/io/opentelemetry/contrib/samplers/ConsistentComposedAndSampler.java @@ -0,0 +1,51 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.samplers; + +import static java.util.Objects.requireNonNull; + +import javax.annotation.concurrent.Immutable; + +/** + * A consistent sampler composed of two consistent samplers. + * + *

This sampler samples if both samplers would sample. + */ +@Immutable +final class ConsistentComposedAndSampler extends ConsistentSampler { + + private final ConsistentSampler sampler1; + private final ConsistentSampler sampler2; + private final String description; + + ConsistentComposedAndSampler(ConsistentSampler sampler1, ConsistentSampler sampler2) { + this.sampler1 = requireNonNull(sampler1); + this.sampler2 = requireNonNull(sampler2); + this.description = + "ConsistentComposedAndSampler{" + + "sampler1=" + + sampler1.getDescription() + + ",sampler2=" + + sampler2.getDescription() + + '}'; + } + + @Override + protected int getP(int parentP, boolean isRoot) { + int p1 = sampler1.getP(parentP, isRoot); + int p2 = sampler2.getP(parentP, isRoot); + if (OtelTraceState.isValidP(p1) && OtelTraceState.isValidP(p2)) { + return Math.max(p1, p2); + } else { + return OtelTraceState.getInvalidP(); + } + } + + @Override + public String getDescription() { + return description; + } +} diff --git a/consistent-sampling/src/main/java/io/opentelemetry/contrib/samplers/ConsistentComposedOrSampler.java b/consistent-sampling/src/main/java/io/opentelemetry/contrib/samplers/ConsistentComposedOrSampler.java new file mode 100644 index 000000000..b090dd399 --- /dev/null +++ b/consistent-sampling/src/main/java/io/opentelemetry/contrib/samplers/ConsistentComposedOrSampler.java @@ -0,0 +1,57 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.samplers; + +import static java.util.Objects.requireNonNull; + +import javax.annotation.concurrent.Immutable; + +/** + * A consistent sampler composed of two consistent samplers. + * + *

This sampler samples if any of the two samplers would sample. + */ +@Immutable +final class ConsistentComposedOrSampler extends ConsistentSampler { + + private final ConsistentSampler sampler1; + private final ConsistentSampler sampler2; + private final String description; + + ConsistentComposedOrSampler(ConsistentSampler sampler1, ConsistentSampler sampler2) { + this.sampler1 = requireNonNull(sampler1); + this.sampler2 = requireNonNull(sampler2); + this.description = + "ConsistentComposedOrSampler{" + + "sampler1=" + + sampler1.getDescription() + + ",sampler2=" + + sampler2.getDescription() + + '}'; + } + + @Override + protected int getP(int parentP, boolean isRoot) { + int p1 = sampler1.getP(parentP, isRoot); + int p2 = sampler2.getP(parentP, isRoot); + if (OtelTraceState.isValidP(p1)) { + if (OtelTraceState.isValidP(p2)) { + return Math.min(p1, p2); + } + return p1; + } else { + if (OtelTraceState.isValidP(p2)) { + return p2; + } + return OtelTraceState.getInvalidP(); + } + } + + @Override + public String getDescription() { + return description; + } +} diff --git a/consistent-sampling/src/main/java/io/opentelemetry/contrib/samplers/ConsistentParentBasedSampler.java b/consistent-sampling/src/main/java/io/opentelemetry/contrib/samplers/ConsistentParentBasedSampler.java new file mode 100644 index 000000000..4db1ad196 --- /dev/null +++ b/consistent-sampling/src/main/java/io/opentelemetry/contrib/samplers/ConsistentParentBasedSampler.java @@ -0,0 +1,61 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.samplers; + +import static java.util.Objects.requireNonNull; + +import javax.annotation.concurrent.Immutable; + +/** + * A consistent sampler that makes the same sampling decision as the parent and optionally falls + * back to an alternative consistent sampler, if the parent p-value is invalid (like for root + * spans). + */ +@Immutable +final class ConsistentParentBasedSampler extends ConsistentSampler { + + private final ConsistentSampler rootSampler; + + private final String description; + + /** + * Constructs a new consistent parent based sampler using the given root sampler. + * + * @param rootSampler the root sampler + */ + ConsistentParentBasedSampler(ConsistentSampler rootSampler) { + this(rootSampler, RandomGenerator.getDefault()); + } + + /** + * Constructs a new consistent parent based sampler using the given root sampler and the given + * thread-safe random generator. + * + * @param rootSampler the root sampler + * @param threadSafeRandomGenerator a thread-safe random generator + */ + ConsistentParentBasedSampler( + ConsistentSampler rootSampler, RandomGenerator threadSafeRandomGenerator) { + super(threadSafeRandomGenerator); + this.rootSampler = requireNonNull(rootSampler); + this.description = + "ConsistentParentBasedSampler{rootSampler=" + rootSampler.getDescription() + '}'; + } + + @Override + protected int getP(int parentP, boolean isRoot) { + if (isRoot) { + return rootSampler.getP(parentP, isRoot); + } else { + return parentP; + } + } + + @Override + public String getDescription() { + return description; + } +} diff --git a/consistent-sampling/src/main/java/io/opentelemetry/contrib/samplers/ConsistentProbabilityBasedSampler.java b/consistent-sampling/src/main/java/io/opentelemetry/contrib/samplers/ConsistentProbabilityBasedSampler.java new file mode 100644 index 000000000..2e55d7409 --- /dev/null +++ b/consistent-sampling/src/main/java/io/opentelemetry/contrib/samplers/ConsistentProbabilityBasedSampler.java @@ -0,0 +1,69 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.samplers; + +import javax.annotation.concurrent.Immutable; + +/** A consistent sampler that samples with a fixed probability. */ +@Immutable +final class ConsistentProbabilityBasedSampler extends ConsistentSampler { + + private final int lowerPValue; + private final int upperPValue; + private final double probabilityToUseLowerPValue; + private final String description; + + /** + * Constructor. + * + * @param samplingProbability the sampling probability + */ + ConsistentProbabilityBasedSampler(double samplingProbability) { + this(samplingProbability, RandomGenerator.getDefault()); + } + + /** + * Constructor. + * + * @param samplingProbability the sampling probability + * @param randomGenerator a random generator + */ + ConsistentProbabilityBasedSampler(double samplingProbability, RandomGenerator randomGenerator) { + super(randomGenerator); + if (samplingProbability < 0.0 || samplingProbability > 1.0) { + throw new IllegalArgumentException("Sampling probability must be in range [0.0, 1.0]!"); + } + this.description = + String.format("ConsistentProbabilityBasedSampler{%.6f}", samplingProbability); + + lowerPValue = getLowerBoundP(samplingProbability); + upperPValue = getUpperBoundP(samplingProbability); + + if (lowerPValue == upperPValue) { + probabilityToUseLowerPValue = 1; + } else { + double upperSamplingProbability = getSamplingProbability(lowerPValue); + double lowerSamplingProbability = getSamplingProbability(upperPValue); + probabilityToUseLowerPValue = + (samplingProbability - lowerSamplingProbability) + / (upperSamplingProbability - lowerSamplingProbability); + } + } + + @Override + protected int getP(int parentP, boolean isRoot) { + if (randomGenerator.nextBoolean(probabilityToUseLowerPValue)) { + return lowerPValue; + } else { + return upperPValue; + } + } + + @Override + public String getDescription() { + return description; + } +} diff --git a/consistent-sampling/src/main/java/io/opentelemetry/contrib/samplers/ConsistentRateLimitingSampler.java b/consistent-sampling/src/main/java/io/opentelemetry/contrib/samplers/ConsistentRateLimitingSampler.java new file mode 100644 index 000000000..9f1edbbd1 --- /dev/null +++ b/consistent-sampling/src/main/java/io/opentelemetry/contrib/samplers/ConsistentRateLimitingSampler.java @@ -0,0 +1,187 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.samplers; + +import static java.util.Objects.requireNonNull; + +import io.opentelemetry.sdk.trace.samplers.Sampler; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.LongSupplier; +import javax.annotation.concurrent.Immutable; + +/** + * This consistent {@link Sampler} adjusts the sampling probability dynamically to limit the rate of + * sampled spans. + * + *

This sampler uses exponential smoothing to estimate on irregular data (compare Wright, David + * J. "Forecasting data published at irregular time intervals using an extension of Holt's method." + * Management science 32.4 (1986): 499-510.) to estimate the average waiting time between spans + * which further allows to estimate the current rate of spans. In the paper, Eq. 2 defines the + * weighted average of a sequence of data + * + *

{@code ..., X(n-2), X(n-1), X(n)} + * + *

at irregular times + * + *

{@code ..., t(n-2), t(n-1), t(n)} + * + *

as + * + *

{@code E(X(n)) := A(n) * V(n)}. + * + *

{@code A(n)} and {@code V(n)} are computed recursively using Eq. 5 and Eq. 6 given by + * + *

{@code A(n) = b(n) * A(n-1) + X(n)} and {@code V(n) = V(n-1) / (b(n) + V(n-1))} + * + *

where + * + *

{@code b(n) := (1 - a)^(t(n) - t(n-1)) = exp((t(n) - t(n-1)) * ln(1 - a))}. + * + *

Introducing + * + *

{@code C(n) := 1 / V(n)} + * + *

the recursion can be rewritten as + * + *

{@code A(n) = b(n) * A(n-1) + X(n)} and {@code C(n) = b(n) * C(n-1) + 1}. + * + *

+ * + *

Since we want to estimate the average waiting time, our data is given by + * + *

{@code X(n) := t(n) - t(n-1)}. + * + *

+ * + *

The following correspondence is used for the implementation: + * + *

+ */ +final class ConsistentRateLimitingSampler extends ConsistentSampler { + + @Immutable + private static final class State { + private final double effectiveWindowCount; + private final double effectiveWindowNanos; + private final long lastNanoTime; + + public State(double effectiveWindowCount, double effectiveWindowNanos, long lastNanoTime) { + this.effectiveWindowCount = effectiveWindowCount; + this.effectiveWindowNanos = effectiveWindowNanos; + this.lastNanoTime = lastNanoTime; + } + } + + private final String description; + private final LongSupplier nanoTimeSupplier; + private final double inverseAdaptationTimeNanos; + private final double targetSpansPerNanosecondLimit; + private final AtomicReference state; + + /** + * Constructor. + * + * @param targetSpansPerSecondLimit the desired spans per second limit + * @param adaptationTimeSeconds the typical time to adapt to a new load (time constant used for + * exponential smoothing) + */ + ConsistentRateLimitingSampler(double targetSpansPerSecondLimit, double adaptationTimeSeconds) { + this( + targetSpansPerSecondLimit, + adaptationTimeSeconds, + RandomGenerator.getDefault(), + System::nanoTime); + } + + /** + * Constructor. + * + * @param targetSpansPerSecondLimit the desired spans per second limit + * @param adaptationTimeSeconds the typical time to adapt to a new load (time constant used for + * exponential smoothing) + * @param randomGenerator a random generator + * @param nanoTimeSupplier a supplier for the current nano time + */ + ConsistentRateLimitingSampler( + double targetSpansPerSecondLimit, + double adaptationTimeSeconds, + RandomGenerator randomGenerator, + LongSupplier nanoTimeSupplier) { + super(randomGenerator); + + if (targetSpansPerSecondLimit < 0.0) { + throw new IllegalArgumentException("Limit for sampled spans per second must be nonnegative!"); + } + if (adaptationTimeSeconds < 0.0) { + throw new IllegalArgumentException("Adaptation rate must be nonnegative!"); + } + this.description = + String.format( + "ConsistentRateLimitingSampler{%.6f, %.6f}", + targetSpansPerSecondLimit, adaptationTimeSeconds); + this.nanoTimeSupplier = requireNonNull(nanoTimeSupplier); + + this.inverseAdaptationTimeNanos = 1e-9 / adaptationTimeSeconds; + this.targetSpansPerNanosecondLimit = 1e-9 * targetSpansPerSecondLimit; + + this.state = new AtomicReference<>(new State(0, 0, nanoTimeSupplier.getAsLong())); + } + + private State updateState(State oldState, long currentNanoTime) { + if (currentNanoTime <= oldState.lastNanoTime) { + return new State( + oldState.effectiveWindowCount + 1, oldState.effectiveWindowNanos, oldState.lastNanoTime); + } + long nanoTimeDelta = currentNanoTime - oldState.lastNanoTime; + double decayFactor = Math.exp(-nanoTimeDelta * inverseAdaptationTimeNanos); + double currentEffectiveWindowCount = oldState.effectiveWindowCount * decayFactor + 1; + double currentEffectiveWindowNanos = + oldState.effectiveWindowNanos * decayFactor + nanoTimeDelta; + return new State(currentEffectiveWindowCount, currentEffectiveWindowNanos, currentNanoTime); + } + + @Override + protected int getP(int parentP, boolean isRoot) { + long currentNanoTime = nanoTimeSupplier.getAsLong(); + State currentState = state.updateAndGet(s -> updateState(s, currentNanoTime)); + + double samplingProbability = + (currentState.effectiveWindowNanos * targetSpansPerNanosecondLimit) + / currentState.effectiveWindowCount; + + if (samplingProbability >= 1.) { + return 0; + } + + int lowerPValue = getLowerBoundP(samplingProbability); + int upperPValue = getUpperBoundP(samplingProbability); + + if (lowerPValue == upperPValue) { + return lowerPValue; + } + + double upperSamplingRate = getSamplingProbability(lowerPValue); + double lowerSamplingRate = getSamplingProbability(upperPValue); + double probabilityToUseLowerPValue = + (samplingProbability - lowerSamplingRate) / (upperSamplingRate - lowerSamplingRate); + + if (randomGenerator.nextBoolean(probabilityToUseLowerPValue)) { + return lowerPValue; + } else { + return upperPValue; + } + } + + @Override + public String getDescription() { + return description; + } +} diff --git a/consistent-sampling/src/main/java/io/opentelemetry/contrib/samplers/ConsistentSampler.java b/consistent-sampling/src/main/java/io/opentelemetry/contrib/samplers/ConsistentSampler.java new file mode 100644 index 000000000..6f9cd76e2 --- /dev/null +++ b/consistent-sampling/src/main/java/io/opentelemetry/contrib/samplers/ConsistentSampler.java @@ -0,0 +1,360 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.samplers; + +import static java.util.Objects.requireNonNull; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.api.trace.TraceState; +import io.opentelemetry.context.Context; +import io.opentelemetry.sdk.trace.data.LinkData; +import io.opentelemetry.sdk.trace.samplers.Sampler; +import io.opentelemetry.sdk.trace.samplers.SamplingDecision; +import io.opentelemetry.sdk.trace.samplers.SamplingResult; +import java.util.List; +import java.util.function.LongSupplier; + +/** Abstract base class for consistent samplers. */ +public abstract class ConsistentSampler implements Sampler { + + /** + * Returns a {@link ConsistentSampler} that samples all spans. + * + * @return a sampler + */ + public static final ConsistentSampler alwaysOn() { + return ConsistentAlwaysOnSampler.getInstance(); + } + + /** + * Returns a {@link ConsistentSampler} that does not sample any span. + * + * @return a sampler + */ + public static final ConsistentSampler alwaysOff() { + return ConsistentAlwaysOffSampler.getInstance(); + } + + /** + * Returns a {@link ConsistentSampler} that samples each span with a fixed probability. + * + * @param samplingProbability the sampling probability + * @return a sampler + */ + public static final ConsistentSampler probabilityBased(double samplingProbability) { + return new ConsistentProbabilityBasedSampler(samplingProbability); + } + + /** + * Returns a {@link ConsistentSampler} that samples each span with a fixed probability. + * + * @param samplingProbability the sampling probability + * @param randomGenerator a random generator + * @return a sampler + */ + static final ConsistentSampler probabilityBased( + double samplingProbability, RandomGenerator randomGenerator) { + return new ConsistentProbabilityBasedSampler(samplingProbability, randomGenerator); + } + + /** + * Returns a new {@link ConsistentSampler} that respects the sampling decision of the parent span + * or falls-back to the given sampler if it is a root span. + * + * @param rootSampler the root sampler + */ + public static final ConsistentSampler parentBased(ConsistentSampler rootSampler) { + return new ConsistentParentBasedSampler(rootSampler); + } + + /** + * Returns a new {@link ConsistentSampler} that respects the sampling decision of the parent span + * or falls-back to the given sampler if it is a root span. + * + * @param rootSampler the root sampler + * @param randomGenerator a random generator + */ + static final ConsistentSampler parentBased( + ConsistentSampler rootSampler, RandomGenerator randomGenerator) { + return new ConsistentParentBasedSampler(rootSampler, randomGenerator); + } + + /** + * Returns a new {@link ConsistentSampler} that attempts to adjust the sampling probability + * dynamically to meet the target span rate. + * + * @param targetSpansPerSecondLimit the desired spans per second limit + * @param adaptationTimeSeconds the typical time to adapt to a new load (time constant used for + * exponential smoothing) + */ + public static final ConsistentSampler rateLimited( + double targetSpansPerSecondLimit, double adaptationTimeSeconds) { + return new ConsistentRateLimitingSampler(targetSpansPerSecondLimit, adaptationTimeSeconds); + } + + /** + * Returns a new {@link ConsistentSampler} that attempts to adjust the sampling probability + * dynamically to meet the target span rate. + * + * @param targetSpansPerSecondLimit the desired spans per second limit + * @param adaptationTimeSeconds the typical time to adapt to a new load (time constant used for + * exponential smoothing) + * @param randomGenerator a random generator + * @param nanoTimeSupplier a supplier for the current nano time + */ + static final ConsistentSampler rateLimited( + double targetSpansPerSecondLimit, + double adaptationTimeSeconds, + RandomGenerator randomGenerator, + LongSupplier nanoTimeSupplier) { + return new ConsistentRateLimitingSampler( + targetSpansPerSecondLimit, adaptationTimeSeconds, randomGenerator, nanoTimeSupplier); + } + + /** + * Returns a {@link ConsistentSampler} that samples a span if both this and the other given + * consistent sampler would sample the span. + * + *

If the other consistent sampler is the same as this, this consistent sampler will be + * returned. + * + *

The returned sampler takes care of setting the trace state correctly, which would not happen + * if the {@link #shouldSample(Context, String, String, SpanKind, Attributes, List)} method was + * called for each sampler individually. Also, the combined sampler is more efficient than + * evaluating the two samplers individually and combining both results afterwards. + * + * @param otherConsistentSampler the other consistent sampler + * @return the composed consistent sampler + */ + public ConsistentSampler and(ConsistentSampler otherConsistentSampler) { + if (otherConsistentSampler == this) { + return this; + } + return new ConsistentComposedAndSampler(this, otherConsistentSampler); + } + + /** + * Returns a {@link ConsistentSampler} that samples a span if either this or the other given + * consistent sampler would sample the span. + * + *

If the other consistent sampler is the same as this, this consistent sampler will be + * returned. + * + *

The returned sampler takes care of setting the trace state correctly, which would not happen + * if the {@link #shouldSample(Context, String, String, SpanKind, Attributes, List)} method was + * called for each sampler individually. Also, the combined sampler is more efficient than + * evaluating the two samplers individually and combining both results afterwards. + * + * @param otherConsistentSampler the other consistent sampler + * @return the composed consistent sampler + */ + public ConsistentSampler or(ConsistentSampler otherConsistentSampler) { + if (otherConsistentSampler == this) { + return this; + } + return new ConsistentComposedOrSampler(this, otherConsistentSampler); + } + + protected final RandomGenerator randomGenerator; + + protected ConsistentSampler(RandomGenerator randomGenerator) { + this.randomGenerator = requireNonNull(randomGenerator); + } + + protected ConsistentSampler() { + this(RandomGenerator.getDefault()); + } + + private static final boolean isInvariantViolated( + OtelTraceState otelTraceState, boolean isParentSampled) { + if (otelTraceState.hasValidR() && otelTraceState.hasValidP()) { + // if valid p- and r-values are given, they must be consistent with the isParentSampled flag + // see + // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/tracestate-probability-sampling.md#sampled-flag + int p = otelTraceState.getP(); + int r = otelTraceState.getR(); + int maxP = OtelTraceState.getMaxP(); + boolean isInvariantTrue = ((p <= r) == isParentSampled) || (isParentSampled && (p == maxP)); + return !isInvariantTrue; + } else { + return false; + } + } + + @Override + public final SamplingResult shouldSample( + Context parentContext, + String traceId, + String name, + SpanKind spanKind, + Attributes attributes, + List parentLinks) { + + Span parentSpan = Span.fromContext(parentContext); + SpanContext parentSpanContext = parentSpan.getSpanContext(); + boolean isRoot = !parentSpanContext.isValid(); + boolean isParentSampled = parentSpanContext.isSampled(); + + TraceState parentTraceState = parentSpanContext.getTraceState(); + String otelTraceStateString = parentTraceState.get(OtelTraceState.TRACE_STATE_KEY); + OtelTraceState otelTraceState = OtelTraceState.parse(otelTraceStateString); + + if (!otelTraceState.hasValidR() || isInvariantViolated(otelTraceState, isParentSampled)) { + // unset p-value in case of an invalid r-value or in case of any invariant violation + otelTraceState.invalidateP(); + } + + // generate new r-value if not available + if (!otelTraceState.hasValidR()) { + otelTraceState.setR( + Math.min(randomGenerator.numberOfLeadingZerosOfRandomLong(), OtelTraceState.getMaxR())); + } + + // determine and set new p-value that is used for the sampling decision + int newP = getP(otelTraceState.getP(), isRoot); + otelTraceState.setP(newP); + + // determine sampling decision + boolean isSampled; + if (otelTraceState.hasValidP()) { + isSampled = (otelTraceState.getP() <= otelTraceState.getR()); + } else { + // if new p-value is invalid, respect sampling decision of parent + isSampled = isParentSampled; + } + SamplingDecision samplingDecision = + isSampled ? SamplingDecision.RECORD_AND_SAMPLE : SamplingDecision.DROP; + + String newOtTraceState = otelTraceState.serialize(); + + return new SamplingResult() { + + @Override + public SamplingDecision getDecision() { + return samplingDecision; + } + + @Override + public Attributes getAttributes() { + return Attributes.empty(); + } + + @Override + public TraceState getUpdatedTraceState(TraceState parentTraceState) { + return parentTraceState.toBuilder() + .put(OtelTraceState.TRACE_STATE_KEY, newOtTraceState) + .build(); + } + }; + } + + /** + * Returns the p-value that is used for the sampling decision. + * + *

The returned p-value is translated into corresponding sampling probabilities as given in the + * following: + * + *

p-value = 0 => sampling probability = 1 + * + *

p-value = 1 => sampling probability = 1/2 + * + *

p-value = 2 => sampling probability = 1/4 + * + *

... + * + *

p-value = (z-2) => sampling probability = 1/2^(z-2) + * + *

p-value = (z-1) => sampling probability = 1/2^(z-1) + * + *

p-value = z => sampling probability = 0 + * + *

Here z denotes OtelTraceState.getMaxP(). + * + *

Any other p-values have no meaning and will lead to inconsistent sampling decisions. The + * parent sampled flag will define the sampling decision in this case. + * + *

NOTE: In future, further information like span attributes could be also added as arguments + * such that the sampling probability could be made dependent on those extra arguments. However, + * in any case the returned p-value must not depend directly or indirectly on the r-value. In + * particular this means that the parent sampled flag must not be used for the calculation of the + * p-value as the sampled flag depends itself on the r-value. + * + * @param parentP is the p-value (if known) that was used for a consistent sampling decision by + * the parent + * @param isRoot is true for the root span + * @return this Builder + */ + protected abstract int getP(int parentP, boolean isRoot); + + /** + * Returns the sampling probability for a given p-value. + * + * @param p the p-value + * @return the sampling probability in the range [0,1] + * @throws IllegalArgumentException if the given p-value is invalid + */ + protected static double getSamplingProbability(int p) { + if (OtelTraceState.isValidP(p)) { + if (p == OtelTraceState.getMaxP()) { + return 0.0; + } else { + return Double.longBitsToDouble((0x3FFL - p) << 52); + } + } else { + throw new IllegalArgumentException("Invalid p-value!"); + } + } + + private static final double SMALLEST_POSITIVE_SAMPLING_PROBABILITY = + getSamplingProbability(OtelTraceState.getMaxP() - 1); + + /** + * Returns the largest p-value for which {@code getSamplingProbability(p) >= samplingProbability}. + * + * @param samplingProbability the sampling probability + * @return the p-value + */ + protected static int getLowerBoundP(double samplingProbability) { + if (!(samplingProbability >= 0.0 && samplingProbability <= 1.0)) { + throw new IllegalArgumentException(); + } + if (samplingProbability == 0.) { + return OtelTraceState.getMaxP(); + } else if (samplingProbability <= SMALLEST_POSITIVE_SAMPLING_PROBABILITY) { + return OtelTraceState.getMaxP() - 1; + } else { + long longSamplingProbability = Double.doubleToRawLongBits(samplingProbability); + long mantissa = longSamplingProbability & 0x000FFFFFFFFFFFFFL; + long exponent = longSamplingProbability >>> 52; // compare + // https://en.wikipedia.org/wiki/Double-precision_floating-point_format#Exponent_encoding + return (int) (0x3FFL - exponent) - (mantissa != 0 ? 1 : 0); + } + } + + /** + * Returns the smallest p-value for which {@code getSamplingProbability(p) <= + * samplingProbability}. + * + * @param samplingProbability the sampling probability + * @return the p-value + */ + protected static int getUpperBoundP(double samplingProbability) { + if (!(samplingProbability >= 0.0 && samplingProbability <= 1.0)) { + throw new IllegalArgumentException(); + } + if (samplingProbability <= SMALLEST_POSITIVE_SAMPLING_PROBABILITY) { + return OtelTraceState.getMaxP(); + } else { + long longSamplingProbability = Double.doubleToRawLongBits(samplingProbability); + long exponent = longSamplingProbability >>> 52; // compare + // https://en.wikipedia.org/wiki/Double-precision_floating-point_format#Exponent_encoding + return (int) (0x3FFL - exponent); + } + } +} diff --git a/consistent-sampling/src/main/java/io/opentelemetry/contrib/samplers/OtelTraceState.java b/consistent-sampling/src/main/java/io/opentelemetry/contrib/samplers/OtelTraceState.java new file mode 100644 index 000000000..f29d40b6d --- /dev/null +++ b/consistent-sampling/src/main/java/io/opentelemetry/contrib/samplers/OtelTraceState.java @@ -0,0 +1,275 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.samplers; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import javax.annotation.Nullable; + +final class OtelTraceState { + + public static final String TRACE_STATE_KEY = "ot"; + + private static final char P_SUBKEY = 'p'; + private static final char R_SUBKEY = 'r'; + private static final int MAX_P = 63; + private static final int MAX_R = 62; + private static final int INVALID_P = -1; + private static final int INVALID_R = -1; + private static final int TRACE_STATE_SIZE_LIMIT = 256; + + private int rval; // valid in the interval [0, MAX_R] + private int pval; // valid in the interval [0, MAX_P] + + private final List otherKeyValuePairs; + + private OtelTraceState(int rvalue, int pvalue, List otherKeyValuePairs) { + this.rval = rvalue; + this.pval = pvalue; + this.otherKeyValuePairs = otherKeyValuePairs; + } + + private OtelTraceState() { + this(INVALID_R, INVALID_P, Collections.emptyList()); + } + + public boolean hasValidR() { + return isValidR(rval); + } + + public boolean hasValidP() { + return isValidP(pval); + } + + public void invalidateP() { + pval = INVALID_P; + } + + public void invalidateR() { + rval = INVALID_R; + } + + /** + * Sets a new p-value. + * + *

If the given p-value is invalid, the current p-value is invalidated. + * + * @param pval the new p-value + */ + public void setP(int pval) { + if (isValidP(pval)) { + this.pval = pval; + } else { + invalidateP(); + } + } + + /** + * Sets a new r-value. + * + *

If the given r-value is invalid, the current r-value is invalidated. + * + * @param rval the new r-value + */ + public void setR(int rval) { + if (isValidR(rval)) { + this.rval = rval; + } else { + invalidateR(); + } + } + + /** + * Returns a string representing this state. + * + * @return a string + */ + public String serialize() { + StringBuilder sb = new StringBuilder(); + if (hasValidP()) { + sb.append("p:").append(pval); + } + if (hasValidR()) { + if (sb.length() > 0) { + sb.append(';'); + } + sb.append("r:").append(rval); + } + for (String pair : otherKeyValuePairs) { + int ex = sb.length(); + if (ex != 0) { + ex += 1; + } + if (ex + pair.length() > TRACE_STATE_SIZE_LIMIT) { + break; + } + if (sb.length() > 0) { + sb.append(';'); + } + sb.append(pair); + } + return sb.toString(); + } + + private static boolean isValueByte(char c) { + return isLowerCaseAlphaNum(c) || isUpperCaseAlpha(c) || c == '.' || c == '_' || c == '-'; + } + + private static boolean isLowerCaseAlphaNum(char c) { + return isLowerCaseAlpha(c) || isDigit(c); + } + + private static boolean isDigit(char c) { + return c >= '0' && c <= '9'; + } + + private static boolean isLowerCaseAlpha(char c) { + return c >= 'a' && c <= 'z'; + } + + private static boolean isUpperCaseAlpha(char c) { + return c >= 'A' && c <= 'Z'; + } + + private static int parseOneOrTwoDigitNumber( + String ts, int from, int to, int twoDigitMaxValue, int invalidValue) { + if (to - from == 1) { + char c = ts.charAt(from); + if (isDigit(c)) { + return c - '0'; + } + } else if (to - from == 2) { + char c1 = ts.charAt(from); + char c2 = ts.charAt(from + 1); + if (isDigit(c1) && isDigit(c2)) { + int v = (c1 - '0') * 10 + (c2 - '0'); + if (v <= twoDigitMaxValue) { + return v; + } + } + } + return invalidValue; + } + + public static boolean isValidR(int v) { + return 0 <= v && v <= MAX_R; + } + + public static boolean isValidP(int v) { + return 0 <= v && v <= MAX_P; + } + + /** + * Parses the OtelTraceState from a given string. + * + *

If the string cannot be successfully parsed, a new empty OtelTraceState is returned. + * + * @param ts the string + * @return the parsed OtelTraceState or a new empty OtelTraceState in case of parsing errors + */ + public static OtelTraceState parse(@Nullable String ts) { + List otherKeyValuePairs = null; + int p = INVALID_P; + int r = INVALID_R; + + if (ts == null || ts.isEmpty()) { + return new OtelTraceState(); + } + + if (ts.length() > TRACE_STATE_SIZE_LIMIT) { + return new OtelTraceState(); + } + + int startPos = 0; + int len = ts.length(); + + while (true) { + int colonPos = startPos; + for (; colonPos < len; colonPos++) { + char c = ts.charAt(colonPos); + if (!isLowerCaseAlpha(c) && (!isDigit(c) || colonPos == startPos)) { + break; + } + } + if (colonPos == startPos || colonPos == len || ts.charAt(colonPos) != ':') { + return new OtelTraceState(); + } + + int separatorPos = colonPos + 1; + while (separatorPos < len && isValueByte(ts.charAt(separatorPos))) { + separatorPos++; + } + + if (colonPos - startPos == 1 && ts.charAt(startPos) == P_SUBKEY) { + p = parseOneOrTwoDigitNumber(ts, colonPos + 1, separatorPos, MAX_P, INVALID_P); + } else if (colonPos - startPos == 1 && ts.charAt(startPos) == R_SUBKEY) { + r = parseOneOrTwoDigitNumber(ts, colonPos + 1, separatorPos, MAX_R, INVALID_R); + } else { + if (otherKeyValuePairs == null) { + otherKeyValuePairs = new ArrayList<>(); + } + otherKeyValuePairs.add(ts.substring(startPos, separatorPos)); + } + + if (separatorPos < len && ts.charAt(separatorPos) != ';') { + return new OtelTraceState(); + } + + if (separatorPos == len) { + break; + } + + startPos = separatorPos + 1; + + // test for a trailing ; + if (startPos == len) { + return new OtelTraceState(); + } + } + + return new OtelTraceState( + r, p, (otherKeyValuePairs != null) ? otherKeyValuePairs : Collections.emptyList()); + } + + public int getR() { + return rval; + } + + public int getP() { + return pval; + } + + public static int getMaxP() { + return MAX_P; + } + + public static int getMaxR() { + return MAX_R; + } + + /** + * Returns an r-value that is guaranteed to be invalid. + * + *

{@code isValidR(getInvalidR())} will always return true. + * + * @return an invalid r-value + */ + public static int getInvalidR() { + return INVALID_R; + } + + /** + * Returns a p-value that is guaranteed to be invalid. + * + *

{@code isValidP(getInvalidP())} will always return true. + * + * @return an invalid p-value + */ + public static int getInvalidP() { + return INVALID_P; + } +} diff --git a/consistent-sampling/src/main/java/io/opentelemetry/contrib/samplers/RandomGenerator.java b/consistent-sampling/src/main/java/io/opentelemetry/contrib/samplers/RandomGenerator.java new file mode 100644 index 000000000..3de2260a2 --- /dev/null +++ b/consistent-sampling/src/main/java/io/opentelemetry/contrib/samplers/RandomGenerator.java @@ -0,0 +1,134 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.samplers; + +import static java.util.Objects.requireNonNull; + +import java.util.concurrent.ThreadLocalRandom; +import java.util.function.LongSupplier; + +final class RandomGenerator { + + private final LongSupplier threadSafeRandomLongSupplier; + + private static final class ThreadLocalData { + private long randomBits = 0; + private int bitCount = 0; + + private boolean nextRandomBit(LongSupplier threadSafeRandomLongSupplier) { + if ((bitCount & 0x3F) == 0) { + randomBits = threadSafeRandomLongSupplier.getAsLong(); + } + boolean randomBit = ((randomBits >>> bitCount) & 1L) != 0L; + bitCount += 1; + return randomBit; + } + + /** + * Returns a pseudorandomly chosen {@code boolean} value where the probability of returning + * {@code true} is predefined. + * + *

{@code true} needs to be returned with a success probability of {@code probability}. If + * the success probability is greater than 50% ({@code probability > 0.5}), the same can be + * achieved by returning {@code true} with a probability of 50%, and returning the result of a + * Bernoulli trial with a probability of {@code 2 * probability - 1}. The resulting success + * probability will be the same as {@code 0.5 + 0.5 * (2 * probability - 1) = probability}. + * Similarly, if the success probability is smaller than 50% ({@code probability <= 0.5}), + * {@code false} is returned with a probability of 50%. Otherwise, the result of a Bernoulli + * trial with success probability of {@code 2 * probability} is returned. Again, the resulting + * success probability is exactly as desired because {@code 0.5 * (2 * probability) = + * probability}. Recursive continuation of this approach allows realizing Bernoulli trials with + * arbitrary success probabilities using just few random bits. + * + * @param threadSafeRandomLongSupplier a thread-safe random long supplier + * @param probability the probability of returning {@code true} + * @return a random {@code boolean} + */ + private boolean generateRandomBoolean( + LongSupplier threadSafeRandomLongSupplier, double probability) { + while (true) { + if (probability <= 0) { + return false; + } + if (probability >= 1) { + return true; + } + boolean b = probability > 0.5; + if (nextRandomBit(threadSafeRandomLongSupplier)) { + return b; + } + probability += probability; + if (b) { + probability -= 1; + } + } + } + + /** + * Returns the number of leading zeros of a uniform random 64-bit integer. + * + * @param threadSafeRandomLongSupplier a thread-safe random long supplier + * @return the number of leading zeros + */ + private int numberOfLeadingZerosOfRandomLong(LongSupplier threadSafeRandomLongSupplier) { + int count = 0; + while (count < Long.SIZE && nextRandomBit(threadSafeRandomLongSupplier)) { + count += 1; + } + return count; + } + } + + private static final ThreadLocal THREAD_LOCAL_DATA = + ThreadLocal.withInitial(ThreadLocalData::new); + + private static final RandomGenerator INSTANCE = + new RandomGenerator(() -> ThreadLocalRandom.current().nextLong()); + + private RandomGenerator(LongSupplier threadSafeRandomLongSupplier) { + this.threadSafeRandomLongSupplier = requireNonNull(threadSafeRandomLongSupplier); + } + + /** + * Creates a new random generator using the given thread-safe random long supplier as random + * source. + * + * @param threadSafeRandomLongSupplier a thread-safe random long supplier + * @return a random generator + */ + public static RandomGenerator create(LongSupplier threadSafeRandomLongSupplier) { + return new RandomGenerator(threadSafeRandomLongSupplier); + } + + /** + * Returns a default random generator. + * + * @return a random generator + */ + public static RandomGenerator getDefault() { + return INSTANCE; + } + + /** + * Returns a pseudorandomly chosen {@code boolean} value where the probability of returning {@code + * true} is predefined. + * + * @param probability the probability of returning {@code true} + * @return a random {@code boolean} + */ + public boolean nextBoolean(double probability) { + return THREAD_LOCAL_DATA.get().generateRandomBoolean(threadSafeRandomLongSupplier, probability); + } + + /** + * Returns the number of leading zeros of a uniform random 64-bit integer. + * + * @return the number of leading zeros + */ + public int numberOfLeadingZerosOfRandomLong() { + return THREAD_LOCAL_DATA.get().numberOfLeadingZerosOfRandomLong(threadSafeRandomLongSupplier); + } +} diff --git a/consistent-sampling/src/test/java/io/opentelemetry/contrib/samplers/ConsistentProbabilityBasedSamplerTest.java b/consistent-sampling/src/test/java/io/opentelemetry/contrib/samplers/ConsistentProbabilityBasedSamplerTest.java new file mode 100644 index 000000000..e37861d24 --- /dev/null +++ b/consistent-sampling/src/test/java/io/opentelemetry/contrib/samplers/ConsistentProbabilityBasedSamplerTest.java @@ -0,0 +1,149 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.samplers; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.api.trace.TraceState; +import io.opentelemetry.context.Context; +import io.opentelemetry.sdk.trace.data.LinkData; +import io.opentelemetry.sdk.trace.samplers.Sampler; +import io.opentelemetry.sdk.trace.samplers.SamplingDecision; +import io.opentelemetry.sdk.trace.samplers.SamplingResult; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.SplittableRandom; +import org.hipparchus.stat.inference.GTest; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +public class ConsistentProbabilityBasedSamplerTest { + + private Context parentContext; + private String traceId; + private String name; + private SpanKind spanKind; + private Attributes attributes; + private List parentLinks; + + @BeforeEach + public void init() { + + parentContext = Context.root(); + traceId = "0123456789abcdef0123456789abcdef"; + name = "name"; + spanKind = SpanKind.SERVER; + attributes = Attributes.empty(); + parentLinks = Collections.emptyList(); + } + + private void test(SplittableRandom rng, double samplingProbability) { + int numSpans = 1000000; + + Sampler sampler = + ConsistentSampler.probabilityBased( + samplingProbability, RandomGenerator.create(rng::nextLong)); + + Map observedPvalues = new HashMap<>(); + for (long i = 0; i < numSpans; ++i) { + SamplingResult samplingResult = + sampler.shouldSample(parentContext, traceId, name, spanKind, attributes, parentLinks); + if (samplingResult.getDecision() == SamplingDecision.RECORD_AND_SAMPLE) { + String traceStateString = + samplingResult + .getUpdatedTraceState(TraceState.getDefault()) + .get(OtelTraceState.TRACE_STATE_KEY); + OtelTraceState traceState = OtelTraceState.parse(traceStateString); + assertTrue(traceState.hasValidR()); + assertTrue(traceState.hasValidP()); + observedPvalues.merge(traceState.getP(), 1L, Long::sum); + } + } + verifyObservedPvaluesUsingGtest(numSpans, observedPvalues, samplingProbability); + } + + @Test + public void test() { + + // fix seed to get reproducible results + SplittableRandom random = new SplittableRandom(0); + + test(random, 1.); + test(random, 0.5); + test(random, 0.25); + test(random, 0.125); + test(random, 0.0); + test(random, 0.45); + test(random, 0.2); + test(random, 0.13); + test(random, 0.05); + } + + private static void verifyObservedPvaluesUsingGtest( + long originalNumberOfSpans, Map observedPvalues, double samplingProbability) { + + Object notSampled = + new Object() { + @Override + public String toString() { + return "NOT SAMPLED"; + } + }; + + Map expectedProbabilities = new HashMap<>(); + if (samplingProbability >= 1.) { + expectedProbabilities.put(0, 1.); + } else if (samplingProbability <= 0.) { + expectedProbabilities.put(notSampled, 1.); + } else { + int exponent = 0; + while (true) { + if (Math.pow(0.5, exponent + 1) < samplingProbability + && Math.pow(0.5, exponent) >= samplingProbability) { + break; + } + exponent += 1; + } + if (samplingProbability == Math.pow(0.5, exponent)) { + expectedProbabilities.put(notSampled, 1 - samplingProbability); + expectedProbabilities.put(exponent, samplingProbability); + } else { + expectedProbabilities.put(notSampled, 1 - samplingProbability); + expectedProbabilities.put(exponent, 2 * samplingProbability - Math.pow(0.5, exponent)); + expectedProbabilities.put(exponent + 1, Math.pow(0.5, exponent) - samplingProbability); + } + } + + Map extendedObservedAdjustedCounts = new HashMap<>(observedPvalues); + long numberOfSpansNotSampled = + originalNumberOfSpans - observedPvalues.values().stream().mapToLong(i -> i).sum(); + if (numberOfSpansNotSampled > 0) { + extendedObservedAdjustedCounts.put(notSampled, numberOfSpansNotSampled); + } + + double[] expectedValues = new double[expectedProbabilities.size()]; + long[] observedValues = new long[expectedProbabilities.size()]; + + int counter = 0; + for (Object key : expectedProbabilities.keySet()) { + observedValues[counter] = extendedObservedAdjustedCounts.getOrDefault(key, 0L); + double p = expectedProbabilities.get(key); + expectedValues[counter] = p * originalNumberOfSpans; + counter += 1; + } + + if (expectedProbabilities.size() > 1) { + assertThat(new GTest().gTest(expectedValues, observedValues)).isGreaterThan(0.01); + } else { + assertThat((double) observedValues[0]).isEqualTo(expectedValues[0]); + } + } +} diff --git a/consistent-sampling/src/test/java/io/opentelemetry/contrib/samplers/ConsistentRateLimitingSamplerTest.java b/consistent-sampling/src/test/java/io/opentelemetry/contrib/samplers/ConsistentRateLimitingSamplerTest.java new file mode 100644 index 000000000..c54d22d2f --- /dev/null +++ b/consistent-sampling/src/test/java/io/opentelemetry/contrib/samplers/ConsistentRateLimitingSamplerTest.java @@ -0,0 +1,211 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.samplers; + +import static org.assertj.core.api.Assertions.assertThat; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.context.Context; +import io.opentelemetry.sdk.trace.data.LinkData; +import io.opentelemetry.sdk.trace.samplers.SamplingDecision; +import io.opentelemetry.sdk.trace.samplers.SamplingResult; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.SplittableRandom; +import java.util.concurrent.TimeUnit; +import java.util.function.LongSupplier; +import org.assertj.core.data.Percentage; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class ConsistentRateLimitingSamplerTest { + + private long[] nanoTime; + private LongSupplier nanoTimeSupplier; + private Context parentContext; + private String traceId; + private String name; + private SpanKind spanKind; + private Attributes attributes; + private List parentLinks; + + @BeforeEach + void init() { + nanoTime = new long[] {0L}; + nanoTimeSupplier = () -> nanoTime[0]; + parentContext = Context.root(); + traceId = "0123456789abcdef0123456789abcdef"; + name = "name"; + spanKind = SpanKind.SERVER; + attributes = Attributes.empty(); + parentLinks = Collections.emptyList(); + } + + private void advanceTime(long nanosIncrement) { + nanoTime[0] += nanosIncrement; + } + + private long getCurrentTimeNanos() { + return nanoTime[0]; + } + + @Test + void testConstantRate() { + + double targetSpansPerSecondLimit = 1000; + double adaptationTimeSeconds = 5; + SplittableRandom random = new SplittableRandom(0L); + + ConsistentSampler sampler = + ConsistentSampler.rateLimited( + targetSpansPerSecondLimit, + adaptationTimeSeconds, + RandomGenerator.create(random::nextLong), + nanoTimeSupplier); + + long nanosBetweenSpans = TimeUnit.MICROSECONDS.toNanos(100); + int numSpans = 1000000; + + List spanSampledNanos = new ArrayList<>(); + + for (int i = 0; i < numSpans; ++i) { + advanceTime(nanosBetweenSpans); + SamplingResult samplingResult = + sampler.shouldSample(parentContext, traceId, name, spanKind, attributes, parentLinks); + if (SamplingDecision.RECORD_AND_SAMPLE.equals(samplingResult.getDecision())) { + spanSampledNanos.add(getCurrentTimeNanos()); + } + } + + long numSampledSpansInLast5Seconds = + spanSampledNanos.stream() + .filter(x -> x > TimeUnit.SECONDS.toNanos(95) && x <= TimeUnit.SECONDS.toNanos(100)) + .count(); + + assertThat(numSampledSpansInLast5Seconds / 5.) + .isCloseTo(targetSpansPerSecondLimit, Percentage.withPercentage(5)); + } + + @Test + void testRateIncrease() { + + double targetSpansPerSecondLimit = 1000; + double adaptationTimeSeconds = 5; + SplittableRandom random = new SplittableRandom(0L); + + ConsistentSampler sampler = + ConsistentSampler.rateLimited( + targetSpansPerSecondLimit, + adaptationTimeSeconds, + RandomGenerator.create(random::nextLong), + nanoTimeSupplier); + + long nanosBetweenSpans1 = TimeUnit.MICROSECONDS.toNanos(100); + long nanosBetweenSpans2 = TimeUnit.MICROSECONDS.toNanos(10); + int numSpans1 = 500000; + int numSpans2 = 5000000; + + List spanSampledNanos = new ArrayList<>(); + + for (int i = 0; i < numSpans1; ++i) { + advanceTime(nanosBetweenSpans1); + SamplingResult samplingResult = + sampler.shouldSample(parentContext, traceId, name, spanKind, attributes, parentLinks); + if (SamplingDecision.RECORD_AND_SAMPLE.equals(samplingResult.getDecision())) { + spanSampledNanos.add(getCurrentTimeNanos()); + } + } + for (int i = 0; i < numSpans2; ++i) { + advanceTime(nanosBetweenSpans2); + SamplingResult samplingResult = + sampler.shouldSample(parentContext, traceId, name, spanKind, attributes, parentLinks); + if (SamplingDecision.RECORD_AND_SAMPLE.equals(samplingResult.getDecision())) { + spanSampledNanos.add(getCurrentTimeNanos()); + } + } + + long numSampledSpansWithin5SecondsBeforeChange = + spanSampledNanos.stream() + .filter(x -> x > TimeUnit.SECONDS.toNanos(45) && x <= TimeUnit.SECONDS.toNanos(50)) + .count(); + long numSampledSpansWithin5SecondsAfterChange = + spanSampledNanos.stream() + .filter(x -> x > TimeUnit.SECONDS.toNanos(50) && x <= TimeUnit.SECONDS.toNanos(55)) + .count(); + long numSampledSpansInLast5Seconds = + spanSampledNanos.stream() + .filter(x -> x > TimeUnit.SECONDS.toNanos(95) && x <= TimeUnit.SECONDS.toNanos(100)) + .count(); + + assertThat(numSampledSpansWithin5SecondsBeforeChange / 5.) + .isCloseTo(targetSpansPerSecondLimit, Percentage.withPercentage(5)); + assertThat(numSampledSpansWithin5SecondsAfterChange / 5.) + .isGreaterThan(2. * targetSpansPerSecondLimit); + assertThat(numSampledSpansInLast5Seconds / 5.) + .isCloseTo(targetSpansPerSecondLimit, Percentage.withPercentage(5)); + } + + @Test + void testRateDecrease() { + + double targetSpansPerSecondLimit = 1000; + double adaptationTimeSeconds = 5; + SplittableRandom random = new SplittableRandom(0L); + + ConsistentSampler sampler = + ConsistentSampler.rateLimited( + targetSpansPerSecondLimit, + adaptationTimeSeconds, + RandomGenerator.create(random::nextLong), + nanoTimeSupplier); + + long nanosBetweenSpans1 = TimeUnit.MICROSECONDS.toNanos(10); + long nanosBetweenSpans2 = TimeUnit.MICROSECONDS.toNanos(100); + int numSpans1 = 5000000; + int numSpans2 = 500000; + + List spanSampledNanos = new ArrayList<>(); + + for (int i = 0; i < numSpans1; ++i) { + advanceTime(nanosBetweenSpans1); + SamplingResult samplingResult = + sampler.shouldSample(parentContext, traceId, name, spanKind, attributes, parentLinks); + if (SamplingDecision.RECORD_AND_SAMPLE.equals(samplingResult.getDecision())) { + spanSampledNanos.add(getCurrentTimeNanos()); + } + } + for (int i = 0; i < numSpans2; ++i) { + advanceTime(nanosBetweenSpans2); + SamplingResult samplingResult = + sampler.shouldSample(parentContext, traceId, name, spanKind, attributes, parentLinks); + if (SamplingDecision.RECORD_AND_SAMPLE.equals(samplingResult.getDecision())) { + spanSampledNanos.add(getCurrentTimeNanos()); + } + } + + long numSampledSpansWithin5SecondsBeforeChange = + spanSampledNanos.stream() + .filter(x -> x > TimeUnit.SECONDS.toNanos(45) && x <= TimeUnit.SECONDS.toNanos(50)) + .count(); + long numSampledSpansWithin5SecondsAfterChange = + spanSampledNanos.stream() + .filter(x -> x > TimeUnit.SECONDS.toNanos(50) && x <= TimeUnit.SECONDS.toNanos(55)) + .count(); + long numSampledSpansInLast5Seconds = + spanSampledNanos.stream() + .filter(x -> x > TimeUnit.SECONDS.toNanos(95) && x <= TimeUnit.SECONDS.toNanos(100)) + .count(); + + assertThat(numSampledSpansWithin5SecondsBeforeChange / 5.) + .isCloseTo(targetSpansPerSecondLimit, Percentage.withPercentage(5)); + assertThat(numSampledSpansWithin5SecondsAfterChange / 5.) + .isLessThan(0.5 * targetSpansPerSecondLimit); + assertThat(numSampledSpansInLast5Seconds / 5.) + .isCloseTo(targetSpansPerSecondLimit, Percentage.withPercentage(5)); + } +} diff --git a/consistent-sampling/src/test/java/io/opentelemetry/contrib/samplers/ConsistentSamplerTest.java b/consistent-sampling/src/test/java/io/opentelemetry/contrib/samplers/ConsistentSamplerTest.java new file mode 100644 index 000000000..d2c5128be --- /dev/null +++ b/consistent-sampling/src/test/java/io/opentelemetry/contrib/samplers/ConsistentSamplerTest.java @@ -0,0 +1,74 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.samplers; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.util.SplittableRandom; +import org.junit.jupiter.api.Test; + +class ConsistentSamplerTest { + + @Test + void testGetSamplingRate() { + assertThrows( + IllegalArgumentException.class, () -> ConsistentSampler.getSamplingProbability(-1)); + for (int i = 0; i < OtelTraceState.getMaxP() - 1; i += 1) { + assertEquals(Math.pow(0.5, i), ConsistentSampler.getSamplingProbability(i)); + } + assertEquals(0., ConsistentSampler.getSamplingProbability(OtelTraceState.getMaxP())); + assertThrows( + IllegalArgumentException.class, + () -> ConsistentSampler.getSamplingProbability(OtelTraceState.getMaxP() + 1)); + } + + @Test + void testGetLowerBoundP() { + assertEquals(0, ConsistentSampler.getLowerBoundP(1.0)); + assertEquals(0, ConsistentSampler.getLowerBoundP(Math.nextDown(1.0))); + for (int i = 1; i < OtelTraceState.getMaxP() - 1; i += 1) { + double samplingProbability = Math.pow(0.5, i); + assertEquals(i, ConsistentSampler.getLowerBoundP(samplingProbability)); + assertEquals(i - 1, ConsistentSampler.getLowerBoundP(Math.nextUp(samplingProbability))); + assertEquals(i, ConsistentSampler.getLowerBoundP(Math.nextDown(samplingProbability))); + } + assertEquals(OtelTraceState.getMaxP() - 1, ConsistentSampler.getLowerBoundP(Double.MIN_NORMAL)); + assertEquals(OtelTraceState.getMaxP() - 1, ConsistentSampler.getLowerBoundP(Double.MIN_VALUE)); + assertEquals(OtelTraceState.getMaxP(), ConsistentSampler.getLowerBoundP(0.0)); + } + + @Test + void testGetUpperBoundP() { + assertEquals(0, ConsistentSampler.getUpperBoundP(1.0)); + assertEquals(1, ConsistentSampler.getUpperBoundP(Math.nextDown(1.0))); + for (int i = 1; i < OtelTraceState.getMaxP() - 1; i += 1) { + double samplingProbability = Math.pow(0.5, i); + assertEquals(i, ConsistentSampler.getUpperBoundP(samplingProbability)); + assertEquals(i, ConsistentSampler.getUpperBoundP(Math.nextUp(samplingProbability))); + assertEquals(i + 1, ConsistentSampler.getUpperBoundP(Math.nextDown(samplingProbability))); + } + assertEquals(OtelTraceState.getMaxP(), ConsistentSampler.getUpperBoundP(Double.MIN_NORMAL)); + assertEquals(OtelTraceState.getMaxP(), ConsistentSampler.getUpperBoundP(Double.MIN_VALUE)); + assertEquals(OtelTraceState.getMaxP(), ConsistentSampler.getUpperBoundP(0.0)); + } + + @Test + void testRandomValues() { + int numCycles = 1000; + SplittableRandom random = new SplittableRandom(0L); + for (int i = 0; i < numCycles; ++i) { + double samplingProbability = Math.exp(-1. / random.nextDouble()); + int pmin = ConsistentSampler.getLowerBoundP(samplingProbability); + int pmax = ConsistentSampler.getUpperBoundP(samplingProbability); + assertThat(ConsistentSampler.getSamplingProbability(pmin)) + .isGreaterThanOrEqualTo(samplingProbability); + assertThat(ConsistentSampler.getSamplingProbability(pmax)) + .isLessThanOrEqualTo(samplingProbability); + } + } +} diff --git a/consistent-sampling/src/test/java/io/opentelemetry/contrib/samplers/OtelTraceStateTest.java b/consistent-sampling/src/test/java/io/opentelemetry/contrib/samplers/OtelTraceStateTest.java new file mode 100644 index 000000000..716321495 --- /dev/null +++ b/consistent-sampling/src/test/java/io/opentelemetry/contrib/samplers/OtelTraceStateTest.java @@ -0,0 +1,79 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.samplers; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class OtelTraceStateTest { + + private static String getXString(int len) { + return Stream.generate(() -> "X").limit(len).collect(Collectors.joining()); + } + + @Test + public void test() { + + Assertions.assertEquals("", OtelTraceState.parse("").serialize()); + assertEquals("", OtelTraceState.parse("").serialize()); + + assertEquals("", OtelTraceState.parse("a").serialize()); + assertEquals("", OtelTraceState.parse("#").serialize()); + assertEquals("", OtelTraceState.parse(" ").serialize()); + + assertEquals("p:5", OtelTraceState.parse("p:5").serialize()); + assertEquals("p:63", OtelTraceState.parse("p:63").serialize()); + assertEquals("", OtelTraceState.parse("p:64").serialize()); + assertEquals("", OtelTraceState.parse("p:5;").serialize()); + assertEquals("", OtelTraceState.parse("p:99").serialize()); + assertEquals("", OtelTraceState.parse("p:").serialize()); + assertEquals("", OtelTraceState.parse("p:232").serialize()); + assertEquals("", OtelTraceState.parse("x;p:5").serialize()); + assertEquals("", OtelTraceState.parse("p:5;x").serialize()); + assertEquals("p:5;x:3", OtelTraceState.parse("x:3;p:5").serialize()); + assertEquals("p:5;x:3", OtelTraceState.parse("p:5;x:3").serialize()); + assertEquals("", OtelTraceState.parse("p:5;x:3;").serialize()); + assertEquals( + "p:5;a:" + getXString(246) + ";x:3", + OtelTraceState.parse("a:" + getXString(246) + ";p:5;x:3").serialize()); + assertEquals("", OtelTraceState.parse("a:" + getXString(247) + ";p:5;x:3").serialize()); + + assertEquals("r:5", OtelTraceState.parse("r:5").serialize()); + assertEquals("r:62", OtelTraceState.parse("r:62").serialize()); + assertEquals("", OtelTraceState.parse("r:63").serialize()); + assertEquals("", OtelTraceState.parse("r:5;").serialize()); + assertEquals("", OtelTraceState.parse("r:99").serialize()); + assertEquals("", OtelTraceState.parse("r:").serialize()); + assertEquals("", OtelTraceState.parse("r:232").serialize()); + assertEquals("", OtelTraceState.parse("x;r:5").serialize()); + assertEquals("", OtelTraceState.parse("r:5;x").serialize()); + assertEquals("r:5;x:3", OtelTraceState.parse("x:3;r:5").serialize()); + assertEquals("r:5;x:3", OtelTraceState.parse("r:5;x:3").serialize()); + assertEquals("", OtelTraceState.parse("r:5;x:3;").serialize()); + assertEquals( + "r:5;a:" + getXString(246) + ";x:3", + OtelTraceState.parse("a:" + getXString(246) + ";r:5;x:3").serialize()); + assertEquals("", OtelTraceState.parse("a:" + getXString(247) + ";r:5;x:3").serialize()); + + assertEquals("p:7;r:5", OtelTraceState.parse("r:5;p:7").serialize()); + assertEquals("p:4;r:5", OtelTraceState.parse("r:5;p:4").serialize()); + assertEquals("p:7;r:5", OtelTraceState.parse("r:5;p:7").serialize()); + assertEquals("p:4;r:5", OtelTraceState.parse("r:5;p:4").serialize()); + + assertEquals("r:6", OtelTraceState.parse("r:5;r:6").serialize()); + assertEquals("p:6;r:10", OtelTraceState.parse("p:5;p:6;r:10").serialize()); + assertEquals("", OtelTraceState.parse("p5;p:6;r:10").serialize()); + assertEquals("p:6;r:10;p5:3", OtelTraceState.parse("p5:3;p:6;r:10").serialize()); + assertEquals("", OtelTraceState.parse(":p:6;r:10").serialize()); + assertEquals("", OtelTraceState.parse(";p:6;r:10").serialize()); + assertEquals("", OtelTraceState.parse("_;p:6;r:10").serialize()); + assertEquals("", OtelTraceState.parse("5;p:6;r:10").serialize()); + } +} diff --git a/settings.gradle.kts b/settings.gradle.kts index 644fa0974..26f8e9fb3 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -37,6 +37,7 @@ rootProject.name = "opentelemetry-java-contrib" include(":all") include(":aws-xray") +include(":consistent-sampling") include(":dependencyManagement") include(":example") include(":jfr-streaming")