Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory mode: Adding support for synchronous instruments - explicit histogram #6153

Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
0c001b8
First part of synchronous support for memory mode.
asafm Nov 19, 2023
e986511
lint + improved error message
asafm Nov 22, 2023
3672a23
Update sdk/common/src/main/java/io/opentelemetry/sdk/common/export/Me…
asafm Nov 22, 2023
8a0330e
Update sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/ViewBui…
asafm Nov 22, 2023
9fbb0b7
PR fixes 1
asafm Nov 22, 2023
fb8358d
Merge remote-tracking branch 'origin/memory-mode-sync-instruments-par…
asafm Nov 22, 2023
e49c5b4
PR fixes 2
asafm Nov 22, 2023
3444c2c
PR fixes 3
asafm Nov 26, 2023
35cdf26
All seems good. After checking it several times
asafm Dec 7, 2023
1ef3ca8
Check gradle task fixes
asafm Dec 7, 2023
8fda642
Merge remote-tracking branch 'upstream/main' into memory-mode-sync-in…
asafm Dec 20, 2023
a021020
Added missing tests to complete the code coverage, where it made sense
asafm Dec 24, 2023
411f73a
Fixes build fails
asafm Dec 24, 2023
55c7ac4
PR fixes - first part
asafm Jan 7, 2024
bb074f4
Merge remote-tracking branch 'upstream/main' into memory-mode-sync-in…
asafm Jan 7, 2024
010022c
PR fixes - second part
asafm Jan 7, 2024
6327f9e
Linter fixed and 1 bug fix
asafm Jan 8, 2024
ccfce83
More PR fixes
asafm Jan 9, 2024
95690f1
Added sanity unit testing 2 data classes just to pass code coverage
asafm Jan 9, 2024
6a124c5
Linter fixes
asafm Jan 9, 2024
9b0d48e
Added more tests for code coverage
asafm Jan 9, 2024
c236048
(Last) part-3 of adding memory mode support for Synchronous instrumen…
asafm Jan 10, 2024
c49d2f9
Merge remote-tracking branch 'upstream/main' into memory-mode-sync-in…
asafm Jan 14, 2024
b15e469
Adding memory mode support for Synchronous instrument: Explicit Histo…
asafm Jan 15, 2024
a916fc4
Fixing few warnings
asafm Jan 15, 2024
1b68df6
Linter issues
asafm Jan 15, 2024
f7d0ab4
Fixed last PR comment.
asafm Jan 21, 2024
1802396
Fixed last PR comment.
asafm Jan 21, 2024
82337b5
Tiny fix
asafm Jan 21, 2024
627e4c9
Merge branch 'memory-mode-sync-instruments-part3' into memory-mode-sy…
asafm Jan 21, 2024
f8f0695
Merge remote-tracking branch 'upstream/main' into memory-mode-sync-in…
asafm Jan 25, 2024
ca98743
PR fixes
asafm Jan 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ public static DynamicPrimitiveLongList of(long... values) {
return list;
}

public static DynamicPrimitiveLongList ofSubArrayCapacity(int subarrayCapacity) {
return new DynamicPrimitiveLongList(subarrayCapacity);
}

public static DynamicPrimitiveLongList empty() {
return new DynamicPrimitiveLongList();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@ public enum HistogramAggregationParam {
new DoubleExplicitBucketHistogramAggregator(
ExplicitBucketHistogramUtils.createBoundaryArray(
ExplicitBucketHistogramUtils.DEFAULT_HISTOGRAM_BUCKET_BOUNDARIES),
ExemplarReservoir::doubleNoSamples)),
ExemplarReservoir::doubleNoSamples,
IMMUTABLE_DATA)),
EXPLICIT_SINGLE_BUCKET(
new DoubleExplicitBucketHistogramAggregator(
ExplicitBucketHistogramUtils.createBoundaryArray(Collections.emptyList()),
ExemplarReservoir::doubleNoSamples)),
ExemplarReservoir::doubleNoSamples,
IMMUTABLE_DATA)),
EXPONENTIAL_SMALL_CIRCULAR_BUFFER(
new DoubleBase2ExponentialHistogramAggregator(
ExemplarReservoir::doubleNoSamples, 20, 0, IMMUTABLE_DATA)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@

import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.sdk.common.export.MemoryMode;
import io.opentelemetry.sdk.metrics.Aggregation;
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
import io.opentelemetry.sdk.metrics.SdkMeterProviderBuilder;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader;
import io.opentelemetry.sdk.metrics.internal.SdkMeterProviderUtil;
import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarFilter;
import io.opentelemetry.sdk.metrics.internal.state.TestInstrumentType.InstrumentTester;
import io.opentelemetry.sdk.metrics.internal.state.TestInstrumentType.TestInstrumentsState;
import java.time.Duration;
import java.util.List;
import java.util.Random;
Expand All @@ -33,8 +34,8 @@
import org.openjdk.jmh.annotations.Warmup;

/**
* Run this through {@link AsynchronousMetricStorageGarbageCollectionBenchmarkTest}, as it runs it
* embedded with the GC profiler which what this test designed for (No need for command line run)
* Run this through {@link InstrumentGarbageCollectionBenchmarkTest}, as it runs it embedded with
* the GC profiler which what this test designed for (No need for command line run)
*
* <p>This test creates 10 asynchronous counters (any asynchronous instrument will do as the code
* path is almost the same for all async instrument types), and 1000 attribute sets. Each time the
Expand All @@ -51,37 +52,41 @@
*/
@BenchmarkMode(Mode.SingleShotTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@Measurement(iterations = 20, batchSize = 100)
@Measurement(iterations = 10, batchSize = 10)
@Warmup(iterations = 10, batchSize = 10)
@Fork(1)
public class AsynchronousMetricStorageGarbageCollectionBenchmark {
public class InstrumentGarbageCollectionBenchmark {

@State(value = Scope.Benchmark)
@SuppressWarnings("SystemOut")
public static class ThreadState {
private final int cardinality;
private final int countersCount;
private final int instrumentCount;
@Param public TestInstrumentType testInstrumentType;
@Param public AggregationTemporality aggregationTemporality;
@Param public MemoryMode memoryMode;
SdkMeterProvider sdkMeterProvider;
private final Random random = new Random();
List<Attributes> attributesList;
private TestInstrumentsState testInstrumentsState;
private InstrumentTester instrumentTester;

/** Creates a ThreadState. */
@SuppressWarnings("unused")
public ThreadState() {
cardinality = 1000;
countersCount = 10;
instrumentCount = 10;
}

@SuppressWarnings("SpellCheckingInspection")
@Setup
public void setup() {
instrumentTester = testInstrumentType.createInstrumentTester();
PeriodicMetricReader metricReader =
PeriodicMetricReader.builder(
// Configure an exporter that configures the temporality and aggregation
// for the test case, but otherwise drops the data on export
new NoopMetricExporter(aggregationTemporality, Aggregation.sum(), memoryMode))
new NoopMetricExporter(
aggregationTemporality, instrumentTester.testedAggregation(), memoryMode))
// Effectively disable periodic reading so reading is only done on #flush()
.setInterval(Duration.ofSeconds(Integer.MAX_VALUE))
.build();
Expand All @@ -95,18 +100,9 @@ public void setup() {
SdkMeterProviderUtil.setExemplarFilter(builder, ExemplarFilter.alwaysOff());

sdkMeterProvider = builder.build();
for (int i = 0; i < countersCount; i++) {
sdkMeterProvider
.get("meter")
.counterBuilder("counter" + i)
.buildWithCallback(
observableLongMeasurement -> {
for (int j = 0; j < attributesList.size(); j++) {
Attributes attributes = attributesList.get(j);
observableLongMeasurement.record(random.nextInt(10_000), attributes);
}
});
}
testInstrumentsState =
instrumentTester.buildInstruments(
instrumentCount, sdkMeterProvider, attributesList, random);
}

@TearDown
Expand All @@ -123,6 +119,8 @@ public void tearDown() {
@Benchmark
@Threads(value = 1)
public void recordAndCollect(ThreadState threadState) {
threadState.instrumentTester.recordValuesInInstruments(
threadState.testInstrumentsState, threadState.attributesList, threadState.random);
threadState.sdkMeterProvider.forceFlush().join(10, TimeUnit.SECONDS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,17 @@
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;

public class AsynchronousMetricStorageGarbageCollectionBenchmarkTest {
public class InstrumentGarbageCollectionBenchmarkTest {

/**
* This test validates that in {@link MemoryMode#REUSABLE_DATA}, {@link
* AsynchronousMetricStorage#collect(Resource, InstrumentationScopeInfo, long, long)} barely
* allocates memory which is then subsequently garbage collected. It is done so comparatively to
* {@link MemoryMode#IMMUTABLE_DATA},
* This test validates that in {@link MemoryMode#REUSABLE_DATA}, any {@link
* MetricStorage#collect(Resource, InstrumentationScopeInfo, long, long)} barely allocates memory
* which is then subsequently garbage collected. It is done so comparatively to {@link
* MemoryMode#IMMUTABLE_DATA},
*
* <p>It runs the JMH test {@link AsynchronousMetricStorageGarbageCollectionBenchmark} with GC
* profiler, and measures for each parameter combination the garbage collector normalized rate
* (bytes allocated per Operation).
* <p>It runs the JMH test {@link InstrumentGarbageCollectionBenchmark} with GC profiler, and
* measures for each parameter combination the garbage collector normalized rate (bytes allocated
* per Operation).
*
* <p>Memory allocations can be hidden even at an innocent foreach loop on a collection, which
* under the hood allocates an internal object O(N) times. Someone can accidentally refactor such
Expand All @@ -52,55 +52,76 @@ public void normalizedAllocationRateTest() throws RunnerException {
"true".equals(System.getenv("CI")),
"This test should only run in GitHub CI since it's long");

// Runs AsynchronousMetricStorageMemoryProfilingBenchmark
// Runs InstrumentGarbageCollectionBenchmark
// with garbage collection profiler
Options opt =
new OptionsBuilder()
.include(AsynchronousMetricStorageGarbageCollectionBenchmark.class.getSimpleName())
.include(InstrumentGarbageCollectionBenchmark.class.getSimpleName())
.addProfiler("gc")
.shouldFailOnError(true)
.jvmArgs("-Xmx1500m")
.build();
Collection<RunResult> results = new Runner(opt).run();

// Collect the normalized GC allocation rate per parameters combination
Map<String, Map<String, Double>> resultMap = new HashMap<>();
Map<String, TestInstrumentTypeResults> testInstrumentTypeResultsMap = new HashMap<>();
for (RunResult result : results) {
for (BenchmarkResult benchmarkResult : result.getBenchmarkResults()) {
BenchmarkParams benchmarkParams = benchmarkResult.getParams();

String memoryMode = benchmarkParams.getParam("memoryMode");
String aggregationTemporality = benchmarkParams.getParam("aggregationTemporality");
String testInstrumentType = benchmarkParams.getParam("testInstrumentType");
assertThat(memoryMode).isNotNull();
assertThat(aggregationTemporality).isNotNull();
assertThat(testInstrumentType).isNotNull();

Map<String, Result> secondaryResults = benchmarkResult.getSecondaryResults();
Result allocRateNorm = secondaryResults.get("gc.alloc.rate.norm");
assertThat(allocRateNorm)
.describedAs("Allocation rate in secondary results: %s", secondaryResults)
.isNotNull();

resultMap
testInstrumentTypeResultsMap
.computeIfAbsent(testInstrumentType, k -> new TestInstrumentTypeResults())
.aggregationTemporalityToMemoryModeResult
.computeIfAbsent(aggregationTemporality, k -> new HashMap<>())
.put(memoryMode, allocRateNorm.getScore());
}
}

assertThat(resultMap).hasSameSizeAs(AggregationTemporality.values());
testInstrumentTypeResultsMap.forEach(
(testInstrumentType, testInstrumentTypeResults) -> {
Map<String, Map<String, Double>> resultMap =
testInstrumentTypeResults.aggregationTemporalityToMemoryModeResult;
assertThat(resultMap).hasSameSizeAs(AggregationTemporality.values());

// Asserts that reusable data GC allocation rate is a tiny fraction of immutable data
// GC allocation rate
resultMap.forEach(
(aggregationTemporality, memoryModeToAllocRateMap) -> {
Double immutableDataAllocRate =
memoryModeToAllocRateMap.get(MemoryMode.IMMUTABLE_DATA.toString());
Double reusableDataAllocRate =
memoryModeToAllocRateMap.get(MemoryMode.REUSABLE_DATA.toString());
// Asserts that reusable data GC allocation rate is a tiny fraction of immutable data
// GC allocation rate
resultMap.forEach(
(aggregationTemporality, memoryModeToAllocRateMap) -> {
Double immutableDataAllocRate =
memoryModeToAllocRateMap.get(MemoryMode.IMMUTABLE_DATA.toString());
Double reusableDataAllocRate =
memoryModeToAllocRateMap.get(MemoryMode.REUSABLE_DATA.toString());

assertThat(immutableDataAllocRate).isNotNull().isNotZero();
assertThat(reusableDataAllocRate).isNotNull().isNotZero();
assertThat(100 - (reusableDataAllocRate / immutableDataAllocRate) * 100)
.isCloseTo(99.8, Offset.offset(2.0));
assertThat(immutableDataAllocRate).isNotNull().isNotZero();
assertThat(reusableDataAllocRate).isNotNull().isNotZero();

// If this test suddenly fails for you this means you have changed the code in a way
// that allocates more memory than before. You can find out where, by running
// ProfileBenchmark class and looking at the flame graph. Make sure to
// set the parameters according to where it failed for.
assertThat(100 - (reusableDataAllocRate / immutableDataAllocRate) * 100)
.describedAs(
"Aggregation temporality = %s, testInstrumentType = %s",
aggregationTemporality, testInstrumentType)
.isCloseTo(99.8, Offset.offset(2.0));
});
});
}

static class TestInstrumentTypeResults {
Map<String, Map<String, Double>> aggregationTemporalityToMemoryModeResult = new HashMap<>();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.metrics.internal.state;

import io.opentelemetry.sdk.common.export.MemoryMode;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;

/**
* This benchmark class is used to see memory allocation flame graphs for a single run.
*
* <p>Steps:
*
* <ol>
* <li>Follow download instructions for async-profiler, located at <a
* href="https://github.com/async-profiler/async-profiler">this location</a>
* <li>Assuming you have extracted it at /tmp/async-profiler-2.9-macos, add the following to your
* JVM arguments of your run configuration:
* <pre>
* -agentpath:/tmp/async-profiler-2.9-macos/build/libasyncProfiler.so=start,event=alloc,flamegraph,file=/tmp/profiled_data.html
* </pre>
* <li>Tune the parameters as you see fit (They are marked below with "Parameters")
* <li>Run the class (its main function)
* <li>Open /tmp/profiled_data.html with your browser
* <li>Use the flame graph to see where the allocations are happening the most and fix
* <li>Run {@link InstrumentGarbageCollectionBenchmark} and see if it passes now
* <li>If not, repeat
* </ol>
*/
public class ProfileBenchmark {

private ProfileBenchmark() {}

public static void main(String[] args) {
// Parameters
AggregationTemporality aggregationTemporality = AggregationTemporality.DELTA;
MemoryMode memoryMode = MemoryMode.REUSABLE_DATA;
TestInstrumentType testInstrumentType = TestInstrumentType.EXPLICIT_BUCKET;

InstrumentGarbageCollectionBenchmark.ThreadState benchmarkSetup =
new InstrumentGarbageCollectionBenchmark.ThreadState();

benchmarkSetup.aggregationTemporality = aggregationTemporality;
benchmarkSetup.memoryMode = memoryMode;
benchmarkSetup.testInstrumentType = testInstrumentType;

InstrumentGarbageCollectionBenchmark benchmark = new InstrumentGarbageCollectionBenchmark();

benchmarkSetup.setup();

warmup(benchmark, benchmarkSetup);

// This is divided explicitly to two methods so you can focus on `measure` in the flame graph
// when trying to decrease the allocations
measure(benchmark, benchmarkSetup);
}

public static void warmup(
InstrumentGarbageCollectionBenchmark benchmark,
InstrumentGarbageCollectionBenchmark.ThreadState benchmarkSetup) {
for (int i = 0; i < 10; i++) {
benchmark.recordAndCollect(benchmarkSetup);
}
}

public static void measure(
InstrumentGarbageCollectionBenchmark benchmark,
InstrumentGarbageCollectionBenchmark.ThreadState benchmarkSetup) {
for (int i = 0; i < 200; i++) {
benchmark.recordAndCollect(benchmarkSetup);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.metrics.internal.state;

import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.sdk.metrics.Aggregation;
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
import io.opentelemetry.sdk.metrics.internal.state.tester.AsyncCounterTester;
import io.opentelemetry.sdk.metrics.internal.state.tester.ExplicitBucketHistogramTester;
import io.opentelemetry.sdk.metrics.internal.state.tester.ExponentialHistogramTester;
import java.util.List;
import java.util.Random;

public enum TestInstrumentType {
ASYNC_COUNTER() {
@Override
InstrumentTester createInstrumentTester() {
return new AsyncCounterTester();
}
},
EXPONENTIAL_HISTOGRAM() {
@Override
InstrumentTester createInstrumentTester() {
return new ExponentialHistogramTester();
}
},
EXPLICIT_BUCKET() {
@Override
InstrumentTester createInstrumentTester() {
return new ExplicitBucketHistogramTester();
}
};

abstract InstrumentTester createInstrumentTester();

TestInstrumentType() {}

public interface InstrumentTester {
Aggregation testedAggregation();

TestInstrumentsState buildInstruments(
double instrumentCount,
SdkMeterProvider sdkMeterProvider,
List<Attributes> attributesList,
Random random);

void recordValuesInInstruments(
TestInstrumentsState testInstrumentsState, List<Attributes> attributesList, Random random);
}

public interface TestInstrumentsState {}

public static class EmptyInstrumentsState implements TestInstrumentsState {}
}
Loading
Loading