diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/Aggregator.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/Aggregator.java index 2097fd144ba..6667aae99b2 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/Aggregator.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/Aggregator.java @@ -58,7 +58,7 @@ static Aggregator drop() { default T accumulateLongMeasurement(long value, Attributes attributes, Context context) { AggregatorHandle handle = createHandle(); handle.recordLong(value, attributes, context); - return handle.accumulateThenReset(attributes); + return handle.accumulateThenMaybeReset(attributes, /* reset= */ true); } /** @@ -73,21 +73,9 @@ default T accumulateLongMeasurement(long value, Attributes attributes, Context c default T accumulateDoubleMeasurement(double value, Attributes attributes, Context context) { AggregatorHandle handle = createHandle(); handle.recordDouble(value, attributes, context); - return handle.accumulateThenReset(attributes); + return handle.accumulateThenMaybeReset(attributes, /* reset= */ true); } - /** - * Returns the result of the merge of the given accumulations. - * - *

This should always assume that the accumulations do not overlap and merge together for a new - * cumulative report. - * - * @param previousCumulative the previously captured accumulation - * @param delta the newly captured (delta) accumulation - * @return the result of the merge of the given accumulations. - */ - T merge(T previousCumulative, T delta); - /** * Returns a new DELTA aggregation by comparing two cumulative measurements. * diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/AggregatorHandle.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/AggregatorHandle.java index bb1986b9296..07f3b021afc 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/AggregatorHandle.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/AggregatorHandle.java @@ -85,20 +85,22 @@ public final boolean tryUnmap() { } /** - * Returns the current value into as {@link T} and resets the current value in this {@code - * Aggregator}. + * Returns the current value into as {@link T}. If {@code reset} is {@code true}, resets the + * current value in this {@code Aggregator}. */ @Nullable - public final T accumulateThenReset(Attributes attributes) { + public final T accumulateThenMaybeReset(Attributes attributes, boolean reset) { if (!hasRecordings) { return null; } - hasRecordings = false; - return doAccumulateThenReset(exemplarReservoir.collectAndReset(attributes)); + if (reset) { + hasRecordings = false; + } + return doAccumulateThenMaybeReset(exemplarReservoir.collectAndReset(attributes), reset); } - /** Implementation of the {@code accumulateThenReset}. */ - protected abstract T doAccumulateThenReset(List exemplars); + /** Implementation of the {@link #accumulateThenMaybeReset(Attributes, boolean)}. */ + protected abstract T doAccumulateThenMaybeReset(List exemplars, boolean reset); @Override public final void recordLong(long value, Attributes attributes, Context context) { diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleExplicitBucketHistogramAggregator.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleExplicitBucketHistogramAggregator.java index f638bdc6709..99161f964b8 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleExplicitBucketHistogramAggregator.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleExplicitBucketHistogramAggregator.java @@ -62,40 +62,6 @@ public AggregatorHandle return new Handle(this.boundaries, reservoirSupplier.get()); } - /** - * Return the result of the merge of two histogram accumulations. As long as one Aggregator - * instance produces all Accumulations with constant boundaries we don't need to worry about - * merging accumulations with different boundaries. - */ - @Override - public ExplicitBucketHistogramAccumulation merge( - ExplicitBucketHistogramAccumulation previous, ExplicitBucketHistogramAccumulation current) { - long[] previousCounts = previous.getCounts(); - long[] mergedCounts = new long[previousCounts.length]; - for (int i = 0; i < previousCounts.length; ++i) { - mergedCounts[i] = previousCounts[i] + current.getCounts()[i]; - } - double min = -1; - double max = -1; - if (previous.hasMinMax() && current.hasMinMax()) { - min = Math.min(previous.getMin(), current.getMin()); - max = Math.max(previous.getMax(), current.getMax()); - } else if (previous.hasMinMax()) { - min = previous.getMin(); - max = previous.getMax(); - } else if (current.hasMinMax()) { - min = current.getMin(); - max = current.getMax(); - } - return ExplicitBucketHistogramAccumulation.create( - previous.getSum() + current.getSum(), - previous.hasMinMax() || current.hasMinMax(), - min, - max, - mergedCounts, - current.getExemplars()); - } - @Override public MetricData toMetricData( Resource resource, @@ -156,8 +122,8 @@ static final class Handle } @Override - protected ExplicitBucketHistogramAccumulation doAccumulateThenReset( - List exemplars) { + protected ExplicitBucketHistogramAccumulation doAccumulateThenMaybeReset( + List exemplars, boolean reset) { lock.lock(); try { ExplicitBucketHistogramAccumulation acc = @@ -168,11 +134,13 @@ protected ExplicitBucketHistogramAccumulation doAccumulateThenReset( this.count > 0 ? this.max : -1, Arrays.copyOf(counts, counts.length), exemplars); - this.sum = 0; - this.min = Double.MAX_VALUE; - this.max = -1; - this.count = 0; - Arrays.fill(this.counts, 0); + if (reset) { + this.sum = 0; + this.min = Double.MAX_VALUE; + this.max = -1; + this.count = 0; + Arrays.fill(this.counts, 0); + } return acc; } finally { lock.unlock(); diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleExponentialHistogramAggregator.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleExponentialHistogramAggregator.java index 15afebc439a..b8fd5fd6bb9 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleExponentialHistogramAggregator.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleExponentialHistogramAggregator.java @@ -56,99 +56,6 @@ public AggregatorHandle cr return new Handle(reservoirSupplier.get(), maxBuckets, maxScale); } - /** - * Merge the exponential histogram accumulations. Mutates the {@link - * ExponentialHistogramAccumulation#getPositiveBuckets()} and {@link - * ExponentialHistogramAccumulation#getNegativeBuckets()} of {@code previous}. Mutating buckets is - * acceptable because copies are already made in {@link Handle#doAccumulateThenReset(List)}. - */ - @Override - public ExponentialHistogramAccumulation merge( - ExponentialHistogramAccumulation previous, ExponentialHistogramAccumulation current) { - - // Create merged buckets - ExponentialHistogramBuckets posBuckets = - merge(previous.getPositiveBuckets(), current.getPositiveBuckets()); - ExponentialHistogramBuckets negBuckets = - merge(previous.getNegativeBuckets(), current.getNegativeBuckets()); - - // resolve possible scale difference due to merge - int commonScale = Math.min(posBuckets.getScale(), negBuckets.getScale()); - posBuckets = downscale(posBuckets, commonScale); - negBuckets = downscale(negBuckets, commonScale); - double min = -1; - double max = -1; - if (previous.hasMinMax() && current.hasMinMax()) { - min = Math.min(previous.getMin(), current.getMin()); - max = Math.max(previous.getMax(), current.getMax()); - } else if (previous.hasMinMax()) { - min = previous.getMin(); - max = previous.getMax(); - } else if (current.hasMinMax()) { - min = current.getMin(); - max = current.getMax(); - } - return ExponentialHistogramAccumulation.create( - commonScale, - previous.getSum() + current.getSum(), - previous.hasMinMax() || current.hasMinMax(), - min, - max, - posBuckets, - negBuckets, - previous.getZeroCount() + current.getZeroCount(), - current.getExemplars()); - } - - /** - * Merge the exponential histogram buckets. If {@code a} is empty, return {@code b}. If {@code b} - * is empty, return {@code a}. Else merge {@code b} into {@code a}. - * - *

Assumes {@code a} and {@code b} are either {@link DoubleExponentialHistogramBuckets} or - * {@link EmptyExponentialHistogramBuckets}. - */ - private static ExponentialHistogramBuckets merge( - ExponentialHistogramBuckets a, ExponentialHistogramBuckets b) { - if (a instanceof EmptyExponentialHistogramBuckets || a.getTotalCount() == 0) { - return b; - } - if (b instanceof EmptyExponentialHistogramBuckets || b.getTotalCount() == 0) { - return a; - } - if ((a instanceof DoubleExponentialHistogramBuckets) - && (b instanceof DoubleExponentialHistogramBuckets)) { - DoubleExponentialHistogramBuckets a1 = (DoubleExponentialHistogramBuckets) a; - DoubleExponentialHistogramBuckets b2 = (DoubleExponentialHistogramBuckets) b; - a1.mergeInto(b2); - return a1; - } - throw new IllegalStateException( - "Unable to merge ExponentialHistogramBuckets. Unrecognized implementation."); - } - - /** - * Downscale the {@code buckets} to the {@code targetScale}. - * - *

Assumes {@code a} and {@code b} are either {@link DoubleExponentialHistogramBuckets} or - * {@link EmptyExponentialHistogramBuckets}. - */ - private static ExponentialHistogramBuckets downscale( - ExponentialHistogramBuckets buckets, int targetScale) { - if (buckets.getScale() == targetScale) { - return buckets; - } - if (buckets instanceof EmptyExponentialHistogramBuckets) { - return EmptyExponentialHistogramBuckets.get(targetScale); - } - if (buckets instanceof DoubleExponentialHistogramBuckets) { - DoubleExponentialHistogramBuckets buckets1 = (DoubleExponentialHistogramBuckets) buckets; - buckets1.downscale(buckets1.getScale() - targetScale); - return buckets1; - } - throw new IllegalStateException( - "Unable to merge ExponentialHistogramBuckets. Unrecognized implementation"); - } - @Override public MetricData toMetricData( Resource resource, @@ -199,22 +106,8 @@ static final class Handle } @Override - protected synchronized ExponentialHistogramAccumulation doAccumulateThenReset( - List exemplars) { - ExponentialHistogramBuckets positiveBuckets; - ExponentialHistogramBuckets negativeBuckets; - if (this.positiveBuckets != null) { - positiveBuckets = this.positiveBuckets.copy(); - this.positiveBuckets.clear(); - } else { - positiveBuckets = EmptyExponentialHistogramBuckets.get(scale); - } - if (this.negativeBuckets != null) { - negativeBuckets = this.negativeBuckets.copy(); - this.negativeBuckets.clear(); - } else { - negativeBuckets = EmptyExponentialHistogramBuckets.get(scale); - } + protected synchronized ExponentialHistogramAccumulation doAccumulateThenMaybeReset( + List exemplars, boolean reset) { ExponentialHistogramAccumulation acc = ExponentialHistogramAccumulation.create( scale, @@ -222,18 +115,32 @@ protected synchronized ExponentialHistogramAccumulation doAccumulateThenReset( this.count > 0, this.count > 0 ? this.min : -1, this.count > 0 ? this.max : -1, - positiveBuckets, - negativeBuckets, + resolveBuckets(this.positiveBuckets, scale, reset), + resolveBuckets(this.negativeBuckets, scale, reset), zeroCount, exemplars); - this.sum = 0; - this.zeroCount = 0; - this.min = Double.MAX_VALUE; - this.max = -1; - this.count = 0; + if (reset) { + this.sum = 0; + this.zeroCount = 0; + this.min = Double.MAX_VALUE; + this.max = -1; + this.count = 0; + } return acc; } + private static ExponentialHistogramBuckets resolveBuckets( + @Nullable DoubleExponentialHistogramBuckets buckets, int scale, boolean reset) { + if (buckets == null) { + return EmptyExponentialHistogramBuckets.get(scale); + } + ExponentialHistogramBuckets copy = buckets.copy(); + if (reset) { + buckets.clear(); + } + return copy; + } + @Override protected synchronized void doRecordDouble(double value) { // ignore NaN and infinity diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleLastValueAggregator.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleLastValueAggregator.java index f28109dc0dc..2f63a5aca9f 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleLastValueAggregator.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleLastValueAggregator.java @@ -17,6 +17,7 @@ import io.opentelemetry.sdk.resources.Resource; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; import javax.annotation.Nullable; @@ -48,11 +49,6 @@ public AggregatorHandle createHandle() { return new Handle(reservoirSupplier.get()); } - @Override - public DoubleAccumulation merge(DoubleAccumulation previous, DoubleAccumulation current) { - return current; - } - @Override public DoubleAccumulation diff(DoubleAccumulation previous, DoubleAccumulation current) { return current; @@ -94,8 +90,12 @@ private Handle(ExemplarReservoir reservoir) { } @Override - protected DoubleAccumulation doAccumulateThenReset(List exemplars) { - return DoubleAccumulation.create(this.current.getAndSet(DEFAULT_VALUE), exemplars); + protected DoubleAccumulation doAccumulateThenMaybeReset( + List exemplars, boolean reset) { + if (reset) { + return DoubleAccumulation.create(this.current.getAndSet(DEFAULT_VALUE), exemplars); + } + return DoubleAccumulation.create(Objects.requireNonNull(this.current.get()), exemplars); } @Override diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleSumAggregator.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleSumAggregator.java index 7fd5324d9fb..d3edf2732c4 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleSumAggregator.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleSumAggregator.java @@ -58,13 +58,6 @@ public DoubleAccumulation accumulateDoubleMeasurement( return DoubleAccumulation.create(value); } - @Override - public DoubleAccumulation merge( - DoubleAccumulation previousAccumulation, DoubleAccumulation accumulation) { - return DoubleAccumulation.create( - previousAccumulation.getValue() + accumulation.getValue(), accumulation.getExemplars()); - } - @Override public DoubleAccumulation diff( DoubleAccumulation previousAccumulation, DoubleAccumulation accumulation) { @@ -107,8 +100,12 @@ static final class Handle extends AggregatorHandle exemplars) { - return DoubleAccumulation.create(this.current.sumThenReset(), exemplars); + protected DoubleAccumulation doAccumulateThenMaybeReset( + List exemplars, boolean reset) { + if (reset) { + return DoubleAccumulation.create(this.current.sumThenReset(), exemplars); + } + return DoubleAccumulation.create(this.current.sum(), exemplars); } @Override diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DropAggregator.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DropAggregator.java index 43d682ff4bb..0cae6b933fb 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DropAggregator.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DropAggregator.java @@ -37,7 +37,8 @@ protected void doRecordLong(long value) {} protected void doRecordDouble(double value) {} @Override - protected Object doAccumulateThenReset(List exemplars) { + protected Object doAccumulateThenMaybeReset( + List exemplars, boolean reset) { return ACCUMULATION; } }; @@ -49,11 +50,6 @@ public AggregatorHandle createHandle() { return HANDLE; } - @Override - public Object merge(Object previousAccumulation, Object accumulation) { - return ACCUMULATION; - } - @Override public MetricData toMetricData( Resource resource, diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongLastValueAggregator.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongLastValueAggregator.java index e9c780975a5..10717c7022b 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongLastValueAggregator.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongLastValueAggregator.java @@ -17,6 +17,7 @@ import io.opentelemetry.sdk.resources.Resource; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; import javax.annotation.Nullable; @@ -45,11 +46,6 @@ public AggregatorHandle createHandle() { return new Handle(reservoirSupplier.get()); } - @Override - public LongAccumulation merge(LongAccumulation previous, LongAccumulation current) { - return current; - } - @Override public LongAccumulation diff(LongAccumulation previous, LongAccumulation current) { return current; @@ -90,8 +86,12 @@ static final class Handle extends AggregatorHandle exemplars) { - return LongAccumulation.create(this.current.getAndSet(DEFAULT_VALUE), exemplars); + protected LongAccumulation doAccumulateThenMaybeReset( + List exemplars, boolean reset) { + if (reset) { + return LongAccumulation.create(this.current.getAndSet(DEFAULT_VALUE), exemplars); + } + return LongAccumulation.create(Objects.requireNonNull(this.current.get()), exemplars); } @Override diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongSumAggregator.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongSumAggregator.java index be1b9fad499..a8943f6afab 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongSumAggregator.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongSumAggregator.java @@ -45,13 +45,6 @@ public AggregatorHandle createHandle() { return new Handle(reservoirSupplier.get()); } - @Override - public LongAccumulation merge( - LongAccumulation previousAccumulation, LongAccumulation accumulation) { - return LongAccumulation.create( - previousAccumulation.getValue() + accumulation.getValue(), accumulation.getExemplars()); - } - @Override public LongAccumulation diff( LongAccumulation previousAccumulation, LongAccumulation accumulation) { @@ -94,8 +87,12 @@ static final class Handle extends AggregatorHandle exemplars) { - return LongAccumulation.create(this.current.sumThenReset(), exemplars); + protected LongAccumulation doAccumulateThenMaybeReset( + List exemplars, boolean reset) { + if (reset) { + return LongAccumulation.create(this.current.sumThenReset(), exemplars); + } + return LongAccumulation.create(this.current.sum(), exemplars); } @Override diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java index 3449f60687a..71ede6f850c 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java @@ -41,10 +41,12 @@ final class AsynchronousMetricStorage implements Metr private final ThrottlingLogger throttlingLogger = new ThrottlingLogger(logger); private final RegisteredReader registeredReader; private final MetricDescriptor metricDescriptor; - private final TemporalMetricStorage metricStorage; + private final AggregationTemporality aggregationTemporality; private final Aggregator aggregator; private final AttributesProcessor attributesProcessor; private Map accumulations = new HashMap<>(); + private Map lastAccumulations = + new HashMap<>(); // Only populated if aggregationTemporality == DELTA private AsynchronousMetricStorage( RegisteredReader registeredReader, @@ -53,17 +55,10 @@ private AsynchronousMetricStorage( AttributesProcessor attributesProcessor) { this.registeredReader = registeredReader; this.metricDescriptor = metricDescriptor; - AggregationTemporality aggregationTemporality = + this.aggregationTemporality = registeredReader .getReader() .getAggregationTemporality(metricDescriptor.getSourceInstrument().getType()); - this.metricStorage = - new TemporalMetricStorage<>( - aggregator, - /* isSynchronous= */ false, - registeredReader, - aggregationTemporality, - metricDescriptor); this.aggregator = aggregator; this.attributesProcessor = attributesProcessor; } @@ -108,13 +103,13 @@ void recordDouble(double value, Attributes attributes) { private void recordAccumulation(T accumulation, Attributes attributes) { Attributes processedAttributes = attributesProcessor.process(attributes, Context.current()); - if (accumulations.size() >= MetricStorageUtils.MAX_ACCUMULATIONS) { + if (accumulations.size() >= MetricStorage.MAX_ACCUMULATIONS) { throttlingLogger.log( Level.WARNING, "Instrument " + metricDescriptor.getSourceInstrument().getName() + " has exceeded the maximum allowed accumulations (" - + MetricStorageUtils.MAX_ACCUMULATIONS + + MetricStorage.MAX_ACCUMULATIONS + ")."); return; } @@ -143,15 +138,34 @@ public RegisteredReader getRegisteredReader() { } @Override - public MetricData collectAndReset( + public MetricData collect( Resource resource, InstrumentationScopeInfo instrumentationScopeInfo, long startEpochNanos, long epochNanos) { - Map currentAccumulations = accumulations; - accumulations = new HashMap<>(); - return metricStorage.buildMetricFor( - resource, instrumentationScopeInfo, currentAccumulations, startEpochNanos, epochNanos); + Map result; + if (aggregationTemporality == AggregationTemporality.DELTA) { + Map accumulations = this.accumulations; + Map lastAccumulations = this.lastAccumulations; + lastAccumulations.entrySet().removeIf(entry -> !accumulations.containsKey(entry.getKey())); + accumulations.forEach( + (k, v) -> + lastAccumulations.compute(k, (k2, v2) -> v2 == null ? v : aggregator.diff(v2, v))); + result = lastAccumulations; + this.lastAccumulations = accumulations; + } else { + result = accumulations; + } + this.accumulations = new HashMap<>(); + return aggregator.toMetricData( + resource, + instrumentationScopeInfo, + metricDescriptor, + result, + aggregationTemporality, + startEpochNanos, + registeredReader.getLastCollectEpochNanos(), + epochNanos); } @Override diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java index aa7ced9ee45..a72e244a37a 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java @@ -5,8 +5,6 @@ package io.opentelemetry.sdk.metrics.internal.state; -import static io.opentelemetry.sdk.metrics.internal.state.MetricStorageUtils.MAX_ACCUMULATIONS; - import io.opentelemetry.api.common.Attributes; import io.opentelemetry.context.Context; import io.opentelemetry.sdk.common.InstrumentationScopeInfo; @@ -16,6 +14,7 @@ import io.opentelemetry.sdk.metrics.data.MetricData; import io.opentelemetry.sdk.metrics.internal.aggregator.Aggregator; import io.opentelemetry.sdk.metrics.internal.aggregator.AggregatorHandle; +import io.opentelemetry.sdk.metrics.internal.aggregator.EmptyMetricData; import io.opentelemetry.sdk.metrics.internal.descriptor.MetricDescriptor; import io.opentelemetry.sdk.metrics.internal.export.RegisteredReader; import io.opentelemetry.sdk.metrics.internal.view.AttributesProcessor; @@ -36,16 +35,17 @@ public final class DefaultSynchronousMetricStorage implements SynchronousMetricStorage { - private static final ThrottlingLogger logger = - new ThrottlingLogger(Logger.getLogger(DefaultSynchronousMetricStorage.class.getName())); private static final BoundStorageHandle NOOP_STORAGE_HANDLE = new NoopBoundHandle(); + private static final Logger internalLogger = + Logger.getLogger(DefaultSynchronousMetricStorage.class.getName()); + private final ThrottlingLogger logger = new ThrottlingLogger(internalLogger); private final RegisteredReader registeredReader; private final MetricDescriptor metricDescriptor; + private final AggregationTemporality aggregationTemporality; private final Aggregator aggregator; private final ConcurrentHashMap> activeCollectionStorage = new ConcurrentHashMap<>(); - private final TemporalMetricStorage temporalMetricStorage; private final AttributesProcessor attributesProcessor; DefaultSynchronousMetricStorage( @@ -55,18 +55,11 @@ public final class DefaultSynchronousMetricStorage AttributesProcessor attributesProcessor) { this.registeredReader = registeredReader; this.metricDescriptor = metricDescriptor; - AggregationTemporality aggregationTemporality = + this.aggregationTemporality = registeredReader .getReader() .getAggregationTemporality(metricDescriptor.getSourceInstrument().getType()); this.aggregator = aggregator; - this.temporalMetricStorage = - new TemporalMetricStorage<>( - aggregator, - /* isSynchronous= */ true, - registeredReader, - aggregationTemporality, - metricDescriptor); this.attributesProcessor = attributesProcessor; } @@ -160,29 +153,44 @@ public void recordDouble(double value, Attributes attributes, Context context) { } @Override - public MetricData collectAndReset( + public MetricData collect( Resource resource, InstrumentationScopeInfo instrumentationScopeInfo, long startEpochNanos, long epochNanos) { + boolean reset = aggregationTemporality == AggregationTemporality.DELTA; + // Grab accumulated measurements. Map accumulations = new HashMap<>(); for (Map.Entry> entry : activeCollectionStorage.entrySet()) { - boolean unmappedEntry = entry.getValue().tryUnmap(); - if (unmappedEntry) { - // If able to unmap then remove the record from the current Map. This can race with the - // acquire but because we requested a specific value only one will succeed. - activeCollectionStorage.remove(entry.getKey(), entry.getValue()); + if (reset) { + boolean unmappedEntry = entry.getValue().tryUnmap(); + if (unmappedEntry) { + // If able to unmap then remove the record from the current Map. This can race with the + // acquire but because we requested a specific value only one will succeed. + activeCollectionStorage.remove(entry.getKey(), entry.getValue()); + } } - T accumulation = entry.getValue().accumulateThenReset(entry.getKey()); + T accumulation = entry.getValue().accumulateThenMaybeReset(entry.getKey(), reset); if (accumulation == null) { continue; } accumulations.put(entry.getKey(), accumulation); } - return temporalMetricStorage.buildMetricFor( - resource, instrumentationScopeInfo, accumulations, startEpochNanos, epochNanos); + if (accumulations.isEmpty()) { + return EmptyMetricData.getInstance(); + } + + return aggregator.toMetricData( + resource, + instrumentationScopeInfo, + metricDescriptor, + accumulations, + aggregationTemporality, + startEpochNanos, + registeredReader.getLastCollectEpochNanos(), + epochNanos); } @Override diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/EmptyMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/EmptyMetricStorage.java index d3770bbd137..60172049ef5 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/EmptyMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/EmptyMetricStorage.java @@ -84,7 +84,7 @@ public BoundStorageHandle bind(Attributes attributes) { } @Override - public MetricData collectAndReset( + public MetricData collect( Resource resource, InstrumentationScopeInfo instrumentationScopeInfo, long startEpochNanos, diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/MeterSharedState.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/MeterSharedState.java index 82e32969ba3..ccc6fcc481f 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/MeterSharedState.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/MeterSharedState.java @@ -101,7 +101,7 @@ public List collectAll( List result = new ArrayList<>(storages.size()); for (MetricStorage storage : storages) { MetricData current = - storage.collectAndReset( + storage.collect( meterProviderSharedState.getResource(), getInstrumentationScopeInfo(), meterProviderSharedState.getStartEpochNanos(), diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/MetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/MetricStorage.java index 86d9aa201b0..3d169740fe1 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/MetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/MetricStorage.java @@ -6,6 +6,7 @@ package io.opentelemetry.sdk.metrics.internal.state; import io.opentelemetry.sdk.common.InstrumentationScopeInfo; +import io.opentelemetry.sdk.metrics.data.AggregationTemporality; import io.opentelemetry.sdk.metrics.data.MetricData; import io.opentelemetry.sdk.metrics.internal.descriptor.MetricDescriptor; import io.opentelemetry.sdk.metrics.internal.export.RegisteredReader; @@ -19,6 +20,9 @@ */ public interface MetricStorage { + /** The max number of metric accumulations for a particular {@link MetricStorage}. */ + int MAX_ACCUMULATIONS = 2000; + /** Returns a description of the metric produced in this storage. */ MetricDescriptor getMetricDescriptor(); @@ -26,7 +30,8 @@ public interface MetricStorage { RegisteredReader getRegisteredReader(); /** - * Collects the metrics from this storage and resets for the next collection period. + * Collects the metrics from this storage. If storing {@link AggregationTemporality#DELTA} + * metrics, reset for the next collection period. * *

Note: This is a stateful operation and will reset any interval-related state for the {@code * collector}. @@ -37,7 +42,7 @@ public interface MetricStorage { * @param epochNanos The timestamp for this collection. * @return The {@link MetricData} from this collection period. */ - MetricData collectAndReset( + MetricData collect( Resource resource, InstrumentationScopeInfo instrumentationScopeInfo, long startEpochNanos, diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/MetricStorageUtils.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/MetricStorageUtils.java deleted file mode 100644 index 3c8f1f32e2d..00000000000 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/MetricStorageUtils.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.sdk.metrics.internal.state; - -import io.opentelemetry.api.common.Attributes; -import io.opentelemetry.sdk.metrics.data.ExemplarData; -import io.opentelemetry.sdk.metrics.internal.aggregator.Aggregator; -import java.util.Map; -import java.util.function.BiFunction; - -/** Utilities to help deal w/ {@code Map} in metric storage. */ -final class MetricStorageUtils { - /** The max number of metric accumulations for a particular {@link MetricStorage}. */ - static final int MAX_ACCUMULATIONS = 2000; - - private MetricStorageUtils() {} - - /** - * Merges accumulations from {@code toMerge} into {@code result}. Keys from {@code result} which - * don't appear in {@code toMerge} are removed. - * - *

Note: This mutates the result map. - */ - static void mergeInPlace( - Map result, Map toMerge, Aggregator aggregator) { - blend(result, toMerge, /* preserve= */ false, aggregator::merge); - } - - /** - * Merges accumulations from {@code toMerge} into {@code result}. Keys from {@code result} which - * don't appear in {@code toMerge} are preserved as-is. - * - *

Note: This mutates the result map. - */ - static void mergeAndPreserveInPlace( - Map result, Map toMerge, Aggregator aggregator) { - blend(result, toMerge, /* preserve= */ true, aggregator::merge); - } - - /** - * Diffs accumulations from {@code toMerge} into {@code result}. Keys from {@code result} which - * don't appear in {@code toMerge} are removed. - * - *

If no prior value is found, then the value from {@code toDiff} is used. - * - *

Note: This mutates the result map. - */ - static void diffInPlace( - Map result, Map toDiff, Aggregator aggregator) { - blend(result, toDiff, /* preserve= */ false, aggregator::diff); - } - - private static void blend( - Map result, - Map toMerge, - boolean preserve, - BiFunction blendFunction) { - if (!preserve) { - removeUnseen(result, toMerge); - } - toMerge.forEach( - (k, v) -> result.compute(k, (k2, v2) -> (v2 != null) ? blendFunction.apply(v2, v) : v)); - } - - /** - * Removes all keys in {@code result} that do not exist in {@code latest}. - * - *

Note: This mutates the result map. - */ - public static void removeUnseen(Map result, Map latest) { - result.entrySet().removeIf(entry -> !latest.containsKey(entry.getKey())); - } -} diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/TemporalMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/TemporalMetricStorage.java deleted file mode 100644 index 1d5544ec35c..00000000000 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/TemporalMetricStorage.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.sdk.metrics.internal.state; - -import io.opentelemetry.api.common.Attributes; -import io.opentelemetry.sdk.common.InstrumentationScopeInfo; -import io.opentelemetry.sdk.metrics.data.AggregationTemporality; -import io.opentelemetry.sdk.metrics.data.ExemplarData; -import io.opentelemetry.sdk.metrics.data.MetricData; -import io.opentelemetry.sdk.metrics.internal.aggregator.Aggregator; -import io.opentelemetry.sdk.metrics.internal.aggregator.EmptyMetricData; -import io.opentelemetry.sdk.metrics.internal.descriptor.MetricDescriptor; -import io.opentelemetry.sdk.metrics.internal.export.RegisteredReader; -import io.opentelemetry.sdk.resources.Resource; -import java.util.HashMap; -import java.util.Map; -import javax.annotation.concurrent.ThreadSafe; - -/** Stores last reported time and (optional) accumulation for metrics. */ -@ThreadSafe -class TemporalMetricStorage { - private final Aggregator aggregator; - private final boolean isSynchronous; - private final RegisteredReader registeredReader; - private Map lastAccumulation = new HashMap<>(); - private final AggregationTemporality temporality; - private final MetricDescriptor metricDescriptor; - - TemporalMetricStorage( - Aggregator aggregator, - boolean isSynchronous, - RegisteredReader registeredReader, - AggregationTemporality aggregationTemporality, - MetricDescriptor metricDescriptor) { - this.aggregator = aggregator; - this.isSynchronous = isSynchronous; - this.registeredReader = registeredReader; - this.temporality = aggregationTemporality; - this.metricDescriptor = metricDescriptor; - } - - /** - * Builds the {@link MetricData} for the {@code currentAccumulation}. - * - * @param resource The resource to attach these metrics against. - * @param instrumentationScopeInfo The instrumentation scope that generated these metrics. - * @param currentAccumulation The current accumulation of metric data from instruments. This might - * be delta (for synchronous) or cumulative (for asynchronous). - * @param startEpochNanos The timestamp when the metrics SDK started. - * @param epochNanos The current collection timestamp. - * @return The {@link MetricData} points. - */ - synchronized MetricData buildMetricFor( - Resource resource, - InstrumentationScopeInfo instrumentationScopeInfo, - Map currentAccumulation, - long startEpochNanos, - long epochNanos) { - - Map result = currentAccumulation; - long lastCollectionEpoch = registeredReader.getLastCollectEpochNanos(); - // Use aggregation temporality + instrument to determine if we do a merge or a diff of - // previous. We have the following four scenarios: - // 1. Delta Aggregation (temporality) + Cumulative recording (async instrument). - // Here we diff with last cumulative to get a delta. - // 2. Cumulative Aggregation + Delta recording (sync instrument). - // Here we merge with our last record to get a cumulative aggregation. - // 3. Cumulative Aggregation + Cumulative recording - do nothing - // 4. Delta Aggregation + Delta recording - do nothing. - if (temporality == AggregationTemporality.DELTA && !isSynchronous) { - MetricStorageUtils.diffInPlace(lastAccumulation, currentAccumulation, aggregator); - result = lastAccumulation; - } else if (temporality == AggregationTemporality.CUMULATIVE && isSynchronous) { - // We need to make sure the current delta recording gets merged into the previous cumulative - // for the next cumulative measurement. - MetricStorageUtils.mergeAndPreserveInPlace(lastAccumulation, currentAccumulation, aggregator); - // Note: We allow going over our hard limit on attribute streams when first merging, but - // preserve after this point. - if (lastAccumulation.size() > MetricStorageUtils.MAX_ACCUMULATIONS) { - MetricStorageUtils.removeUnseen(lastAccumulation, currentAccumulation); - } - result = lastAccumulation; - } - - // Update last reported (cumulative) accumulation. - // For synchronous instruments, we need the merge result. - // For asynchronous instruments, we need the recorded value. - // This assumes aggregation remains consistent for the lifetime of a collector, and - // could be optimised to not record results for cases 3+4 listed above. - if (isSynchronous) { - // Sync instruments remember the full recording. - lastAccumulation = result; - } else { - // Async instruments record the raw measurement. - lastAccumulation = currentAccumulation; - } - if (result.isEmpty()) { - return EmptyMetricData.getInstance(); - } - return aggregator.toMetricData( - resource, - instrumentationScopeInfo, - metricDescriptor, - result, - temporality, - startEpochNanos, - lastCollectionEpoch, - epochNanos); - } -} diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/CardinalityTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/CardinalityTest.java index ca1b4ddec63..13e28e75ab5 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/CardinalityTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/CardinalityTest.java @@ -118,7 +118,7 @@ void staleMetricsDropped_synchronousInstrument() { (Consumer>) sumPointData -> assertThat(sumPointData.getPoints().size()) - .isEqualTo(10)))); + .isEqualTo(2000)))); } /** diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/AggregatorHandleTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/AggregatorHandleTest.java index 79832c9f408..0e5fb4007a9 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/AggregatorHandleTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/AggregatorHandleTest.java @@ -98,7 +98,7 @@ void testRecordings() { assertThat(testAggregator.recordedLong.get()).isEqualTo(22); assertThat(testAggregator.recordedDouble.get()).isEqualTo(0); - testAggregator.accumulateThenReset(Attributes.empty()); + testAggregator.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true); assertThat(testAggregator.recordedLong.get()).isEqualTo(0); assertThat(testAggregator.recordedDouble.get()).isEqualTo(0); @@ -106,7 +106,7 @@ void testRecordings() { assertThat(testAggregator.recordedLong.get()).isEqualTo(0); assertThat(testAggregator.recordedDouble.get()).isEqualTo(33.55); - testAggregator.accumulateThenReset(Attributes.empty()); + testAggregator.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true); assertThat(testAggregator.recordedLong.get()).isEqualTo(0); assertThat(testAggregator.recordedDouble.get()).isEqualTo(0); } @@ -148,7 +148,7 @@ void testGenerateExemplarsOnCollect() { testAggregator.recordDouble(1.0, Attributes.empty(), Context.root()); Mockito.when(doubleReservoir.collectAndReset(attributes)) .thenReturn(Collections.singletonList(result)); - testAggregator.accumulateThenReset(attributes); + testAggregator.accumulateThenMaybeReset(attributes, /* reset= */ true); assertThat(testAggregator.recordedExemplars.get()).containsExactly(result); } @@ -164,7 +164,7 @@ private static class TestAggregatorHandle @Nullable @Override - protected Void doAccumulateThenReset(List exemplars) { + protected Void doAccumulateThenMaybeReset(List exemplars, boolean reset) { recordedLong.set(0); recordedDouble.set(0); recordedExemplars.set(exemplars); diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleExplicitBucketHistogramAggregatorTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleExplicitBucketHistogramAggregatorTest.java index 8ff3b9e9266..5ca17964f61 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleExplicitBucketHistogramAggregatorTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleExplicitBucketHistogramAggregatorTest.java @@ -28,7 +28,6 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadPoolExecutor; import java.util.stream.Collectors; -import javax.annotation.Nullable; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; @@ -63,7 +62,7 @@ void testRecordings() { aggregatorHandle.recordLong(5); aggregatorHandle.recordLong(150); aggregatorHandle.recordLong(2000); - assertThat(aggregatorHandle.accumulateThenReset(Attributes.empty())) + assertThat(aggregatorHandle.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true)) .isEqualTo( ExplicitBucketHistogramAccumulation.create( 2175, /* hasMinMax= */ true, 5d, 2000d, new long[] {1, 1, 1, 1})); @@ -89,7 +88,7 @@ void testExemplarsInAccumulation() { AggregatorHandle aggregatorHandle = aggregator.createHandle(); aggregatorHandle.recordDouble(0, attributes, Context.root()); - assertThat(aggregatorHandle.accumulateThenReset(Attributes.empty())) + assertThat(aggregatorHandle.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true)) .isEqualTo( ExplicitBucketHistogramAccumulation.create( 0, /* hasMinMax= */ true, 0, 0, new long[] {1, 0, 0, 0}, exemplars)); @@ -99,21 +98,24 @@ void testExemplarsInAccumulation() { void toAccumulationAndReset() { AggregatorHandle aggregatorHandle = aggregator.createHandle(); - assertThat(aggregatorHandle.accumulateThenReset(Attributes.empty())).isNull(); + assertThat(aggregatorHandle.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true)) + .isNull(); aggregatorHandle.recordLong(100); - assertThat(aggregatorHandle.accumulateThenReset(Attributes.empty())) + assertThat(aggregatorHandle.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true)) .isEqualTo( ExplicitBucketHistogramAccumulation.create( 100, /* hasMinMax= */ true, 100d, 100d, new long[] {0, 1, 0, 0})); - assertThat(aggregatorHandle.accumulateThenReset(Attributes.empty())).isNull(); + assertThat(aggregatorHandle.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true)) + .isNull(); aggregatorHandle.recordLong(0); - assertThat(aggregatorHandle.accumulateThenReset(Attributes.empty())) + assertThat(aggregatorHandle.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true)) .isEqualTo( ExplicitBucketHistogramAccumulation.create( 0, /* hasMinMax= */ true, 0d, 0d, new long[] {1, 0, 0, 0})); - assertThat(aggregatorHandle.accumulateThenReset(Attributes.empty())).isNull(); + assertThat(aggregatorHandle.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true)) + .isNull(); } @Test @@ -128,87 +130,6 @@ void accumulateData() { 10.0, /* hasMinMax= */ true, 10.0, 10.0, new long[] {1, 0, 0, 0})); } - @Test - void mergeAccumulation() { - Attributes attributes = Attributes.builder().put("test", "value").build(); - DoubleExemplarData exemplar = - ImmutableDoubleExemplarData.create( - attributes, - 2L, - SpanContext.create( - "00000000000000000000000000000001", - "0000000000000002", - TraceFlags.getDefault(), - TraceState.getDefault()), - 1); - List exemplars = Collections.singletonList(exemplar); - List previousExemplars = - Collections.singletonList( - ImmutableDoubleExemplarData.create( - attributes, - 1L, - SpanContext.create( - "00000000000000000000000000000001", - "0000000000000002", - TraceFlags.getDefault(), - TraceState.getDefault()), - 2)); - ExplicitBucketHistogramAccumulation previousAccumulation = - ExplicitBucketHistogramAccumulation.create( - 2, /* hasMinMax= */ true, 1d, 2d, new long[] {1, 1, 0}, previousExemplars); - ExplicitBucketHistogramAccumulation nextAccumulation = - ExplicitBucketHistogramAccumulation.create( - 2, /* hasMinMax= */ true, 2d, 3d, new long[] {0, 0, 2}, exemplars); - // Assure most recent exemplars are kept. - assertThat(aggregator.merge(previousAccumulation, nextAccumulation)) - .isEqualTo( - ExplicitBucketHistogramAccumulation.create( - 4, /* hasMinMax= */ true, 1d, 3d, new long[] {1, 1, 2}, exemplars)); - } - - @Test - void mergeAccumulation_MinAndMax() { - // If min / max is null for both accumulations set min / max to null - assertThat( - aggregator.merge( - ExplicitBucketHistogramAccumulation.create( - 0, /* hasMinMax= */ false, 0, 0, new long[] {}, Collections.emptyList()), - ExplicitBucketHistogramAccumulation.create( - 0, /* hasMinMax= */ false, 0, 0, new long[] {}, Collections.emptyList()))) - .isEqualTo( - ExplicitBucketHistogramAccumulation.create( - 0, /* hasMinMax= */ false, -1, -1, new long[] {}, Collections.emptyList())); - // If min / max is non-null for only one accumulation set min / max to it - assertThat( - aggregator.merge( - ExplicitBucketHistogramAccumulation.create( - 0, /* hasMinMax= */ true, 1d, 2d, new long[] {}, Collections.emptyList()), - ExplicitBucketHistogramAccumulation.create( - 0, /* hasMinMax= */ false, 0, 0, new long[] {}, Collections.emptyList()))) - .isEqualTo( - ExplicitBucketHistogramAccumulation.create( - 0, /* hasMinMax= */ true, 1d, 2d, new long[] {}, Collections.emptyList())); - assertThat( - aggregator.merge( - ExplicitBucketHistogramAccumulation.create( - 0, /* hasMinMax= */ false, 0, 0, new long[] {}, Collections.emptyList()), - ExplicitBucketHistogramAccumulation.create( - 0, /* hasMinMax= */ true, 1d, 2d, new long[] {}, Collections.emptyList()))) - .isEqualTo( - ExplicitBucketHistogramAccumulation.create( - 0, /* hasMinMax= */ true, 1d, 2d, new long[] {}, Collections.emptyList())); - // If both accumulations have min / max compute the min / max - assertThat( - aggregator.merge( - ExplicitBucketHistogramAccumulation.create( - 0, /* hasMinMax= */ true, 1d, 1d, new long[] {}, Collections.emptyList()), - ExplicitBucketHistogramAccumulation.create( - 0, /* hasMinMax= */ true, 2d, 2d, new long[] {}, Collections.emptyList()))) - .isEqualTo( - ExplicitBucketHistogramAccumulation.create( - 0, /* hasMinMax= */ true, 1d, 2d, new long[] {}, Collections.emptyList())); - } - @Test void toMetricData() { AggregatorHandle aggregatorHandle = @@ -221,7 +142,8 @@ void toMetricData() { INSTRUMENTATION_SCOPE_INFO, METRIC_DESCRIPTOR, Collections.singletonMap( - Attributes.empty(), aggregatorHandle.accumulateThenReset(Attributes.empty())), + Attributes.empty(), + aggregatorHandle.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true)), AggregationTemporality.DELTA, 0, 10, @@ -295,7 +217,7 @@ void testHistogramCounts() { aggregator.createHandle(); aggregatorHandle.recordDouble(1.1); ExplicitBucketHistogramAccumulation explicitBucketHistogramAccumulation = - aggregatorHandle.accumulateThenReset(Attributes.empty()); + aggregatorHandle.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true); assertThat(explicitBucketHistogramAccumulation).isNotNull(); assertThat(explicitBucketHistogramAccumulation.getCounts().length) .isEqualTo(boundaries.length + 1); @@ -305,7 +227,6 @@ void testHistogramCounts() { void testMultithreadedUpdates() throws InterruptedException { AggregatorHandle aggregatorHandle = aggregator.createHandle(); - Histogram summarizer = new Histogram(); ImmutableList updates = ImmutableList.of(1L, 2L, 3L, 5L, 7L, 11L, 13L, 17L, 19L, 23L); int numberOfThreads = updates.size(); int numberOfUpdates = 10000; @@ -321,39 +242,16 @@ void testMultithreadedUpdates() throws InterruptedException { for (int j = 0; j < numberOfUpdates; j++) { aggregatorHandle.recordLong(v); if (ThreadLocalRandom.current().nextInt(10) == 0) { - summarizer.process( - aggregatorHandle.accumulateThenReset(Attributes.empty())); + aggregatorHandle.accumulateThenMaybeReset( + Attributes.empty(), /* reset= */ false); } } })) .collect(Collectors.toList())); - // make sure everything gets merged when all the aggregation is done. - summarizer.process(aggregatorHandle.accumulateThenReset(Attributes.empty())); - - assertThat(summarizer.accumulation) + assertThat(aggregatorHandle.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ false)) .isEqualTo( ExplicitBucketHistogramAccumulation.create( 1010000, /* hasMinMax= */ true, 1d, 23d, new long[] {50000, 50000, 0, 0})); } - - private static final class Histogram { - private final Object mutex = new Object(); - - @Nullable private ExplicitBucketHistogramAccumulation accumulation; - - void process(@Nullable ExplicitBucketHistogramAccumulation other) { - if (other == null) { - return; - } - - synchronized (mutex) { - if (accumulation == null) { - accumulation = other; - return; - } - accumulation = aggregator.merge(accumulation, other); - } - } - } } diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleExponentialHistogramAggregatorTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleExponentialHistogramAggregatorTest.java index 713fda8a93a..e792ee8eedd 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleExponentialHistogramAggregatorTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleExponentialHistogramAggregatorTest.java @@ -34,7 +34,6 @@ import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; -import javax.annotation.Nullable; import org.assertj.core.data.Offset; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -77,7 +76,7 @@ private static ExponentialHistogramAccumulation getTestAccumulation( for (double r : recordings) { aggregatorHandle.recordDouble(r); } - return aggregatorHandle.doAccumulateThenReset(exemplars); + return aggregatorHandle.doAccumulateThenMaybeReset(exemplars, /* reset= */ true); } @Test @@ -86,7 +85,7 @@ void createHandle() { assertThat(handle).isInstanceOf(DoubleExponentialHistogramAggregator.Handle.class); ExponentialHistogramAccumulation accumulation = ((DoubleExponentialHistogramAggregator.Handle) handle) - .doAccumulateThenReset(Collections.emptyList()); + .doAccumulateThenMaybeReset(Collections.emptyList(), /* reset= */ true); assertThat(accumulation.getPositiveBuckets()) .isInstanceOf(DoubleExponentialHistogramAggregator.EmptyExponentialHistogramBuckets.class); assertThat(accumulation.getPositiveBuckets().getScale()).isEqualTo(MAX_SCALE); @@ -110,7 +109,8 @@ void testRecordings() { aggregatorHandle.recordDouble(0.0); aggregatorHandle.recordLong(0); - ExponentialHistogramAccumulation acc = aggregatorHandle.accumulateThenReset(Attributes.empty()); + ExponentialHistogramAccumulation acc = + aggregatorHandle.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true); List positiveCounts = Objects.requireNonNull(acc).getPositiveBuckets().getBucketCounts(); List negativeCounts = acc.getNegativeBuckets().getBucketCounts(); int expectedScale = 5; // should be downscaled from 20 to 5 after recordings @@ -145,7 +145,8 @@ void testInvalidRecording() { aggregatorHandle.recordDouble(Double.NEGATIVE_INFINITY); aggregatorHandle.recordDouble(Double.NaN); - ExponentialHistogramAccumulation acc = aggregatorHandle.accumulateThenReset(Attributes.empty()); + ExponentialHistogramAccumulation acc = + aggregatorHandle.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true); assertThat(Objects.requireNonNull(acc).getSum()).isEqualTo(0); assertThat(acc.getPositiveBuckets().getTotalCount()).isEqualTo(0); assertThat(acc.getNegativeBuckets().getTotalCount()).isEqualTo(0); @@ -161,7 +162,8 @@ void testRecordingsAtLimits(DoubleExponentialHistogramAggregator aggregator) { aggregatorHandle.recordDouble(Double.MIN_VALUE); aggregatorHandle.recordDouble(Double.MAX_VALUE); - ExponentialHistogramAccumulation acc = aggregatorHandle.accumulateThenReset(Attributes.empty()); + ExponentialHistogramAccumulation acc = + aggregatorHandle.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true); List bucketCounts = Objects.requireNonNull(acc).getPositiveBuckets().getBucketCounts(); // assert buckets == [1 0 0 0 ... 1] @@ -220,7 +222,9 @@ void testExemplarsInAccumulation() { aggregatorHandle.recordDouble(0, attributes, Context.root()); assertThat( - Objects.requireNonNull(aggregatorHandle.accumulateThenReset(Attributes.empty())) + Objects.requireNonNull( + aggregatorHandle.accumulateThenMaybeReset( + Attributes.empty(), /* reset= */ true)) .getExemplars()) .isEqualTo(exemplars); } @@ -229,15 +233,19 @@ void testExemplarsInAccumulation() { void testAccumulationAndReset() { AggregatorHandle aggregatorHandle = aggregator.createHandle(); - assertThat(aggregatorHandle.accumulateThenReset(Attributes.empty())).isNull(); + assertThat(aggregatorHandle.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true)) + .isNull(); aggregatorHandle.recordDouble(5.0); assertThat( - Objects.requireNonNull(aggregatorHandle.accumulateThenReset(Attributes.empty())) + Objects.requireNonNull( + aggregatorHandle.accumulateThenMaybeReset( + Attributes.empty(), /* reset= */ true)) .getPositiveBuckets() .getBucketCounts()) .isEqualTo(Collections.singletonList(1L)); - assertThat(aggregatorHandle.accumulateThenReset(Attributes.empty())).isNull(); + assertThat(aggregatorHandle.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true)) + .isNull(); } @Test @@ -248,125 +256,6 @@ void testAccumulateData() { assertThat(acc).isEqualTo(expected); } - @Test - void testMergeAccumulation() { - Attributes attributes = Attributes.builder().put("test", "value").build(); - DoubleExemplarData exemplar = - ImmutableDoubleExemplarData.create( - attributes, - 2L, - SpanContext.create( - "00000000000000000000000000000001", - "0000000000000002", - TraceFlags.getDefault(), - TraceState.getDefault()), - 1); - List exemplars = Collections.singletonList(exemplar); - List previousExemplars = - Collections.singletonList( - ImmutableDoubleExemplarData.create( - attributes, - 1L, - SpanContext.create( - "00000000000000000000000000000001", - "0000000000000002", - TraceFlags.getDefault(), - TraceState.getDefault()), - 2)); - ExponentialHistogramAccumulation previousAccumulation = - getTestAccumulation(previousExemplars, 0, 4.1, 100, 100, 10000, 1000000); - ExponentialHistogramAccumulation nextAccumulation = - getTestAccumulation(exemplars, -1000, -2000000, -8.2, 2.3); - - // Merged accumulations should equal accumulation with equivalent recordings and latest - // exemplars. - assertThat(aggregator.merge(previousAccumulation, nextAccumulation)) - .isEqualTo( - getTestAccumulation( - exemplars, 0, 4.1, 100, 100, 10000, 1000000, -1000, -2000000, -8.2, 2.3)); - } - - @Test - void testMergeAccumulationMinAndMax() { - // If min / max is null for both accumulations set min / max to null - assertThat( - aggregator.merge( - createAccumulation(/* hasMinMax= */ false, 0, 0), - createAccumulation(/* hasMinMax= */ false, 0, 0))) - .isEqualTo(createAccumulation(/* hasMinMax= */ false, -1, -1)); - // If min / max is non-null for only one accumulation set min / max to it - assertThat( - aggregator.merge( - createAccumulation(/* hasMinMax= */ true, 1d, 2d), - createAccumulation(/* hasMinMax= */ false, 0, 0))) - .isEqualTo(createAccumulation(/* hasMinMax= */ true, 1d, 2d)); - assertThat( - aggregator.merge( - createAccumulation(/* hasMinMax= */ false, 0, 0), - createAccumulation(/* hasMinMax= */ true, 1d, 2d))) - .isEqualTo(createAccumulation(/* hasMinMax= */ true, 1d, 2d)); - // If both accumulations have min / max compute the min / max - assertThat( - aggregator.merge( - createAccumulation(/* hasMinMax= */ true, 1d, 1d), - createAccumulation(/* hasMinMax= */ true, 2d, 2d))) - .isEqualTo(createAccumulation(/* hasMinMax= */ true, 1d, 2d)); - } - - private static ExponentialHistogramAccumulation createAccumulation( - boolean hasMinMax, double min, double max) { - DoubleExponentialHistogramBuckets buckets = new DoubleExponentialHistogramBuckets(0, 1); - return ExponentialHistogramAccumulation.create( - 0, 0, hasMinMax, min, max, buckets, buckets, 0, Collections.emptyList()); - } - - @Test - void testMergeNonOverlap() { - ExponentialHistogramAccumulation previousAccumulation = - getTestAccumulation(Collections.emptyList(), 10, 100, 100, 10000, 100000); - ExponentialHistogramAccumulation nextAccumulation = - getTestAccumulation(Collections.emptyList(), 0.001, 0.01, 0.1, 1); - - assertThat(aggregator.merge(previousAccumulation, nextAccumulation)) - .isEqualTo( - getTestAccumulation( - Collections.emptyList(), 0.001, 0.01, 0.1, 1, 10, 100, 100, 10000, 100000)); - } - - @Test - void testMergeWithEmptyBuckets() { - assertThat( - aggregator.merge( - getTestAccumulation(Collections.emptyList()), - getTestAccumulation(Collections.emptyList(), 1))) - .isEqualTo(getTestAccumulation(Collections.emptyList(), 1)); - - assertThat( - aggregator.merge( - getTestAccumulation(Collections.emptyList(), 1), - getTestAccumulation(Collections.emptyList()))) - .isEqualTo(getTestAccumulation(Collections.emptyList(), 1)); - - assertThat( - aggregator.merge( - getTestAccumulation(Collections.emptyList()), - getTestAccumulation(Collections.emptyList()))) - .isEqualTo(getTestAccumulation(Collections.emptyList())); - } - - @Test - void testMergeOverlap() { - ExponentialHistogramAccumulation previousAccumulation = - getTestAccumulation(Collections.emptyList(), 0, 10, 100, 10000, 100000); - ExponentialHistogramAccumulation nextAccumulation = - getTestAccumulation(Collections.emptyList(), 100000, 10000, 100, 10, 0); - - assertThat(aggregator.merge(previousAccumulation, nextAccumulation)) - .isEqualTo( - getTestAccumulation( - Collections.emptyList(), 0, 0, 10, 10, 100, 100, 10000, 10000, 100000, 100000)); - } - @Test void testInsert1M() { AggregatorHandle handle = @@ -381,7 +270,8 @@ void testInsert1M() { } ExponentialHistogramAccumulation acc = - Objects.requireNonNull(handle.accumulateThenReset(Attributes.empty())); + Objects.requireNonNull( + handle.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true)); assertThat(acc.getScale()).isEqualTo(3); assertThat(acc.getPositiveBuckets().getScale()).isEqualTo(3); assertThat(acc.getNegativeBuckets().getScale()).isEqualTo(3); @@ -405,7 +295,8 @@ void testDownScale() { handle.recordDouble(16.0); ExponentialHistogramAccumulation acc = - Objects.requireNonNull(handle.accumulateThenReset(Attributes.empty())); + Objects.requireNonNull( + handle.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true)); assertThat(acc.getScale()).isEqualTo(0); assertThat(acc.getPositiveBuckets().getScale()).isEqualTo(0); assertThat(acc.getNegativeBuckets().getScale()).isEqualTo(0); @@ -444,7 +335,8 @@ void testToMetricData() { aggregatorHandle.recordDouble(0); aggregatorHandle.recordDouble(0); aggregatorHandle.recordDouble(123.456); - ExponentialHistogramAccumulation acc = aggregatorHandle.accumulateThenReset(Attributes.empty()); + ExponentialHistogramAccumulation acc = + aggregatorHandle.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true); MetricData metricDataCumulative = cumulativeAggregator.toMetricData( @@ -504,7 +396,6 @@ void testToMetricData() { void testMultithreadedUpdates() throws InterruptedException { AggregatorHandle aggregatorHandle = aggregator.createHandle(); - ExponentialHistogram summarizer = new ExponentialHistogram(); ImmutableList updates = ImmutableList.of(0D, 0.1D, -0.1D, 1D, -1D, 100D); int numberOfThreads = updates.size(); int numberOfUpdates = 10000; @@ -520,17 +411,16 @@ void testMultithreadedUpdates() throws InterruptedException { for (int j = 0; j < numberOfUpdates; j++) { aggregatorHandle.recordDouble(v); if (ThreadLocalRandom.current().nextInt(10) == 0) { - summarizer.process( - aggregatorHandle.accumulateThenReset(Attributes.empty())); + aggregatorHandle.accumulateThenMaybeReset( + Attributes.empty(), /* reset= */ false); } } })) .collect(Collectors.toList())); - // make sure everything gets merged when all the aggregation is done. - summarizer.process(aggregatorHandle.accumulateThenReset(Attributes.empty())); - - ExponentialHistogramAccumulation acc = Objects.requireNonNull(summarizer.accumulation); + ExponentialHistogramAccumulation acc = + Objects.requireNonNull( + aggregatorHandle.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ false)); assertThat(acc.getZeroCount()).isEqualTo(numberOfUpdates); assertThat(acc.getSum()).isCloseTo(100.0D * 10000, Offset.offset(0.0001)); // float error assertThat(acc.getScale()).isEqualTo(3); @@ -564,24 +454,4 @@ void testMultithreadedUpdates() throws InterruptedException { negCounts.get(valueToIndex(acc.getScale(), 1) - acc.getPositiveBuckets().getOffset())) .isEqualTo(numberOfUpdates); } - - private static final class ExponentialHistogram { - private final Object mutex = new Object(); - - @Nullable private ExponentialHistogramAccumulation accumulation; - - void process(@Nullable ExponentialHistogramAccumulation other) { - if (other == null) { - return; - } - - synchronized (mutex) { - if (accumulation == null) { - accumulation = other; - return; - } - accumulation = aggregator.merge(accumulation, other); - } - } - } } diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleLastValueAggregatorTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleLastValueAggregatorTest.java index 1b5df5b6fd0..61e4a0c8356 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleLastValueAggregatorTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleLastValueAggregatorTest.java @@ -43,58 +43,44 @@ void multipleRecords() { AggregatorHandle aggregatorHandle = aggregator.createHandle(); aggregatorHandle.recordDouble(12.1); - assertThat(aggregatorHandle.accumulateThenReset(Attributes.empty()).getValue()).isEqualTo(12.1); + assertThat( + aggregatorHandle + .accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true) + .getValue()) + .isEqualTo(12.1); aggregatorHandle.recordDouble(13.1); aggregatorHandle.recordDouble(14.1); - assertThat(aggregatorHandle.accumulateThenReset(Attributes.empty()).getValue()).isEqualTo(14.1); + assertThat( + aggregatorHandle + .accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true) + .getValue()) + .isEqualTo(14.1); } @Test void toAccumulationAndReset() { AggregatorHandle aggregatorHandle = aggregator.createHandle(); - assertThat(aggregatorHandle.accumulateThenReset(Attributes.empty())).isNull(); + assertThat(aggregatorHandle.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true)) + .isNull(); aggregatorHandle.recordDouble(13.1); - assertThat(aggregatorHandle.accumulateThenReset(Attributes.empty()).getValue()).isEqualTo(13.1); - assertThat(aggregatorHandle.accumulateThenReset(Attributes.empty())).isNull(); + assertThat( + aggregatorHandle + .accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true) + .getValue()) + .isEqualTo(13.1); + assertThat(aggregatorHandle.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true)) + .isNull(); aggregatorHandle.recordDouble(12.1); - assertThat(aggregatorHandle.accumulateThenReset(Attributes.empty()).getValue()).isEqualTo(12.1); - assertThat(aggregatorHandle.accumulateThenReset(Attributes.empty())).isNull(); - } - - @Test - void mergeAccumulation() { - Attributes attributes = Attributes.builder().put("test", "value").build(); - DoubleExemplarData exemplar = - ImmutableDoubleExemplarData.create( - attributes, - 2L, - SpanContext.create( - "00000000000000000000000000000001", - "0000000000000002", - TraceFlags.getDefault(), - TraceState.getDefault()), - 1); - List exemplars = Collections.singletonList(exemplar); - List previousExemplars = - Collections.singletonList( - ImmutableDoubleExemplarData.create( - attributes, - 1L, - SpanContext.create( - "00000000000000000000000000000001", - "0000000000000002", - TraceFlags.getDefault(), - TraceState.getDefault()), - 2)); - DoubleAccumulation result = - aggregator.merge( - DoubleAccumulation.create(1, previousExemplars), - DoubleAccumulation.create(2, exemplars)); - // Assert that latest measurement is kept. - assertThat(result).isEqualTo(DoubleAccumulation.create(2, exemplars)); + assertThat( + aggregatorHandle + .accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true) + .getValue()) + .isEqualTo(12.1); + assertThat(aggregatorHandle.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true)) + .isNull(); } @Test @@ -131,7 +117,6 @@ void diffAccumulation() { } @Test - @SuppressWarnings("unchecked") void toMetricData() { AggregatorHandle aggregatorHandle = aggregator.createHandle(); @@ -143,7 +128,8 @@ void toMetricData() { INSTRUMENTATION_SCOPE_INFO, METRIC_DESCRIPTOR, Collections.singletonMap( - Attributes.empty(), aggregatorHandle.accumulateThenReset(Attributes.empty())), + Attributes.empty(), + aggregatorHandle.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true)), AggregationTemporality.DELTA, 0, 10, diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleSumAggregatorTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleSumAggregatorTest.java index 5f287e310ca..096d0095d0b 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleSumAggregatorTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleSumAggregatorTest.java @@ -66,7 +66,10 @@ void multipleRecords() { aggregatorHandle.recordDouble(12.1); aggregatorHandle.recordDouble(12.1); aggregatorHandle.recordDouble(12.1); - assertThat(aggregatorHandle.accumulateThenReset(Attributes.empty()).getValue()) + assertThat( + aggregatorHandle + .accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true) + .getValue()) .isEqualTo(12.1 * 5); } @@ -80,24 +83,39 @@ void multipleRecords_WithNegatives() { aggregatorHandle.recordDouble(12); aggregatorHandle.recordDouble(12); aggregatorHandle.recordDouble(-11); - assertThat(aggregatorHandle.accumulateThenReset(Attributes.empty()).getValue()).isEqualTo(14); + assertThat( + aggregatorHandle + .accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true) + .getValue()) + .isEqualTo(14); } @Test void toAccumulationAndReset() { AggregatorHandle aggregatorHandle = aggregator.createHandle(); - assertThat(aggregatorHandle.accumulateThenReset(Attributes.empty())).isNull(); + assertThat(aggregatorHandle.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true)) + .isNull(); aggregatorHandle.recordDouble(13); aggregatorHandle.recordDouble(12); - assertThat(aggregatorHandle.accumulateThenReset(Attributes.empty()).getValue()).isEqualTo(25); - assertThat(aggregatorHandle.accumulateThenReset(Attributes.empty())).isNull(); + assertThat( + aggregatorHandle + .accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true) + .getValue()) + .isEqualTo(25); + assertThat(aggregatorHandle.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true)) + .isNull(); aggregatorHandle.recordDouble(12); aggregatorHandle.recordDouble(-25); - assertThat(aggregatorHandle.accumulateThenReset(Attributes.empty()).getValue()).isEqualTo(-13); - assertThat(aggregatorHandle.accumulateThenReset(Attributes.empty())).isNull(); + assertThat( + aggregatorHandle + .accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true) + .getValue()) + .isEqualTo(-13); + assertThat(aggregatorHandle.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true)) + .isNull(); } @Test @@ -127,7 +145,7 @@ void testExemplarsInAccumulation() { AggregatorHandle aggregatorHandle = aggregator.createHandle(); aggregatorHandle.recordDouble(0, attributes, Context.root()); - assertThat(aggregatorHandle.accumulateThenReset(Attributes.empty())) + assertThat(aggregatorHandle.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true)) .isEqualTo(DoubleAccumulation.create(0, exemplars)); } @@ -145,17 +163,6 @@ void mergeAndDiff() { TraceState.getDefault()), 1); List exemplars = Collections.singletonList(exemplar); - List previousExemplars = - Collections.singletonList( - ImmutableDoubleExemplarData.create( - attributes, - 1L, - SpanContext.create( - "00000000000000000000000000000001", - "0000000000000002", - TraceFlags.getDefault(), - TraceState.getDefault()), - 2)); for (InstrumentType instrumentType : InstrumentType.values()) { for (AggregationTemporality temporality : AggregationTemporality.values()) { DoubleSumAggregator aggregator = @@ -163,16 +170,6 @@ void mergeAndDiff() { InstrumentDescriptor.create( "name", "description", "unit", instrumentType, InstrumentValueType.LONG), ExemplarReservoir::doubleNoSamples); - DoubleAccumulation merged = - aggregator.merge( - DoubleAccumulation.create(1.0d, previousExemplars), - DoubleAccumulation.create(2.0d, exemplars)); - assertThat(merged.getValue()) - .withFailMessage( - "Invalid merge result for instrumentType %s, temporality %s: %s", - instrumentType, temporality, merged) - .isEqualTo(3.0d); - assertThat(merged.getExemplars()).containsExactly(exemplar); DoubleAccumulation diffed = aggregator.diff( @@ -180,7 +177,7 @@ void mergeAndDiff() { assertThat(diffed.getValue()) .withFailMessage( "Invalid diff result for instrumentType %s, temporality %s: %s", - instrumentType, temporality, merged) + instrumentType, temporality, diffed) .isEqualTo(1d); assertThat(diffed.getExemplars()).containsExactly(exemplar); } @@ -188,7 +185,6 @@ void mergeAndDiff() { } @Test - @SuppressWarnings("unchecked") void toMetricData() { AggregatorHandle aggregatorHandle = aggregator.createHandle(); @@ -200,7 +196,8 @@ void toMetricData() { scope, metricDescriptor, Collections.singletonMap( - Attributes.empty(), aggregatorHandle.accumulateThenReset(Attributes.empty())), + Attributes.empty(), + aggregatorHandle.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true)), AggregationTemporality.CUMULATIVE, 0, 10, diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongLastValueAggregatorTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongLastValueAggregatorTest.java index c64c6f0fb4c..e7fca0148c9 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongLastValueAggregatorTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongLastValueAggregatorTest.java @@ -8,22 +8,17 @@ import static org.assertj.core.api.Assertions.assertThat; import io.opentelemetry.api.common.Attributes; -import io.opentelemetry.api.trace.SpanContext; -import io.opentelemetry.api.trace.TraceFlags; -import io.opentelemetry.api.trace.TraceState; import io.opentelemetry.sdk.common.InstrumentationScopeInfo; import io.opentelemetry.sdk.metrics.data.AggregationTemporality; import io.opentelemetry.sdk.metrics.data.LongExemplarData; import io.opentelemetry.sdk.metrics.data.MetricData; import io.opentelemetry.sdk.metrics.internal.data.ImmutableGaugeData; -import io.opentelemetry.sdk.metrics.internal.data.ImmutableLongExemplarData; import io.opentelemetry.sdk.metrics.internal.data.ImmutableLongPointData; import io.opentelemetry.sdk.metrics.internal.data.ImmutableMetricData; import io.opentelemetry.sdk.metrics.internal.descriptor.MetricDescriptor; import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarReservoir; import io.opentelemetry.sdk.resources.Resource; import java.util.Collections; -import java.util.List; import org.junit.jupiter.api.Test; /** Unit tests for {@link LongLastValueAggregator}. */ @@ -46,57 +41,44 @@ void multipleRecords() { AggregatorHandle aggregatorHandle = aggregator.createHandle(); aggregatorHandle.recordLong(12); - assertThat(aggregatorHandle.accumulateThenReset(Attributes.empty()).getValue()).isEqualTo(12L); + assertThat( + aggregatorHandle + .accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true) + .getValue()) + .isEqualTo(12L); aggregatorHandle.recordLong(13); aggregatorHandle.recordLong(14); - assertThat(aggregatorHandle.accumulateThenReset(Attributes.empty()).getValue()).isEqualTo(14L); + assertThat( + aggregatorHandle + .accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true) + .getValue()) + .isEqualTo(14L); } @Test void toAccumulationAndReset() { AggregatorHandle aggregatorHandle = aggregator.createHandle(); - assertThat(aggregatorHandle.accumulateThenReset(Attributes.empty())).isNull(); + assertThat(aggregatorHandle.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true)) + .isNull(); aggregatorHandle.recordLong(13); - assertThat(aggregatorHandle.accumulateThenReset(Attributes.empty()).getValue()).isEqualTo(13L); - assertThat(aggregatorHandle.accumulateThenReset(Attributes.empty())).isNull(); + assertThat( + aggregatorHandle + .accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true) + .getValue()) + .isEqualTo(13L); + assertThat(aggregatorHandle.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true)) + .isNull(); aggregatorHandle.recordLong(12); - assertThat(aggregatorHandle.accumulateThenReset(Attributes.empty()).getValue()).isEqualTo(12L); - assertThat(aggregatorHandle.accumulateThenReset(Attributes.empty())).isNull(); - } - - @Test - void mergeAccumulation() { - Attributes attributes = Attributes.builder().put("test", "value").build(); - LongExemplarData exemplar = - ImmutableLongExemplarData.create( - attributes, - 2L, - SpanContext.create( - "00000000000000000000000000000001", - "0000000000000002", - TraceFlags.getDefault(), - TraceState.getDefault()), - 1); - List exemplars = Collections.singletonList(exemplar); - List previousExemplars = - Collections.singletonList( - ImmutableLongExemplarData.create( - attributes, - 1L, - SpanContext.create( - "00000000000000000000000000000001", - "0000000000000002", - TraceFlags.getDefault(), - TraceState.getDefault()), - 2)); - LongAccumulation result = - aggregator.merge( - LongAccumulation.create(1, previousExemplars), LongAccumulation.create(2, exemplars)); - // Assert that latest measurement is kept. - assertThat(result).isEqualTo(LongAccumulation.create(2, exemplars)); + assertThat( + aggregatorHandle + .accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true) + .getValue()) + .isEqualTo(12L); + assertThat(aggregatorHandle.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true)) + .isNull(); } @Test @@ -111,7 +93,8 @@ void toMetricData() { INSTRUMENTATION_SCOPE_INFO, METRIC_DESCRIPTOR, Collections.singletonMap( - Attributes.empty(), aggregatorHandle.accumulateThenReset(Attributes.empty())), + Attributes.empty(), + aggregatorHandle.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true)), AggregationTemporality.CUMULATIVE, 2, 10, diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongSumAggregatorTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongSumAggregatorTest.java index 5598287f8d2..9bc5614ca06 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongSumAggregatorTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongSumAggregatorTest.java @@ -65,9 +65,13 @@ void multipleRecords() { aggregatorHandle.recordLong(12); aggregatorHandle.recordLong(12); aggregatorHandle.recordLong(12); - assertThat(aggregatorHandle.accumulateThenReset(Attributes.empty()).getValue()) + assertThat( + aggregatorHandle + .accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true) + .getValue()) .isEqualTo(12 * 5); - assertThat(aggregatorHandle.accumulateThenReset(Attributes.empty())).isNull(); + assertThat(aggregatorHandle.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true)) + .isNull(); } @Test @@ -80,25 +84,41 @@ void multipleRecords_WithNegatives() { aggregatorHandle.recordLong(12); aggregatorHandle.recordLong(12); aggregatorHandle.recordLong(-11); - assertThat(aggregatorHandle.accumulateThenReset(Attributes.empty()).getValue()).isEqualTo(14); - assertThat(aggregatorHandle.accumulateThenReset(Attributes.empty())).isNull(); + assertThat( + aggregatorHandle + .accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true) + .getValue()) + .isEqualTo(14); + assertThat(aggregatorHandle.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true)) + .isNull(); } @Test void toAccumulationAndReset() { AggregatorHandle aggregatorHandle = aggregator.createHandle(); - assertThat(aggregatorHandle.accumulateThenReset(Attributes.empty())).isNull(); + assertThat(aggregatorHandle.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true)) + .isNull(); aggregatorHandle.recordLong(13); aggregatorHandle.recordLong(12); - assertThat(aggregatorHandle.accumulateThenReset(Attributes.empty()).getValue()).isEqualTo(25); - assertThat(aggregatorHandle.accumulateThenReset(Attributes.empty())).isNull(); + assertThat( + aggregatorHandle + .accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true) + .getValue()) + .isEqualTo(25); + assertThat(aggregatorHandle.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true)) + .isNull(); aggregatorHandle.recordLong(12); aggregatorHandle.recordLong(-25); - assertThat(aggregatorHandle.accumulateThenReset(Attributes.empty()).getValue()).isEqualTo(-13); - assertThat(aggregatorHandle.accumulateThenReset(Attributes.empty())).isNull(); + assertThat( + aggregatorHandle + .accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true) + .getValue()) + .isEqualTo(-13); + assertThat(aggregatorHandle.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true)) + .isNull(); } @Test @@ -128,7 +148,7 @@ void testExemplarsInAccumulation() { AggregatorHandle aggregatorHandle = aggregator.createHandle(); aggregatorHandle.recordLong(0, attributes, Context.root()); - assertThat(aggregatorHandle.accumulateThenReset(Attributes.empty())) + assertThat(aggregatorHandle.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true)) .isEqualTo(LongAccumulation.create(0, exemplars)); } @@ -152,21 +172,13 @@ void mergeAndDiff() { InstrumentDescriptor.create( "name", "description", "unit", instrumentType, InstrumentValueType.LONG), ExemplarReservoir::longNoSamples); - LongAccumulation merged = - aggregator.merge(LongAccumulation.create(1L), LongAccumulation.create(2L, exemplars)); - assertThat(merged.getValue()) - .withFailMessage( - "Invalid merge result for instrumentType %s, temporality %s: %s", - instrumentType, temporality, merged) - .isEqualTo(3); - assertThat(merged.getExemplars()).containsExactly(exemplar); LongAccumulation diffed = aggregator.diff(LongAccumulation.create(1L), LongAccumulation.create(2L, exemplars)); assertThat(diffed.getValue()) .withFailMessage( "Invalid diff result for instrumentType %s, temporality %s: %s", - instrumentType, temporality, merged) + instrumentType, temporality, diffed) .isEqualTo(1); assertThat(diffed.getExemplars()).containsExactly(exemplar); } @@ -186,7 +198,8 @@ void toMetricData() { library, metricDescriptor, Collections.singletonMap( - Attributes.empty(), aggregatorHandle.accumulateThenReset(Attributes.empty())), + Attributes.empty(), + aggregatorHandle.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true)), AggregationTemporality.CUMULATIVE, 0, 10, diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorageTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorageTest.java index e546b5cb23b..4c153cc3841 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorageTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorageTest.java @@ -89,7 +89,7 @@ void recordLong() { longCounterStorage.recordLong(2, Attributes.builder().put("key", "b").build()); longCounterStorage.recordLong(3, Attributes.builder().put("key", "c").build()); - assertThat(longCounterStorage.collectAndReset(resource, scope, 0, testClock.nanoTime())) + assertThat(longCounterStorage.collect(resource, scope, 0, testClock.nanoTime())) .satisfies( metricData -> assertThat(metricData) @@ -111,7 +111,7 @@ void recordDouble() { doubleCounterStorage.recordDouble(2.2, Attributes.builder().put("key", "b").build()); doubleCounterStorage.recordDouble(3.3, Attributes.builder().put("key", "c").build()); - assertThat(doubleCounterStorage.collectAndReset(resource, scope, 0, testClock.nanoTime())) + assertThat(doubleCounterStorage.collect(resource, scope, 0, testClock.nanoTime())) .satisfies( metricData -> assertThat(metricData) @@ -144,7 +144,7 @@ void record_ProcessesAttributes() { storage.recordLong(1, Attributes.builder().put("key1", "a").put("key2", "b").build()); - assertThat(storage.collectAndReset(resource, scope, 0, testClock.nanoTime())) + assertThat(storage.collect(resource, scope, 0, testClock.nanoTime())) .satisfies( metricData -> assertThat(metricData) @@ -158,15 +158,15 @@ void record_ProcessesAttributes() { @Test void record_MaxAccumulations() { - for (int i = 0; i <= MetricStorageUtils.MAX_ACCUMULATIONS + 1; i++) { + for (int i = 0; i <= MetricStorage.MAX_ACCUMULATIONS + 1; i++) { longCounterStorage.recordLong(1, Attributes.builder().put("key" + i, "val").build()); } - assertThat(longCounterStorage.collectAndReset(resource, scope, 0, testClock.nanoTime())) + assertThat(longCounterStorage.collect(resource, scope, 0, testClock.nanoTime())) .satisfies( metricData -> assertThat(metricData.getLongSumData().getPoints()) - .hasSize(MetricStorageUtils.MAX_ACCUMULATIONS)); + .hasSize(MetricStorage.MAX_ACCUMULATIONS)); logs.assertContains("Instrument long-counter has exceeded the maximum allowed accumulations"); } @@ -175,7 +175,7 @@ void record_DuplicateAttributes() { longCounterStorage.recordLong(1, Attributes.builder().put("key1", "a").build()); longCounterStorage.recordLong(2, Attributes.builder().put("key1", "a").build()); - assertThat(longCounterStorage.collectAndReset(resource, scope, 0, testClock.nanoTime())) + assertThat(longCounterStorage.collect(resource, scope, 0, testClock.nanoTime())) .satisfies( metricData -> assertThat(metricData) @@ -187,4 +187,138 @@ void record_DuplicateAttributes() { logs.assertContains( "Instrument long-counter has recorded multiple values for the same attributes"); } + + @Test + void collect_CumulativeReportsCumulativeObservations() { + // Record measurement and collect at time 10 + longCounterStorage.recordLong(3, Attributes.empty()); + assertThat(longCounterStorage.collect(resource, scope, 0, 10)) + .hasLongSumSatisfying( + sum -> + sum.isCumulative() + .hasPointsSatisfying( + point -> + point + .hasStartEpochNanos(0) + .hasEpochNanos(10) + .hasValue(3) + .hasAttributes(Attributes.empty()))); + registeredReader.setLastCollectEpochNanos(10); + + // Record measurements and collect at time 30 + longCounterStorage.recordLong(3, Attributes.empty()); + longCounterStorage.recordLong(6, Attributes.builder().put("key", "value1").build()); + assertThat(longCounterStorage.collect(resource, scope, 0, 30)) + .hasLongSumSatisfying( + sum -> + sum.isCumulative() + .hasPointsSatisfying( + point -> + point + .hasStartEpochNanos(0) + .hasEpochNanos(30) + .hasValue(3) + .hasAttributes(Attributes.empty()), + point -> + point + .hasStartEpochNanos(0) + .hasEpochNanos(30) + .hasValue(6) + .hasAttributes(Attributes.builder().put("key", "value1").build()))); + registeredReader.setLastCollectEpochNanos(30); + + // Record measurement and collect at time 35 + longCounterStorage.recordLong(4, Attributes.empty()); + longCounterStorage.recordLong(5, Attributes.builder().put("key", "value2").build()); + assertThat(longCounterStorage.collect(resource, scope, 0, 35)) + .hasLongSumSatisfying( + sum -> + sum.isCumulative() + .hasPointsSatisfying( + point -> + point + .hasStartEpochNanos(0) + .hasEpochNanos(35) + .hasValue(4) + .hasAttributes(Attributes.empty()), + point -> + point + .hasStartEpochNanos(0) + .hasEpochNanos(35) + .hasValue(5) + .hasAttributes(Attributes.builder().put("key", "value2").build()))); + } + + @Test + void collect_DeltaComputesDiff() { + when(reader.getAggregationTemporality(any())).thenReturn(AggregationTemporality.DELTA); + longCounterStorage = + AsynchronousMetricStorage.create( + registeredReader, + registeredView, + InstrumentDescriptor.create( + "long-counter", + "description", + "unit", + InstrumentType.COUNTER, + InstrumentValueType.LONG)); + + // Record measurement and collect at time 10 + longCounterStorage.recordLong(3, Attributes.empty()); + assertThat(longCounterStorage.collect(resource, scope, 0, 10)) + .hasLongSumSatisfying( + sum -> + sum.isDelta() + .hasPointsSatisfying( + point -> + point + .hasStartEpochNanos(0) + .hasEpochNanos(10) + .hasValue(3) + .hasAttributes(Attributes.empty()))); + registeredReader.setLastCollectEpochNanos(10); + + // Record measurement and collect at time 30 + longCounterStorage.recordLong(3, Attributes.empty()); + longCounterStorage.recordLong(6, Attributes.builder().put("key", "value1").build()); + assertThat(longCounterStorage.collect(resource, scope, 0, 30)) + .hasLongSumSatisfying( + sum -> + sum.isDelta() + .hasPointsSatisfying( + point -> + point + .hasStartEpochNanos(10) + .hasEpochNanos(30) + .hasValue(0) + .hasAttributes(Attributes.empty()), + point -> + point + .hasStartEpochNanos(10) + .hasEpochNanos(30) + .hasValue(6) + .hasAttributes(Attributes.builder().put("key", "value1").build()))); + registeredReader.setLastCollectEpochNanos(30); + + // Record measurement and collect at time 35 + longCounterStorage.recordLong(4, Attributes.empty()); + longCounterStorage.recordLong(5, Attributes.builder().put("key", "value2").build()); + assertThat(longCounterStorage.collect(resource, scope, 0, 35)) + .hasLongSumSatisfying( + sum -> + sum.isDelta() + .hasPointsSatisfying( + point -> + point + .hasStartEpochNanos(30) + .hasEpochNanos(35) + .hasValue(1) + .hasAttributes(Attributes.empty()), + point -> + point + .hasStartEpochNanos(30) + .hasEpochNanos(35) + .hasValue(5) + .hasAttributes(Attributes.builder().put("key", "value2").build()))); + } } diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/MetricStorageRegistryTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/MetricStorageRegistryTest.java index bb2523fd1b4..d69a358ab98 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/MetricStorageRegistryTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/MetricStorageRegistryTest.java @@ -130,7 +130,7 @@ public RegisteredReader getRegisteredReader() { } @Override - public MetricData collectAndReset( + public MetricData collect( Resource resource, InstrumentationScopeInfo instrumentationScopeInfo, long startEpochNanos, diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/MetricStorageUtilsTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/MetricStorageUtilsTest.java deleted file mode 100644 index 08c166039c2..00000000000 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/MetricStorageUtilsTest.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.sdk.metrics.internal.state; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.AssertionsForClassTypes.entry; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.mock; - -import io.opentelemetry.api.common.AttributeKey; -import io.opentelemetry.api.common.Attributes; -import io.opentelemetry.sdk.metrics.internal.aggregator.Aggregator; -import java.util.HashMap; -import java.util.Map; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -class MetricStorageUtilsTest { - - private static final Attributes a = Attributes.of(AttributeKey.stringKey("a"), "a"); - private static final Attributes b = Attributes.of(AttributeKey.stringKey("b"), "b"); - private static final Attributes c = Attributes.of(AttributeKey.stringKey("c"), "c"); - private static final Attributes d = Attributes.of(AttributeKey.stringKey("d"), "d"); - private static final Attributes e = Attributes.of(AttributeKey.stringKey("e"), "e"); - private Map result; - private Map toMerge; - - @BeforeEach - public void setup() { - result = new HashMap<>(); - result.put(a, "A"); - result.put(b, "B"); - result.put(d, null); - toMerge = new HashMap<>(); - toMerge.put(b, "B'"); - toMerge.put(c, "C"); - toMerge.put(e, null); - } - - @Test - void mergeInPlace() { - Aggregator agg = buildConcatAggregator(); - MetricStorageUtils.mergeInPlace(result, toMerge, agg); - - assertThat(result).containsOnly(entry(b, "merge(B,B')"), entry(c, "C")); - } - - @Test - void diffInPlace() { - Aggregator agg = buildConcatAggregator(); - MetricStorageUtils.diffInPlace(result, toMerge, agg); - - assertThat(result).containsOnly(entry(b, "diff(B,B')"), entry(c, "C")); - } - - @SuppressWarnings("unchecked") - private static Aggregator buildConcatAggregator() { - Aggregator agg = mock(Aggregator.class); - doAnswer( - invocation -> { - String previousCumulative = invocation.getArgument(0); - String delta = invocation.getArgument(1); - return "merge(" + previousCumulative + "," + delta + ")"; - }) - .when(agg) - .merge(anyString(), anyString()); - doAnswer( - invocation -> { - String previousCumulative = invocation.getArgument(0); - String delta = invocation.getArgument(1); - return "diff(" + previousCumulative + "," + delta + ")"; - }) - .when(agg) - .diff(anyString(), anyString()); - return agg; - } -} diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorageTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorageTest.java index ceded3f948f..795d0119445 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorageTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorageTest.java @@ -8,15 +8,17 @@ import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat; import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.attributeEntry; +import io.github.netmikey.logunit.api.LogCapturer; +import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.common.Attributes; import io.opentelemetry.context.Context; +import io.opentelemetry.internal.testing.slf4j.SuppressLogger; import io.opentelemetry.sdk.common.InstrumentationScopeInfo; import io.opentelemetry.sdk.metrics.Aggregation; import io.opentelemetry.sdk.metrics.InstrumentType; import io.opentelemetry.sdk.metrics.InstrumentValueType; import io.opentelemetry.sdk.metrics.data.LongExemplarData; import io.opentelemetry.sdk.metrics.data.MetricData; -import io.opentelemetry.sdk.metrics.export.MetricReader; import io.opentelemetry.sdk.metrics.internal.aggregator.Aggregator; import io.opentelemetry.sdk.metrics.internal.aggregator.AggregatorFactory; import io.opentelemetry.sdk.metrics.internal.aggregator.EmptyMetricData; @@ -27,15 +29,13 @@ import io.opentelemetry.sdk.metrics.internal.view.AttributesProcessor; import io.opentelemetry.sdk.metrics.internal.view.ViewRegistry; import io.opentelemetry.sdk.resources.Resource; +import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader; import io.opentelemetry.sdk.testing.time.TestClock; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mock; +import org.junit.jupiter.api.extension.RegisterExtension; import org.mockito.Mockito; -import org.mockito.junit.jupiter.MockitoExtension; -@ExtendWith(MockitoExtension.class) +@SuppressLogger(DefaultSynchronousMetricStorage.class) public class SynchronousMetricStorageTest { private static final Resource RESOURCE = Resource.empty(); private static final InstrumentationScopeInfo INSTRUMENTATION_SCOPE_INFO = @@ -45,26 +45,26 @@ public class SynchronousMetricStorageTest { "name", "description", "unit", InstrumentType.COUNTER, InstrumentValueType.DOUBLE); private static final MetricDescriptor METRIC_DESCRIPTOR = MetricDescriptor.create("name", "description", "unit"); + + @RegisterExtension + LogCapturer logs = LogCapturer.create().captureForType(DefaultSynchronousMetricStorage.class); + + private final RegisteredReader deltaReader = + RegisteredReader.create(InMemoryMetricReader.createDelta(), ViewRegistry.create()); + private final RegisteredReader cumulativeReader = + RegisteredReader.create(InMemoryMetricReader.create(), ViewRegistry.create()); private final TestClock testClock = TestClock.create(); private final Aggregator aggregator = - ((AggregatorFactory) Aggregation.lastValue()) + ((AggregatorFactory) Aggregation.sum()) .createAggregator(DESCRIPTOR, ExemplarFilter.alwaysOff()); private final AttributesProcessor attributesProcessor = AttributesProcessor.noop(); - @Mock private MetricReader reader; - private RegisteredReader registeredReader; - - @BeforeEach - void setup() { - registeredReader = RegisteredReader.create(reader, ViewRegistry.create()); - } - @Test void attributesProcessor_used() { AttributesProcessor spyAttributesProcessor = Mockito.spy(this.attributesProcessor); SynchronousMetricStorage accumulator = new DefaultSynchronousMetricStorage<>( - registeredReader, METRIC_DESCRIPTOR, aggregator, spyAttributesProcessor); + cumulativeReader, METRIC_DESCRIPTOR, aggregator, spyAttributesProcessor); accumulator.bind(Attributes.empty()); Mockito.verify(spyAttributesProcessor).process(Attributes.empty(), Context.current()); } @@ -77,15 +77,14 @@ void attributesProcessor_applied() { AttributesProcessor spyLabelsProcessor = Mockito.spy(attributesProcessor); SynchronousMetricStorage accumulator = new DefaultSynchronousMetricStorage<>( - registeredReader, METRIC_DESCRIPTOR, aggregator, spyLabelsProcessor); + cumulativeReader, METRIC_DESCRIPTOR, aggregator, spyLabelsProcessor); BoundStorageHandle handle = accumulator.bind(labels); handle.recordDouble(1, labels, Context.root()); - MetricData md = - accumulator.collectAndReset(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, testClock.now()); + MetricData md = accumulator.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, testClock.now()); assertThat(md) - .hasDoubleGaugeSatisfying( - gauge -> - gauge.hasPointsSatisfying( + .hasDoubleSumSatisfying( + sum -> + sum.hasPointsSatisfying( point -> point.hasAttributes( attributeEntry("K", "V"), attributeEntry("modifiedK", "modifiedV")))); @@ -93,17 +92,16 @@ void attributesProcessor_applied() { @Test void sameAggregator_ForSameAttributes() { - SynchronousMetricStorage accumulator = + SynchronousMetricStorage storage = new DefaultSynchronousMetricStorage<>( - registeredReader, METRIC_DESCRIPTOR, aggregator, attributesProcessor); - BoundStorageHandle handle = accumulator.bind(Attributes.builder().put("K", "V").build()); - BoundStorageHandle duplicateHandle = - accumulator.bind(Attributes.builder().put("K", "V").build()); + cumulativeReader, METRIC_DESCRIPTOR, aggregator, attributesProcessor); + BoundStorageHandle handle = storage.bind(Attributes.builder().put("K", "V").build()); + BoundStorageHandle duplicateHandle = storage.bind(Attributes.builder().put("K", "V").build()); try { assertThat(duplicateHandle).isSameAs(handle); - accumulator.collectAndReset(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, testClock.now()); + storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, testClock.now()); BoundStorageHandle anotherDuplicateAggregatorHandle = - accumulator.bind(Attributes.builder().put("K", "V").build()); + storage.bind(Attributes.builder().put("K", "V").build()); try { assertThat(anotherDuplicateAggregatorHandle).isSameAs(handle); } finally { @@ -116,8 +114,209 @@ void sameAggregator_ForSameAttributes() { // If we try to collect once all bound references are gone AND no recordings have occurred, we // should not see any labels (or metric). - assertThat( - accumulator.collectAndReset(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, testClock.now())) + assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, testClock.now())) .isEqualTo(EmptyMetricData.getInstance()); } + + @Test + void recordAndcollect_CumulativeDoesNotReset() { + SynchronousMetricStorage storage = + new DefaultSynchronousMetricStorage<>( + cumulativeReader, METRIC_DESCRIPTOR, aggregator, attributesProcessor); + + // Record measurement and collect at time 10 + storage.recordDouble(3, Attributes.empty(), Context.current()); + assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 10)) + .hasDoubleSumSatisfying( + sum -> + sum.isCumulative() + .hasPointsSatisfying( + point -> point.hasStartEpochNanos(0).hasEpochNanos(10).hasValue(3))); + cumulativeReader.setLastCollectEpochNanos(10); + + // Record measurement and collect at time 30 + storage.recordDouble(3, Attributes.empty(), Context.current()); + assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 30)) + .hasDoubleSumSatisfying( + sum -> + sum.isCumulative() + .hasPointsSatisfying( + point -> point.hasStartEpochNanos(0).hasEpochNanos(30).hasValue(6))); + cumulativeReader.setLastCollectEpochNanos(30); + + // Record measurement and collect at time 35 + storage.recordDouble(2, Attributes.empty(), Context.current()); + assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 35)) + .hasDoubleSumSatisfying( + sum -> + sum.isCumulative() + .hasPointsSatisfying( + point -> point.hasStartEpochNanos(0).hasEpochNanos(35).hasValue(8))); + } + + @Test + void recordAndcollect_DeltaResets() { + SynchronousMetricStorage storage = + new DefaultSynchronousMetricStorage<>( + deltaReader, METRIC_DESCRIPTOR, aggregator, attributesProcessor); + + // Record measurement and collect at time 10 + storage.recordDouble(3, Attributes.empty(), Context.current()); + assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 10)) + .hasDoubleSumSatisfying( + sum -> + sum.isDelta() + .hasPointsSatisfying( + point -> point.hasStartEpochNanos(0).hasEpochNanos(10).hasValue(3))); + deltaReader.setLastCollectEpochNanos(10); + + // Record measurement and collect at time 30 + storage.recordDouble(3, Attributes.empty(), Context.current()); + assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 30)) + .hasDoubleSumSatisfying( + sum -> + sum.isDelta() + .hasPointsSatisfying( + point -> point.hasStartEpochNanos(10).hasEpochNanos(30).hasValue(3))); + deltaReader.setLastCollectEpochNanos(30); + + // Record measurement and collect at time 35 + storage.recordDouble(2, Attributes.empty(), Context.current()); + assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 35)) + .hasDoubleSumSatisfying( + sum -> + sum.isDelta() + .hasPointsSatisfying( + point -> point.hasStartEpochNanos(30).hasEpochNanos(35).hasValue(2))); + } + + @Test + void recordAndCollect_CumulativeAtLimit() { + SynchronousMetricStorage storage = + new DefaultSynchronousMetricStorage<>( + cumulativeReader, METRIC_DESCRIPTOR, aggregator, attributesProcessor); + + // Record measurements for max number of attributes + for (int i = 0; i < MetricStorage.MAX_ACCUMULATIONS; i++) { + storage.recordDouble( + 3, Attributes.builder().put("key", "value" + i).build(), Context.current()); + } + assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 10)) + .hasDoubleSumSatisfying( + sum -> + sum.satisfies( + sumData -> + assertThat(sumData.getPoints()) + .hasSize(MetricStorage.MAX_ACCUMULATIONS) + .allSatisfy( + point -> { + assertThat(point.getStartEpochNanos()).isEqualTo(0); + assertThat(point.getEpochNanos()).isEqualTo(10); + assertThat(point.getValue()).isEqualTo(3); + }))); + assertThat(logs.getEvents()).isEmpty(); + cumulativeReader.setLastCollectEpochNanos(10); + + // Record measurement for additional attribute, exceeding limit + storage.recordDouble( + 3, + Attributes.builder().put("key", "value" + MetricStorage.MAX_ACCUMULATIONS + 1).build(), + Context.current()); + assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 20)) + .hasDoubleSumSatisfying( + sum -> + sum.satisfies( + sumData -> + assertThat(sumData.getPoints()) + .hasSize(MetricStorage.MAX_ACCUMULATIONS) + .allSatisfy( + point -> { + assertThat(point.getStartEpochNanos()).isEqualTo(0); + assertThat(point.getEpochNanos()).isEqualTo(20); + assertThat(point.getValue()).isEqualTo(3); + }) + .noneMatch( + point -> + point + .getAttributes() + .get(AttributeKey.stringKey("key")) + .equals("value" + MetricStorage.MAX_ACCUMULATIONS + 1)))); + logs.assertContains("Instrument name has exceeded the maximum allowed accumulations"); + } + + @Test + void recordAndCollect_DeltaAtLimit() { + SynchronousMetricStorage storage = + new DefaultSynchronousMetricStorage<>( + deltaReader, METRIC_DESCRIPTOR, aggregator, attributesProcessor); + + // Record measurements for max number of attributes + for (int i = 0; i < MetricStorage.MAX_ACCUMULATIONS; i++) { + storage.recordDouble( + 3, Attributes.builder().put("key", "value" + i).build(), Context.current()); + } + assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 10)) + .hasDoubleSumSatisfying( + sum -> + sum.satisfies( + sumData -> + assertThat(sumData.getPoints()) + .hasSize(MetricStorage.MAX_ACCUMULATIONS) + .allSatisfy( + point -> { + assertThat(point.getStartEpochNanos()).isEqualTo(0); + assertThat(point.getEpochNanos()).isEqualTo(10); + assertThat(point.getValue()).isEqualTo(3); + }))); + assertThat(logs.getEvents()).isEmpty(); + deltaReader.setLastCollectEpochNanos(10); + + // Record measurement for additional attribute, should not exceed limit due to reset + storage.recordDouble( + 3, + Attributes.builder().put("key", "value" + MetricStorage.MAX_ACCUMULATIONS + 1).build(), + Context.current()); + assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 20)) + .hasDoubleSumSatisfying( + sum -> + sum.isDelta() + .hasPointsSatisfying( + point -> + point + .hasStartEpochNanos(10) + .hasEpochNanos(20) + .hasValue(3) + .hasAttributes( + Attributes.builder() + .put("key", "value" + MetricStorage.MAX_ACCUMULATIONS + 1) + .build()))); + assertThat(logs.getEvents()).isEmpty(); + deltaReader.setLastCollectEpochNanos(20); + + // Record measurements exceeding max number of attributes. Last measurement should be dropped + for (int i = 0; i < MetricStorage.MAX_ACCUMULATIONS + 1; i++) { + storage.recordDouble( + 3, Attributes.builder().put("key", "value" + i).build(), Context.current()); + } + assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 30)) + .hasDoubleSumSatisfying( + sum -> + sum.satisfies( + sumData -> + assertThat(sumData.getPoints()) + .hasSize(MetricStorage.MAX_ACCUMULATIONS) + .allSatisfy( + point -> { + assertThat(point.getStartEpochNanos()).isEqualTo(20); + assertThat(point.getEpochNanos()).isEqualTo(30); + assertThat(point.getValue()).isEqualTo(3); + }) + .noneMatch( + point -> + point + .getAttributes() + .get(AttributeKey.stringKey("key")) + .equals("value" + MetricStorage.MAX_ACCUMULATIONS + 1)))); + logs.assertContains("Instrument name has exceeded the maximum allowed accumulations"); + } } diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/TemporalMetricStorageTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/TemporalMetricStorageTest.java deleted file mode 100644 index 0e0827d3472..00000000000 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/TemporalMetricStorageTest.java +++ /dev/null @@ -1,431 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.sdk.metrics.internal.state; - -import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat; - -import io.opentelemetry.api.common.Attributes; -import io.opentelemetry.sdk.common.InstrumentationScopeInfo; -import io.opentelemetry.sdk.metrics.Aggregation; -import io.opentelemetry.sdk.metrics.InstrumentType; -import io.opentelemetry.sdk.metrics.InstrumentValueType; -import io.opentelemetry.sdk.metrics.data.AggregationTemporality; -import io.opentelemetry.sdk.metrics.data.DoubleExemplarData; -import io.opentelemetry.sdk.metrics.export.MetricReader; -import io.opentelemetry.sdk.metrics.internal.aggregator.Aggregator; -import io.opentelemetry.sdk.metrics.internal.aggregator.AggregatorFactory; -import io.opentelemetry.sdk.metrics.internal.aggregator.DoubleAccumulation; -import io.opentelemetry.sdk.metrics.internal.descriptor.InstrumentDescriptor; -import io.opentelemetry.sdk.metrics.internal.descriptor.MetricDescriptor; -import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarFilter; -import io.opentelemetry.sdk.metrics.internal.export.RegisteredReader; -import io.opentelemetry.sdk.metrics.internal.view.ViewRegistry; -import io.opentelemetry.sdk.resources.Resource; -import java.util.HashMap; -import java.util.Map; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoExtension; - -@ExtendWith(MockitoExtension.class) -class TemporalMetricStorageTest { - private static final InstrumentDescriptor DESCRIPTOR = - InstrumentDescriptor.create( - "name", "description", "unit", InstrumentType.COUNTER, InstrumentValueType.DOUBLE); - private static final InstrumentDescriptor ASYNC_DESCRIPTOR = - InstrumentDescriptor.create( - "name", - "description", - "unit", - InstrumentType.OBSERVABLE_COUNTER, - InstrumentValueType.DOUBLE); - private static final MetricDescriptor METRIC_DESCRIPTOR = - MetricDescriptor.create("name", "description", "unit"); - private static final Aggregator SUM = - ((AggregatorFactory) Aggregation.sum()) - .createAggregator(DESCRIPTOR, ExemplarFilter.alwaysOff()); - - private static final Aggregator ASYNC_SUM = - ((AggregatorFactory) Aggregation.sum()) - .createAggregator(ASYNC_DESCRIPTOR, ExemplarFilter.alwaysOff()); - - @Mock private MetricReader reader; - private RegisteredReader registeredReader; - - @BeforeEach - void setup() { - registeredReader = RegisteredReader.create(reader, ViewRegistry.create()); - registeredReader.setLastCollectEpochNanos(0); - } - - private static Map createMeasurement(double value) { - Map measurement = new HashMap<>(); - measurement.put(Attributes.empty(), DoubleAccumulation.create(value)); - return measurement; - } - - @Test - void synchronousCumulative_joinsWithLastMeasurementForCumulative() { - AggregationTemporality temporality = AggregationTemporality.CUMULATIVE; - TemporalMetricStorage storage = - new TemporalMetricStorage<>( - SUM, /* isSynchronous= */ true, registeredReader, temporality, METRIC_DESCRIPTOR); - // Send in new measurement at time 10 - assertThat( - storage.buildMetricFor( - Resource.empty(), InstrumentationScopeInfo.empty(), createMeasurement(3), 0, 10)) - .hasDoubleSumSatisfying( - sum -> - sum.isCumulative() - .hasPointsSatisfying( - point -> point.hasStartEpochNanos(0).hasEpochNanos(10).hasValue(3))); - registeredReader.setLastCollectEpochNanos(10); - - // Send in new measurement at time 30 - assertThat( - storage.buildMetricFor( - Resource.empty(), InstrumentationScopeInfo.empty(), createMeasurement(3), 0, 30)) - .hasDoubleSumSatisfying( - sum -> - sum.isCumulative() - .hasPointsSatisfying( - point -> point.hasStartEpochNanos(0).hasEpochNanos(30).hasValue(6))); - registeredReader.setLastCollectEpochNanos(30); - - // Send in new measurement at time 35 - assertThat( - storage.buildMetricFor( - Resource.empty(), InstrumentationScopeInfo.empty(), createMeasurement(2), 0, 35)) - .hasDoubleSumSatisfying( - sum -> - sum.isCumulative() - .hasPointsSatisfying( - point -> point.hasStartEpochNanos(0).hasEpochNanos(35).hasValue(8))); - } - - @Test - void synchronousCumulative_dropsStaleAtLimit() { - TemporalMetricStorage storage = - new TemporalMetricStorage<>( - SUM, - /* isSynchronous= */ true, - registeredReader, - AggregationTemporality.CUMULATIVE, - METRIC_DESCRIPTOR); - - // Send in new measurement at time 10, with attr1 - Map measurement1 = new HashMap<>(); - for (int i = 0; i < MetricStorageUtils.MAX_ACCUMULATIONS; i++) { - Attributes attr1 = Attributes.builder().put("key", "value" + i).build(); - measurement1.put(attr1, DoubleAccumulation.create(3)); - } - assertThat( - storage.buildMetricFor( - Resource.empty(), InstrumentationScopeInfo.empty(), measurement1, 0, 10)) - .hasDoubleSumSatisfying( - sum -> - sum.isCumulative() - .satisfies( - sumData -> - assertThat(sumData.getPoints()) - .hasSize(MetricStorageUtils.MAX_ACCUMULATIONS) - .allSatisfy( - point -> { - assertThat(point.getStartEpochNanos()).isEqualTo(0); - assertThat(point.getEpochNanos()).isEqualTo(10); - assertThat(point.getValue()).isEqualTo(3); - }))); - registeredReader.setLastCollectEpochNanos(10); - - // Send in new measurement at time 20, with attr2 - // Result should drop accumulation for attr1, only reporting accumulation for attr2 - Map measurement2 = new HashMap<>(); - Attributes attr2 = - Attributes.builder() - .put("key", "value" + (MetricStorageUtils.MAX_ACCUMULATIONS + 1)) - .build(); - measurement2.put(attr2, DoubleAccumulation.create(3)); - assertThat( - storage.buildMetricFor( - Resource.empty(), InstrumentationScopeInfo.empty(), measurement2, 0, 20)) - .hasDoubleSumSatisfying( - sum -> sum.isCumulative().hasPointsSatisfying(point -> point.hasAttributes(attr2))); - } - - @Test - void synchronousDelta_dropsStale() { - TemporalMetricStorage storage = - new TemporalMetricStorage<>( - SUM, - /* isSynchronous= */ true, - registeredReader, - AggregationTemporality.DELTA, - METRIC_DESCRIPTOR); - - // Send in new measurement at time 10, with attr1 - Map measurement1 = new HashMap<>(); - Attributes attr1 = Attributes.builder().put("key", "value1").build(); - measurement1.put(attr1, DoubleAccumulation.create(3)); - assertThat( - storage.buildMetricFor( - Resource.empty(), InstrumentationScopeInfo.empty(), measurement1, 0, 10)) - .hasDoubleSumSatisfying( - sum -> - sum.isDelta() - .hasPointsSatisfying( - point -> - point - .hasStartEpochNanos(0) - .hasEpochNanos(10) - .hasAttributes(attr1) - .hasValue(3))); - registeredReader.setLastCollectEpochNanos(10); - - // Send in new measurement at time 20, with attr2 - // Result should drop accumulation for attr1, only reporting accumulation for attr2 - Map measurement2 = new HashMap<>(); - Attributes attr2 = Attributes.builder().put("key", "value2").build(); - measurement2.put(attr2, DoubleAccumulation.create(7)); - assertThat( - storage.buildMetricFor( - Resource.empty(), InstrumentationScopeInfo.empty(), measurement2, 0, 20)) - .hasDoubleSumSatisfying( - sum -> - sum.isDelta() - .hasPointsSatisfying( - point -> - point - .hasStartEpochNanos(10) - .hasEpochNanos(20) - .hasAttributes(attr2) - .hasValue(7))); - } - - @Test - void synchronousDelta_useLastTimestamp() { - AggregationTemporality temporality = AggregationTemporality.DELTA; - TemporalMetricStorage storage = - new TemporalMetricStorage<>( - SUM, /* isSynchronous= */ true, registeredReader, temporality, METRIC_DESCRIPTOR); - // Send in new measurement at time 10 - assertThat( - storage.buildMetricFor( - Resource.empty(), InstrumentationScopeInfo.empty(), createMeasurement(3), 0, 10)) - .hasDoubleSumSatisfying( - sum -> - sum.isDelta() - .hasPointsSatisfying( - point -> point.hasStartEpochNanos(0).hasEpochNanos(10).hasValue(3))); - registeredReader.setLastCollectEpochNanos(10); - - // Send in new measurement at time 30 - assertThat( - storage.buildMetricFor( - Resource.empty(), InstrumentationScopeInfo.empty(), createMeasurement(3), 0, 30)) - .hasDoubleSumSatisfying( - sum -> - sum.isDelta() - .hasPointsSatisfying( - point -> point.hasStartEpochNanos(10).hasEpochNanos(30).hasValue(3))); - registeredReader.setLastCollectEpochNanos(30); - - // Send in new measurement at time 35 - assertThat( - storage.buildMetricFor( - Resource.empty(), InstrumentationScopeInfo.empty(), createMeasurement(2), 0, 35)) - .hasDoubleSumSatisfying( - sum -> - sum.isDelta() - .hasPointsSatisfying( - point -> point.hasStartEpochNanos(30).hasEpochNanos(35).hasValue(2))); - } - - @Test - void asynchronousCumulative_doesNotJoin() { - AggregationTemporality temporality = AggregationTemporality.CUMULATIVE; - TemporalMetricStorage storage = - new TemporalMetricStorage<>( - ASYNC_SUM, - /* isSynchronous= */ false, - registeredReader, - temporality, - METRIC_DESCRIPTOR); - // Send in new measurement at time 10 - assertThat( - storage.buildMetricFor( - Resource.empty(), InstrumentationScopeInfo.empty(), createMeasurement(3), 0, 10)) - .hasDoubleSumSatisfying( - sum -> - sum.isCumulative() - .hasPointsSatisfying( - point -> point.hasStartEpochNanos(0).hasEpochNanos(10).hasValue(3))); - registeredReader.setLastCollectEpochNanos(10); - - // Send in new measurement at time 30 - assertThat( - storage.buildMetricFor( - Resource.empty(), InstrumentationScopeInfo.empty(), createMeasurement(3), 0, 30)) - .hasDoubleSumSatisfying( - sum -> - sum.isCumulative() - .hasPointsSatisfying( - point -> point.hasStartEpochNanos(0).hasEpochNanos(30).hasValue(3))); - registeredReader.setLastCollectEpochNanos(30); - - // Send in new measurement at time 35 - assertThat( - storage.buildMetricFor( - Resource.empty(), InstrumentationScopeInfo.empty(), createMeasurement(2), 0, 35)) - .hasDoubleSumSatisfying( - sum -> - sum.isCumulative() - .hasPointsSatisfying( - point -> point.hasStartEpochNanos(0).hasEpochNanos(35).hasValue(2))); - } - - @Test - void asynchronousCumulative_dropsStale() { - TemporalMetricStorage storage = - new TemporalMetricStorage<>( - ASYNC_SUM, - /* isSynchronous= */ false, - registeredReader, - AggregationTemporality.CUMULATIVE, - METRIC_DESCRIPTOR); - - // Send in new measurement at time 10, with attr1 - Map measurement1 = new HashMap<>(); - Attributes attr1 = Attributes.builder().put("key", "value1").build(); - measurement1.put(attr1, DoubleAccumulation.create(3)); - assertThat( - storage.buildMetricFor( - Resource.empty(), InstrumentationScopeInfo.empty(), measurement1, 0, 10)) - .hasDoubleSumSatisfying( - sum -> - sum.isCumulative() - .hasPointsSatisfying( - point -> - point - .hasStartEpochNanos(0) - .hasEpochNanos(10) - .hasAttributes(attr1) - .hasValue(3))); - registeredReader.setLastCollectEpochNanos(10); - - // Send in new measurement at time 20, with attr2 - // Result should drop accumulation for attr1, only reporting accumulation for attr2 - Map measurement2 = new HashMap<>(); - Attributes attr2 = Attributes.builder().put("key", "value2").build(); - measurement2.put(attr2, DoubleAccumulation.create(7)); - assertThat( - storage.buildMetricFor( - Resource.empty(), InstrumentationScopeInfo.empty(), measurement2, 0, 20)) - .hasDoubleSumSatisfying( - sum -> - sum.isCumulative() - .hasPointsSatisfying( - point -> - point - .hasStartEpochNanos(0) - .hasEpochNanos(20) - .hasAttributes(attr2) - .hasValue(7))); - } - - @Test - void asynchronousDelta_dropsStale() { - TemporalMetricStorage storage = - new TemporalMetricStorage<>( - ASYNC_SUM, - /* isSynchronous= */ false, - registeredReader, - AggregationTemporality.DELTA, - METRIC_DESCRIPTOR); - - // Send in new measurement at time 10, with attr1 - Map measurement1 = new HashMap<>(); - Attributes attr1 = Attributes.builder().put("key", "value1").build(); - measurement1.put(attr1, DoubleAccumulation.create(3)); - assertThat( - storage.buildMetricFor( - Resource.empty(), InstrumentationScopeInfo.empty(), measurement1, 0, 10)) - .hasDoubleSumSatisfying( - sum -> - sum.isDelta() - .hasPointsSatisfying( - point -> - point - .hasStartEpochNanos(0) - .hasEpochNanos(10) - .hasAttributes(attr1) - .hasValue(3))); - registeredReader.setLastCollectEpochNanos(10); - - // Send in new measurement at time 20, with attr2 - // Result should drop accumulation for attr1, only reporting accumulation for attr2 - Map measurement2 = new HashMap<>(); - Attributes attr2 = Attributes.builder().put("key", "value2").build(); - measurement2.put(attr2, DoubleAccumulation.create(7)); - assertThat( - storage.buildMetricFor( - Resource.empty(), InstrumentationScopeInfo.empty(), measurement2, 0, 20)) - .hasDoubleSumSatisfying( - sum -> - sum.isDelta() - .hasPointsSatisfying( - point -> - point - .hasStartEpochNanos(10) - .hasEpochNanos(20) - .hasAttributes(attr2) - .hasValue(7))); - } - - @Test - void asynchronousDelta_diffsLastTimestamp() { - AggregationTemporality temporality = AggregationTemporality.DELTA; - TemporalMetricStorage storage = - new TemporalMetricStorage<>( - ASYNC_SUM, - /* isSynchronous= */ false, - registeredReader, - temporality, - METRIC_DESCRIPTOR); - // Send in new measurement at time 10 - assertThat( - storage.buildMetricFor( - Resource.empty(), InstrumentationScopeInfo.empty(), createMeasurement(3), 0, 10)) - .hasDoubleSumSatisfying( - sum -> - sum.isDelta() - .hasPointsSatisfying( - point -> point.hasStartEpochNanos(0).hasEpochNanos(10).hasValue(3))); - registeredReader.setLastCollectEpochNanos(10); - - // Send in new measurement at time 30 - assertThat( - storage.buildMetricFor( - Resource.empty(), InstrumentationScopeInfo.empty(), createMeasurement(3), 0, 30)) - .hasDoubleSumSatisfying( - sum -> - sum.isDelta() - .hasPointsSatisfying( - point -> point.hasStartEpochNanos(10).hasEpochNanos(30).hasValue(0))); - registeredReader.setLastCollectEpochNanos(30); - - // Send in new measurement at time 35 - assertThat( - storage.buildMetricFor( - Resource.empty(), InstrumentationScopeInfo.empty(), createMeasurement(2), 0, 35)) - .hasDoubleSumSatisfying( - sum -> - sum.isDelta() - .hasPointsSatisfying( - point -> point.hasStartEpochNanos(30).hasEpochNanos(35).hasValue(-1))); - } -}