Skip to content

Commit

Permalink
Handle exception from observable instruments (#2457)
Browse files Browse the repository at this point in the history
  • Loading branch information
cijothomas authored Oct 6, 2021
1 parent 3d16cd8 commit dd6b11b
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 13 deletions.
3 changes: 3 additions & 0 deletions src/OpenTelemetry/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

## Unreleased

* Exception from Observable instrument callbacks does not
result in entire metrics being lost.

* SDK is allocation-free on recording of measurements with
upto 8 tags.

Expand Down
20 changes: 15 additions & 5 deletions src/OpenTelemetry/Internal/OpenTelemetrySdkEventSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,21 @@ public void TracestateExtractException(Exception ex)
}

[NonEvent]
public void MetricObserverCallbackException(string metricName, Exception ex)
public void MetricObserverCallbackException(Exception exception)
{
if (this.IsEnabled(EventLevel.Warning, EventKeywords.All))
{
this.MetricObserverCallbackError(metricName, ex.ToInvariantString());
if (exception is AggregateException aggregateException)
{
foreach (var ex in aggregateException.InnerExceptions)
{
this.ObservableInstrumentCallbackException(ex.ToInvariantString());
}
}
else
{
this.ObservableInstrumentCallbackException(exception.ToInvariantString());
}
}
}

Expand Down Expand Up @@ -238,10 +248,10 @@ public void AttemptToActivateOobSpan(string spanName)
this.WriteEvent(15, spanName);
}

[Event(16, Message = "Exception occurring while invoking Metric Observer callback. '{0}' Exception: '{1}'", Level = EventLevel.Warning)]
public void MetricObserverCallbackError(string metricName, string exception)
[Event(16, Message = "Exception occurred while invoking Observable instrument callback. Exception: '{0}'", Level = EventLevel.Warning)]
public void ObservableInstrumentCallbackException(string exception)
{
this.WriteEvent(16, metricName, exception);
this.WriteEvent(16, exception);
}

[Event(17, Message = "Batcher finished collection with '{0}' metrics.", Level = EventLevel.Informational)]
Expand Down
16 changes: 9 additions & 7 deletions src/OpenTelemetry/Metrics/AggregatorStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,17 @@ internal class AggregatorStore
private readonly ConcurrentDictionary<string[], ConcurrentDictionary<object[], int>> keyValue2MetricAggs =
new ConcurrentDictionary<string[], ConcurrentDictionary<object[], int>>(new StringArrayEqualityComparer());

private AggregationTemporality temporality;
private MetricPoint[] metrics;
private readonly AggregationTemporality temporality;
private readonly bool outputDelta;
private readonly MetricPoint[] metrics;
private readonly AggregationType aggType;
private readonly double[] histogramBounds;
private readonly UpdateLongDelegate updateLongCallback;
private readonly UpdateDoubleDelegate updateDoubleCallback;
private int metricPointIndex = 0;
private bool zeroTagMetricPointInitialized;
private AggregationType aggType;
private double[] histogramBounds;
private DateTimeOffset startTimeExclusive;
private DateTimeOffset endTimeInclusive;
private UpdateLongDelegate updateLongCallback;
private UpdateDoubleDelegate updateDoubleCallback;

internal AggregatorStore(
AggregationType aggType,
Expand All @@ -54,6 +55,7 @@ internal AggregatorStore(
this.metrics = new MetricPoint[MaxMetricPoints];
this.aggType = aggType;
this.temporality = temporality;
this.outputDelta = temporality == AggregationTemporality.Delta ? true : false;
this.histogramBounds = histogramBounds;
this.startTimeExclusive = DateTimeOffset.UtcNow;
if (tagKeysInteresting == null)
Expand Down Expand Up @@ -102,7 +104,7 @@ internal void SnapShot()
continue;
}

metricPoint.TakeSnapShot(this.temporality == AggregationTemporality.Delta ? true : false);
metricPoint.TakeSnapShot(this.outputDelta);
}

if (this.temporality == AggregationTemporality.Delta)
Expand Down
14 changes: 13 additions & 1 deletion src/OpenTelemetry/Metrics/MeterProviderSdk.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
using System.Diagnostics;
using System.Diagnostics.Metrics;
using System.Linq;
using OpenTelemetry.Internal;
using OpenTelemetry.Resources;

namespace OpenTelemetry.Metrics
Expand Down Expand Up @@ -348,7 +349,18 @@ internal Batch<Metric> Collect()
try
{
// Record all observable instruments
this.listener.RecordObservableInstruments();
try
{
this.listener.RecordObservableInstruments();
}
catch (Exception exception)
{
// TODO:
// It doesn't looks like we can find which instrument callback
// threw.
OpenTelemetrySdkEventSource.Log.MetricObserverCallbackException(exception);
}

var indexSnapShot = Math.Min(this.metricIndex, MaxMetrics - 1);
var target = indexSnapShot + 1;
for (int i = 0; i < target; i++)
Expand Down
74 changes: 74 additions & 0 deletions test/OpenTelemetry.Tests/Metrics/MetricAPITest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@

namespace OpenTelemetry.Metrics.Tests
{
#pragma warning disable SA1000 // KeywordsMustBeSpacedCorrectly https://github.com/DotNetAnalyzers/StyleCopAnalyzers/issues/3214
public class MetricApiTest
{
private const int MaxTimeToAllowForFlush = 10000;
private static int numberOfThreads = Environment.ProcessorCount;
private static long deltaLongValueUpdatedByEachCall = 10;
private static double deltaDoubleValueUpdatedByEachCall = 11.987;
Expand All @@ -38,6 +40,77 @@ public MetricApiTest(ITestOutputHelper output)
this.output = output;
}

[Fact]
public void ObserverCallbackTest()
{
using var meter = new Meter("ObserverCallbackErrorTest");
var exportedItems = new List<Metric>();
using var meterProvider = Sdk.CreateMeterProviderBuilder()
.AddMeter(meter.Name)
.AddInMemoryExporter(exportedItems)
.Build();

var measurement = new Measurement<int>(100, new("name", "apple"), new("color", "red"));
meter.CreateObservableGauge("myGauge", () => measurement);

meterProvider.ForceFlush(MaxTimeToAllowForFlush);
Assert.Single(exportedItems);
var metric = exportedItems[0];
Assert.Equal("myGauge", metric.Name);
List<MetricPoint> metricPoints = new List<MetricPoint>();
foreach (ref var mp in metric.GetMetricPoints())
{
metricPoints.Add(mp);
}

Assert.Single(metricPoints);
var metricPoint = metricPoints[0];
Assert.Equal(100, metricPoint.LongValue);
Assert.NotNull(metricPoint.Keys);
Assert.NotNull(metricPoint.Values);
}

[Fact]
public void ObserverCallbackExceptionTest()
{
using var meter = new Meter("ObserverCallbackErrorTest");
var exportedItems = new List<Metric>();
using var meterProvider = Sdk.CreateMeterProviderBuilder()
.AddMeter(meter.Name)
.AddInMemoryExporter(exportedItems)
.Build();

var measurement = new Measurement<int>(100, new("name", "apple"), new("color", "red"));
meter.CreateObservableGauge("myGauge", () => measurement);
meter.CreateObservableGauge<long>("myBadGauge", observeValues: () => throw new Exception("gauge read error"));

meterProvider.ForceFlush(MaxTimeToAllowForFlush);
Assert.Equal(2, exportedItems.Count);
var metric = exportedItems[0];
Assert.Equal("myGauge", metric.Name);
List<MetricPoint> metricPoints = new List<MetricPoint>();
foreach (ref var mp in metric.GetMetricPoints())
{
metricPoints.Add(mp);
}

Assert.Single(metricPoints);
var metricPoint = metricPoints[0];
Assert.Equal(100, metricPoint.LongValue);
Assert.NotNull(metricPoint.Keys);
Assert.NotNull(metricPoint.Values);

metric = exportedItems[1];
Assert.Equal("myBadGauge", metric.Name);
metricPoints.Clear();
foreach (ref var mp in metric.GetMetricPoints())
{
metricPoints.Add(mp);
}

Assert.Empty(metricPoints);
}

[Theory]
[InlineData(AggregationTemporality.Cumulative)]
[InlineData(AggregationTemporality.Delta)]
Expand Down Expand Up @@ -495,4 +568,5 @@ private class UpdateThreadArguments<T>
public T DeltaValueUpdatedByEachCall;
}
}
#pragma warning restore SA1000 // KeywordsMustBeSpacedCorrectly
}

0 comments on commit dd6b11b

Please sign in to comment.