Skip to content

Commit

Permalink
poc for fix to InMemoryExporter
Browse files Browse the repository at this point in the history
  • Loading branch information
TimothyMothra committed Feb 16, 2022
1 parent 68d1977 commit 9dcb839
Show file tree
Hide file tree
Showing 6 changed files with 133 additions and 26 deletions.
9 changes: 9 additions & 0 deletions src/OpenTelemetry.Exporter.InMemory/InMemoryExporter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ public class InMemoryExporter<T> : BaseExporter<T>
where T : class
{
private readonly ICollection<T> exportedItems;
private readonly bool isMetric;

public InMemoryExporter(ICollection<T> exportedItems)
{
this.exportedItems = exportedItems;
this.isMetric = typeof(T) == typeof(Metrics.Metric);
}

public override ExportResult Export(in Batch<T> batch)
Expand All @@ -35,6 +37,13 @@ public override ExportResult Export(in Batch<T> batch)
return ExportResult.Failure;
}

// Note: this mitigates the growing ExportedItems issue.
// Need to discuss if this is an acceptible change.
if (this.isMetric)
{
this.exportedItems.Clear();
}

foreach (var data in batch)
{
this.exportedItems.Add(data);
Expand Down
11 changes: 11 additions & 0 deletions src/OpenTelemetry/Metrics/AggregatorStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,17 @@ internal MetricPointsAccessor GetMetricPoints()
return new MetricPointsAccessor(this.metricPoints, this.currentMetricPointBatch, this.batchSize, this.startTimeExclusive, this.endTimeInclusive);
}

internal MetricPointsAccessor GetDeepCloneMetricPoints()
{
var deepClonedMetricPoints = new MetricPoint[this.metricPoints.Length];
for (int i = 0; i < this.metricPoints.Length; i++)
{
deepClonedMetricPoints[i] = this.metricPoints[i].DeepCopy();
}

return new MetricPointsAccessor(deepClonedMetricPoints, this.currentMetricPointBatch, this.batchSize, this.startTimeExclusive, this.endTimeInclusive);
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void InitializeZeroTagPointIfNotInitialized()
{
Expand Down
4 changes: 4 additions & 0 deletions src/OpenTelemetry/Metrics/HistogramBuckets.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ internal HistogramBuckets(double[] explicitBounds)

public Enumerator GetEnumerator() => new(this);

// This works because all private fields are value types.
// If this class changes significantly, this may need to change.
internal HistogramBuckets DeepCopy() => (HistogramBuckets)this.MemberwiseClone();

/// <summary>
/// Enumerates the elements of a <see cref="HistogramBuckets"/>.
/// </summary>
Expand Down
5 changes: 4 additions & 1 deletion src/OpenTelemetry/Metrics/Metric.cs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,10 @@ internal Metric(

public MetricPointsAccessor GetMetricPoints()
{
return this.aggStore.GetMetricPoints();
// Note: This appears to be safe for all existing unit tests.
// This is safe because the enumerator only moves forward.
// This may not be desirable for all use cases, in which we would need a new public method.
return this.aggStore.GetDeepCloneMetricPoints();
}

internal void UpdateLong(long value, ReadOnlySpan<KeyValuePair<string, object>> tags)
Expand Down
81 changes: 58 additions & 23 deletions src/OpenTelemetry/Metrics/MetricPoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ public struct MetricPoint
{
private readonly AggregationType aggType;

private readonly HistogramBuckets histogramBuckets;
// Note: not necessary to remove this, but makes DeepCopy easier.
//private readonly HistogramBuckets histogramBuckets;

// Represents temporality adjusted "value" for double/long metric types or "count" when histogram
private MetricPointValueStorage runningValue;
Expand Down Expand Up @@ -59,18 +60,35 @@ internal MetricPoint(

if (this.aggType == AggregationType.Histogram)
{
this.histogramBuckets = new HistogramBuckets(histogramExplicitBounds);
this.HistogramBuckets = new HistogramBuckets(histogramExplicitBounds);
}
else if (this.aggType == AggregationType.HistogramSumCount)
{
this.histogramBuckets = new HistogramBuckets(null);
this.HistogramBuckets = new HistogramBuckets(null);
}
else
{
this.histogramBuckets = null;
this.HistogramBuckets = null;
}
}

//internal MetricPoint(MetricPoint other)
//{
// Note: This is an alternative to the DeepCopy method below.
// I'm not a fan of this because it creates extra maintenance.

// this.aggType = other.aggType;
// this.HistogramBuckets = other.HistogramBuckets?.DeepClone();
// this.runningValue = other.runningValue;
// this.snapshotValue = other.snapshotValue;
// this.deltaLastValue = other.deltaLastValue;
// this.Tags = other.Tags;
// this.MetricPointStatus = other.MetricPointStatus;
// this.Tags = other.Tags;
// this.StartTime = other.StartTime;
// this.EndTime = other.EndTime;
//}

/// <summary>
/// Gets the tags associated with the metric point.
/// </summary>
Expand Down Expand Up @@ -113,6 +131,13 @@ internal MetricPointStatus MetricPointStatus
private set;
}

// Note: changing this to a Property loses the "readonly",
// but enables DeepCopy using the MemberwiseClone pattern below.
private HistogramBuckets HistogramBuckets
{
get; set;
}

/// <summary>
/// Gets the sum long value associated with the metric point.
/// </summary>
Expand Down Expand Up @@ -218,7 +243,7 @@ public double GetHistogramSum()
this.ThrowNotSupportedMetricTypeException(nameof(this.GetHistogramSum));
}

return this.histogramBuckets.SnapshotSum;
return this.HistogramBuckets.SnapshotSum;
}

/// <summary>
Expand All @@ -227,7 +252,7 @@ public double GetHistogramSum()
/// <remarks>
/// Applies to <see cref="MetricType.Histogram"/> metric type.
/// </remarks>
/// <returns><see cref="HistogramBuckets"/>.</returns>
/// <returns><see cref="Metrics.HistogramBuckets"/>.</returns>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public HistogramBuckets GetHistogramBuckets()
{
Expand All @@ -236,7 +261,17 @@ public HistogramBuckets GetHistogramBuckets()
this.ThrowNotSupportedMetricTypeException(nameof(this.GetHistogramBuckets));
}

return this.histogramBuckets;
return this.HistogramBuckets;
}

// this would be easier to maintain.
// Currently this doesn't work because histogramBuckets is readonly, but could hack using Reflection.
internal MetricPoint DeepCopy()
{
var other = (MetricPoint)this.MemberwiseClone();
other.HistogramBuckets = this.HistogramBuckets?.DeepCopy();

return other;
}

internal void Update(long number)
Expand Down Expand Up @@ -314,31 +349,31 @@ internal void Update(double number)
case AggregationType.Histogram:
{
int i;
for (i = 0; i < this.histogramBuckets.ExplicitBounds.Length; i++)
for (i = 0; i < this.HistogramBuckets.ExplicitBounds.Length; i++)
{
// Upper bound is inclusive
if (number <= this.histogramBuckets.ExplicitBounds[i])
if (number <= this.HistogramBuckets.ExplicitBounds[i])
{
break;
}
}

lock (this.histogramBuckets.LockObject)
lock (this.HistogramBuckets.LockObject)
{
this.runningValue.AsLong++;
this.histogramBuckets.RunningSum += number;
this.histogramBuckets.RunningBucketCounts[i]++;
this.HistogramBuckets.RunningSum += number;
this.HistogramBuckets.RunningBucketCounts[i]++;
}

break;
}

case AggregationType.HistogramSumCount:
{
lock (this.histogramBuckets.LockObject)
lock (this.HistogramBuckets.LockObject)
{
this.runningValue.AsLong++;
this.histogramBuckets.RunningSum += number;
this.HistogramBuckets.RunningSum += number;
}

break;
Expand Down Expand Up @@ -460,22 +495,22 @@ internal void TakeSnapshot(bool outputDelta)

case AggregationType.Histogram:
{
lock (this.histogramBuckets.LockObject)
lock (this.HistogramBuckets.LockObject)
{
this.snapshotValue.AsLong = this.runningValue.AsLong;
this.histogramBuckets.SnapshotSum = this.histogramBuckets.RunningSum;
this.HistogramBuckets.SnapshotSum = this.HistogramBuckets.RunningSum;
if (outputDelta)
{
this.runningValue.AsLong = 0;
this.histogramBuckets.RunningSum = 0;
this.HistogramBuckets.RunningSum = 0;
}

for (int i = 0; i < this.histogramBuckets.RunningBucketCounts.Length; i++)
for (int i = 0; i < this.HistogramBuckets.RunningBucketCounts.Length; i++)
{
this.histogramBuckets.SnapshotBucketCounts[i] = this.histogramBuckets.RunningBucketCounts[i];
this.HistogramBuckets.SnapshotBucketCounts[i] = this.HistogramBuckets.RunningBucketCounts[i];
if (outputDelta)
{
this.histogramBuckets.RunningBucketCounts[i] = 0;
this.HistogramBuckets.RunningBucketCounts[i] = 0;
}
}

Expand All @@ -487,14 +522,14 @@ internal void TakeSnapshot(bool outputDelta)

case AggregationType.HistogramSumCount:
{
lock (this.histogramBuckets.LockObject)
lock (this.HistogramBuckets.LockObject)
{
this.snapshotValue.AsLong = this.runningValue.AsLong;
this.histogramBuckets.SnapshotSum = this.histogramBuckets.RunningSum;
this.HistogramBuckets.SnapshotSum = this.HistogramBuckets.RunningSum;
if (outputDelta)
{
this.runningValue.AsLong = 0;
this.histogramBuckets.RunningSum = 0;
this.HistogramBuckets.RunningSum = 0;
}

this.MetricPointStatus = MetricPointStatus.NoCollectPending;
Expand Down
49 changes: 47 additions & 2 deletions test/OpenTelemetry.Tests/Metrics/InMemoryExporterTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,17 @@

using System.Collections.Generic;
using System.Diagnostics.Metrics;

using OpenTelemetry.Exporter;
using OpenTelemetry.Tests;

using Xunit;

namespace OpenTelemetry.Metrics.Tests
{
public class InMemoryExporterTests
{
[Fact(Skip = "To be run after https://github.com/open-telemetry/opentelemetry-dotnet/issues/2361 is fixed")]
[Fact]
public void InMemoryExporterShouldDeepCopyMetricPoints()
{
var exportedItems = new List<Metric>();
Expand Down Expand Up @@ -56,7 +58,7 @@ public void InMemoryExporterShouldDeepCopyMetricPoints()

meterProvider.ForceFlush();

metric = exportedItems[1]; // Second Metric object is added to the collection at this point
metric = exportedItems[0]; // Second Metric object is added to the collection at this point
metricPointsEnumerator = metric.GetMetricPoints().GetEnumerator();
Assert.True(metricPointsEnumerator.MoveNext()); // One MetricPoint is emitted for the Metric
var metricPointForSecondExport = metricPointsEnumerator.Current;
Expand All @@ -65,5 +67,48 @@ public void InMemoryExporterShouldDeepCopyMetricPoints()
// MetricPoint.LongValue for the first exporter metric should still be 10
Assert.Equal(10, metricPointForFirstExport.GetSumLong());
}

[Fact]
public void Investigate_2361()
{
// https://github.com/open-telemetry/opentelemetry-dotnet/issues/2361

var exportedItems = new List<Metric>();

using var meter = new Meter(Utils.GetCurrentMethodName());
using var meterProvider = Sdk.CreateMeterProviderBuilder()
.AddMeter(meter.Name)
.AddInMemoryExporter(exportedItems)
.Build();

int i = 0;
var counterLong = meter.CreateObservableCounter(
"observable-counter",
() => ++i * 10);

meterProvider.ForceFlush();
Assert.Equal(1, i); // verify that callback is invoked when calling Flush
Assert.Single(exportedItems); // verify that List<metrics> contains 1 item
var metricPoint1 = GetSingleMetricPoint(exportedItems[0]);
Assert.Equal(10, metricPoint1.GetSumLong());

meterProvider.ForceFlush();
Assert.Equal(2, i); // verify that callback is invoked when calling Flush
Assert.Single(exportedItems); // verify that List<metrics> contains 1 item
var metricPoint2 = GetSingleMetricPoint(exportedItems[0]);
Assert.Equal(20, metricPoint2.GetSumLong());

// Retest 1st item, this is expected to be unchanged.
Assert.Equal(10, metricPoint1.GetSumLong());
}

private static MetricPoint GetSingleMetricPoint(Metric metric)
{
var metricPointsEnumerator = metric.GetMetricPoints().GetEnumerator();
Assert.True(metricPointsEnumerator.MoveNext()); // One MetricPoint is emitted for the Metric
ref readonly var metricPoint = ref metricPointsEnumerator.Current;
Assert.False(metricPointsEnumerator.MoveNext());
return metricPoint;
}
}
}

0 comments on commit 9dcb839

Please sign in to comment.