Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow exporters to influence Aggregation #3762

Merged
merged 23 commits into from
Oct 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
3bb5625
Major refactoring to Aggregator
jsuereth Oct 16, 2021
eecddb7
Remove extraneous bits of information from AggregatorFactory interface
jsuereth Oct 16, 2021
9a195c3
Spotless + remove extraneous pieces of data from Aggregation interface.
jsuereth Oct 16, 2021
6a07707
Added supported + preferred temprality to export interfaces.
jsuereth Oct 16, 2021
9f12d78
Finish wiring configured temporality through Views.
jsuereth Oct 16, 2021
9d120b2
Fix remaining gauge tests
jsuereth Oct 16, 2021
e227722
Update exporters to specify desired temporality
jsuereth Oct 17, 2021
e2a8865
Other build fixes
jsuereth Oct 17, 2021
bf8e339
Fix javadoc
jsuereth Oct 17, 2021
123f729
add tests for cumulative+delta in temporal storage
jsuereth Oct 17, 2021
fbc1c12
Move diffInPlace and megeInPlace into utility class
jsuereth Oct 17, 2021
a1c598a
Add explicit test for delta vs. cumulative exporters.
jsuereth Oct 17, 2021
619ec2c
Spotless fixes
jsuereth Oct 17, 2021
09137e1
Remove AggregatorFactory
jsuereth Oct 17, 2021
8a7100e
Fixes from review
jsuereth Oct 20, 2021
c94719c
Add javadoc for new parameters in temporal metric storage.
jsuereth Oct 20, 2021
44ecfb8
Merge latest.
jsuereth Oct 23, 2021
cbb9750
Fixes from review.
jsuereth Oct 23, 2021
d99b832
Add some unit tests for the aggregation config
jsuereth Oct 24, 2021
0d0e0db
Add "diff" unit test for histogram + lastvalue
jsuereth Oct 24, 2021
58ba395
Apply suggestions from code review
jsuereth Oct 25, 2021
547abf7
Merge main.
jsuereth Oct 26, 2021
b4152b8
Fixes from code review.
jsuereth Oct 26, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@
import io.opentelemetry.exporter.otlp.internal.metrics.MetricsRequestMarshaler;
import io.opentelemetry.exporter.otlp.internal.okhttp.OkHttpExporter;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.export.MetricExporter;
import java.util.Collection;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;

/** Exports metrics using OTLP via HTTP, using OpenTelemetry's protobuf model. */
Expand Down Expand Up @@ -41,6 +43,14 @@ public static OtlpHttpMetricExporterBuilder builder() {
return new OtlpHttpMetricExporterBuilder();
}

@Nullable
@Override
public final AggregationTemporality getPreferredTemporality() {
// TODO: Lookup based on specification, or constructor
jsuereth marked this conversation as resolved.
Show resolved Hide resolved
// https://github.com/open-telemetry/opentelemetry-java/issues/3790
return null;
}

/**
* Submits all the given metrics in a single batch to the OpenTelemetry collector.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@
import io.opentelemetry.exporter.otlp.internal.grpc.GrpcExporter;
import io.opentelemetry.exporter.otlp.internal.metrics.MetricsRequestMarshaler;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.export.MetricExporter;
import java.util.Collection;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;

/** Exports metrics using OTLP via gRPC, using OpenTelemetry's protobuf model. */
Expand Down Expand Up @@ -43,6 +45,14 @@ public static OtlpGrpcMetricExporterBuilder builder() {
this.delegate = delegate;
}

@Nullable
@Override
public AggregationTemporality getPreferredTemporality() {
// TODO: Lookup based on specification, or constructor
// https://github.com/open-telemetry/opentelemetry-java/issues/3790
return null;
}

/**
* Submits all the given metrics in a single batch to the OpenTelemetry collector.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package io.opentelemetry.exporter.prometheus;

import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.export.MetricProducer;
import io.opentelemetry.sdk.metrics.export.MetricReader;
Expand All @@ -15,6 +16,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;

/**
Expand Down Expand Up @@ -47,6 +49,16 @@ public static MetricReaderFactory create() {
return new Factory();
}

@Override
public EnumSet<AggregationTemporality> getSupportedTemporality() {
return EnumSet.of(AggregationTemporality.CUMULATIVE);
}

@Override
public AggregationTemporality getPreferredTemporality() {
return AggregationTemporality.CUMULATIVE;
}

// Prometheus cannot flush.
@Override
public CompletableResultCode flush() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.sun.net.httpserver.HttpServer;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.internal.DaemonThreadFactory;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.export.MetricExporter;
import io.opentelemetry.sdk.metrics.export.MetricProducer;
Expand All @@ -35,6 +36,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -104,6 +106,16 @@ private void start() {
}
}

@Override
public EnumSet<AggregationTemporality> getSupportedTemporality() {
return EnumSet.of(AggregationTemporality.CUMULATIVE);
}

@Override
public AggregationTemporality getPreferredTemporality() {
return AggregationTemporality.CUMULATIVE;
}

@Override
public CompletableResultCode flush() {
return CompletableResultCode.ofSuccess();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ public SumAssertT isNotMonotonic() {
failWithActualExpectedAndMessage(
actual,
"montonic: fail",
"Exepcted Sum to be non-monotonic",
false,
"Expected Sum to be non-monotonic, found: %s",
actual.isMonotonic());
}
return myself;
Expand All @@ -48,8 +47,7 @@ public SumAssertT isCumulative() {
failWithActualExpectedAndMessage(
actual,
"aggregationTemporality: CUMULATIVE",
"Exepcted Sum to have cumulative aggregation but found <%s>",
AggregationTemporality.CUMULATIVE,
"Expected Sum to have cumulative aggregation but found <%s>",
actual.getAggregationTemporality());
}
return myself;
Expand All @@ -62,8 +60,7 @@ public SumAssertT isDelta() {
failWithActualExpectedAndMessage(
actual,
"aggregationTemporality: DELTA",
"Exepcted Sum to have cumulative aggregation but found <%s>",
AggregationTemporality.DELTA,
"Expected Sum to have delta aggregation but found <%s>",
actual.getAggregationTemporality());
}
return myself;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,11 @@

package io.opentelemetry.sdk.metrics.internal.aggregator;

import io.opentelemetry.sdk.common.InstrumentationLibraryInfo;
import io.opentelemetry.sdk.metrics.common.InstrumentDescriptor;
import io.opentelemetry.sdk.metrics.common.InstrumentType;
import io.opentelemetry.sdk.metrics.common.InstrumentValueType;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.exemplar.ExemplarReservoir;
import io.opentelemetry.sdk.metrics.internal.descriptor.MetricDescriptor;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.metrics.exemplar.ExemplarFilter;
import io.opentelemetry.sdk.metrics.view.Aggregation;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import org.openjdk.jmh.annotations.Benchmark;
Expand All @@ -31,14 +28,11 @@
@State(Scope.Benchmark)
public class DoubleHistogramBenchmark {
private static final Aggregator<HistogramAccumulation> aggregator =
AggregatorFactory.histogram(Arrays.asList(10.0, 100.0, 1_000.0), AggregationTemporality.DELTA)
.create(
Resource.getDefault(),
InstrumentationLibraryInfo.empty(),
Aggregation.explicitBucketHistogram(Arrays.asList(10.0, 100.0, 1_000.0))
.createAggregator(
InstrumentDescriptor.create(
"name", "description", "1", InstrumentType.HISTOGRAM, InstrumentValueType.DOUBLE),
MetricDescriptor.create("name", "description", "1"),
ExemplarReservoir::noSamples);
ExemplarFilter.neverSample());
private AggregatorHandle<HistogramAccumulation> aggregatorHandle;

@Setup(Level.Trial)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,7 @@

package io.opentelemetry.sdk.metrics.internal.aggregator;

import io.opentelemetry.sdk.common.InstrumentationLibraryInfo;
import io.opentelemetry.sdk.metrics.common.InstrumentDescriptor;
import io.opentelemetry.sdk.metrics.common.InstrumentType;
import io.opentelemetry.sdk.metrics.common.InstrumentValueType;
import io.opentelemetry.sdk.metrics.exemplar.ExemplarReservoir;
import io.opentelemetry.sdk.metrics.internal.descriptor.MetricDescriptor;
import io.opentelemetry.sdk.resources.Resource;
import java.util.concurrent.TimeUnit;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
Expand All @@ -29,14 +23,7 @@
@State(Scope.Benchmark)
public class DoubleMinMaxSumCountBenchmark {
private static final Aggregator<MinMaxSumCountAccumulation> aggregator =
AggregatorFactory.minMaxSumCount()
.create(
Resource.getDefault(),
InstrumentationLibraryInfo.empty(),
InstrumentDescriptor.create(
"name", "description", "1", InstrumentType.HISTOGRAM, InstrumentValueType.DOUBLE),
MetricDescriptor.create("name", "description", "1"),
ExemplarReservoir::noSamples);
new DoubleMinMaxSumCountAggregator(ExemplarReservoir::noSamples);
private AggregatorHandle<MinMaxSumCountAccumulation> aggregatorHandle;

@Setup(Level.Trial)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,7 @@

package io.opentelemetry.sdk.metrics.internal.aggregator;

import io.opentelemetry.sdk.common.InstrumentationLibraryInfo;
import io.opentelemetry.sdk.metrics.common.InstrumentDescriptor;
import io.opentelemetry.sdk.metrics.common.InstrumentType;
import io.opentelemetry.sdk.metrics.common.InstrumentValueType;
import io.opentelemetry.sdk.metrics.exemplar.ExemplarReservoir;
import io.opentelemetry.sdk.metrics.internal.descriptor.MetricDescriptor;
import io.opentelemetry.sdk.resources.Resource;
import java.util.concurrent.TimeUnit;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
Expand All @@ -29,14 +23,7 @@
@State(Scope.Benchmark)
public class LongMinMaxSumCountBenchmark {
private static final Aggregator<MinMaxSumCountAccumulation> aggregator =
AggregatorFactory.minMaxSumCount()
.create(
Resource.getDefault(),
InstrumentationLibraryInfo.empty(),
InstrumentDescriptor.create(
"name", "description", "1", InstrumentType.HISTOGRAM, InstrumentValueType.LONG),
MetricDescriptor.create("name", "description", "1"),
ExemplarReservoir::noSamples);
new LongMinMaxSumCountAggregator(ExemplarReservoir::noSamples);
private AggregatorHandle<MinMaxSumCountAccumulation> aggregatorHandle;

@Setup(Level.Trial)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@
import io.opentelemetry.sdk.metrics.export.MetricReader;
import io.opentelemetry.sdk.metrics.export.MetricReaderFactory;
import io.opentelemetry.sdk.metrics.internal.export.CollectionHandle;
import io.opentelemetry.sdk.metrics.internal.export.CollectionInfo;
import io.opentelemetry.sdk.metrics.internal.state.MeterProviderSharedState;
import io.opentelemetry.sdk.metrics.internal.view.ViewRegistry;
import io.opentelemetry.sdk.resources.Resource;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -45,7 +48,7 @@ final class DefaultSdkMeterProvider implements SdkMeterProvider {
private final ComponentRegistry<SdkMeter> registry;
private final MeterProviderSharedState sharedState;
private final Set<CollectionHandle> collectors;
private final List<MetricReader> readers;
private final Map<CollectionHandle, CollectionInfo> collectionInfoMap;
private final AtomicBoolean isClosed = new AtomicBoolean(false);
private final AtomicLong lastCollectionTimestamp;

Expand All @@ -72,14 +75,14 @@ final class DefaultSdkMeterProvider implements SdkMeterProvider {
// These are guaranteed to be unique per-reader for this SDK, and only this SDK.
// These are *only* mutated in our constructor, and safe to use concurrently after construction.
collectors = CollectionHandle.mutableSet();
readers = new ArrayList<>();
collectionInfoMap = new HashMap<>();
Supplier<CollectionHandle> handleSupplier = CollectionHandle.createSupplier();
for (MetricReaderFactory readerFactory : readerFactories) {
CollectionHandle handle = handleSupplier.get();
// TODO: handle failure in creation or just crash?
MetricReader reader = readerFactory.apply(new LeasedMetricProducer(handle));
collectionInfoMap.put(handle, CollectionInfo.create(handle, collectors, reader));
collectors.add(handle);
readers.add(reader);
}
}

Expand All @@ -95,8 +98,8 @@ public MeterBuilder meterBuilder(@Nullable String instrumentationName) {
@Override
public CompletableResultCode forceFlush() {
List<CompletableResultCode> results = new ArrayList<>();
for (MetricReader reader : readers) {
results.add(reader.flush());
for (CollectionInfo collectionInfo : collectionInfoMap.values()) {
results.add(collectionInfo.getReader().shutdown());
}
return CompletableResultCode.ofAll(results);
}
Expand All @@ -108,8 +111,8 @@ public CompletableResultCode close() {
return CompletableResultCode.ofSuccess();
}
List<CompletableResultCode> results = new ArrayList<>();
for (MetricReader reader : readers) {
results.add(reader.shutdown());
for (CollectionInfo info : collectionInfoMap.values()) {
results.add(info.getReader().shutdown());
}
return CompletableResultCode.ofAll(results);
}
Expand Down Expand Up @@ -147,7 +150,9 @@ public Collection<MetricData> collectAllMetrics() {
for (SdkMeter meter : meters) {
result.addAll(
meter.collectAll(
handle, collectors, sharedState.getClock().now(), disableSynchronousCollection));
collectionInfoMap.get(handle),
sharedState.getClock().now(),
disableSynchronousCollection));
}
return Collections.unmodifiableCollection(result);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,10 @@
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.sdk.common.InstrumentationLibraryInfo;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.internal.export.CollectionHandle;
import io.opentelemetry.sdk.metrics.internal.export.CollectionInfo;
import io.opentelemetry.sdk.metrics.internal.state.MeterProviderSharedState;
import io.opentelemetry.sdk.metrics.internal.state.MeterSharedState;
import java.util.Collection;
import java.util.Set;

/** {@link SdkMeter} is SDK implementation of {@link Meter}. */
final class SdkMeter implements Meter {
Expand All @@ -37,16 +36,9 @@ InstrumentationLibraryInfo getInstrumentationLibraryInfo() {

/** Collects all the metric recordings that changed since the previous call. */
Collection<MetricData> collectAll(
CollectionHandle collector,
Set<CollectionHandle> allCollectors,
long epochNanos,
boolean suppressSynchronousCollection) {
CollectionInfo collectionInfo, long epochNanos, boolean suppressSynchronousCollection) {
return meterSharedState.collectAll(
collector,
allCollectors,
meterProviderSharedState,
epochNanos,
suppressSynchronousCollection);
collectionInfo, meterProviderSharedState, epochNanos, suppressSynchronousCollection);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@
package io.opentelemetry.sdk.metrics.export;

import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.data.MetricData;
import java.util.Collection;
import java.util.EnumSet;
import javax.annotation.Nullable;

/**
* {@code MetricExporter} is the interface that all "push based" metric libraries should use to
Expand All @@ -17,6 +20,17 @@
*/
public interface MetricExporter {

/** Returns the set of all supported temporalities for this exporter. */
default EnumSet<AggregationTemporality> getSupportedTemporality() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the purpose of this to fail if someone configures a view for an exporter that doesn't support the specified temporality?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Theoretically.

However, the Metrics SiG Is removing AggregationTemporality from the View API, so the purpose here is that an exporter can "handle everything" by default.

return EnumSet.allOf(AggregationTemporality.class);
}

/** Returns the preferred temporality for metrics. */
@Nullable
default AggregationTemporality getPreferredTemporality() {
return null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AggregatedTemporality.CUMULATIVE? Just kidding (mostly) :-)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ha!

}

/**
* Exports the collection of given {@link MetricData}. Note that export operations can be
* performed simultaneously depending on the type of metric reader being used. However, the caller
Expand Down
Loading