Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle exception from observable instruments #2457

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/OpenTelemetry/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

## Unreleased

* Exception from Observable instrument callbacks does not
result in entire metrics being lost.

* SDK is allocation-free on recording of measurements with
upto 8 tags.

Expand Down
20 changes: 15 additions & 5 deletions src/OpenTelemetry/Internal/OpenTelemetrySdkEventSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,21 @@ public void TracestateExtractException(Exception ex)
}

[NonEvent]
public void MetricObserverCallbackException(string metricName, Exception ex)
public void MetricObserverCallbackException(Exception exception)
{
if (this.IsEnabled(EventLevel.Warning, EventKeywords.All))
{
this.MetricObserverCallbackError(metricName, ex.ToInvariantString());
if (exception is AggregateException aggregateException)
{
foreach (var ex in aggregateException.InnerExceptions)
{
this.ObservableInstrumentCallbackException(ex.ToInvariantString());
}
}
else
{
this.ObservableInstrumentCallbackException(exception.ToInvariantString());
}
}
}

Expand Down Expand Up @@ -238,10 +248,10 @@ public void AttemptToActivateOobSpan(string spanName)
this.WriteEvent(15, spanName);
}

[Event(16, Message = "Exception occurring while invoking Metric Observer callback. '{0}' Exception: '{1}'", Level = EventLevel.Warning)]
public void MetricObserverCallbackError(string metricName, string exception)
[Event(16, Message = "Exception occurred while invoking Observable instrument callback. Exception: '{0}'", Level = EventLevel.Warning)]
public void ObservableInstrumentCallbackException(string exception)
{
this.WriteEvent(16, metricName, exception);
this.WriteEvent(16, exception);
}

[Event(17, Message = "Batcher finished collection with '{0}' metrics.", Level = EventLevel.Informational)]
Expand Down
16 changes: 9 additions & 7 deletions src/OpenTelemetry/Metrics/AggregatorStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,17 @@ internal class AggregatorStore
private readonly ConcurrentDictionary<string[], ConcurrentDictionary<object[], int>> keyValue2MetricAggs =
new ConcurrentDictionary<string[], ConcurrentDictionary<object[], int>>(new StringArrayEqualityComparer());

private AggregationTemporality temporality;
private MetricPoint[] metrics;
private readonly AggregationTemporality temporality;
private readonly bool outputDelta;
private readonly MetricPoint[] metrics;
private readonly AggregationType aggType;
private readonly double[] histogramBounds;
private readonly UpdateLongDelegate updateLongCallback;
private readonly UpdateDoubleDelegate updateDoubleCallback;
private int metricPointIndex = 0;
private bool zeroTagMetricPointInitialized;
private AggregationType aggType;
private double[] histogramBounds;
private DateTimeOffset startTimeExclusive;
private DateTimeOffset endTimeInclusive;
private UpdateLongDelegate updateLongCallback;
private UpdateDoubleDelegate updateDoubleCallback;

internal AggregatorStore(
AggregationType aggType,
Expand All @@ -54,6 +55,7 @@ internal AggregatorStore(
this.metrics = new MetricPoint[MaxMetricPoints];
this.aggType = aggType;
this.temporality = temporality;
this.outputDelta = temporality == AggregationTemporality.Delta ? true : false;
this.histogramBounds = histogramBounds;
this.startTimeExclusive = DateTimeOffset.UtcNow;
if (tagKeysInteresting == null)
Expand Down Expand Up @@ -102,7 +104,7 @@ internal void SnapShot()
continue;
}

metricPoint.TakeSnapShot(this.temporality == AggregationTemporality.Delta ? true : false);
metricPoint.TakeSnapShot(this.outputDelta);
}

if (this.temporality == AggregationTemporality.Delta)
Expand Down
14 changes: 13 additions & 1 deletion src/OpenTelemetry/Metrics/MeterProviderSdk.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
using System.Diagnostics;
using System.Diagnostics.Metrics;
using System.Linq;
using OpenTelemetry.Internal;
using OpenTelemetry.Resources;

namespace OpenTelemetry.Metrics
Expand Down Expand Up @@ -348,7 +349,18 @@ internal Batch<Metric> Collect()
try
{
// Record all observable instruments
this.listener.RecordObservableInstruments();
try
{
this.listener.RecordObservableInstruments();
}
catch (Exception exception)
{
// TODO:
// It doesn't looks like we can find which instrument callback
// threw.
OpenTelemetrySdkEventSource.Log.MetricObserverCallbackException(exception);
}

var indexSnapShot = Math.Min(this.metricIndex, MaxMetrics - 1);
var target = indexSnapShot + 1;
for (int i = 0; i < target; i++)
Expand Down
74 changes: 74 additions & 0 deletions test/OpenTelemetry.Tests/Metrics/MetricAPITest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@

namespace OpenTelemetry.Metrics.Tests
{
#pragma warning disable SA1000 // KeywordsMustBeSpacedCorrectly https://github.com/DotNetAnalyzers/StyleCopAnalyzers/issues/3214
public class MetricApiTest
{
private const int MaxTimeToAllowForFlush = 10000;
private static int numberOfThreads = Environment.ProcessorCount;
private static long deltaLongValueUpdatedByEachCall = 10;
private static double deltaDoubleValueUpdatedByEachCall = 11.987;
Expand All @@ -38,6 +40,77 @@ public MetricApiTest(ITestOutputHelper output)
this.output = output;
}

[Fact]
public void ObserverCallbackTest()
{
using var meter = new Meter("ObserverCallbackErrorTest");
var exportedItems = new List<Metric>();
using var meterProvider = Sdk.CreateMeterProviderBuilder()
.AddMeter(meter.Name)
.AddInMemoryExporter(exportedItems)
.Build();

var measurement = new Measurement<int>(100, new("name", "apple"), new("color", "red"));
meter.CreateObservableGauge("myGauge", () => measurement);

meterProvider.ForceFlush(MaxTimeToAllowForFlush);
Assert.Single(exportedItems);
var metric = exportedItems[0];
Assert.Equal("myGauge", metric.Name);
List<MetricPoint> metricPoints = new List<MetricPoint>();
foreach (ref var mp in metric.GetMetricPoints())
{
metricPoints.Add(mp);
}

Assert.Single(metricPoints);
var metricPoint = metricPoints[0];
Assert.Equal(100, metricPoint.LongValue);
Assert.NotNull(metricPoint.Keys);
Assert.NotNull(metricPoint.Values);
}

[Fact]
public void ObserverCallbackExceptionTest()
{
using var meter = new Meter("ObserverCallbackErrorTest");
var exportedItems = new List<Metric>();
using var meterProvider = Sdk.CreateMeterProviderBuilder()
.AddMeter(meter.Name)
.AddInMemoryExporter(exportedItems)
.Build();

var measurement = new Measurement<int>(100, new("name", "apple"), new("color", "red"));
meter.CreateObservableGauge("myGauge", () => measurement);
meter.CreateObservableGauge<long>("myBadGauge", observeValues: () => throw new Exception("gauge read error"));

meterProvider.ForceFlush(MaxTimeToAllowForFlush);
Assert.Equal(2, exportedItems.Count);
var metric = exportedItems[0];
Assert.Equal("myGauge", metric.Name);
List<MetricPoint> metricPoints = new List<MetricPoint>();
foreach (ref var mp in metric.GetMetricPoints())
{
metricPoints.Add(mp);
}

Assert.Single(metricPoints);
var metricPoint = metricPoints[0];
Assert.Equal(100, metricPoint.LongValue);
Assert.NotNull(metricPoint.Keys);
Assert.NotNull(metricPoint.Values);

metric = exportedItems[1];
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

even if no MetricPoints are there, we still emit the Metric. This is something to be fixed along with #2360

Assert.Equal("myBadGauge", metric.Name);
metricPoints.Clear();
foreach (ref var mp in metric.GetMetricPoints())
{
metricPoints.Add(mp);
}

Assert.Empty(metricPoints);
}

[Theory]
[InlineData(AggregationTemporality.Cumulative)]
[InlineData(AggregationTemporality.Delta)]
Expand Down Expand Up @@ -495,4 +568,5 @@ private class UpdateThreadArguments<T>
public T DeltaValueUpdatedByEachCall;
}
}
#pragma warning restore SA1000 // KeywordsMustBeSpacedCorrectly
}