Skip to content

Commit

Permalink
Add a basic metric test for multi-thread update (#2205)
Browse files Browse the repository at this point in the history
* Add a basic metric test for multi-thread update

* nulklable
  • Loading branch information
cijothomas authored Jul 29, 2021
1 parent 7db5ec8 commit 175154e
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 3 deletions.
12 changes: 12 additions & 0 deletions src/OpenTelemetry/Metrics/AggregatorStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,18 @@ internal List<IMetric> Collect(bool isDelta, DateTimeOffset dt)
{
var collectedMetrics = new List<IMetric>();

if (this.tag0Metrics != null)
{
foreach (var aggregator in this.tag0Metrics)
{
var m = aggregator.Collect(dt, isDelta);
if (m != null)
{
collectedMetrics.Add(m);
}
}
}

// Lock to prevent new time series from being added
// until collect is done.
lock (this.lockKeyValue2MetricAggs)
Expand Down
109 changes: 106 additions & 3 deletions test/OpenTelemetry.Tests/Metrics/MetricAPITest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,123 @@
// limitations under the License.
// </copyright>

using System;
using System.Collections.Generic;
using System.Diagnostics.Metrics;
using System.Threading.Tasks;
using System.Threading;
using OpenTelemetry.Tests;
using Xunit;

#nullable enable

namespace OpenTelemetry.Metrics.Tests
{
public class MetricApiTest
{
private static int numberOfThreads = 10;
private static long deltaValueUpdatedByEachCall = 10;
private static int numberOfMetricUpdateByEachThread = 1000000;

[Fact]
public void SimpleTest()
{
var metricItems = new List<MetricItem>();
var metricExporter = new TestExporter<MetricItem>(ProcessExport);
void ProcessExport(Batch<MetricItem> batch)
{
foreach (var metricItem in batch)
{
metricItems.Add(metricItem);
}
}

var meter = new Meter("TestMeter");
var counterLong = meter.CreateCounter<long>("mycounter");
var meterProvider = Sdk.CreateMeterProviderBuilder()
.AddSource("TestMeter")
.AddMetricProcessor(new PushMetricProcessor(metricExporter, 100, isDelta: true))
.Build();

// setup args to threads.
var mreToBlockUpdateThreads = new ManualResetEvent(false);
var mreToEnsureAllThreadsStarted = new ManualResetEvent(false);

var argToThread = new UpdateThreadArguments();
argToThread.Counter = counterLong;
argToThread.ThreadsStartedCount = 0;
argToThread.MreToBlockUpdateThread = mreToBlockUpdateThreads;
argToThread.MreToEnsureAllThreadsStart = mreToEnsureAllThreadsStarted;

Thread[] t = new Thread[numberOfThreads];
for (int i = 0; i < numberOfThreads; i++)
{
t[i] = new Thread(CounterUpdateThread);
t[i].Start(argToThread);
}

// Block until all threads started.
mreToEnsureAllThreadsStarted.WaitOne();

// unblock all the threads.
// (i.e let them start counter.Add)
mreToBlockUpdateThreads.Set();

for (int i = 0; i < numberOfThreads; i++)
{
// wait for all threads to complete
t[i].Join();
}

meterProvider.Dispose();

// TODO: Once Dispose does flush, we may not need this
// unknown sleep below.
Thread.Sleep(1000);

long sumReceived = 0;
foreach (var metricItem in metricItems)
{
var metrics = metricItem.Metrics;
foreach (var metric in metrics)
{
sumReceived += (long)(metric as ISumMetric).Sum.Value;
}
}

var expectedSum = deltaValueUpdatedByEachCall * numberOfMetricUpdateByEachThread * numberOfThreads;
Assert.Equal(expectedSum, sumReceived);
}

private static void CounterUpdateThread(object obj)
{
var arguments = obj as UpdateThreadArguments;
if (arguments == null)
{
throw new Exception("Invalid args");
}

var mre = arguments.MreToBlockUpdateThread;
var mreToEnsureAllThreadsStart = arguments.MreToEnsureAllThreadsStart;
var counter = arguments.Counter;

if (Interlocked.Increment(ref arguments.ThreadsStartedCount) == numberOfThreads)
{
mreToEnsureAllThreadsStart.Set();
}

// Wait until signalled to start calling update on aggregator
mre.WaitOne();

for (int i = 0; i < numberOfMetricUpdateByEachThread; i++)
{
counter.Add(deltaValueUpdatedByEachCall, new KeyValuePair<string, object>("verb", "GET"));
}
}

private class UpdateThreadArguments
{
public ManualResetEvent MreToBlockUpdateThread;
public ManualResetEvent MreToEnsureAllThreadsStart;
public int ThreadsStartedCount;
public Counter<long> Counter;
}
}
}

0 comments on commit 175154e

Please sign in to comment.