Skip to content

Commit

Permalink
Fix concurrency issue with metrics gauge value collection (dotnet#106175
Browse files Browse the repository at this point in the history
)

* Fix concurrency issue with metrics gauge value collection

* Address the feedback
  • Loading branch information
tarekgh authored Aug 9, 2024
1 parent afa79b3 commit 81d7ff1
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ private void RemoveInstrumentState(Instrument instrument)
}
};
}
else if (genericDefType == typeof(ObservableGauge<>) || genericDefType == typeof(Gauge<>))
else if (genericDefType == typeof(ObservableGauge<>))
{
return () =>
{
Expand All @@ -338,6 +338,16 @@ private void RemoveInstrumentState(Instrument instrument)
}
};
}
else if (genericDefType == typeof(Gauge<>))
{
return () =>
{
lock (this)
{
return CheckTimeSeriesAllowed() ? new SynchronousLastValue() : null;
}
};
}
else if (genericDefType == typeof(Histogram<>))
{
return () =>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.Threading;

namespace System.Diagnostics.Metrics
{
// Aggregator that keeps the last value it received until Collect().
// This class is used with the observable gauge that always called from a single thread during the collection.
// It is safe to use it without synchronization.
internal sealed class LastValue : Aggregator
{
private double? _lastValue;
Expand Down Expand Up @@ -32,4 +37,26 @@ internal LastValueStatistics(double? lastValue)

public double? LastValue { get; }
}

// Aggregator that keeps the last value it received.
// This class is used with the synchronous gauge that can be called from multiple threads during the collection.
// It uses volatile read/write to ensure the visibility of the last value.
internal sealed class SynchronousLastValue : Aggregator
{
private double _lastValue;

public override void Update(double value) => Volatile.Write(ref _lastValue, value);

public override IAggregationStatistics Collect() => new SynchronousLastValueStatistics(Volatile.Read(ref _lastValue));
}

internal sealed class SynchronousLastValueStatistics : IAggregationStatistics
{
internal SynchronousLastValueStatistics(double lastValue)
{
LastValue = lastValue;
}

public double LastValue { get; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -649,6 +649,12 @@ private void ParseSpecs(string? metricsSpecs)
return;
}

if (metricsSpecs.Length == 0)
{
_aggregationManager!.IncludeAll();
return;
}

string[] specStrings = metricsSpecs.Split(s_instrumentSeparators, StringSplitOptions.RemoveEmptyEntries);
foreach (string specString in specStrings)
{
Expand Down Expand Up @@ -699,6 +705,11 @@ private static void TransmitMetricValue(Instrument instrument, LabeledAggregatio
Log.GaugeValuePublished(sessionId, instrument.Meter.Name, instrument.Meter.Version, instrument.Name, instrument.Unit, Helpers.FormatTags(stats.Labels),
lastValueStats.LastValue.HasValue ? lastValueStats.LastValue.Value.ToString(CultureInfo.InvariantCulture) : "", instrumentId);
}
else if (stats.AggregationStatistics is SynchronousLastValueStatistics synchronousLastValueStats)
{
Log.GaugeValuePublished(sessionId, instrument.Meter.Name, instrument.Meter.Version, instrument.Name, instrument.Unit, Helpers.FormatTags(stats.Labels),
synchronousLastValueStats.LastValue.ToString(CultureInfo.InvariantCulture), instrumentId);
}
else if (stats.AggregationStatistics is HistogramStatistics histogramStats)
{
Log.HistogramValuePublished(sessionId, instrument.Meter.Name, instrument.Meter.Version, instrument.Name, instrument.Unit, Helpers.FormatTags(stats.Labels), FormatQuantiles(histogramStats.Quantiles),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ public class MetricEventSourceTests
const double IntervalSecs = 10;
static readonly TimeSpan s_waitForEventTimeout = TimeSpan.FromSeconds(60);

private const string RuntimeMeterName = "System.Runtime";

public MetricEventSourceTests(ITestOutputHelper output)
{
_output = output;
Expand All @@ -35,7 +37,7 @@ public void GetInstanceMethodIsReflectable()
// Even though the the type isn't public this test ensures the GetInstance() API isn't removed or renamed.
Type? metricsEventSourceType = Type.GetType("System.Diagnostics.Metrics.MetricsEventSource, System.Diagnostics.DiagnosticSource", throwOnError: false);
Assert.True(metricsEventSourceType != null, "Unable to get MetricsEventSource type via reflection");

MethodInfo? getInstanceMethod = metricsEventSourceType.GetMethod("GetInstance", BindingFlags.NonPublic | BindingFlags.Public | BindingFlags.Static, null, Type.EmptyTypes, null);
Assert.True(getInstanceMethod != null, "Unable to get MetricsEventSource.GetInstance method via reflection");

Expand Down Expand Up @@ -247,7 +249,7 @@ public void SingleListener_Wildcard()
Counter<int> c3 = meter3.CreateCounter<int>("counter3");

EventWrittenEventArgs[] events;
using (MetricsEventListener listener = new MetricsEventListener(_output, MetricsEventListener.TimeSeriesValues, isShared: true, IntervalSecs, "*"))
using (MetricsEventListener listener = new MetricsEventListener(_output, MetricsEventListener.TimeSeriesValues, isShared: true, IntervalSecs, ""))
{
listener.WaitForCollectionStop(s_waitForEventTimeout, 1);
c.Add(5);
Expand Down Expand Up @@ -1227,7 +1229,7 @@ public void EventSourcePublishesMissingDataPoints()
AssertBeginInstrumentReportingEventsPresent(events, c, oc, og, h, udc, oudc, g);
AssertInitialEnumerationCompleteEventPresent(events);
AssertCounterEventsPresent(events, meter.Name, c.Name, "", "", ("5", "5"), ("0", "5"), ("12", "17"));
AssertGaugeEventsPresent(events, meter.Name, g.Name, "", "", "-123", "", "123", "");
AssertGaugeEventsPresent(events, meter.Name, g.Name, "", "", "-123", "-123", "123", "123");
AssertCounterEventsPresent(events, meter.Name, oc.Name, "", "", ("", "17"), ("0", "17"), ("14", "31"), ("0", "31"));
AssertGaugeEventsPresent(events, meter.Name, og.Name, "", "", "18", "", "36", "");
AssertHistogramEventsPresent(events, meter.Name, h.Name, "", "", ("0.5=19;0.95=19;0.99=19", "1", "19"), ("", "0", "0"), ("0.5=26;0.95=26;0.99=26", "1", "26"), ("", "0", "0"));
Expand Down Expand Up @@ -1290,36 +1292,40 @@ public void EventSourcePublishesEndEventsOnMeterDispose()
AssertEndInstrumentReportingEventsPresent(events, c, oc, og, udc, oudc, g);
}

[ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsNotBrowser))]
[ConditionalFact(typeof(MetricEventSourceTests), nameof(IsNotBrowserAndRemoteExecuteSupported))]
[OuterLoop("Slow and has lots of console spew")]
public void EventSourcePublishesInstruments()
{
object scope = new object();
RemoteExecutor.Invoke(static () =>
{
using Meter meterA = new Meter("TestMeter10", null, null, scope);
using Meter meterB = new Meter("TestMeter11", null, new TagList() { { "Mk1", "Mv1" }, { "Mk2", null } }, scope);
Counter<int> c = meterA.CreateCounter<int>("counter1", "hat", "Fooz!!");
Gauge<int> g = meterA.CreateGauge<int>("gauge1", "C", "Temperature");
int counterState = 3;
ObservableCounter<int> oc = meterA.CreateObservableCounter<int>("observableCounter1", () => { counterState += 7; return counterState; }, "MB", "Size of universe",
new TagList() { { "ock1", "ocv1" }, { "ock2", "ocv2" }, { "ock3", "ocv3" } });
int gaugeState = 0;
ObservableGauge<int> og = meterA.CreateObservableGauge<int>("observableGauge1", () => { gaugeState += 9; return gaugeState; }, "12394923 asd [],;/", "junk!",
new TagList() { { "ogk1", "ogv1" } });
Histogram<int> h = meterB.CreateHistogram<int>("histogram1", "a unit", "the description", new TagList() { { "hk1", "hv1" }, { "hk2", "" }, {"hk3", null } });
UpDownCounter<int> udc = meterA.CreateUpDownCounter<int>("upDownCounter1", "udc unit", "udc description", new TagList() { { "udk1", "udv1" } });
int upDownCounterState = 0;
ObservableUpDownCounter<int> oudc = meterA.CreateObservableUpDownCounter<int>("observableUpDownCounter1", () => { upDownCounterState += 11; return upDownCounterState; }, "oudc unit", "oudc description");
object scope = new object();
EventWrittenEventArgs[] events;
using (MetricsEventListener listener = new MetricsEventListener(_output, MetricsEventListener.InstrumentPublishing, null, ""))
{
listener.WaitForEnumerationComplete(s_waitForEventTimeout);
events = listener.Events.ToArray();
}
using Meter meterA = new Meter("TestMeter10", null, null, scope);
using Meter meterB = new Meter("TestMeter11", null, new TagList() { { "Mk1", "Mv1" }, { "Mk2", null } }, scope);
Counter<int> c = meterA.CreateCounter<int>("counter1", "hat", "Fooz!!");
Gauge<int> g = meterA.CreateGauge<int>("gauge1", "C", "Temperature");
int counterState = 3;
ObservableCounter<int> oc = meterA.CreateObservableCounter<int>("observableCounter1", () => { counterState += 7; return counterState; }, "MB", "Size of universe",
new TagList() { { "ock1", "ocv1" }, { "ock2", "ocv2" }, { "ock3", "ocv3" } });
int gaugeState = 0;
ObservableGauge<int> og = meterA.CreateObservableGauge<int>("observableGauge1", () => { gaugeState += 9; return gaugeState; }, "12394923 asd [],;/", "junk!",
new TagList() { { "ogk1", "ogv1" } });
Histogram<int> h = meterB.CreateHistogram<int>("histogram1", "a unit", "the description", new TagList() { { "hk1", "hv1" }, { "hk2", "" }, {"hk3", null } });
UpDownCounter<int> udc = meterA.CreateUpDownCounter<int>("upDownCounter1", "udc unit", "udc description", new TagList() { { "udk1", "udv1" } });
int upDownCounterState = 0;
ObservableUpDownCounter<int> oudc = meterA.CreateObservableUpDownCounter<int>("observableUpDownCounter1", () => { upDownCounterState += 11; return upDownCounterState; }, "oudc unit", "oudc description");
AssertInstrumentPublishingEventsPresent(events, c, oc, og, h, udc, oudc, g);
AssertInitialEnumerationCompleteEventPresent(events);
EventWrittenEventArgs[] events;
using (MetricsEventListener listener = new MetricsEventListener(NullTestOutputHelper.Instance, MetricsEventListener.InstrumentPublishing, null, ""))
{
listener.WaitForEnumerationComplete(s_waitForEventTimeout);
events = listener.Events.ToArray();
}
AssertInstrumentPublishingEventsPresent(events, c, oc, og, h, udc, oudc, g);
AssertInitialEnumerationCompleteEventPresent(events);
}).Dispose();
}

[ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsNotBrowser))]
Expand Down Expand Up @@ -1604,25 +1610,25 @@ public void EventSourceEnforcesHistogramLimitAndNotMaxTimeSeries()

public static IEnumerable<object[]> DifferentMetersAndInstrumentsData()
{
yield return new object[] { new Meter("M1").CreateCounter<int>("C1"), new Meter("M1").CreateCounter<int>("C1"), false};
yield return new object[] { new Meter("M1").CreateCounter<int>("C1"), new Meter("M2").CreateCounter<int>("C2"), false};

var counter = new Meter("M1").CreateCounter<int>("C1");
yield return new object[] { counter, counter.Meter.CreateCounter<int>("C1"), false };
var counter = new Meter("M2").CreateCounter<int>("C3");
yield return new object[] { counter, counter.Meter.CreateCounter<int>("C4"), false };

// Same counters
counter = new Meter("M1").CreateCounter<int>("C1");
counter = new Meter("M3").CreateCounter<int>("C5");
yield return new object[] { counter, counter, true };

var scope = new object();
yield return new object[]
{
new Meter("M1", "v1", new TagList { { "k1", "v1" } }, scope).CreateCounter<int>("C1", "u1", "d1", new TagList { { "k2", "v2" } } ),
new Meter("M1", "v1", new TagList { { "k1", "v1" } }, scope).CreateCounter<int>("C1", "u1", "d1", new TagList { { "k2", "v2" } } ),
new Meter("M4", "v1", new TagList { { "k1", "v1" } }, scope).CreateCounter<int>("C6", "u1", "d1", new TagList { { "k2", "v2" } } ),
new Meter("M5", "v1", new TagList { { "k1", "v1" } }, scope).CreateCounter<int>("C7", "u1", "d1", new TagList { { "k2", "v2" } } ),
false, // Same Instrument
};

Meter meter = new Meter("M1", "v1", new TagList { { "k1", "v1" } }, scope);
yield return new object[] { meter.CreateCounter<int>("C1", "u1", "d1", new TagList { { "k2", "v2" } } ), meter.CreateCounter<int>("C1", "u1", "d1", new TagList { { "k2", "v2" } } ), false };
Meter meter = new Meter("M6", "v1", new TagList { { "k1", "v1" } }, scope);
yield return new object[] { meter.CreateCounter<int>("C8", "u1", "d1", new TagList { { "k2", "v2" } } ), meter.CreateCounter<int>("C9", "u1", "d1", new TagList { { "k2", "v2" } } ), false };
}

[ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsNotBrowser))]
Expand Down Expand Up @@ -1756,7 +1762,7 @@ private static void AssertHistogramLimitPresent(EventWrittenEventArgs[] events)

private static void AssertInstrumentPublishingEventsPresent(EventWrittenEventArgs[] events, params Instrument[] expectedInstruments)
{
var publishEvents = events.Where(e => e.EventName == "InstrumentPublished").Select(e =>
var publishEvents = events.Where(e => e.EventName == "InstrumentPublished" && e.Payload[1].ToString() != RuntimeMeterName).Select(e =>
new
{
MeterName = e.Payload[1].ToString(),
Expand Down

0 comments on commit 81d7ff1

Please sign in to comment.