Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reuse MetricData #5178

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ public static List<Long> wrap(long[] values) {
return new LongListImpl(values);
}

/** Set the size. */
public static void setSize(List<Long> list, int size) {
if (list instanceof LongListImpl) {
((LongListImpl) list).setSize(size);
}
}

/**
* Returns a primitive array with the values of the list. The list should generally have been
* created with {@link PrimitiveLongList#wrap(long[])}.
Expand All @@ -49,13 +56,22 @@ public static long[] toArray(List<Long> list) {
private static class LongListImpl extends AbstractList<Long> {

private final long[] values;
private int size;

LongListImpl(long[] values) {
this.values = values;
this.size = values.length;
}

private void setSize(int size) {
this.size = size;
}

@Override
public Long get(int index) {
if (index >= size) {
throw new IndexOutOfBoundsException("Index " + index + " out of bounds for length " + size);
}
// If out of bounds, the array access will produce a perfectly fine IndexOutOfBoundsException.
return values[index];
}
Expand All @@ -75,7 +91,7 @@ public int hashCode() {

@Override
public int size() {
return values.length;
return size;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assertions.catchThrowable;

import java.util.Arrays;
import java.util.List;
Expand All @@ -27,12 +26,7 @@ void wrap() {
assertThat(wrapped.hashCode()).isEqualTo(reference.hashCode());
assertThat(wrapped.equals(PrimitiveLongList.wrap(array))).isTrue();

// Message can change between Java versions, so instead check it's the same as a normal List's
// exception.
Throwable referenceException = catchThrowable(() -> reference.get(3));
assertThatThrownBy(() -> wrapped.get(3))
.isInstanceOf(IndexOutOfBoundsException.class)
.hasMessage(referenceException.getMessage());
assertThatThrownBy(() -> wrapped.get(3)).isInstanceOf(IndexOutOfBoundsException.class);

assertThat(PrimitiveLongList.toArray(wrapped)).isSameAs(array).containsExactly(1L, 2L);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,9 @@ private static class LeasedMetricProducer implements MetricProducer {
private final MeterProviderSharedState sharedState;
private final RegisteredReader registeredReader;

private final Object collectLock = new Object();
private final List<MetricData> results = new ArrayList<>();

LeasedMetricProducer(
ComponentRegistry<SdkMeter> registry,
MeterProviderSharedState sharedState,
Expand All @@ -173,14 +176,16 @@ private static class LeasedMetricProducer implements MetricProducer {

@Override
public Collection<MetricData> collectAllMetrics() {
Collection<SdkMeter> meters = registry.getComponents();
List<MetricData> result = new ArrayList<>();
long collectTime = sharedState.getClock().now();
for (SdkMeter meter : meters) {
result.addAll(meter.collectAll(registeredReader, collectTime));
synchronized (collectLock) {
Collection<SdkMeter> meters = registry.getComponents();
results.clear();
long collectTime = sharedState.getClock().now();
for (SdkMeter meter : meters) {
results.addAll(meter.collectAll(registeredReader, collectTime));
}
registeredReader.setLastCollectEpochNanos(collectTime);
return Collections.unmodifiableCollection(results);
}
registeredReader.setLastCollectEpochNanos(collectTime);
return Collections.unmodifiableCollection(result);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
*
* <p>This class is NOT thread-safe. It is expected to be behind a synchronized incrementer.
*/
final class AdaptingCircularBufferCounter {
public final class AdaptingCircularBufferCounter {
private static final int NULL_INDEX = Integer.MIN_VALUE;
private int endIndex = NULL_INDEX;
private int startIndex = NULL_INDEX;
Expand All @@ -41,7 +41,7 @@ final class AdaptingCircularBufferCounter {
*
* @return the first index with a recording.
*/
int getIndexStart() {
public int getIndexStart() {
return startIndex;
}

Expand All @@ -52,7 +52,7 @@ int getIndexStart() {
*
* @return The last index with a recording.
*/
int getIndexEnd() {
public int getIndexEnd() {
return endIndex;
}

Expand Down Expand Up @@ -95,7 +95,7 @@ boolean increment(int index, long delta) {
*
* @return the number of recordings for the index, or 0 if the index is out of bounds.
*/
long get(int index) {
public long get(int index) {
if (index < startIndex || index > endIndex) {
return 0;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@
import io.opentelemetry.sdk.metrics.data.ExponentialHistogramBuckets;
import io.opentelemetry.sdk.metrics.data.ExponentialHistogramPointData;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableExponentialHistogramData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableExponentialHistogramPointData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableMetricData;
import io.opentelemetry.sdk.metrics.data.MetricDataType;
import io.opentelemetry.sdk.metrics.internal.data.MutableExponentialHistogramBuckets;
import io.opentelemetry.sdk.metrics.internal.data.MutableExponentialHistogramData;
import io.opentelemetry.sdk.metrics.internal.data.MutableExponentialHistogramPointData;
import io.opentelemetry.sdk.metrics.internal.data.MutableMetricData;
import io.opentelemetry.sdk.metrics.internal.descriptor.MetricDescriptor;
import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarReservoir;
import io.opentelemetry.sdk.resources.Resource;
Expand All @@ -39,6 +41,9 @@ public final class DoubleBase2ExponentialHistogramAggregator
private final Supplier<ExemplarReservoir<DoubleExemplarData>> reservoirSupplier;
private final int maxBuckets;
private final int maxScale;
private final MutableMetricData metricData =
new MutableMetricData(MetricDataType.EXPONENTIAL_HISTOGRAM);
private final MutableExponentialHistogramData data = new MutableExponentialHistogramData();

/**
* Constructs an exponential histogram aggregator.
Expand Down Expand Up @@ -66,13 +71,15 @@ public MetricData toMetricData(
MetricDescriptor metricDescriptor,
Collection<ExponentialHistogramPointData> points,
AggregationTemporality temporality) {
return ImmutableMetricData.createExponentialHistogram(
data.set(temporality, points);
metricData.set(
resource,
instrumentationScopeInfo,
metricDescriptor.getName(),
metricDescriptor.getDescription(),
metricDescriptor.getSourceInstrument().getUnit(),
ImmutableExponentialHistogramData.create(temporality, points));
data);
return metricData;
}

static final class Handle
Expand All @@ -86,6 +93,10 @@ static final class Handle
private double max;
private long count;
private int scale;
private final MutableExponentialHistogramPointData point =
new MutableExponentialHistogramPointData();
private final MutableExponentialHistogramBuckets mutablePositiveBuckets;
private final MutableExponentialHistogramBuckets mutableNegativeBuckets;

Handle(ExemplarReservoir<DoubleExemplarData> reservoir, int maxBuckets, int maxScale) {
super(reservoir);
Expand All @@ -96,6 +107,8 @@ static final class Handle
this.max = -1;
this.count = 0;
this.scale = maxScale;
this.mutablePositiveBuckets = new MutableExponentialHistogramBuckets(maxBuckets);
this.mutableNegativeBuckets = new MutableExponentialHistogramBuckets(maxBuckets);
}

@Override
Expand All @@ -105,21 +118,20 @@ protected synchronized ExponentialHistogramPointData doAggregateThenMaybeReset(
Attributes attributes,
List<DoubleExemplarData> exemplars,
boolean reset) {
ExponentialHistogramPointData point =
ImmutableExponentialHistogramPointData.create(
scale,
sum,
zeroCount,
this.count > 0,
this.min,
this.count > 0,
this.max,
resolveBuckets(this.positiveBuckets, scale, reset),
resolveBuckets(this.negativeBuckets, scale, reset),
startEpochNanos,
epochNanos,
attributes,
exemplars);
point.set(
scale,
sum,
zeroCount,
this.count > 0,
this.min,
this.count > 0,
this.max,
resolveBuckets(this.positiveBuckets, this.mutablePositiveBuckets, scale, reset),
resolveBuckets(this.negativeBuckets, this.mutableNegativeBuckets, scale, reset),
startEpochNanos,
epochNanos,
attributes,
exemplars);
if (reset) {
this.sum = 0;
this.zeroCount = 0;
Expand All @@ -131,15 +143,18 @@ protected synchronized ExponentialHistogramPointData doAggregateThenMaybeReset(
}

private static ExponentialHistogramBuckets resolveBuckets(
@Nullable DoubleBase2ExponentialHistogramBuckets buckets, int scale, boolean reset) {
@Nullable DoubleBase2ExponentialHistogramBuckets buckets,
MutableExponentialHistogramBuckets mutableBuckets,
int scale,
boolean reset) {
if (buckets == null) {
return EmptyExponentialHistogramBuckets.get(scale);
}
ExponentialHistogramBuckets copy = buckets.copy();
mutableBuckets.set(buckets);
if (reset) {
buckets.clear();
}
return copy;
return mutableBuckets;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was one idea I entertained for awhile performance gain. How are you avoiding multi-threads touching this data?

Is it because you're only returning this to ONE metric-reader at a time and the "hot path" of writes is still writing to the underlying data allocated in this handle?

If so, VERY clever. We should document this in the handle class how it works and why it's safe.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it because you're only returning this to ONE metric-reader at a time and the "hot path" of writes is still writing to the underlying data allocated in this handle?

Yes exactly. While we support multiple readers, we don't support concurrent reads. As long as readers don't hold on to references to MetricData and try to read after they're done reading, they shouldn't get any weird behavior. Right now this won't work with multiple readers since once PeriodicMetricReader calls MetricProducer#collectAllMetrics(), another reader will be able to start reading and MetricData will be mutated out from under the PeriodicMetricReader. Ouch. But this is solvable by providing readers a way to communicate to MetricProducer that they're done consuming the data. For example, by adjusting collectAllMetrics to accept a CompletableResultCode which the reader completes when finished consuming the data, i.e. MetricProducer#collectAllMetrics(CompleteableResultCode).

As you noticed, this also relies on different objects for writes vs. reads (writes use AggregationHandle, reads use some some mutuable MetricData).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scratch that part about readers needing to communicate when they're finished consuming the data. Each reader has its own copies of metric storage, and the mutable MetricData, so its much simpler: It should be safe as long as a MetricReader doesn't hold on to the MetricData references and try to consume them during a subsequent collect.

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
final class DoubleBase2ExponentialHistogramBuckets implements ExponentialHistogramBuckets {
public final class DoubleBase2ExponentialHistogramBuckets implements ExponentialHistogramBuckets {

private AdaptingCircularBufferCounter counts;
private int scale;
Expand Down Expand Up @@ -75,6 +75,10 @@ public int getOffset() {
return counts.getIndexStart();
}

public AdaptingCircularBufferCounter getCounts() {
return counts;
}

@Override
public List<Long> getBucketCounts() {
if (counts.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.metrics.internal.data;

import io.opentelemetry.sdk.internal.PrimitiveLongList;
import io.opentelemetry.sdk.metrics.data.ExponentialHistogramBuckets;
import io.opentelemetry.sdk.metrics.internal.aggregator.AdaptingCircularBufferCounter;
import io.opentelemetry.sdk.metrics.internal.aggregator.DoubleBase2ExponentialHistogramBuckets;
import java.util.List;

public class MutableExponentialHistogramBuckets implements ExponentialHistogramBuckets {

private int scale;
private int offset;
private final long[] bucketCounts;
private final List<Long> bucketCountsList;
private long totalCount;

public MutableExponentialHistogramBuckets(int maxBuckets) {
this.bucketCounts = new long[maxBuckets];
this.bucketCountsList = PrimitiveLongList.wrap(bucketCounts);
}

@Override
public int getScale() {
return scale;
}

@Override
public int getOffset() {
return offset;
}

@Override
public List<Long> getBucketCounts() {
return bucketCountsList;
}

@Override
public long getTotalCount() {
return totalCount;
}

/** Set the values. */
public void set(DoubleBase2ExponentialHistogramBuckets exponentialHistogramBuckets) {
this.scale = exponentialHistogramBuckets.getScale();
this.offset = exponentialHistogramBuckets.getOffset();
AdaptingCircularBufferCounter counts = exponentialHistogramBuckets.getCounts();
for (int i = 0; i < bucketCounts.length; i++) {
this.bucketCounts[i] = counts.get(i + counts.getIndexStart());
}
PrimitiveLongList.setSize(
this.bucketCountsList, counts.getIndexEnd() - counts.getIndexStart() + 1);
this.totalCount = exponentialHistogramBuckets.getTotalCount();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.metrics.internal.data;

import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.data.ExponentialHistogramData;
import io.opentelemetry.sdk.metrics.data.ExponentialHistogramPointData;
import java.util.Collection;
import java.util.Collections;

/**
* Auto value implementation of {@link ExponentialHistogramData}.
*
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
public class MutableExponentialHistogramData implements ExponentialHistogramData {

private AggregationTemporality temporality = AggregationTemporality.CUMULATIVE;
private Collection<ExponentialHistogramPointData> pointData = Collections.emptyList();

public MutableExponentialHistogramData() {}

@Override
public AggregationTemporality getAggregationTemporality() {
return temporality;
}

@Override
public Collection<ExponentialHistogramPointData> getPoints() {
return pointData;
}

/** Set the values. */
public void set(
AggregationTemporality temporality, Collection<ExponentialHistogramPointData> pointData) {
this.temporality = temporality;
this.pointData = pointData;
}
}
Loading