From 9dcb8393a5eff65b7e61852303f7d4cfdcc7f247 Mon Sep 17 00:00:00 2001 From: TimothyMothra Date: Wed, 16 Feb 2022 15:21:10 -0800 Subject: [PATCH] poc for fix to InMemoryExporter --- .../InMemoryExporter.cs | 9 +++ src/OpenTelemetry/Metrics/AggregatorStore.cs | 11 +++ src/OpenTelemetry/Metrics/HistogramBuckets.cs | 4 + src/OpenTelemetry/Metrics/Metric.cs | 5 +- src/OpenTelemetry/Metrics/MetricPoint.cs | 81 +++++++++++++------ .../Metrics/InMemoryExporterTests.cs | 49 ++++++++++- 6 files changed, 133 insertions(+), 26 deletions(-) diff --git a/src/OpenTelemetry.Exporter.InMemory/InMemoryExporter.cs b/src/OpenTelemetry.Exporter.InMemory/InMemoryExporter.cs index d76995f8fb6..da9777fe2bb 100644 --- a/src/OpenTelemetry.Exporter.InMemory/InMemoryExporter.cs +++ b/src/OpenTelemetry.Exporter.InMemory/InMemoryExporter.cs @@ -22,10 +22,12 @@ public class InMemoryExporter : BaseExporter where T : class { private readonly ICollection exportedItems; + private readonly bool isMetric; public InMemoryExporter(ICollection exportedItems) { this.exportedItems = exportedItems; + this.isMetric = typeof(T) == typeof(Metrics.Metric); } public override ExportResult Export(in Batch batch) @@ -35,6 +37,13 @@ public override ExportResult Export(in Batch batch) return ExportResult.Failure; } + // Note: this mitigates the growing ExportedItems issue. + // Need to discuss if this is an acceptible change. + if (this.isMetric) + { + this.exportedItems.Clear(); + } + foreach (var data in batch) { this.exportedItems.Add(data); diff --git a/src/OpenTelemetry/Metrics/AggregatorStore.cs b/src/OpenTelemetry/Metrics/AggregatorStore.cs index c61f1a59273..09ef496272b 100644 --- a/src/OpenTelemetry/Metrics/AggregatorStore.cs +++ b/src/OpenTelemetry/Metrics/AggregatorStore.cs @@ -156,6 +156,17 @@ internal MetricPointsAccessor GetMetricPoints() return new MetricPointsAccessor(this.metricPoints, this.currentMetricPointBatch, this.batchSize, this.startTimeExclusive, this.endTimeInclusive); } + internal MetricPointsAccessor GetDeepCloneMetricPoints() + { + var deepClonedMetricPoints = new MetricPoint[this.metricPoints.Length]; + for (int i = 0; i < this.metricPoints.Length; i++) + { + deepClonedMetricPoints[i] = this.metricPoints[i].DeepCopy(); + } + + return new MetricPointsAccessor(deepClonedMetricPoints, this.currentMetricPointBatch, this.batchSize, this.startTimeExclusive, this.endTimeInclusive); + } + [MethodImpl(MethodImplOptions.AggressiveInlining)] private void InitializeZeroTagPointIfNotInitialized() { diff --git a/src/OpenTelemetry/Metrics/HistogramBuckets.cs b/src/OpenTelemetry/Metrics/HistogramBuckets.cs index a68033d4300..d58300ba864 100644 --- a/src/OpenTelemetry/Metrics/HistogramBuckets.cs +++ b/src/OpenTelemetry/Metrics/HistogramBuckets.cs @@ -43,6 +43,10 @@ internal HistogramBuckets(double[] explicitBounds) public Enumerator GetEnumerator() => new(this); + // This works because all private fields are value types. + // If this class changes significantly, this may need to change. + internal HistogramBuckets DeepCopy() => (HistogramBuckets)this.MemberwiseClone(); + /// /// Enumerates the elements of a . /// diff --git a/src/OpenTelemetry/Metrics/Metric.cs b/src/OpenTelemetry/Metrics/Metric.cs index 983976d5e9e..2abd0b6e6d5 100644 --- a/src/OpenTelemetry/Metrics/Metric.cs +++ b/src/OpenTelemetry/Metrics/Metric.cs @@ -128,7 +128,10 @@ internal Metric( public MetricPointsAccessor GetMetricPoints() { - return this.aggStore.GetMetricPoints(); + // Note: This appears to be safe for all existing unit tests. + // This is safe because the enumerator only moves forward. + // This may not be desirable for all use cases, in which we would need a new public method. + return this.aggStore.GetDeepCloneMetricPoints(); } internal void UpdateLong(long value, ReadOnlySpan> tags) diff --git a/src/OpenTelemetry/Metrics/MetricPoint.cs b/src/OpenTelemetry/Metrics/MetricPoint.cs index e91c1253ace..b416d5a2382 100644 --- a/src/OpenTelemetry/Metrics/MetricPoint.cs +++ b/src/OpenTelemetry/Metrics/MetricPoint.cs @@ -28,7 +28,8 @@ public struct MetricPoint { private readonly AggregationType aggType; - private readonly HistogramBuckets histogramBuckets; + // Note: not necessary to remove this, but makes DeepCopy easier. + //private readonly HistogramBuckets histogramBuckets; // Represents temporality adjusted "value" for double/long metric types or "count" when histogram private MetricPointValueStorage runningValue; @@ -59,18 +60,35 @@ internal MetricPoint( if (this.aggType == AggregationType.Histogram) { - this.histogramBuckets = new HistogramBuckets(histogramExplicitBounds); + this.HistogramBuckets = new HistogramBuckets(histogramExplicitBounds); } else if (this.aggType == AggregationType.HistogramSumCount) { - this.histogramBuckets = new HistogramBuckets(null); + this.HistogramBuckets = new HistogramBuckets(null); } else { - this.histogramBuckets = null; + this.HistogramBuckets = null; } } + //internal MetricPoint(MetricPoint other) + //{ + // Note: This is an alternative to the DeepCopy method below. + // I'm not a fan of this because it creates extra maintenance. + + // this.aggType = other.aggType; + // this.HistogramBuckets = other.HistogramBuckets?.DeepClone(); + // this.runningValue = other.runningValue; + // this.snapshotValue = other.snapshotValue; + // this.deltaLastValue = other.deltaLastValue; + // this.Tags = other.Tags; + // this.MetricPointStatus = other.MetricPointStatus; + // this.Tags = other.Tags; + // this.StartTime = other.StartTime; + // this.EndTime = other.EndTime; + //} + /// /// Gets the tags associated with the metric point. /// @@ -113,6 +131,13 @@ internal MetricPointStatus MetricPointStatus private set; } + // Note: changing this to a Property loses the "readonly", + // but enables DeepCopy using the MemberwiseClone pattern below. + private HistogramBuckets HistogramBuckets + { + get; set; + } + /// /// Gets the sum long value associated with the metric point. /// @@ -218,7 +243,7 @@ public double GetHistogramSum() this.ThrowNotSupportedMetricTypeException(nameof(this.GetHistogramSum)); } - return this.histogramBuckets.SnapshotSum; + return this.HistogramBuckets.SnapshotSum; } /// @@ -227,7 +252,7 @@ public double GetHistogramSum() /// /// Applies to metric type. /// - /// . + /// . [MethodImpl(MethodImplOptions.AggressiveInlining)] public HistogramBuckets GetHistogramBuckets() { @@ -236,7 +261,17 @@ public HistogramBuckets GetHistogramBuckets() this.ThrowNotSupportedMetricTypeException(nameof(this.GetHistogramBuckets)); } - return this.histogramBuckets; + return this.HistogramBuckets; + } + + // this would be easier to maintain. + // Currently this doesn't work because histogramBuckets is readonly, but could hack using Reflection. + internal MetricPoint DeepCopy() + { + var other = (MetricPoint)this.MemberwiseClone(); + other.HistogramBuckets = this.HistogramBuckets?.DeepCopy(); + + return other; } internal void Update(long number) @@ -314,20 +349,20 @@ internal void Update(double number) case AggregationType.Histogram: { int i; - for (i = 0; i < this.histogramBuckets.ExplicitBounds.Length; i++) + for (i = 0; i < this.HistogramBuckets.ExplicitBounds.Length; i++) { // Upper bound is inclusive - if (number <= this.histogramBuckets.ExplicitBounds[i]) + if (number <= this.HistogramBuckets.ExplicitBounds[i]) { break; } } - lock (this.histogramBuckets.LockObject) + lock (this.HistogramBuckets.LockObject) { this.runningValue.AsLong++; - this.histogramBuckets.RunningSum += number; - this.histogramBuckets.RunningBucketCounts[i]++; + this.HistogramBuckets.RunningSum += number; + this.HistogramBuckets.RunningBucketCounts[i]++; } break; @@ -335,10 +370,10 @@ internal void Update(double number) case AggregationType.HistogramSumCount: { - lock (this.histogramBuckets.LockObject) + lock (this.HistogramBuckets.LockObject) { this.runningValue.AsLong++; - this.histogramBuckets.RunningSum += number; + this.HistogramBuckets.RunningSum += number; } break; @@ -460,22 +495,22 @@ internal void TakeSnapshot(bool outputDelta) case AggregationType.Histogram: { - lock (this.histogramBuckets.LockObject) + lock (this.HistogramBuckets.LockObject) { this.snapshotValue.AsLong = this.runningValue.AsLong; - this.histogramBuckets.SnapshotSum = this.histogramBuckets.RunningSum; + this.HistogramBuckets.SnapshotSum = this.HistogramBuckets.RunningSum; if (outputDelta) { this.runningValue.AsLong = 0; - this.histogramBuckets.RunningSum = 0; + this.HistogramBuckets.RunningSum = 0; } - for (int i = 0; i < this.histogramBuckets.RunningBucketCounts.Length; i++) + for (int i = 0; i < this.HistogramBuckets.RunningBucketCounts.Length; i++) { - this.histogramBuckets.SnapshotBucketCounts[i] = this.histogramBuckets.RunningBucketCounts[i]; + this.HistogramBuckets.SnapshotBucketCounts[i] = this.HistogramBuckets.RunningBucketCounts[i]; if (outputDelta) { - this.histogramBuckets.RunningBucketCounts[i] = 0; + this.HistogramBuckets.RunningBucketCounts[i] = 0; } } @@ -487,14 +522,14 @@ internal void TakeSnapshot(bool outputDelta) case AggregationType.HistogramSumCount: { - lock (this.histogramBuckets.LockObject) + lock (this.HistogramBuckets.LockObject) { this.snapshotValue.AsLong = this.runningValue.AsLong; - this.histogramBuckets.SnapshotSum = this.histogramBuckets.RunningSum; + this.HistogramBuckets.SnapshotSum = this.HistogramBuckets.RunningSum; if (outputDelta) { this.runningValue.AsLong = 0; - this.histogramBuckets.RunningSum = 0; + this.HistogramBuckets.RunningSum = 0; } this.MetricPointStatus = MetricPointStatus.NoCollectPending; diff --git a/test/OpenTelemetry.Tests/Metrics/InMemoryExporterTests.cs b/test/OpenTelemetry.Tests/Metrics/InMemoryExporterTests.cs index 5f5914be28e..926097728d8 100644 --- a/test/OpenTelemetry.Tests/Metrics/InMemoryExporterTests.cs +++ b/test/OpenTelemetry.Tests/Metrics/InMemoryExporterTests.cs @@ -16,15 +16,17 @@ using System.Collections.Generic; using System.Diagnostics.Metrics; + using OpenTelemetry.Exporter; using OpenTelemetry.Tests; + using Xunit; namespace OpenTelemetry.Metrics.Tests { public class InMemoryExporterTests { - [Fact(Skip = "To be run after https://github.com/open-telemetry/opentelemetry-dotnet/issues/2361 is fixed")] + [Fact] public void InMemoryExporterShouldDeepCopyMetricPoints() { var exportedItems = new List(); @@ -56,7 +58,7 @@ public void InMemoryExporterShouldDeepCopyMetricPoints() meterProvider.ForceFlush(); - metric = exportedItems[1]; // Second Metric object is added to the collection at this point + metric = exportedItems[0]; // Second Metric object is added to the collection at this point metricPointsEnumerator = metric.GetMetricPoints().GetEnumerator(); Assert.True(metricPointsEnumerator.MoveNext()); // One MetricPoint is emitted for the Metric var metricPointForSecondExport = metricPointsEnumerator.Current; @@ -65,5 +67,48 @@ public void InMemoryExporterShouldDeepCopyMetricPoints() // MetricPoint.LongValue for the first exporter metric should still be 10 Assert.Equal(10, metricPointForFirstExport.GetSumLong()); } + + [Fact] + public void Investigate_2361() + { + // https://github.com/open-telemetry/opentelemetry-dotnet/issues/2361 + + var exportedItems = new List(); + + using var meter = new Meter(Utils.GetCurrentMethodName()); + using var meterProvider = Sdk.CreateMeterProviderBuilder() + .AddMeter(meter.Name) + .AddInMemoryExporter(exportedItems) + .Build(); + + int i = 0; + var counterLong = meter.CreateObservableCounter( + "observable-counter", + () => ++i * 10); + + meterProvider.ForceFlush(); + Assert.Equal(1, i); // verify that callback is invoked when calling Flush + Assert.Single(exportedItems); // verify that List contains 1 item + var metricPoint1 = GetSingleMetricPoint(exportedItems[0]); + Assert.Equal(10, metricPoint1.GetSumLong()); + + meterProvider.ForceFlush(); + Assert.Equal(2, i); // verify that callback is invoked when calling Flush + Assert.Single(exportedItems); // verify that List contains 1 item + var metricPoint2 = GetSingleMetricPoint(exportedItems[0]); + Assert.Equal(20, metricPoint2.GetSumLong()); + + // Retest 1st item, this is expected to be unchanged. + Assert.Equal(10, metricPoint1.GetSumLong()); + } + + private static MetricPoint GetSingleMetricPoint(Metric metric) + { + var metricPointsEnumerator = metric.GetMetricPoints().GetEnumerator(); + Assert.True(metricPointsEnumerator.MoveNext()); // One MetricPoint is emitted for the Metric + ref readonly var metricPoint = ref metricPointsEnumerator.Current; + Assert.False(metricPointsEnumerator.MoveNext()); + return metricPoint; + } } }