diff --git a/src/OpenTelemetry/Trace/Export/BatchingActivityProcessor.cs b/src/OpenTelemetry/Trace/Export/BatchingActivityProcessor.cs new file mode 100644 index 00000000000..d900a828396 --- /dev/null +++ b/src/OpenTelemetry/Trace/Export/BatchingActivityProcessor.cs @@ -0,0 +1,247 @@ +// +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Diagnostics; +using System.Threading; +using System.Threading.Tasks; +using OpenTelemetry.Internal; + +namespace OpenTelemetry.Trace.Export +{ + /// + /// Implements processor that batches activities before calling exporter. + /// + public class BatchingActivityProcessor : ActivityProcessor, IDisposable + { + private const int DefaultMaxQueueSize = 2048; + private const int DefaultMaxExportBatchSize = 512; + private static readonly TimeSpan DefaultScheduleDelay = TimeSpan.FromMilliseconds(5000); + private readonly ConcurrentQueue exportQueue; + private readonly int maxQueueSize; + private readonly int maxExportBatchSize; + private readonly TimeSpan scheduleDelay; + private readonly ActivityExporter exporter; + private readonly List batch = new List(); + private CancellationTokenSource cts; + private volatile int currentQueueSize; + private bool stopping = false; + + /// + /// Initializes a new instance of the class with default parameters: + /// + /// + /// maxQueueSize = 2048, + /// + /// + /// scheduleDelay = 5 sec, + /// + /// + /// maxExportBatchSize = 512 + /// + /// + /// + /// Exporter instance. + public BatchingActivityProcessor(ActivityExporter exporter) + : this(exporter, DefaultMaxQueueSize, DefaultScheduleDelay, DefaultMaxExportBatchSize) + { + } + + /// + /// Initializes a new instance of the class with custom settings. + /// + /// Exporter instance. + /// Maximum queue size. After the size is reached activities are dropped by processor. + /// The delay between two consecutive exports. + /// The maximum batch size of every export. It must be smaller or equal to maxQueueSize. + public BatchingActivityProcessor(ActivityExporter exporter, int maxQueueSize, TimeSpan scheduleDelay, int maxExportBatchSize) + { + if (maxQueueSize <= 0) + { + throw new ArgumentOutOfRangeException(nameof(maxQueueSize)); + } + + if (maxExportBatchSize <= 0 || maxExportBatchSize > maxQueueSize) + { + throw new ArgumentOutOfRangeException(nameof(maxExportBatchSize)); + } + + this.exporter = exporter ?? throw new ArgumentNullException(nameof(exporter)); + this.maxQueueSize = maxQueueSize; + this.scheduleDelay = scheduleDelay; + this.maxExportBatchSize = maxExportBatchSize; + + this.cts = new CancellationTokenSource(); + this.exportQueue = new ConcurrentQueue(); + + // worker task that will last for lifetime of processor. + // Threads are also useless as exporter tasks run in thread pool threads. + Task.Run(() => this.Worker(this.cts.Token), this.cts.Token); + } + + /// + public override void OnStart(Activity activity) + { + } + + /// + public override void OnEnd(Activity activity) + { + if (this.stopping) + { + return; + } + + // because of race-condition between checking the size and enqueueing, + // we might end up with a bit more activities than maxQueueSize. + // Let's just tolerate it to avoid extra synchronization. + if (this.currentQueueSize >= this.maxQueueSize) + { + OpenTelemetrySdkEventSource.Log.SpanProcessorQueueIsExhausted(); + return; + } + + Interlocked.Increment(ref this.currentQueueSize); + + this.exportQueue.Enqueue(activity); + } + + /// + public override async Task ShutdownAsync(CancellationToken cancellationToken) + { + if (!this.stopping) + { + this.stopping = true; + + // This will stop the loop after current batch finishes. + this.cts.Cancel(false); + this.cts.Dispose(); + this.cts = null; + + // if there are more items, continue until cancellation token allows + while (this.currentQueueSize > 0 && !cancellationToken.IsCancellationRequested) + { + await this.ExportBatchAsync(cancellationToken).ConfigureAwait(false); + } + + await this.exporter.ShutdownAsync(cancellationToken); + + // there is no point in waiting for a worker task if cancellation happens + // it's dead already or will die on the next iteration on its own + + // ExportBatchAsync must never throw, we are here either because it was cancelled + // or because there are no items left + OpenTelemetrySdkEventSource.Log.ShutdownEvent(this.currentQueueSize); + } + } + + public void Dispose() + { + this.Dispose(true); + } + + protected virtual void Dispose(bool isDisposing) + { + if (!this.stopping) + { + this.ShutdownAsync(CancellationToken.None).ContinueWith(_ => { }).GetAwaiter().GetResult(); + } + + if (isDisposing) + { + if (this.exporter is IDisposable disposableExporter) + { + try + { + disposableExporter.Dispose(); + } + catch (Exception e) + { + OpenTelemetrySdkEventSource.Log.SpanProcessorException("Dispose", e); + } + } + } + } + + private async Task ExportBatchAsync(CancellationToken cancellationToken) + { + try + { + if (cancellationToken.IsCancellationRequested) + { + return; + } + + if (this.exportQueue.TryDequeue(out var nextActivity)) + { + Interlocked.Decrement(ref this.currentQueueSize); + this.batch.Add(nextActivity); + } + else + { + // nothing in queue + return; + } + + while (this.batch.Count < this.maxExportBatchSize && this.exportQueue.TryDequeue(out nextActivity)) + { + Interlocked.Decrement(ref this.currentQueueSize); + this.batch.Add(nextActivity); + } + + var result = await this.exporter.ExportAsync(this.batch, cancellationToken).ConfigureAwait(false); + if (result != ExportResult.Success) + { + OpenTelemetrySdkEventSource.Log.ExporterErrorResult(result); + + // we do not support retries for now and leave it up to exporter + // as only exporter implementation knows how to retry: which items failed + // and what is the reasonable policy for that exporter. + } + } + catch (Exception ex) + { + OpenTelemetrySdkEventSource.Log.SpanProcessorException(nameof(this.ExportBatchAsync), ex); + } + finally + { + this.batch.Clear(); + } + } + + private async Task Worker(CancellationToken cancellationToken) + { + while (!cancellationToken.IsCancellationRequested) + { + var sw = Stopwatch.StartNew(); + await this.ExportBatchAsync(cancellationToken).ConfigureAwait(false); + + if (cancellationToken.IsCancellationRequested) + { + return; + } + + var remainingWait = this.scheduleDelay - sw.Elapsed; + if (remainingWait > TimeSpan.Zero) + { + await Task.Delay(remainingWait, cancellationToken).ConfigureAwait(false); + } + } + } + } +} diff --git a/test/OpenTelemetry.Tests/Implementation/Testing/Export/TestActivityExporter.cs b/test/OpenTelemetry.Tests/Implementation/Testing/Export/TestActivityExporter.cs new file mode 100644 index 00000000000..bb8023699a4 --- /dev/null +++ b/test/OpenTelemetry.Tests/Implementation/Testing/Export/TestActivityExporter.cs @@ -0,0 +1,57 @@ +// +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +using System; +using System.Collections.Generic; +using System.Collections.Concurrent; +using System.Threading; +using System.Threading.Tasks; +using OpenTelemetry.Trace.Export; +using System.Diagnostics; + +namespace OpenTelemetry.Testing.Export +{ + public class TestActivityExporter : ActivityExporter + { + private readonly ConcurrentQueue activities = new ConcurrentQueue(); + private readonly Action> onExport; + public TestActivityExporter(Action> onExport) + { + this.onExport = onExport; + } + + public Activity[] ExportedActivities => activities.ToArray(); + + public bool WasShutDown { get; private set; } = false; + + public override Task ExportAsync(IEnumerable data, CancellationToken cancellationToken) + { + this.onExport?.Invoke(data); + + foreach (var s in data) + { + this.activities.Enqueue(s); + } + + return Task.FromResult(ExportResult.Success); + } + + public override Task ShutdownAsync(CancellationToken cancellationToken) + { + this.WasShutDown = true; + return Task.CompletedTask; + } + } +} diff --git a/test/OpenTelemetry.Tests/Implementation/Testing/Export/TestExporter.cs b/test/OpenTelemetry.Tests/Implementation/Testing/Export/TestSpanExporter.cs similarity index 89% rename from test/OpenTelemetry.Tests/Implementation/Testing/Export/TestExporter.cs rename to test/OpenTelemetry.Tests/Implementation/Testing/Export/TestSpanExporter.cs index 37e9b0b4836..9def4dd8fb7 100644 --- a/test/OpenTelemetry.Tests/Implementation/Testing/Export/TestExporter.cs +++ b/test/OpenTelemetry.Tests/Implementation/Testing/Export/TestSpanExporter.cs @@ -1,4 +1,4 @@ -// +// // Copyright The OpenTelemetry Authors // // Licensed under the Apache License, Version 2.0 (the "License"); @@ -18,16 +18,15 @@ using System.Collections.Concurrent; using System.Threading; using System.Threading.Tasks; -using OpenTelemetry.Trace; using OpenTelemetry.Trace.Export; namespace OpenTelemetry.Testing.Export { - public class TestExporter : SpanExporter + public class TestSpanExporter : SpanExporter { private readonly ConcurrentQueue spanDataList = new ConcurrentQueue(); private readonly Action> onExport; - public TestExporter(Action> onExport) + public TestSpanExporter(Action> onExport) { this.onExport = onExport; } diff --git a/test/OpenTelemetry.Tests/Implementation/Trace/Config/SpanProcessorPipelineTests.cs b/test/OpenTelemetry.Tests/Implementation/Trace/Config/SpanProcessorPipelineTests.cs index b13a332ebe2..43d6e71f131 100644 --- a/test/OpenTelemetry.Tests/Implementation/Trace/Config/SpanProcessorPipelineTests.cs +++ b/test/OpenTelemetry.Tests/Implementation/Trace/Config/SpanProcessorPipelineTests.cs @@ -55,7 +55,7 @@ public void PipelineBuilder_AddExporter() { var builder = new SpanProcessorPipelineBuilder(); - var exporter = new TestExporter(null); + var exporter = new TestSpanExporter(null); builder.SetExporter(exporter); Assert.Same(exporter, builder.Exporter); @@ -72,7 +72,7 @@ public void PipelineBuilder_AddExporterAndExportingProcessor() { var builder = new SpanProcessorPipelineBuilder(); - var exporter = new TestExporter(null); + var exporter = new TestSpanExporter(null); builder.SetExporter(exporter); bool processorFactoryCalled = false; @@ -203,7 +203,7 @@ public void PipelineBuilder_AddProcessorChainWithExporter() Assert.NotNull(exporter); return new SimpleSpanProcessor(exporter); }) - .SetExporter(new TestExporter(null)); + .SetExporter(new TestSpanExporter(null)); var firstProcessor = (TestProcessor)builder.Build(); diff --git a/test/OpenTelemetry.Tests/Implementation/Trace/Config/TracerBuilderTests.cs b/test/OpenTelemetry.Tests/Implementation/Trace/Config/TracerBuilderTests.cs index 021c2a4b9b8..a20e4eedaf9 100644 --- a/test/OpenTelemetry.Tests/Implementation/Trace/Config/TracerBuilderTests.cs +++ b/test/OpenTelemetry.Tests/Implementation/Trace/Config/TracerBuilderTests.cs @@ -57,7 +57,7 @@ public void TracerBuilder_ValidArgs() bool instrumentationFactoryCalled = true; var sampler = new ProbabilitySampler(0.1); - var exporter = new TestExporter(_ => { }); + var exporter = new TestSpanExporter(_ => { }); var options = new TracerConfiguration(1, 1, 1); builder diff --git a/test/OpenTelemetry.Tests/Implementation/Trace/Config/TracerFactoryTest.cs b/test/OpenTelemetry.Tests/Implementation/Trace/Config/TracerFactoryTest.cs index a0b6b8ea26f..09393d002e4 100644 --- a/test/OpenTelemetry.Tests/Implementation/Trace/Config/TracerFactoryTest.cs +++ b/test/OpenTelemetry.Tests/Implementation/Trace/Config/TracerFactoryTest.cs @@ -58,7 +58,7 @@ public void CreateFactory_BuilderWithArgs() { var exporterCalledCount = 0; - var testExporter = new TestExporter(spans => + var testExporter = new TestSpanExporter(spans => { exporterCalledCount++; Assert.Single(spans); @@ -126,7 +126,7 @@ public void CreateFactory_BuilderWithMultiplePipelines() { var exporterCalledCount = 0; - var testExporter = new TestExporter(spans => + var testExporter = new TestSpanExporter(spans => { exporterCalledCount++; Assert.Single(spans); diff --git a/test/OpenTelemetry.Tests/Implementation/Trace/Export/BatchingActivityProcessorTests.cs b/test/OpenTelemetry.Tests/Implementation/Trace/Export/BatchingActivityProcessorTests.cs new file mode 100644 index 00000000000..1e88f568f50 --- /dev/null +++ b/test/OpenTelemetry.Tests/Implementation/Trace/Export/BatchingActivityProcessorTests.cs @@ -0,0 +1,349 @@ +// +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using OpenTelemetry.Testing.Export; +using OpenTelemetry.Trace.Configuration; +using OpenTelemetry.Trace.Samplers; +using Xunit; + +namespace OpenTelemetry.Trace.Export.Test +{ + public class BatchingActivitiyProcessorTest : IDisposable + { + private const string ActivityName1 = "MySpanName/1"; + private const string ActivityName2 = "MySpanName/2"; + + private static readonly TimeSpan DefaultDelay = TimeSpan.FromMilliseconds(30); + private static readonly TimeSpan DefaultTimeout = TimeSpan.FromSeconds(1); + + private Activity CreateSampledEndedActivity(string activityName, ActivityProcessor activityProcessor) + { + var source = new ActivitySource("my.source"); + var builder = new OpenTelemetryBuilder() + .SetSampler(new AlwaysOnActivitySampler()) + .SetProcessorPipeline(pp => pp.AddProcessor(ap => activityProcessor)) + .AddActivitySource(source.Name); + + // var context = new SpanContext(ActivityTraceId.CreateRandom(), ActivitySpanId.CreateRandom(), ActivityTraceFlags.Recorded); + var activity = source.StartActivity(activityName); + activity?.Stop(); + return activity; + } + + private Activity CreateNotSampledEndedActivity(string activityName, ActivityProcessor activityProcessor) + { + var source = new ActivitySource("my.source"); + var builder = new OpenTelemetryBuilder() + .SetSampler(new AlwaysOnActivitySampler()) + .SetProcessorPipeline(pp => pp.AddProcessor(ap => activityProcessor)) + .AddActivitySource(source.Name); + + // var context = new SpanContext(ActivityTraceId.CreateRandom(), ActivitySpanId.CreateRandom(), ActivityTraceFlags.None); + var activity = source.StartActivity(activityName); + activity?.Stop(); + return activity; + } + + [Fact] + public void ThrowsOnInvalidArguments() + { + Assert.Throws(() => new BatchingActivityProcessor(null)); + Assert.Throws(() => new BatchingActivityProcessor(new TestActivityExporter(null), 0, TimeSpan.FromSeconds(5), 0)); + Assert.Throws(() => new BatchingActivityProcessor(new TestActivityExporter(null), 2048, TimeSpan.FromSeconds(5), 0)); + Assert.Throws(() => new BatchingActivityProcessor(new TestActivityExporter(null), 512, TimeSpan.FromSeconds(5), 513)); + } + + [Fact] + public async Task ShutdownTwice() + { + using var activityProcessor = new BatchingActivityProcessor(new TestActivityExporter(null)); + await activityProcessor.ShutdownAsync(CancellationToken.None); + + // does not throw + await activityProcessor.ShutdownAsync(CancellationToken.None); + } + + [Fact] + public async Task ShutdownWithHugeScheduleDelay() + { + using var activityProcessor = + new BatchingActivityProcessor(new TestActivityExporter(null), 128, TimeSpan.FromMinutes(1), 32); + var sw = Stopwatch.StartNew(); + using (var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(100))) + { + cts.Token.ThrowIfCancellationRequested(); + await activityProcessor.ShutdownAsync(cts.Token).ConfigureAwait(false); + } + + sw.Stop(); + Assert.InRange(sw.Elapsed, TimeSpan.Zero, TimeSpan.FromMilliseconds(100)); + } + + [Fact] + public void ExportDifferentSampledActivities() + { + var activityExporter = new TestActivityExporter(null); + using var activityProcessor = new BatchingActivityProcessor(activityExporter, 128, DefaultDelay, 128); + var activity1 = this.CreateSampledEndedActivity(ActivityName1, activityProcessor); + var activity2 = this.CreateSampledEndedActivity(ActivityName2, activityProcessor); + + var exported = this.WaitForActivities(activityExporter, 2, DefaultTimeout); + + Assert.Equal(2, exported.Length); + Assert.Contains(activity1, exported); + Assert.Contains(activity2, exported); + } + + [Fact] + public void ExporterIsSlowerThanDelay() + { + var exportStartTimes = new List(); + var exportEndTimes = new List(); + var activityExporter = new TestActivityExporter(_ => + { + exportStartTimes.Add(Stopwatch.GetTimestamp()); + Thread.Sleep(50); + exportEndTimes.Add(Stopwatch.GetTimestamp()); + }); + + using var activityProcessor = new BatchingActivityProcessor(activityExporter, 128, TimeSpan.FromMilliseconds(30), 2); + var activities = new List(); + for (int i = 0; i < 20; i++) + { + activities.Add(this.CreateSampledEndedActivity(i.ToString(), activityProcessor)); + } + + var exported = this.WaitForActivities(activityExporter, 20, TimeSpan.FromSeconds(2)); + + Assert.Equal(activities.Count, exported.Length); + Assert.InRange(exportStartTimes.Count, 10, 20); + + for (int i = 1; i < exportStartTimes.Count - 1; i++) + { + Assert.InRange(exportStartTimes[i], exportEndTimes[i - 1] + 1, exportStartTimes[i + 1] - 1); + } + } + + [Fact] + public void AddSpanAfterQueueIsExhausted() + { + int exportCalledCount = 0; + var activityExporter = new TestActivityExporter(_ => Interlocked.Increment(ref exportCalledCount)); + using var activityProcessor = new BatchingActivityProcessor(activityExporter, 1, TimeSpan.FromMilliseconds(100), 1); + var activities = new List(); + for (int i = 0; i < 20; i++) + { + activities.Add(this.CreateSampledEndedActivity(i.ToString(), activityProcessor)); + } + + var exported = this.WaitForActivities(activityExporter, 1, DefaultTimeout); + + Assert.Equal(1, exportCalledCount); + Assert.InRange(exported.Length, 1, 2); + Assert.Contains(activities.First(), exported); + } + + [Fact] + public void ExportMoreSpansThanTheMaxBatchSize() + { + var exporterCalled = new ManualResetEvent(false); + int exportCalledCount = 0; + var activityExporter = new TestActivityExporter(_ => + { + exporterCalled.Set(); + Interlocked.Increment(ref exportCalledCount); + }); + + using var activityProcessor = new BatchingActivityProcessor(activityExporter, 128, DefaultDelay, 3); + var span1 = this.CreateSampledEndedActivity(ActivityName1, activityProcessor); + var span2 = this.CreateSampledEndedActivity(ActivityName1, activityProcessor); + var span3 = this.CreateSampledEndedActivity(ActivityName1, activityProcessor); + var span4 = this.CreateSampledEndedActivity(ActivityName1, activityProcessor); + var span5 = this.CreateSampledEndedActivity(ActivityName1, activityProcessor); + var span6 = this.CreateSampledEndedActivity(ActivityName1, activityProcessor); + + // wait for exporter to be called to stabilize tests on the build server + exporterCalled.WaitOne(TimeSpan.FromSeconds(10)); + + var exported = this.WaitForActivities(activityExporter, 6, DefaultTimeout); + + Assert.InRange(exportCalledCount, 2, 6); + + Assert.Equal(6, exported.Count()); + Assert.Contains(span1, exported); + Assert.Contains(span2, exported); + Assert.Contains(span3, exported); + Assert.Contains(span4, exported); + Assert.Contains(span5, exported); + Assert.Contains(span6, exported); + } + + + [Fact] + public void ExportNotSampledActivities() + { + int exportCalledCount = 0; + var activityExporter = new TestActivityExporter(_ => Interlocked.Increment(ref exportCalledCount)); + using var activityProcessor = new BatchingActivityProcessor(activityExporter, 128, DefaultDelay, 3); + var activity1 = this.CreateNotSampledEndedActivity(ActivityName1, activityProcessor); + var activity2 = this.CreateSampledEndedActivity(ActivityName2, activityProcessor); + // Spans are recorded and exported in the same order as they are ended, we test that a non + // sampled span is not exported by creating and ending a sampled span after a non sampled span + // and checking that the first exported span is the sampled span (the non sampled did not get + // exported). + var exported = this.WaitForActivities(activityExporter, 1, DefaultTimeout); + Assert.Equal(1, exportCalledCount); + + // Need to check this because otherwise the variable span1 is unused, other option is to not + // have a span1 variable. + Assert.Single(exported); + Assert.Contains(activity2, exported); + } + + [Fact] + public void ProcessorDoesNotBlockOnExporter() + { + var resetEvent = new ManualResetEvent(false); + var activityExporter = new TestActivityExporter(_ => resetEvent.WaitOne(TimeSpan.FromSeconds(10))); + var builder = new OpenTelemetryBuilder() + .SetProcessorPipeline(pp => pp + .SetExporter(activityExporter) + .SetExportingProcessor(ae => new BatchingActivityProcessor(ae, 128, DefaultDelay, 128))) + .AddActivitySource("test.source"); + + // var context = new SpanContext(ActivityTraceId.CreateRandom(), ActivitySpanId.CreateRandom(), ActivityTraceFlags.Recorded); + var source = new ActivitySource("test.source"); + var activity = source.StartActivity("foo"); + + // does not block + var sw = Stopwatch.StartNew(); + activity.Stop(); + sw.Stop(); + + Assert.InRange(sw.Elapsed, TimeSpan.Zero, TimeSpan.FromMilliseconds(100)); + + resetEvent.Set(); + + var exported = this.WaitForActivities(activityExporter, 1, DefaultTimeout); + + Assert.Single(exported); + } + + [Fact] + public async Task ShutdownOnNotEmptyQueueFullFlush() + { + const int batchSize = 2; + int exportCalledCount = 0; + var activityExporter = new TestActivityExporter(_ => Interlocked.Increment(ref exportCalledCount)); + using var activityProcessor = + new BatchingActivityProcessor(activityExporter, 128, TimeSpan.FromMilliseconds(100), batchSize); + var activities = new List(); + for (int i = 0; i < 100; i++) + { + activities.Add(this.CreateSampledEndedActivity(i.ToString(), activityProcessor)); + } + + Assert.True(activityExporter.ExportedActivities.Length < activities.Count); + using (var cts = new CancellationTokenSource(DefaultTimeout)) + { + await activityProcessor.ShutdownAsync(cts.Token); + } + + Assert.True(activityExporter.WasShutDown); + Assert.Equal(activities.Count, activityExporter.ExportedActivities.Length); + Assert.InRange(exportCalledCount, activities.Count / batchSize, activities.Count); + } + + [Fact] + public async Task ShutdownOnNotEmptyQueueNotFullFlush() + { + const int batchSize = 2; + int exportCalledCount = 0; + + // we'll need about 1.5 sec to export all spans + // we export 100 spans in batches of 2, each export takes 30ms, in one thread + var activityExporter = new TestActivityExporter(_ => + { + Interlocked.Increment(ref exportCalledCount); + Thread.Sleep(30); + }); + + using var activityProcessor = + new BatchingActivityProcessor(activityExporter, 128, TimeSpan.FromMilliseconds(100), batchSize); + var activities = new List(); + for (int i = 0; i < 100; i++) + { + activities.Add(this.CreateSampledEndedActivity(i.ToString(), activityProcessor)); + } + + Assert.True(activityExporter.ExportedActivities.Length < activities.Count); + + // we won't bs able to export all before cancellation will fire + using (var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(200))) + { + await activityProcessor.ShutdownAsync(cts.Token); + } + + var exportedCount = activityExporter.ExportedActivities.Length; + Assert.True(exportedCount < activities.Count); + } + + [Fact] + public void DisposeFlushes() + { + const int batchSize = 2; + int exportCalledCount = 0; + var activityExporter = new TestActivityExporter(_ => Interlocked.Increment(ref exportCalledCount)); + var activities = new List(); + using (var spanProcessor = new BatchingActivityProcessor(activityExporter, 128, TimeSpan.FromMilliseconds(100), batchSize)) + { + for (int i = 0; i < 100; i++) + { + activities.Add(CreateSampledEndedActivity(i.ToString(), spanProcessor)); + } + Assert.True(activityExporter.ExportedActivities.Length < activities.Count); + } + Assert.True(activityExporter.WasShutDown); + Assert.Equal(activities.Count, activityExporter.ExportedActivities.Length); + Assert.Equal(activities.Count / batchSize, exportCalledCount); + } + + public void Dispose() + { + Activity.Current = null; + } + + private Activity[] WaitForActivities(TestActivityExporter exporter, int spanCount, TimeSpan timeout) + { + var sw = Stopwatch.StartNew(); + while (exporter.ExportedActivities.Length < spanCount && sw.Elapsed <= timeout) + { + Thread.Sleep(10); + } + + Assert.True(exporter.ExportedActivities.Length >= spanCount, + $"Expected at least {spanCount}, got {exporter.ExportedActivities.Length}"); + + return exporter.ExportedActivities; + } + } +} diff --git a/test/OpenTelemetry.Tests/Implementation/Trace/Export/BatchingSpanProcessorTests.cs b/test/OpenTelemetry.Tests/Implementation/Trace/Export/BatchingSpanProcessorTests.cs index c8c181c7901..04783dd2967 100644 --- a/test/OpenTelemetry.Tests/Implementation/Trace/Export/BatchingSpanProcessorTests.cs +++ b/test/OpenTelemetry.Tests/Implementation/Trace/Export/BatchingSpanProcessorTests.cs @@ -64,15 +64,15 @@ private SpanSdk CreateNotSampledEndedSpan(string spanName, SpanProcessor spanPro public void ThrowsOnInvalidArguments() { Assert.Throws(() => new BatchingSpanProcessor(null)); - Assert.Throws(() => new BatchingSpanProcessor(new TestExporter(null), 0, TimeSpan.FromSeconds(5), 0)); - Assert.Throws(() => new BatchingSpanProcessor(new TestExporter(null), 2048, TimeSpan.FromSeconds(5), 0)); - Assert.Throws(() => new BatchingSpanProcessor(new TestExporter(null), 512, TimeSpan.FromSeconds(5), 513)); + Assert.Throws(() => new BatchingSpanProcessor(new TestSpanExporter(null), 0, TimeSpan.FromSeconds(5), 0)); + Assert.Throws(() => new BatchingSpanProcessor(new TestSpanExporter(null), 2048, TimeSpan.FromSeconds(5), 0)); + Assert.Throws(() => new BatchingSpanProcessor(new TestSpanExporter(null), 512, TimeSpan.FromSeconds(5), 513)); } [Fact] public async Task ShutdownTwice() { - using var spanProcessor = new BatchingSpanProcessor(new TestExporter(null)); + using var spanProcessor = new BatchingSpanProcessor(new TestSpanExporter(null)); await spanProcessor.ShutdownAsync(CancellationToken.None); // does not throw @@ -83,7 +83,7 @@ public async Task ShutdownTwice() public async Task ShutdownWithHugeScheduleDelay() { using var spanProcessor = - new BatchingSpanProcessor(new TestExporter(null), 128, TimeSpan.FromMinutes(1), 32); + new BatchingSpanProcessor(new TestSpanExporter(null), 128, TimeSpan.FromMinutes(1), 32); var sw = Stopwatch.StartNew(); using (var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(100))) { @@ -98,7 +98,7 @@ public async Task ShutdownWithHugeScheduleDelay() [Fact] public void ExportDifferentSampledSpans() { - var spanExporter = new TestExporter(null); + var spanExporter = new TestSpanExporter(null); using var spanProcessor = new BatchingSpanProcessor(spanExporter, 128, DefaultDelay, 128); var span1 = this.CreateSampledEndedSpan(SpanName1, spanProcessor); var span2 = this.CreateSampledEndedSpan(SpanName2, spanProcessor); @@ -115,7 +115,7 @@ public void ExporterIsSlowerThanDelay() { var exportStartTimes = new List(); var exportEndTimes = new List(); - var spanExporter = new TestExporter(_ => + var spanExporter = new TestSpanExporter(_ => { exportStartTimes.Add(Stopwatch.GetTimestamp()); Thread.Sleep(50); @@ -144,7 +144,7 @@ public void ExporterIsSlowerThanDelay() public void AddSpanAfterQueueIsExhausted() { int exportCalledCount = 0; - var spanExporter = new TestExporter(_ => Interlocked.Increment(ref exportCalledCount)); + var spanExporter = new TestSpanExporter(_ => Interlocked.Increment(ref exportCalledCount)); using var spanProcessor = new BatchingSpanProcessor(spanExporter, 1, TimeSpan.FromMilliseconds(100), 1); var spans = new List(); for (int i = 0; i < 20; i++) @@ -164,7 +164,7 @@ public void ExportMoreSpansThanTheMaxBatchSize() { var exporterCalled = new ManualResetEvent(false); int exportCalledCount = 0; - var spanExporter = new TestExporter(_ => + var spanExporter = new TestSpanExporter(_ => { exporterCalled.Set(); Interlocked.Increment(ref exportCalledCount); @@ -199,7 +199,7 @@ public void ExportMoreSpansThanTheMaxBatchSize() public void ExportNotSampledSpans() { int exportCalledCount = 0; - var spanExporter = new TestExporter(_ => Interlocked.Increment(ref exportCalledCount)); + var spanExporter = new TestSpanExporter(_ => Interlocked.Increment(ref exportCalledCount)); using var spanProcessor = new BatchingSpanProcessor(spanExporter, 128, DefaultDelay, 3); var span1 = this.CreateNotSampledEndedSpan(SpanName1, spanProcessor); var span2 = this.CreateSampledEndedSpan(SpanName2, spanProcessor); @@ -220,7 +220,7 @@ public void ExportNotSampledSpans() public void ProcessorDoesNotBlockOnExporter() { var resetEvent = new ManualResetEvent(false); - var spanExporter = new TestExporter(_ => resetEvent.WaitOne(TimeSpan.FromSeconds(10))); + var spanExporter = new TestSpanExporter(_ => resetEvent.WaitOne(TimeSpan.FromSeconds(10))); using var factory = TracerFactory.Create(b => b .AddProcessorPipeline(p => p .SetExporter(spanExporter) @@ -249,7 +249,7 @@ public async Task ShutdownOnNotEmptyQueueFullFlush() { const int batchSize = 2; int exportCalledCount = 0; - var spanExporter = new TestExporter(_ => Interlocked.Increment(ref exportCalledCount)); + var spanExporter = new TestSpanExporter(_ => Interlocked.Increment(ref exportCalledCount)); using var spanProcessor = new BatchingSpanProcessor(spanExporter, 128, TimeSpan.FromMilliseconds(100), batchSize); var spans = new List(); @@ -276,8 +276,8 @@ public async Task ShutdownOnNotEmptyQueueNotFullFlush() int exportCalledCount = 0; // we'll need about 1.5 sec to export all spans - // we export 100 spans in batches of 2, each export takes 30ms, in one thread - var spanExporter = new TestExporter(_ => + // we export 100 spans in batches of 2, each export takes 30ms, in one thread + var spanExporter = new TestSpanExporter(_ => { Interlocked.Increment(ref exportCalledCount); Thread.Sleep(30); @@ -308,7 +308,7 @@ public void DisposeFlushes() { const int batchSize = 2; int exportCalledCount = 0; - var spanExporter = new TestExporter(_ => Interlocked.Increment(ref exportCalledCount)); + var spanExporter = new TestSpanExporter(_ => Interlocked.Increment(ref exportCalledCount)); var spans = new List(); using (var spanProcessor = new BatchingSpanProcessor(spanExporter, 128, TimeSpan.FromMilliseconds(100), batchSize)) { @@ -328,7 +328,7 @@ public void Dispose() Activity.Current = null; } - private SpanData[] WaitForSpans(TestExporter exporter, int spanCount, TimeSpan timeout) + private SpanData[] WaitForSpans(TestSpanExporter exporter, int spanCount, TimeSpan timeout) { var sw = Stopwatch.StartNew(); while (exporter.ExportedSpans.Length < spanCount && sw.Elapsed <= timeout) diff --git a/test/OpenTelemetry.Tests/Implementation/Trace/Export/SimpleSpanProcessorTests.cs b/test/OpenTelemetry.Tests/Implementation/Trace/Export/SimpleSpanProcessorTests.cs index dae3f4d46e6..e24a37b059c 100644 --- a/test/OpenTelemetry.Tests/Implementation/Trace/Export/SimpleSpanProcessorTests.cs +++ b/test/OpenTelemetry.Tests/Implementation/Trace/Export/SimpleSpanProcessorTests.cs @@ -30,12 +30,12 @@ public class SimpleSpanProcessorTest : IDisposable private const string SpanName1 = "MySpanName/1"; private const string SpanName2 = "MySpanName/2"; - private TestExporter spanExporter; + private TestSpanExporter spanExporter; private Tracer tracer; public SimpleSpanProcessorTest() { - spanExporter = new TestExporter(null); + spanExporter = new TestSpanExporter(null); tracer = TracerFactory.Create(b => b .AddProcessorPipeline(p => p .SetExporter(spanExporter) @@ -70,7 +70,7 @@ public void ThrowsOnNullExporter() [Fact] public void ThrowsInExporter() { - spanExporter = new TestExporter(_ => throw new ArgumentException("123")); + spanExporter = new TestSpanExporter(_ => throw new ArgumentException("123")); tracer = TracerFactory.Create(b => b .AddProcessorPipeline(p => p .SetExporter(spanExporter) @@ -87,7 +87,7 @@ public void ThrowsInExporter() [Fact] public void ProcessorDoesNotBlockOnExporter() { - spanExporter = new TestExporter(async _ => await Task.Delay(500)); + spanExporter = new TestSpanExporter(async _ => await Task.Delay(500)); tracer = TracerFactory.Create(b => b .AddProcessorPipeline(p => p .SetExporter(spanExporter) @@ -113,7 +113,7 @@ public void ProcessorDoesNotBlockOnExporter() [Fact] public async Task ShutdownTwice() { - var spanProcessor = new SimpleSpanProcessor(new TestExporter(null)); + var spanProcessor = new SimpleSpanProcessor(new TestSpanExporter(null)); await spanProcessor.ShutdownAsync(CancellationToken.None).ConfigureAwait(false); @@ -157,7 +157,7 @@ public void Dispose() Activity.Current = null; } - private SpanData[] WaitForSpans(TestExporter exporter, int spanCount, TimeSpan timeout) + private SpanData[] WaitForSpans(TestSpanExporter exporter, int spanCount, TimeSpan timeout) { Assert.True( SpinWait.SpinUntil(() => diff --git a/test/OpenTelemetry.Tests/Implementation/Trace/TracerTest.cs b/test/OpenTelemetry.Tests/Implementation/Trace/TracerTest.cs index d7cdb74a246..2c60d82de0a 100644 --- a/test/OpenTelemetry.Tests/Implementation/Trace/TracerTest.cs +++ b/test/OpenTelemetry.Tests/Implementation/Trace/TracerTest.cs @@ -41,7 +41,7 @@ public class TracerTest public TracerTest() { - spanProcessor = new SimpleSpanProcessor(new TestExporter(null)); + spanProcessor = new SimpleSpanProcessor(new TestSpanExporter(null)); tracerConfiguration = new TracerConfiguration(); tracerFactory = TracerFactory.Create(b => b .AddProcessorPipeline(p => p.AddProcessor(_ => spanProcessor))); @@ -51,7 +51,7 @@ public TracerTest() [Fact] public void BadConstructorArgumentsThrow() { - var noopProc = new SimpleSpanProcessor(new TestExporter(null)); + var noopProc = new SimpleSpanProcessor(new TestSpanExporter(null)); Assert.Throws(() => new TracerSdk(null, new AlwaysOnSampler(), new TracerConfiguration(), Resource.Empty)); Assert.Throws(() => new TracerSdk(noopProc, new AlwaysOnSampler(), null, Resource.Empty)); Assert.Throws(() => new TracerSdk(noopProc, new AlwaysOnSampler(), new TracerConfiguration(), null));