From f77a4e9b6a637aef6ac8728a471948ef8c1c85ee Mon Sep 17 00:00:00 2001 From: Cijo Thomas Date: Wed, 6 Oct 2021 12:26:46 -0700 Subject: [PATCH] Handle exception from observable instruments --- src/OpenTelemetry/CHANGELOG.md | 3 + .../Internal/OpenTelemetrySdkEventSource.cs | 20 +++-- src/OpenTelemetry/Metrics/AggregatorStore.cs | 16 ++-- src/OpenTelemetry/Metrics/MeterProviderSdk.cs | 14 +++- .../Metrics/MetricAPITest.cs | 74 +++++++++++++++++++ 5 files changed, 114 insertions(+), 13 deletions(-) diff --git a/src/OpenTelemetry/CHANGELOG.md b/src/OpenTelemetry/CHANGELOG.md index 8a42e851daa..5bfcd8bc8ef 100644 --- a/src/OpenTelemetry/CHANGELOG.md +++ b/src/OpenTelemetry/CHANGELOG.md @@ -2,6 +2,9 @@ ## Unreleased +* Exception from Observable instrument callbacks does not + result in entire metrics being lost. + * SDK is allocation-free on recording of measurements with upto 8 tags. diff --git a/src/OpenTelemetry/Internal/OpenTelemetrySdkEventSource.cs b/src/OpenTelemetry/Internal/OpenTelemetrySdkEventSource.cs index 19778a7f790..67042d9d9e4 100644 --- a/src/OpenTelemetry/Internal/OpenTelemetrySdkEventSource.cs +++ b/src/OpenTelemetry/Internal/OpenTelemetrySdkEventSource.cs @@ -55,11 +55,21 @@ public void TracestateExtractException(Exception ex) } [NonEvent] - public void MetricObserverCallbackException(string metricName, Exception ex) + public void MetricObserverCallbackException(Exception exception) { if (this.IsEnabled(EventLevel.Warning, EventKeywords.All)) { - this.MetricObserverCallbackError(metricName, ex.ToInvariantString()); + if (exception is AggregateException aggregateException) + { + foreach (var ex in aggregateException.InnerExceptions) + { + this.ObservableInstrumentCallbackException(ex.ToInvariantString()); + } + } + else + { + this.ObservableInstrumentCallbackException(exception.ToInvariantString()); + } } } @@ -238,10 +248,10 @@ public void AttemptToActivateOobSpan(string spanName) this.WriteEvent(15, spanName); } - [Event(16, Message = "Exception occurring while invoking Metric Observer callback. '{0}' Exception: '{1}'", Level = EventLevel.Warning)] - public void MetricObserverCallbackError(string metricName, string exception) + [Event(16, Message = "Exception occurred while invoking Observable instrument callback. Exception: '{0}'", Level = EventLevel.Warning)] + public void ObservableInstrumentCallbackException(string exception) { - this.WriteEvent(16, metricName, exception); + this.WriteEvent(16, exception); } [Event(17, Message = "Batcher finished collection with '{0}' metrics.", Level = EventLevel.Informational)] diff --git a/src/OpenTelemetry/Metrics/AggregatorStore.cs b/src/OpenTelemetry/Metrics/AggregatorStore.cs index e151875b3a9..b7707de7eec 100644 --- a/src/OpenTelemetry/Metrics/AggregatorStore.cs +++ b/src/OpenTelemetry/Metrics/AggregatorStore.cs @@ -34,16 +34,17 @@ internal class AggregatorStore private readonly ConcurrentDictionary> keyValue2MetricAggs = new ConcurrentDictionary>(new StringArrayEqualityComparer()); - private AggregationTemporality temporality; - private MetricPoint[] metrics; + private readonly AggregationTemporality temporality; + private readonly bool outputDelta; + private readonly MetricPoint[] metrics; + private readonly AggregationType aggType; + private readonly double[] histogramBounds; + private readonly UpdateLongDelegate updateLongCallback; + private readonly UpdateDoubleDelegate updateDoubleCallback; private int metricPointIndex = 0; private bool zeroTagMetricPointInitialized; - private AggregationType aggType; - private double[] histogramBounds; private DateTimeOffset startTimeExclusive; private DateTimeOffset endTimeInclusive; - private UpdateLongDelegate updateLongCallback; - private UpdateDoubleDelegate updateDoubleCallback; internal AggregatorStore( AggregationType aggType, @@ -54,6 +55,7 @@ internal AggregatorStore( this.metrics = new MetricPoint[MaxMetricPoints]; this.aggType = aggType; this.temporality = temporality; + this.outputDelta = temporality == AggregationTemporality.Delta ? true : false; this.histogramBounds = histogramBounds; this.startTimeExclusive = DateTimeOffset.UtcNow; if (tagKeysInteresting == null) @@ -102,7 +104,7 @@ internal void SnapShot() continue; } - metricPoint.TakeSnapShot(this.temporality == AggregationTemporality.Delta ? true : false); + metricPoint.TakeSnapShot(this.outputDelta); } if (this.temporality == AggregationTemporality.Delta) diff --git a/src/OpenTelemetry/Metrics/MeterProviderSdk.cs b/src/OpenTelemetry/Metrics/MeterProviderSdk.cs index e3670bc9eca..771412bc428 100644 --- a/src/OpenTelemetry/Metrics/MeterProviderSdk.cs +++ b/src/OpenTelemetry/Metrics/MeterProviderSdk.cs @@ -19,6 +19,7 @@ using System.Diagnostics; using System.Diagnostics.Metrics; using System.Linq; +using OpenTelemetry.Internal; using OpenTelemetry.Resources; namespace OpenTelemetry.Metrics @@ -348,7 +349,18 @@ internal Batch Collect() try { // Record all observable instruments - this.listener.RecordObservableInstruments(); + try + { + this.listener.RecordObservableInstruments(); + } + catch (Exception exception) + { + // TODO: + // It doesn't looks like we can find which instrument callback + // threw. + OpenTelemetrySdkEventSource.Log.MetricObserverCallbackException(exception); + } + var indexSnapShot = Math.Min(this.metricIndex, MaxMetrics - 1); var target = indexSnapShot + 1; for (int i = 0; i < target; i++) diff --git a/test/OpenTelemetry.Tests/Metrics/MetricAPITest.cs b/test/OpenTelemetry.Tests/Metrics/MetricAPITest.cs index 360ddc67369..3bd95500084 100644 --- a/test/OpenTelemetry.Tests/Metrics/MetricAPITest.cs +++ b/test/OpenTelemetry.Tests/Metrics/MetricAPITest.cs @@ -25,8 +25,10 @@ namespace OpenTelemetry.Metrics.Tests { +#pragma warning disable SA1000 // KeywordsMustBeSpacedCorrectly https://github.com/DotNetAnalyzers/StyleCopAnalyzers/issues/3214 public class MetricApiTest { + private const int MaxTimeToAllowForFlush = 10000; private static int numberOfThreads = Environment.ProcessorCount; private static long deltaLongValueUpdatedByEachCall = 10; private static double deltaDoubleValueUpdatedByEachCall = 11.987; @@ -38,6 +40,77 @@ public MetricApiTest(ITestOutputHelper output) this.output = output; } + [Fact] + public void ObserverCallbackTest() + { + using var meter = new Meter("ObserverCallbackErrorTest"); + var exportedItems = new List(); + using var meterProvider = Sdk.CreateMeterProviderBuilder() + .AddMeter(meter.Name) + .AddInMemoryExporter(exportedItems) + .Build(); + + var measurement = new Measurement(100, new("name", "apple"), new("color", "red")); + meter.CreateObservableGauge("myGauge", () => measurement); + + meterProvider.ForceFlush(MaxTimeToAllowForFlush); + Assert.Single(exportedItems); + var metric = exportedItems[0]; + Assert.Equal("myGauge", metric.Name); + List metricPoints = new List(); + foreach (ref var mp in metric.GetMetricPoints()) + { + metricPoints.Add(mp); + } + + Assert.Single(metricPoints); + var metricPoint = metricPoints[0]; + Assert.Equal(100, metricPoint.LongValue); + Assert.NotNull(metricPoint.Keys); + Assert.NotNull(metricPoint.Values); + } + + [Fact] + public void ObserverCallbackExceptionTest() + { + using var meter = new Meter("ObserverCallbackErrorTest"); + var exportedItems = new List(); + using var meterProvider = Sdk.CreateMeterProviderBuilder() + .AddMeter(meter.Name) + .AddInMemoryExporter(exportedItems) + .Build(); + + var measurement = new Measurement(100, new("name", "apple"), new("color", "red")); + meter.CreateObservableGauge("myGauge", () => measurement); + meter.CreateObservableGauge("myBadGauge", observeValues: () => throw new Exception("gauge read error")); + + meterProvider.ForceFlush(MaxTimeToAllowForFlush); + Assert.Equal(2, exportedItems.Count); + var metric = exportedItems[0]; + Assert.Equal("myGauge", metric.Name); + List metricPoints = new List(); + foreach (ref var mp in metric.GetMetricPoints()) + { + metricPoints.Add(mp); + } + + Assert.Single(metricPoints); + var metricPoint = metricPoints[0]; + Assert.Equal(100, metricPoint.LongValue); + Assert.NotNull(metricPoint.Keys); + Assert.NotNull(metricPoint.Values); + + metric = exportedItems[1]; + Assert.Equal("myBadGauge", metric.Name); + metricPoints.Clear(); + foreach (ref var mp in metric.GetMetricPoints()) + { + metricPoints.Add(mp); + } + + Assert.Empty(metricPoints); + } + [Theory] [InlineData(AggregationTemporality.Cumulative)] [InlineData(AggregationTemporality.Delta)] @@ -495,4 +568,5 @@ private class UpdateThreadArguments public T DeltaValueUpdatedByEachCall; } } +#pragma warning restore SA1000 // KeywordsMustBeSpacedCorrectly }