Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reuse AggregatorHandle with cumulative temporality #5142

Merged
merged 3 commits into from
Jan 30, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ static Aggregator<Object, DoubleExemplarData> drop() {
default T accumulateLongMeasurement(long value, Attributes attributes, Context context) {
AggregatorHandle<T, U> handle = createHandle();
handle.recordLong(value, attributes, context);
return handle.accumulateThenReset(attributes);
return handle.accumulateThenReset(attributes, /* reset= */ true);
}

/**
Expand All @@ -73,21 +73,9 @@ default T accumulateLongMeasurement(long value, Attributes attributes, Context c
default T accumulateDoubleMeasurement(double value, Attributes attributes, Context context) {
AggregatorHandle<T, U> handle = createHandle();
handle.recordDouble(value, attributes, context);
return handle.accumulateThenReset(attributes);
return handle.accumulateThenReset(attributes, /* reset= */ true);
}

/**
* Returns the result of the merge of the given accumulations.
*
* <p>This should always assume that the accumulations do not overlap and merge together for a new
* cumulative report.
*
* @param previousCumulative the previously captured accumulation
* @param delta the newly captured (delta) accumulation
* @return the result of the merge of the given accumulations.
*/
T merge(T previousCumulative, T delta);

/**
* Returns a new DELTA aggregation by comparing two cumulative measurements.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,16 +89,18 @@ public final boolean tryUnmap() {
* Aggregator}.
*/
@Nullable
public final T accumulateThenReset(Attributes attributes) {
public final T accumulateThenReset(Attributes attributes, boolean reset) {
if (!hasRecordings) {
return null;
}
hasRecordings = false;
return doAccumulateThenReset(exemplarReservoir.collectAndReset(attributes));
if (reset) {
hasRecordings = false;
}
return doAccumulateThenReset(exemplarReservoir.collectAndReset(attributes), reset);
}

/** Implementation of the {@code accumulateThenReset}. */
protected abstract T doAccumulateThenReset(List<U> exemplars);
protected abstract T doAccumulateThenReset(List<U> exemplars, boolean reset);

@Override
public final void recordLong(long value, Attributes attributes, Context context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,40 +62,6 @@ public AggregatorHandle<ExplicitBucketHistogramAccumulation, DoubleExemplarData>
return new Handle(this.boundaries, reservoirSupplier.get());
}

/**
* Return the result of the merge of two histogram accumulations. As long as one Aggregator
* instance produces all Accumulations with constant boundaries we don't need to worry about
* merging accumulations with different boundaries.
*/
@Override
public ExplicitBucketHistogramAccumulation merge(
ExplicitBucketHistogramAccumulation previous, ExplicitBucketHistogramAccumulation current) {
long[] previousCounts = previous.getCounts();
long[] mergedCounts = new long[previousCounts.length];
for (int i = 0; i < previousCounts.length; ++i) {
mergedCounts[i] = previousCounts[i] + current.getCounts()[i];
}
double min = -1;
double max = -1;
if (previous.hasMinMax() && current.hasMinMax()) {
min = Math.min(previous.getMin(), current.getMin());
max = Math.max(previous.getMax(), current.getMax());
} else if (previous.hasMinMax()) {
min = previous.getMin();
max = previous.getMax();
} else if (current.hasMinMax()) {
min = current.getMin();
max = current.getMax();
}
return ExplicitBucketHistogramAccumulation.create(
previous.getSum() + current.getSum(),
previous.hasMinMax() || current.hasMinMax(),
min,
max,
mergedCounts,
current.getExemplars());
}

@Override
public MetricData toMetricData(
Resource resource,
Expand Down Expand Up @@ -157,7 +123,7 @@ static final class Handle

@Override
protected ExplicitBucketHistogramAccumulation doAccumulateThenReset(
List<DoubleExemplarData> exemplars) {
List<DoubleExemplarData> exemplars, boolean reset) {
lock.lock();
try {
ExplicitBucketHistogramAccumulation acc =
Expand All @@ -168,11 +134,13 @@ protected ExplicitBucketHistogramAccumulation doAccumulateThenReset(
this.count > 0 ? this.max : -1,
Arrays.copyOf(counts, counts.length),
exemplars);
this.sum = 0;
this.min = Double.MAX_VALUE;
this.max = -1;
this.count = 0;
Arrays.fill(this.counts, 0);
if (reset) {
this.sum = 0;
this.min = Double.MAX_VALUE;
this.max = -1;
this.count = 0;
Arrays.fill(this.counts, 0);
}
return acc;
} finally {
lock.unlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,99 +56,6 @@ public AggregatorHandle<ExponentialHistogramAccumulation, DoubleExemplarData> cr
return new Handle(reservoirSupplier.get(), maxBuckets, maxScale);
}

/**
* Merge the exponential histogram accumulations. Mutates the {@link
* ExponentialHistogramAccumulation#getPositiveBuckets()} and {@link
* ExponentialHistogramAccumulation#getNegativeBuckets()} of {@code previous}. Mutating buckets is
* acceptable because copies are already made in {@link Handle#doAccumulateThenReset(List)}.
*/
@Override
public ExponentialHistogramAccumulation merge(
ExponentialHistogramAccumulation previous, ExponentialHistogramAccumulation current) {

// Create merged buckets
ExponentialHistogramBuckets posBuckets =
merge(previous.getPositiveBuckets(), current.getPositiveBuckets());
ExponentialHistogramBuckets negBuckets =
merge(previous.getNegativeBuckets(), current.getNegativeBuckets());

// resolve possible scale difference due to merge
int commonScale = Math.min(posBuckets.getScale(), negBuckets.getScale());
posBuckets = downscale(posBuckets, commonScale);
negBuckets = downscale(negBuckets, commonScale);
double min = -1;
double max = -1;
if (previous.hasMinMax() && current.hasMinMax()) {
min = Math.min(previous.getMin(), current.getMin());
max = Math.max(previous.getMax(), current.getMax());
} else if (previous.hasMinMax()) {
min = previous.getMin();
max = previous.getMax();
} else if (current.hasMinMax()) {
min = current.getMin();
max = current.getMax();
}
return ExponentialHistogramAccumulation.create(
commonScale,
previous.getSum() + current.getSum(),
previous.hasMinMax() || current.hasMinMax(),
min,
max,
posBuckets,
negBuckets,
previous.getZeroCount() + current.getZeroCount(),
current.getExemplars());
}

/**
* Merge the exponential histogram buckets. If {@code a} is empty, return {@code b}. If {@code b}
* is empty, return {@code a}. Else merge {@code b} into {@code a}.
*
* <p>Assumes {@code a} and {@code b} are either {@link DoubleExponentialHistogramBuckets} or
* {@link EmptyExponentialHistogramBuckets}.
*/
private static ExponentialHistogramBuckets merge(
ExponentialHistogramBuckets a, ExponentialHistogramBuckets b) {
if (a instanceof EmptyExponentialHistogramBuckets || a.getTotalCount() == 0) {
return b;
}
if (b instanceof EmptyExponentialHistogramBuckets || b.getTotalCount() == 0) {
return a;
}
if ((a instanceof DoubleExponentialHistogramBuckets)
&& (b instanceof DoubleExponentialHistogramBuckets)) {
DoubleExponentialHistogramBuckets a1 = (DoubleExponentialHistogramBuckets) a;
DoubleExponentialHistogramBuckets b2 = (DoubleExponentialHistogramBuckets) b;
a1.mergeInto(b2);
return a1;
}
throw new IllegalStateException(
"Unable to merge ExponentialHistogramBuckets. Unrecognized implementation.");
}

/**
* Downscale the {@code buckets} to the {@code targetScale}.
*
* <p>Assumes {@code a} and {@code b} are either {@link DoubleExponentialHistogramBuckets} or
* {@link EmptyExponentialHistogramBuckets}.
*/
private static ExponentialHistogramBuckets downscale(
ExponentialHistogramBuckets buckets, int targetScale) {
if (buckets.getScale() == targetScale) {
return buckets;
}
if (buckets instanceof EmptyExponentialHistogramBuckets) {
return EmptyExponentialHistogramBuckets.get(targetScale);
}
if (buckets instanceof DoubleExponentialHistogramBuckets) {
DoubleExponentialHistogramBuckets buckets1 = (DoubleExponentialHistogramBuckets) buckets;
buckets1.downscale(buckets1.getScale() - targetScale);
return buckets1;
}
throw new IllegalStateException(
"Unable to merge ExponentialHistogramBuckets. Unrecognized implementation");
}

@Override
public MetricData toMetricData(
Resource resource,
Expand Down Expand Up @@ -200,18 +107,22 @@ static final class Handle

@Override
protected synchronized ExponentialHistogramAccumulation doAccumulateThenReset(
List<DoubleExemplarData> exemplars) {
List<DoubleExemplarData> exemplars, boolean reset) {
ExponentialHistogramBuckets positiveBuckets;
ExponentialHistogramBuckets negativeBuckets;
if (this.positiveBuckets != null) {
positiveBuckets = this.positiveBuckets.copy();
this.positiveBuckets.clear();
if (reset) {
this.positiveBuckets.clear();
}
} else {
positiveBuckets = EmptyExponentialHistogramBuckets.get(scale);
}
if (this.negativeBuckets != null) {
negativeBuckets = this.negativeBuckets.copy();
this.negativeBuckets.clear();
if (reset) {
this.negativeBuckets.clear();
}
} else {
negativeBuckets = EmptyExponentialHistogramBuckets.get(scale);
}
Expand All @@ -226,11 +137,13 @@ protected synchronized ExponentialHistogramAccumulation doAccumulateThenReset(
negativeBuckets,
zeroCount,
exemplars);
this.sum = 0;
this.zeroCount = 0;
this.min = Double.MAX_VALUE;
this.max = -1;
this.count = 0;
if (reset) {
this.sum = 0;
this.zeroCount = 0;
this.min = Double.MAX_VALUE;
this.max = -1;
this.count = 0;
}
return acc;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.opentelemetry.sdk.resources.Resource;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -48,11 +49,6 @@ public AggregatorHandle<DoubleAccumulation, DoubleExemplarData> createHandle() {
return new Handle(reservoirSupplier.get());
}

@Override
public DoubleAccumulation merge(DoubleAccumulation previous, DoubleAccumulation current) {
return current;
}

@Override
public DoubleAccumulation diff(DoubleAccumulation previous, DoubleAccumulation current) {
return current;
Expand Down Expand Up @@ -94,8 +90,12 @@ private Handle(ExemplarReservoir<DoubleExemplarData> reservoir) {
}

@Override
protected DoubleAccumulation doAccumulateThenReset(List<DoubleExemplarData> exemplars) {
return DoubleAccumulation.create(this.current.getAndSet(DEFAULT_VALUE), exemplars);
protected DoubleAccumulation doAccumulateThenReset(
List<DoubleExemplarData> exemplars, boolean reset) {
if (reset) {
return DoubleAccumulation.create(this.current.getAndSet(DEFAULT_VALUE), exemplars);
}
return DoubleAccumulation.create(Objects.requireNonNull(this.current.get()), exemplars);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,6 @@ public DoubleAccumulation accumulateDoubleMeasurement(
return DoubleAccumulation.create(value);
}

@Override
public DoubleAccumulation merge(
DoubleAccumulation previousAccumulation, DoubleAccumulation accumulation) {
return DoubleAccumulation.create(
previousAccumulation.getValue() + accumulation.getValue(), accumulation.getExemplars());
}

@Override
public DoubleAccumulation diff(
DoubleAccumulation previousAccumulation, DoubleAccumulation accumulation) {
Expand Down Expand Up @@ -107,8 +100,12 @@ static final class Handle extends AggregatorHandle<DoubleAccumulation, DoubleExe
}

@Override
protected DoubleAccumulation doAccumulateThenReset(List<DoubleExemplarData> exemplars) {
return DoubleAccumulation.create(this.current.sumThenReset(), exemplars);
protected DoubleAccumulation doAccumulateThenReset(
List<DoubleExemplarData> exemplars, boolean reset) {
if (reset) {
return DoubleAccumulation.create(this.current.sumThenReset(), exemplars);
}
return DoubleAccumulation.create(this.current.sum(), exemplars);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ protected void doRecordLong(long value) {}
protected void doRecordDouble(double value) {}

@Override
protected Object doAccumulateThenReset(List<DoubleExemplarData> exemplars) {
protected Object doAccumulateThenReset(List<DoubleExemplarData> exemplars, boolean reset) {
return ACCUMULATION;
}
};
Expand All @@ -49,11 +49,6 @@ public AggregatorHandle<Object, DoubleExemplarData> createHandle() {
return HANDLE;
}

@Override
public Object merge(Object previousAccumulation, Object accumulation) {
return ACCUMULATION;
}

@Override
public MetricData toMetricData(
Resource resource,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.opentelemetry.sdk.resources.Resource;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -45,11 +46,6 @@ public AggregatorHandle<LongAccumulation, LongExemplarData> createHandle() {
return new Handle(reservoirSupplier.get());
}

@Override
public LongAccumulation merge(LongAccumulation previous, LongAccumulation current) {
return current;
}

@Override
public LongAccumulation diff(LongAccumulation previous, LongAccumulation current) {
return current;
Expand Down Expand Up @@ -90,8 +86,12 @@ static final class Handle extends AggregatorHandle<LongAccumulation, LongExempla
}

@Override
protected LongAccumulation doAccumulateThenReset(List<LongExemplarData> exemplars) {
return LongAccumulation.create(this.current.getAndSet(DEFAULT_VALUE), exemplars);
protected LongAccumulation doAccumulateThenReset(
List<LongExemplarData> exemplars, boolean reset) {
if (reset) {
return LongAccumulation.create(this.current.getAndSet(DEFAULT_VALUE), exemplars);
}
return LongAccumulation.create(Objects.requireNonNull(this.current.get()), exemplars);
}

@Override
Expand Down
Loading