Skip to content

Commit

Permalink
Memory mode: Adding support for synchronous instruments - Counter (#6182
Browse files Browse the repository at this point in the history
)
  • Loading branch information
asafm committed Jan 31, 2024
1 parent 1969059 commit 0aa223d
Show file tree
Hide file tree
Showing 13 changed files with 386 additions and 134 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@ public void normalizedAllocationRateTest() throws RunnerException {
assertThat(immutableDataAllocRate).isNotNull().isNotZero();
assertThat(reusableDataAllocRate).isNotNull().isNotZero();

float dataAllocRateReductionPercentage =
TestInstrumentType.valueOf(testInstrumentType)
.getDataAllocRateReductionPercentage();

// If this test suddenly fails for you this means you have changed the code in a way
// that allocates more memory than before. You can find out where, by running
// ProfileBenchmark class and looking at the flame graph. Make sure to
Expand All @@ -116,7 +120,7 @@ public void normalizedAllocationRateTest() throws RunnerException {
.describedAs(
"Aggregation temporality = %s, testInstrumentType = %s",
aggregationTemporality, testInstrumentType)
.isCloseTo(99.8, Offset.offset(2.0));
.isCloseTo(dataAllocRateReductionPercentage, Offset.offset(2.0));
});
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,34 +9,45 @@
import io.opentelemetry.sdk.metrics.Aggregation;
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
import io.opentelemetry.sdk.metrics.internal.state.tester.AsyncCounterTester;
import io.opentelemetry.sdk.metrics.internal.state.tester.DoubleSumTester;
import io.opentelemetry.sdk.metrics.internal.state.tester.ExplicitBucketHistogramTester;
import io.opentelemetry.sdk.metrics.internal.state.tester.ExponentialHistogramTester;
import io.opentelemetry.sdk.metrics.internal.state.tester.LongSumTester;
import java.util.List;
import java.util.Random;
import java.util.function.Supplier;

@SuppressWarnings("ImmutableEnumChecker")
public enum TestInstrumentType {
ASYNC_COUNTER() {
@Override
InstrumentTester createInstrumentTester() {
return new AsyncCounterTester();
}
},
EXPONENTIAL_HISTOGRAM() {
@Override
InstrumentTester createInstrumentTester() {
return new ExponentialHistogramTester();
}
},
EXPLICIT_BUCKET() {
@Override
InstrumentTester createInstrumentTester() {
return new ExplicitBucketHistogramTester();
}
};

abstract InstrumentTester createInstrumentTester();

TestInstrumentType() {}
ASYNC_COUNTER(AsyncCounterTester::new),
EXPONENTIAL_HISTOGRAM(ExponentialHistogramTester::new),
EXPLICIT_BUCKET(ExplicitBucketHistogramTester::new),
LONG_SUM(LongSumTester::new, /* dataAllocRateReductionPercentage= */ 97.3f),
DOUBLE_SUM(DoubleSumTester::new, /* dataAllocRateReductionPercentage= */ 97.3f);

private final Supplier<? extends InstrumentTester> instrumentTesterInitializer;
private final float dataAllocRateReductionPercentage;

TestInstrumentType(Supplier<? extends InstrumentTester> instrumentTesterInitializer) {
this.dataAllocRateReductionPercentage = 99.8f; // default
this.instrumentTesterInitializer = instrumentTesterInitializer;
}

// Some instruments have different reduction percentage.
TestInstrumentType(
Supplier<? extends InstrumentTester> instrumentTesterInitializer,
float dataAllocRateReductionPercentage) {
this.instrumentTesterInitializer = instrumentTesterInitializer;
this.dataAllocRateReductionPercentage = dataAllocRateReductionPercentage;
}

float getDataAllocRateReductionPercentage() {
return dataAllocRateReductionPercentage;
}

InstrumentTester createInstrumentTester() {
return instrumentTesterInitializer.get();
}

public interface InstrumentTester {
Aggregation testedAggregation();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.metrics.internal.state.tester;

import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.DoubleCounter;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.sdk.metrics.Aggregation;
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
import io.opentelemetry.sdk.metrics.internal.state.TestInstrumentType;
import java.util.List;
import java.util.Random;

public class DoubleSumTester implements TestInstrumentType.InstrumentTester {
private static final int measurementsPerAttributeSet = 1_000;

static class DoubleSumState implements TestInstrumentType.TestInstrumentsState {
DoubleCounter doubleCounter;
}

@Override
public Aggregation testedAggregation() {
return Aggregation.sum();
}

@Override
public TestInstrumentType.TestInstrumentsState buildInstruments(
double instrumentCount,
SdkMeterProvider sdkMeterProvider,
List<Attributes> attributesList,
Random random) {
DoubleSumState doubleSumState = new DoubleSumState();

Meter meter = sdkMeterProvider.meterBuilder("meter").build();
doubleSumState.doubleCounter = meter.counterBuilder("test.double.sum").ofDoubles().build();

return doubleSumState;
}

@SuppressWarnings("ForLoopReplaceableByForEach") // This is for GC sensitivity testing: no streams
@Override
public void recordValuesInInstruments(
TestInstrumentType.TestInstrumentsState testInstrumentsState,
List<Attributes> attributesList,
Random random) {
DoubleSumState state = (DoubleSumState) testInstrumentsState;

for (int j = 0; j < attributesList.size(); j++) {
Attributes attributes = attributesList.get(j);
for (int i = 0; i < measurementsPerAttributeSet; i++) {
state.doubleCounter.add(1.2f, attributes);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.metrics.internal.state.tester;

import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.LongCounter;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.sdk.metrics.Aggregation;
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
import io.opentelemetry.sdk.metrics.internal.state.TestInstrumentType;
import java.util.List;
import java.util.Random;

public class LongSumTester implements TestInstrumentType.InstrumentTester {
private static final int measurementsPerAttributeSet = 1_000;

static class LongSumState implements TestInstrumentType.TestInstrumentsState {
LongCounter longCounter;
}

@Override
public Aggregation testedAggregation() {
return Aggregation.sum();
}

@Override
public TestInstrumentType.TestInstrumentsState buildInstruments(
double instrumentCount,
SdkMeterProvider sdkMeterProvider,
List<Attributes> attributesList,
Random random) {
LongSumState longSumState = new LongSumState();

Meter meter = sdkMeterProvider.meterBuilder("meter").build();
longSumState.longCounter = meter.counterBuilder("test.long.sum").build();

return longSumState;
}

@SuppressWarnings("ForLoopReplaceableByForEach") // This is for GC sensitivity testing: no streams
@Override
public void recordValuesInInstruments(
TestInstrumentType.TestInstrumentsState testInstrumentsState,
List<Attributes> attributesList,
Random random) {
LongSumState state = (LongSumState) testInstrumentsState;

for (int j = 0; j < attributesList.size(); j++) {
Attributes attributes = attributesList.get(j);
for (int i = 0; i < measurementsPerAttributeSet; i++) {
state.longCounter.add(1, attributes);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
import io.opentelemetry.sdk.common.export.MemoryMode;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.data.DoubleExemplarData;
import io.opentelemetry.sdk.metrics.data.DoublePointData;
Expand All @@ -25,6 +26,7 @@
import java.util.Collection;
import java.util.List;
import java.util.function.Supplier;
import javax.annotation.Nullable;

/**
* Sum aggregator that keeps values as {@code double}s.
Expand All @@ -35,24 +37,28 @@
public final class DoubleSumAggregator
extends AbstractSumAggregator<DoublePointData, DoubleExemplarData> {
private final Supplier<ExemplarReservoir<DoubleExemplarData>> reservoirSupplier;
private final MemoryMode memoryMode;

/**
* Constructs a sum aggregator.
*
* @param instrumentDescriptor The instrument being recorded, used to compute monotonicity.
* @param reservoirSupplier Supplier of exemplar reservoirs per-stream.
* @param memoryMode The memory mode to use.
*/
public DoubleSumAggregator(
InstrumentDescriptor instrumentDescriptor,
Supplier<ExemplarReservoir<DoubleExemplarData>> reservoirSupplier) {
Supplier<ExemplarReservoir<DoubleExemplarData>> reservoirSupplier,
MemoryMode memoryMode) {
super(instrumentDescriptor);

this.reservoirSupplier = reservoirSupplier;
this.memoryMode = memoryMode;
}

@Override
public AggregatorHandle<DoublePointData, DoubleExemplarData> createHandle() {
return new Handle(reservoirSupplier.get());
return new Handle(reservoirSupplier.get(), memoryMode);
}

@Override
Expand Down Expand Up @@ -124,8 +130,12 @@ public MetricData toMetricData(
static final class Handle extends AggregatorHandle<DoublePointData, DoubleExemplarData> {
private final DoubleAdder current = AdderUtil.createDoubleAdder();

Handle(ExemplarReservoir<DoubleExemplarData> exemplarReservoir) {
// Only used if memoryMode == MemoryMode.REUSABLE_DATA
@Nullable private final MutableDoublePointData reusablePoint;

Handle(ExemplarReservoir<DoubleExemplarData> exemplarReservoir, MemoryMode memoryMode) {
super(exemplarReservoir);
reusablePoint = memoryMode == MemoryMode.REUSABLE_DATA ? new MutableDoublePointData() : null;
}

@Override
Expand All @@ -136,8 +146,13 @@ protected DoublePointData doAggregateThenMaybeReset(
List<DoubleExemplarData> exemplars,
boolean reset) {
double value = reset ? this.current.sumThenReset() : this.current.sum();
return ImmutableDoublePointData.create(
startEpochNanos, epochNanos, attributes, value, exemplars);
if (reusablePoint != null) {
reusablePoint.set(startEpochNanos, epochNanos, attributes, value, exemplars);
return reusablePoint;
} else {
return ImmutableDoublePointData.create(
startEpochNanos, epochNanos, attributes, value, exemplars);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
import io.opentelemetry.sdk.common.export.MemoryMode;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.data.LongExemplarData;
import io.opentelemetry.sdk.metrics.data.LongPointData;
Expand All @@ -25,6 +26,7 @@
import java.util.Collection;
import java.util.List;
import java.util.function.Supplier;
import javax.annotation.Nullable;

/**
* Sum aggregator that keeps values as {@code long}s.
Expand All @@ -36,17 +38,20 @@ public final class LongSumAggregator
extends AbstractSumAggregator<LongPointData, LongExemplarData> {

private final Supplier<ExemplarReservoir<LongExemplarData>> reservoirSupplier;
private final MemoryMode memoryMode;

public LongSumAggregator(
InstrumentDescriptor instrumentDescriptor,
Supplier<ExemplarReservoir<LongExemplarData>> reservoirSupplier) {
Supplier<ExemplarReservoir<LongExemplarData>> reservoirSupplier,
MemoryMode memoryMode) {
super(instrumentDescriptor);
this.reservoirSupplier = reservoirSupplier;
this.memoryMode = memoryMode;
}

@Override
public AggregatorHandle<LongPointData, LongExemplarData> createHandle() {
return new Handle(reservoirSupplier.get());
return new Handle(reservoirSupplier.get(), memoryMode);
}

@Override
Expand Down Expand Up @@ -118,8 +123,13 @@ public MetricData toMetricData(
static final class Handle extends AggregatorHandle<LongPointData, LongExemplarData> {
private final LongAdder current = AdderUtil.createLongAdder();

Handle(ExemplarReservoir<LongExemplarData> exemplarReservoir) {
// Only used if memoryMode == MemoryMode.REUSABLE_DATA
@Nullable private final MutableLongPointData reusablePointData;

Handle(ExemplarReservoir<LongExemplarData> exemplarReservoir, MemoryMode memoryMode) {
super(exemplarReservoir);
reusablePointData =
memoryMode == MemoryMode.REUSABLE_DATA ? new MutableLongPointData() : null;
}

@Override
Expand All @@ -130,8 +140,13 @@ protected LongPointData doAggregateThenMaybeReset(
List<LongExemplarData> exemplars,
boolean reset) {
long value = reset ? this.current.sumThenReset() : this.current.sum();
return ImmutableLongPointData.create(
startEpochNanos, epochNanos, attributes, value, exemplars);
if (reusablePointData != null) {
reusablePointData.set(startEpochNanos, epochNanos, attributes, value, exemplars);
return reusablePointData;
} else {
return ImmutableLongPointData.create(
startEpochNanos, epochNanos, attributes, value, exemplars);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,15 +93,15 @@ public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || !(o instanceof MutableDoublePointData)) {
if (!(o instanceof DoublePointData)) {
return false;
}
MutableDoublePointData pointData = (MutableDoublePointData) o;
return startEpochNanos == pointData.startEpochNanos
&& epochNanos == pointData.epochNanos
&& Double.doubleToLongBits(value) == Double.doubleToLongBits(pointData.value)
&& Objects.equals(attributes, pointData.attributes)
&& Objects.equals(exemplars, pointData.exemplars);
DoublePointData pointData = (DoublePointData) o;
return startEpochNanos == pointData.getStartEpochNanos()
&& epochNanos == pointData.getEpochNanos()
&& Double.doubleToLongBits(value) == Double.doubleToLongBits(pointData.getValue())
&& Objects.equals(attributes, pointData.getAttributes())
&& Objects.equals(exemplars, pointData.getExemplars());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,12 @@ public boolean equals(Object o) {
if (o == this) {
return true;
}
if (o instanceof MutableExponentialHistogramBuckets) {
MutableExponentialHistogramBuckets that = (MutableExponentialHistogramBuckets) o;
if (o instanceof ExponentialHistogramBuckets) {
ExponentialHistogramBuckets that = (ExponentialHistogramBuckets) o;
return this.scale == that.getScale()
&& this.offset == that.getOffset()
&& this.totalCount == that.getTotalCount()
&& Objects.equals(this.bucketCounts, that.bucketCounts);
&& Objects.equals(this.bucketCounts, that.getBucketCounts());
}
return false;
}
Expand Down
Loading

0 comments on commit 0aa223d

Please sign in to comment.