From 175154e1950441ab73544dfa5c2cc59cf338b38f Mon Sep 17 00:00:00 2001 From: Cijo Thomas Date: Wed, 28 Jul 2021 17:35:33 -0700 Subject: [PATCH] Add a basic metric test for multi-thread update (#2205) * Add a basic metric test for multi-thread update * nulklable --- src/OpenTelemetry/Metrics/AggregatorStore.cs | 12 ++ .../Metrics/MetricAPITest.cs | 109 +++++++++++++++++- 2 files changed, 118 insertions(+), 3 deletions(-) diff --git a/src/OpenTelemetry/Metrics/AggregatorStore.cs b/src/OpenTelemetry/Metrics/AggregatorStore.cs index c13c01d1961..20aa4c8760e 100644 --- a/src/OpenTelemetry/Metrics/AggregatorStore.cs +++ b/src/OpenTelemetry/Metrics/AggregatorStore.cs @@ -157,6 +157,18 @@ internal List Collect(bool isDelta, DateTimeOffset dt) { var collectedMetrics = new List(); + if (this.tag0Metrics != null) + { + foreach (var aggregator in this.tag0Metrics) + { + var m = aggregator.Collect(dt, isDelta); + if (m != null) + { + collectedMetrics.Add(m); + } + } + } + // Lock to prevent new time series from being added // until collect is done. lock (this.lockKeyValue2MetricAggs) diff --git a/test/OpenTelemetry.Tests/Metrics/MetricAPITest.cs b/test/OpenTelemetry.Tests/Metrics/MetricAPITest.cs index 2a4efce8fc1..1f0d8424d6a 100644 --- a/test/OpenTelemetry.Tests/Metrics/MetricAPITest.cs +++ b/test/OpenTelemetry.Tests/Metrics/MetricAPITest.cs @@ -14,20 +14,123 @@ // limitations under the License. // +using System; using System.Collections.Generic; using System.Diagnostics.Metrics; -using System.Threading.Tasks; +using System.Threading; +using OpenTelemetry.Tests; using Xunit; -#nullable enable - namespace OpenTelemetry.Metrics.Tests { public class MetricApiTest { + private static int numberOfThreads = 10; + private static long deltaValueUpdatedByEachCall = 10; + private static int numberOfMetricUpdateByEachThread = 1000000; + [Fact] public void SimpleTest() { + var metricItems = new List(); + var metricExporter = new TestExporter(ProcessExport); + void ProcessExport(Batch batch) + { + foreach (var metricItem in batch) + { + metricItems.Add(metricItem); + } + } + + var meter = new Meter("TestMeter"); + var counterLong = meter.CreateCounter("mycounter"); + var meterProvider = Sdk.CreateMeterProviderBuilder() + .AddSource("TestMeter") + .AddMetricProcessor(new PushMetricProcessor(metricExporter, 100, isDelta: true)) + .Build(); + + // setup args to threads. + var mreToBlockUpdateThreads = new ManualResetEvent(false); + var mreToEnsureAllThreadsStarted = new ManualResetEvent(false); + + var argToThread = new UpdateThreadArguments(); + argToThread.Counter = counterLong; + argToThread.ThreadsStartedCount = 0; + argToThread.MreToBlockUpdateThread = mreToBlockUpdateThreads; + argToThread.MreToEnsureAllThreadsStart = mreToEnsureAllThreadsStarted; + + Thread[] t = new Thread[numberOfThreads]; + for (int i = 0; i < numberOfThreads; i++) + { + t[i] = new Thread(CounterUpdateThread); + t[i].Start(argToThread); + } + + // Block until all threads started. + mreToEnsureAllThreadsStarted.WaitOne(); + + // unblock all the threads. + // (i.e let them start counter.Add) + mreToBlockUpdateThreads.Set(); + + for (int i = 0; i < numberOfThreads; i++) + { + // wait for all threads to complete + t[i].Join(); + } + + meterProvider.Dispose(); + + // TODO: Once Dispose does flush, we may not need this + // unknown sleep below. + Thread.Sleep(1000); + + long sumReceived = 0; + foreach (var metricItem in metricItems) + { + var metrics = metricItem.Metrics; + foreach (var metric in metrics) + { + sumReceived += (long)(metric as ISumMetric).Sum.Value; + } + } + + var expectedSum = deltaValueUpdatedByEachCall * numberOfMetricUpdateByEachThread * numberOfThreads; + Assert.Equal(expectedSum, sumReceived); + } + + private static void CounterUpdateThread(object obj) + { + var arguments = obj as UpdateThreadArguments; + if (arguments == null) + { + throw new Exception("Invalid args"); + } + + var mre = arguments.MreToBlockUpdateThread; + var mreToEnsureAllThreadsStart = arguments.MreToEnsureAllThreadsStart; + var counter = arguments.Counter; + + if (Interlocked.Increment(ref arguments.ThreadsStartedCount) == numberOfThreads) + { + mreToEnsureAllThreadsStart.Set(); + } + + // Wait until signalled to start calling update on aggregator + mre.WaitOne(); + + for (int i = 0; i < numberOfMetricUpdateByEachThread; i++) + { + counter.Add(deltaValueUpdatedByEachCall, new KeyValuePair("verb", "GET")); + } + } + + private class UpdateThreadArguments + { + public ManualResetEvent MreToBlockUpdateThread; + public ManualResetEvent MreToEnsureAllThreadsStart; + public int ThreadsStartedCount; + public Counter Counter; } } }