Skip to content

Commit

Permalink
Delete notion of accumulation
Browse files Browse the repository at this point in the history
  • Loading branch information
jack-berg committed Jan 29, 2023
1 parent 1e41653 commit efc98fb
Show file tree
Hide file tree
Showing 41 changed files with 882 additions and 1,032 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,25 @@

package io.opentelemetry.sdk.metrics.internal.aggregator;

import io.opentelemetry.sdk.metrics.InstrumentType;
import io.opentelemetry.sdk.metrics.data.ExemplarData;
import io.opentelemetry.sdk.metrics.data.PointData;
import io.opentelemetry.sdk.metrics.internal.descriptor.InstrumentDescriptor;

abstract class AbstractSumAggregator<T, U extends ExemplarData> implements Aggregator<T, U> {
abstract class AbstractSumAggregator<T extends PointData, U extends ExemplarData>
implements Aggregator<T, U> {
private final boolean isMonotonic;

AbstractSumAggregator(InstrumentDescriptor instrumentDescriptor) {
this.isMonotonic = MetricDataUtils.isMonotonicInstrument(instrumentDescriptor);
this.isMonotonic = isMonotonicInstrument(instrumentDescriptor);
}

/** Returns true if the instrument does not allow negative measurements. */
private static boolean isMonotonicInstrument(InstrumentDescriptor descriptor) {
InstrumentType type = descriptor.getType();
return type == InstrumentType.HISTOGRAM
|| type == InstrumentType.COUNTER
|| type == InstrumentType.OBSERVABLE_COUNTER;
}

final boolean isMonotonic() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,36 +5,30 @@

package io.opentelemetry.sdk.metrics.internal.aggregator;

import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.data.DoubleExemplarData;
import io.opentelemetry.sdk.metrics.data.ExemplarData;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.data.MetricDataType;
import io.opentelemetry.sdk.metrics.data.PointData;
import io.opentelemetry.sdk.metrics.internal.descriptor.MetricDescriptor;
import io.opentelemetry.sdk.metrics.internal.state.Measurement;
import io.opentelemetry.sdk.resources.Resource;
import java.util.Map;
import javax.annotation.Nullable;
import java.util.Collection;
import javax.annotation.concurrent.Immutable;

/**
* Aggregator represents the abstract class for all the available aggregations that can be computed
* during the accumulation phase for all the instrument.
*
* <p>The synchronous instruments will create an {@link AggregatorHandle} to record individual
* measurements synchronously, and for asynchronous the {@link #accumulateDoubleMeasurement} or
* {@link #accumulateLongMeasurement} will be used when reading values from the instrument
* callbacks.
* during the collection phase for all the instruments.
*
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
@Immutable
public interface Aggregator<T, U extends ExemplarData> {
public interface Aggregator<T extends PointData, U extends ExemplarData> {
/** Returns the drop aggregator, an aggregator that drops measurements. */
static Aggregator<Object, DoubleExemplarData> drop() {
static Aggregator<?, DoubleExemplarData> drop() {
return DropAggregator.INSTANCE;
}

Expand All @@ -47,46 +41,25 @@ static Aggregator<Object, DoubleExemplarData> drop() {
AggregatorHandle<T, U> createHandle();

/**
* Returns a new {@code Accumulation} for the given value. This MUST be used by the asynchronous
* instruments to create {@code Accumulation} that are passed to the processor.
* Returns a new DELTA point by computing the difference between two cumulative points.
*
* @param value the given value to be used to create the {@code Accumulation}.
* @return a new {@code Accumulation} for the given value, or {@code null} if there are no
* recordings.
*/
@Nullable
default T accumulateLongMeasurement(long value, Attributes attributes, Context context) {
AggregatorHandle<T, U> handle = createHandle();
handle.recordLong(value, attributes, context);
return handle.accumulateThenMaybeReset(attributes, /* reset= */ true);
}

/**
* Returns a new {@code Accumulation} for the given value. This MUST be used by the asynchronous
* instruments to create {@code Accumulation} that are passed to the processor.
* <p>Aggregators MUST implement diff if it can be used with asynchronous instruments.
*
* @param value the given value to be used to create the {@code Accumulation}.
* @return a new {@code Accumulation} for the given value, or {@code null} if there are no
* recordings.
* @param previousCumulative the previously captured point.
* @param currentCumulative the newly captured (cumulative) point.
* @return The resulting delta point.
*/
@Nullable
default T accumulateDoubleMeasurement(double value, Attributes attributes, Context context) {
AggregatorHandle<T, U> handle = createHandle();
handle.recordDouble(value, attributes, context);
return handle.accumulateThenMaybeReset(attributes, /* reset= */ true);
default T diff(T previousCumulative, T currentCumulative) {
throw new UnsupportedOperationException("This aggregator does not support diff.");
}

/**
* Returns a new DELTA aggregation by comparing two cumulative measurements.
* Return a new point representing the measurement.
*
* <p>Aggregators MUST implement diff if it can be used with asynchronous instruments.
*
* @param previousCumulative the previously captured accumulation.
* @param currentCumulative the newly captured (cumulative) accumulation.
* @return The resulting delta accumulation.
*/
default T diff(T previousCumulative, T currentCumulative) {
throw new UnsupportedOperationException("This aggregator does not support diff.");
default T toPoint(Measurement measurement) {
throw new UnsupportedOperationException("This aggregator does not support toPoint.");
}

/**
Expand All @@ -95,19 +68,14 @@ default T diff(T previousCumulative, T currentCumulative) {
* @param resource the resource producing the metric.
* @param instrumentationScopeInfo the scope that instrumented the metric.
* @param metricDescriptor the name, description and unit of the metric.
* @param accumulationByLabels the map of Labels to Accumulation.
* @param temporality the temporality of the accumulation.
* @param startEpochNanos the startEpochNanos for the {@code Point}.
* @param epochNanos the epochNanos for the {@code Point}.
* @param points list of points
* @param temporality the temporality of the metric.
* @return the {@link MetricDataType} that this {@code Aggregation} will produce.
*/
MetricData toMetricData(
Resource resource,
InstrumentationScopeInfo instrumentationScopeInfo,
MetricDescriptor metricDescriptor,
Map<Attributes, T> accumulationByLabels,
AggregationTemporality temporality,
long startEpochNanos,
long lastCollectionEpoch,
long epochNanos);
Collection<T> points,
AggregationTemporality temporality);
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package io.opentelemetry.sdk.metrics.internal.aggregator;

import io.opentelemetry.sdk.metrics.data.ExemplarData;
import io.opentelemetry.sdk.metrics.data.PointData;
import io.opentelemetry.sdk.metrics.internal.descriptor.InstrumentDescriptor;
import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarFilter;

Expand All @@ -28,7 +29,7 @@ public interface AggregatorFactory {
* @return a new {@link Aggregator}. {@link Aggregator#drop()} indicates no measurements should be
* recorded.
*/
<T, U extends ExemplarData> Aggregator<T, U> createAggregator(
<T extends PointData, U extends ExemplarData> Aggregator<T, U> createAggregator(
InstrumentDescriptor instrumentDescriptor, ExemplarFilter exemplarFilter);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.metrics.data.ExemplarData;
import io.opentelemetry.sdk.metrics.data.PointData;
import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarReservoir;
import io.opentelemetry.sdk.metrics.internal.state.BoundStorageHandle;
import java.util.List;
Expand All @@ -32,7 +33,8 @@
* at any time.
*/
@ThreadSafe
public abstract class AggregatorHandle<T, U extends ExemplarData> implements BoundStorageHandle {
public abstract class AggregatorHandle<T extends PointData, U extends ExemplarData>
implements BoundStorageHandle {
// Atomically counts the number of references (usages) while also keeping a state of
// mapped/unmapped into a registry map.
private final AtomicLong refCountMapped;
Expand Down Expand Up @@ -89,18 +91,29 @@ public final boolean tryUnmap() {
* current value in this {@code Aggregator}.
*/
@Nullable
public final T accumulateThenMaybeReset(Attributes attributes, boolean reset) {
public final T aggregateThenMaybeReset(
long startEpochNanos, long epochNanos, Attributes attributes, boolean reset) {
if (!hasRecordings) {
return null;
}
if (reset) {
hasRecordings = false;
}
return doAccumulateThenMaybeReset(exemplarReservoir.collectAndReset(attributes), reset);
return doAggregateThenMaybeReset(
startEpochNanos,
epochNanos,
attributes,
exemplarReservoir.collectAndReset(attributes),
reset);
}

/** Implementation of the {@link #accumulateThenMaybeReset(Attributes, boolean)}. */
protected abstract T doAccumulateThenMaybeReset(List<U> exemplars, boolean reset);
/** Implementation of the {@link #aggregateThenMaybeReset(long, long, Attributes, boolean)} . */
protected abstract T doAggregateThenMaybeReset(
long startEpochNanos,
long epochNanos,
Attributes attributes,
List<U> exemplars,
boolean reset);

@Override
public final void recordLong(long value, Attributes attributes, Context context) {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,22 @@
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.internal.GuardedBy;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
import io.opentelemetry.sdk.internal.PrimitiveLongList;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.data.DoubleExemplarData;
import io.opentelemetry.sdk.metrics.data.HistogramPointData;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableHistogramData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableHistogramPointData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableMetricData;
import io.opentelemetry.sdk.metrics.internal.descriptor.MetricDescriptor;
import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarReservoir;
import io.opentelemetry.sdk.resources.Resource;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;

Expand All @@ -31,7 +34,7 @@
* at any time.
*/
public final class DoubleExplicitBucketHistogramAggregator
implements Aggregator<ExplicitBucketHistogramAccumulation, DoubleExemplarData> {
implements Aggregator<HistogramPointData, DoubleExemplarData> {
private final double[] boundaries;

// a cache for converting to MetricData
Expand All @@ -58,39 +61,29 @@ public DoubleExplicitBucketHistogramAggregator(
}

@Override
public AggregatorHandle<ExplicitBucketHistogramAccumulation, DoubleExemplarData> createHandle() {
return new Handle(this.boundaries, reservoirSupplier.get());
public AggregatorHandle<HistogramPointData, DoubleExemplarData> createHandle() {
return new Handle(this.boundaryList, this.boundaries, reservoirSupplier.get());
}

@Override
public MetricData toMetricData(
Resource resource,
InstrumentationScopeInfo instrumentationScopeInfo,
MetricDescriptor metricDescriptor,
Map<Attributes, ExplicitBucketHistogramAccumulation> accumulationByLabels,
AggregationTemporality temporality,
long startEpochNanos,
long lastCollectionEpoch,
long epochNanos) {
Collection<HistogramPointData> pointData,
AggregationTemporality temporality) {
return ImmutableMetricData.createDoubleHistogram(
resource,
instrumentationScopeInfo,
metricDescriptor.getName(),
metricDescriptor.getDescription(),
metricDescriptor.getSourceInstrument().getUnit(),
ImmutableHistogramData.create(
temporality,
MetricDataUtils.toExplicitBucketHistogramPointList(
accumulationByLabels,
(temporality == AggregationTemporality.CUMULATIVE)
? startEpochNanos
: lastCollectionEpoch,
epochNanos,
boundaryList)));
ImmutableHistogramData.create(temporality, pointData));
}

static final class Handle
extends AggregatorHandle<ExplicitBucketHistogramAccumulation, DoubleExemplarData> {
static final class Handle extends AggregatorHandle<HistogramPointData, DoubleExemplarData> {
// read-only
private final List<Double> boundaryList;
// read-only
private final double[] boundaries;

Expand All @@ -111,8 +104,12 @@ static final class Handle

private final ReentrantLock lock = new ReentrantLock();

Handle(double[] boundaries, ExemplarReservoir<DoubleExemplarData> reservoir) {
Handle(
List<Double> boundaryList,
double[] boundaries,
ExemplarReservoir<DoubleExemplarData> reservoir) {
super(reservoir);
this.boundaryList = boundaryList;
this.boundaries = boundaries;
this.counts = new long[this.boundaries.length + 1];
this.sum = 0;
Expand All @@ -122,17 +119,24 @@ static final class Handle
}

@Override
protected ExplicitBucketHistogramAccumulation doAccumulateThenMaybeReset(
List<DoubleExemplarData> exemplars, boolean reset) {
protected HistogramPointData doAggregateThenMaybeReset(
long startEpochNanos,
long epochNanos,
Attributes attributes,
List<DoubleExemplarData> exemplars,
boolean reset) {
lock.lock();
try {
ExplicitBucketHistogramAccumulation acc =
ExplicitBucketHistogramAccumulation.create(
HistogramPointData pointData =
ImmutableHistogramPointData.create(
startEpochNanos,
epochNanos,
attributes,
sum,
this.count > 0,
this.count > 0 ? this.min : -1,
this.count > 0 ? this.max : -1,
Arrays.copyOf(counts, counts.length),
this.count > 0 ? this.min : null,
this.count > 0 ? this.max : null,
boundaryList,
PrimitiveLongList.wrap(Arrays.copyOf(counts, counts.length)),
exemplars);
if (reset) {
this.sum = 0;
Expand All @@ -141,7 +145,7 @@ protected ExplicitBucketHistogramAccumulation doAccumulateThenMaybeReset(
this.count = 0;
Arrays.fill(this.counts, 0);
}
return acc;
return pointData;
} finally {
lock.unlock();
}
Expand Down
Loading

0 comments on commit efc98fb

Please sign in to comment.