Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory mode: Adding support for synchronous instruments - Counter #6182

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@ public void normalizedAllocationRateTest() throws RunnerException {
assertThat(immutableDataAllocRate).isNotNull().isNotZero();
assertThat(reusableDataAllocRate).isNotNull().isNotZero();

float dataAllocRateReductionPercentage =
TestInstrumentType.valueOf(testInstrumentType)
.getDataAllocRateReductionPercentage();

// If this test suddenly fails for you this means you have changed the code in a way
// that allocates more memory than before. You can find out where, by running
// ProfileBenchmark class and looking at the flame graph. Make sure to
Expand All @@ -116,7 +120,7 @@ public void normalizedAllocationRateTest() throws RunnerException {
.describedAs(
"Aggregation temporality = %s, testInstrumentType = %s",
aggregationTemporality, testInstrumentType)
.isCloseTo(99.8, Offset.offset(2.0));
.isCloseTo(dataAllocRateReductionPercentage, Offset.offset(2.0));
});
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@
import io.opentelemetry.sdk.metrics.Aggregation;
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
import io.opentelemetry.sdk.metrics.internal.state.tester.AsyncCounterTester;
import io.opentelemetry.sdk.metrics.internal.state.tester.DoubleSumTester;
import io.opentelemetry.sdk.metrics.internal.state.tester.ExplicitBucketHistogramTester;
import io.opentelemetry.sdk.metrics.internal.state.tester.ExponentialHistogramTester;
import io.opentelemetry.sdk.metrics.internal.state.tester.LongSumTester;
import java.util.List;
import java.util.Random;

Expand All @@ -32,11 +34,36 @@ InstrumentTester createInstrumentTester() {
InstrumentTester createInstrumentTester() {
return new ExplicitBucketHistogramTester();
}
},
LONG_SUM(/* dataAllocRateReductionPercentage= */ 97.3f) {
@Override
InstrumentTester createInstrumentTester() {
return new LongSumTester();
}
},
DOUBLE_SUM(/* dataAllocRateReductionPercentage= */ 97.3f) {
@Override
InstrumentTester createInstrumentTester() {
return new DoubleSumTester();
}
};

abstract InstrumentTester createInstrumentTester();
private final float dataAllocRateReductionPercentage;

TestInstrumentType() {
this.dataAllocRateReductionPercentage = 99.8f; // default
}

// Some instruments have different reduction percentage.
TestInstrumentType(float dataAllocRateReductionPercentage) {
this.dataAllocRateReductionPercentage = dataAllocRateReductionPercentage;
}

float getDataAllocRateReductionPercentage() {
return dataAllocRateReductionPercentage;
}

TestInstrumentType() {}
abstract InstrumentTester createInstrumentTester();
jack-berg marked this conversation as resolved.
Show resolved Hide resolved

public interface InstrumentTester {
Aggregation testedAggregation();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.metrics.internal.state.tester;

import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.DoubleCounter;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.sdk.metrics.Aggregation;
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
import io.opentelemetry.sdk.metrics.internal.state.TestInstrumentType;
import java.util.List;
import java.util.Random;

public class DoubleSumTester implements TestInstrumentType.InstrumentTester {
private static final int measurementsPerAttributeSet = 1_000;

static class DoubleSumState implements TestInstrumentType.TestInstrumentsState {
DoubleCounter doubleCounter;
}

@Override
public Aggregation testedAggregation() {
return Aggregation.sum();
}

@Override
public TestInstrumentType.TestInstrumentsState buildInstruments(
double instrumentCount,
SdkMeterProvider sdkMeterProvider,
List<Attributes> attributesList,
Random random) {
DoubleSumState doubleSumState = new DoubleSumState();

Meter meter = sdkMeterProvider.meterBuilder("meter").build();
doubleSumState.doubleCounter = meter.counterBuilder("test.double.sum").ofDoubles().build();

return doubleSumState;
}

@SuppressWarnings("ForLoopReplaceableByForEach") // This is for GC sensitivity testing: no streams
@Override
public void recordValuesInInstruments(
TestInstrumentType.TestInstrumentsState testInstrumentsState,
List<Attributes> attributesList,
Random random) {
DoubleSumState state = (DoubleSumState) testInstrumentsState;

for (int j = 0; j < attributesList.size(); j++) {
Attributes attributes = attributesList.get(j);
for (int i = 0; i < measurementsPerAttributeSet; i++) {
state.doubleCounter.add(1.2f, attributes);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.metrics.internal.state.tester;

import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.LongCounter;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.sdk.metrics.Aggregation;
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
import io.opentelemetry.sdk.metrics.internal.state.TestInstrumentType;
import java.util.List;
import java.util.Random;

public class LongSumTester implements TestInstrumentType.InstrumentTester {
private static final int measurementsPerAttributeSet = 1_000;

static class LongSumState implements TestInstrumentType.TestInstrumentsState {
LongCounter longCounter;
}

@Override
public Aggregation testedAggregation() {
return Aggregation.sum();
}

@Override
public TestInstrumentType.TestInstrumentsState buildInstruments(
double instrumentCount,
SdkMeterProvider sdkMeterProvider,
List<Attributes> attributesList,
Random random) {
LongSumState longSumState = new LongSumState();

Meter meter = sdkMeterProvider.meterBuilder("meter").build();
longSumState.longCounter = meter.counterBuilder("test.long.sum").build();

return longSumState;
}

@SuppressWarnings("ForLoopReplaceableByForEach") // This is for GC sensitivity testing: no streams
@Override
public void recordValuesInInstruments(
TestInstrumentType.TestInstrumentsState testInstrumentsState,
List<Attributes> attributesList,
Random random) {
LongSumState state = (LongSumState) testInstrumentsState;

for (int j = 0; j < attributesList.size(); j++) {
Attributes attributes = attributesList.get(j);
for (int i = 0; i < measurementsPerAttributeSet; i++) {
state.longCounter.add(1, attributes);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
import io.opentelemetry.sdk.common.export.MemoryMode;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.data.DoubleExemplarData;
import io.opentelemetry.sdk.metrics.data.DoublePointData;
Expand All @@ -25,6 +26,7 @@
import java.util.Collection;
import java.util.List;
import java.util.function.Supplier;
import javax.annotation.Nullable;

/**
* Sum aggregator that keeps values as {@code double}s.
Expand All @@ -35,24 +37,28 @@
public final class DoubleSumAggregator
extends AbstractSumAggregator<DoublePointData, DoubleExemplarData> {
private final Supplier<ExemplarReservoir<DoubleExemplarData>> reservoirSupplier;
private final MemoryMode memoryMode;

/**
* Constructs a sum aggregator.
*
* @param instrumentDescriptor The instrument being recorded, used to compute monotonicity.
* @param reservoirSupplier Supplier of exemplar reservoirs per-stream.
* @param memoryMode The memory mode to use.
*/
public DoubleSumAggregator(
InstrumentDescriptor instrumentDescriptor,
Supplier<ExemplarReservoir<DoubleExemplarData>> reservoirSupplier) {
Supplier<ExemplarReservoir<DoubleExemplarData>> reservoirSupplier,
MemoryMode memoryMode) {
super(instrumentDescriptor);

this.reservoirSupplier = reservoirSupplier;
this.memoryMode = memoryMode;
}

@Override
public AggregatorHandle<DoublePointData, DoubleExemplarData> createHandle() {
return new Handle(reservoirSupplier.get());
return new Handle(reservoirSupplier.get(), memoryMode);
}

@Override
Expand Down Expand Up @@ -124,8 +130,12 @@ public MetricData toMetricData(
static final class Handle extends AggregatorHandle<DoublePointData, DoubleExemplarData> {
private final DoubleAdder current = AdderUtil.createDoubleAdder();

Handle(ExemplarReservoir<DoubleExemplarData> exemplarReservoir) {
// Only used if memoryMode == MemoryMode.REUSABLE_DATA
@Nullable private final MutableDoublePointData reusablePoint;

Handle(ExemplarReservoir<DoubleExemplarData> exemplarReservoir, MemoryMode memoryMode) {
super(exemplarReservoir);
reusablePoint = memoryMode == MemoryMode.REUSABLE_DATA ? new MutableDoublePointData() : null;
}

@Override
Expand All @@ -136,8 +146,13 @@ protected DoublePointData doAggregateThenMaybeReset(
List<DoubleExemplarData> exemplars,
boolean reset) {
double value = reset ? this.current.sumThenReset() : this.current.sum();
return ImmutableDoublePointData.create(
startEpochNanos, epochNanos, attributes, value, exemplars);
if (reusablePoint != null) {
reusablePoint.set(startEpochNanos, epochNanos, attributes, value, exemplars);
return reusablePoint;
} else {
return ImmutableDoublePointData.create(
startEpochNanos, epochNanos, attributes, value, exemplars);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
import io.opentelemetry.sdk.common.export.MemoryMode;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.data.LongExemplarData;
import io.opentelemetry.sdk.metrics.data.LongPointData;
Expand All @@ -25,6 +26,7 @@
import java.util.Collection;
import java.util.List;
import java.util.function.Supplier;
import javax.annotation.Nullable;

/**
* Sum aggregator that keeps values as {@code long}s.
Expand All @@ -36,17 +38,20 @@ public final class LongSumAggregator
extends AbstractSumAggregator<LongPointData, LongExemplarData> {

private final Supplier<ExemplarReservoir<LongExemplarData>> reservoirSupplier;
private final MemoryMode memoryMode;

public LongSumAggregator(
InstrumentDescriptor instrumentDescriptor,
Supplier<ExemplarReservoir<LongExemplarData>> reservoirSupplier) {
Supplier<ExemplarReservoir<LongExemplarData>> reservoirSupplier,
MemoryMode memoryMode) {
super(instrumentDescriptor);
this.reservoirSupplier = reservoirSupplier;
this.memoryMode = memoryMode;
}

@Override
public AggregatorHandle<LongPointData, LongExemplarData> createHandle() {
return new Handle(reservoirSupplier.get());
return new Handle(reservoirSupplier.get(), memoryMode);
}

@Override
Expand Down Expand Up @@ -118,8 +123,13 @@ public MetricData toMetricData(
static final class Handle extends AggregatorHandle<LongPointData, LongExemplarData> {
private final LongAdder current = AdderUtil.createLongAdder();

Handle(ExemplarReservoir<LongExemplarData> exemplarReservoir) {
// Only used if memoryMode == MemoryMode.REUSABLE_DATA
@Nullable private final MutableLongPointData reusablePointData;

Handle(ExemplarReservoir<LongExemplarData> exemplarReservoir, MemoryMode memoryMode) {
super(exemplarReservoir);
reusablePointData =
memoryMode == MemoryMode.REUSABLE_DATA ? new MutableLongPointData() : null;
}

@Override
Expand All @@ -130,8 +140,13 @@ protected LongPointData doAggregateThenMaybeReset(
List<LongExemplarData> exemplars,
boolean reset) {
long value = reset ? this.current.sumThenReset() : this.current.sum();
return ImmutableLongPointData.create(
startEpochNanos, epochNanos, attributes, value, exemplars);
if (reusablePointData != null) {
reusablePointData.set(startEpochNanos, epochNanos, attributes, value, exemplars);
return reusablePointData;
} else {
return ImmutableLongPointData.create(
startEpochNanos, epochNanos, attributes, value, exemplars);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,15 +93,15 @@ public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || !(o instanceof MutableDoublePointData)) {
if (!(o instanceof DoublePointData)) {
return false;
}
MutableDoublePointData pointData = (MutableDoublePointData) o;
return startEpochNanos == pointData.startEpochNanos
&& epochNanos == pointData.epochNanos
&& Double.doubleToLongBits(value) == Double.doubleToLongBits(pointData.value)
&& Objects.equals(attributes, pointData.attributes)
&& Objects.equals(exemplars, pointData.exemplars);
DoublePointData pointData = (DoublePointData) o;
jack-berg marked this conversation as resolved.
Show resolved Hide resolved
return startEpochNanos == pointData.getStartEpochNanos()
&& epochNanos == pointData.getEpochNanos()
&& Double.doubleToLongBits(value) == Double.doubleToLongBits(pointData.getValue())
&& Objects.equals(attributes, pointData.getAttributes())
&& Objects.equals(exemplars, pointData.getExemplars());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,15 @@ public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || !(o instanceof MutableLongPointData)) {
if (!(o instanceof LongPointData)) {
return false;
}
MutableLongPointData that = (MutableLongPointData) o;
return value == that.value
&& startEpochNanos == that.startEpochNanos
&& epochNanos == that.epochNanos
&& Objects.equals(attributes, that.attributes)
&& Objects.equals(exemplars, that.exemplars);
LongPointData that = (LongPointData) o;
return value == that.getValue()
&& startEpochNanos == that.getStartEpochNanos()
&& epochNanos == that.getEpochNanos()
&& Objects.equals(attributes, that.getAttributes())
&& Objects.equals(exemplars, that.getExemplars());
}

@Override
Expand Down
Loading
Loading