Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batching activity processor #697

Closed
wants to merge 9 commits into from
244 changes: 244 additions & 0 deletions src/OpenTelemetry/Trace/Export/BatchingActivityProcessor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
// <copyright file="BatchingActivityProcessor.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using OpenTelemetry.Internal;

namespace OpenTelemetry.Trace.Export
{
/// <summary>
/// Implements processor that batches astivities before calling exporter.
MikeGoldsmith marked this conversation as resolved.
Show resolved Hide resolved
/// </summary>
public class BatchingActivityProcessor : ActivityProcessor, IDisposable
{
private const int DefaultMaxQueueSize = 2048;
private const int DefaultMaxExportBatchSize = 512;
private static readonly TimeSpan DefaultScheduleDelay = TimeSpan.FromMilliseconds(5000);
private readonly ConcurrentQueue<Activity> exportQueue;
private readonly int maxQueueSize;
private readonly int maxExportBatchSize;
private readonly TimeSpan scheduleDelay;
private readonly ActivityExporter exporter;
private CancellationTokenSource cts;
private volatile int currentQueueSize;
private bool stopping = false;

/// <summary>
/// Initializes a new instance of the <see cref="BatchingActivityProcessor"/> class with default parameters:
/// <list type="bullet">
/// <item>
/// <description>maxQueueSize = 2048,</description>
/// </item>
/// <item>
/// <description>scheduleDelay = 5 sec,</description>
/// </item>
/// <item>
/// <description>maxExportBatchSize = 512</description>
/// </item>
/// </list>
/// </summary>
/// <param name="exporter">Exporter instance.</param>
public BatchingActivityProcessor(ActivityExporter exporter)
: this(exporter, DefaultMaxQueueSize, DefaultScheduleDelay, DefaultMaxExportBatchSize)
{
}

/// <summary>
/// Initializes a new instance of the <see cref="BatchingActivityProcessor"/> class with custom settings.
/// </summary>
/// <param name="exporter">Exporter instance.</param>
/// <param name="maxQueueSize">Maximum queue size. After the size is reached activities are dropped by processor.</param>
/// <param name="scheduleDelay">The delay between two consecutive exports.</param>
/// <param name="maxExportBatchSize">The maximum batch size of every export. It must be smaller or equal to maxQueueSize.</param>
public BatchingActivityProcessor(ActivityExporter exporter, int maxQueueSize, TimeSpan scheduleDelay, int maxExportBatchSize)
{
if (maxQueueSize <= 0)
{
throw new ArgumentOutOfRangeException(nameof(maxQueueSize));
}

if (maxExportBatchSize <= 0 || maxExportBatchSize > maxQueueSize)
{
throw new ArgumentOutOfRangeException(nameof(maxExportBatchSize));
}

this.exporter = exporter ?? throw new ArgumentNullException(nameof(exporter));
this.maxQueueSize = maxQueueSize;
this.scheduleDelay = scheduleDelay;
this.maxExportBatchSize = maxExportBatchSize;

this.cts = new CancellationTokenSource();
this.exportQueue = new ConcurrentQueue<Activity>();

// worker task that will last for lifetime of processor.
// No need to specify long running - it is useless if any async calls are made internally.
// Threads are also useless as exporter tasks run in thread pool threads.
Task.Factory.StartNew(s => this.Worker((CancellationToken)s), this.cts.Token);
MikeGoldsmith marked this conversation as resolved.
Show resolved Hide resolved
}

/// <inheritdoc/>
public override void OnStart(Activity activity)
{
}

/// <inheritdoc/>
public override void OnEnd(Activity activity)
{
if (this.stopping)
{
return;
}

// because of race-condition between checking the size and enqueueing,
// we might end up with a bit more activities than maxQueueSize.
// Let's just tolerate it to avoid extra synchronization.
if (this.currentQueueSize >= this.maxQueueSize)
{
OpenTelemetrySdkEventSource.Log.SpanProcessorQueueIsExhausted();
return;
}

Interlocked.Increment(ref this.currentQueueSize);

this.exportQueue.Enqueue(activity);
}

/// <inheritdoc/>
public override async Task ShutdownAsync(CancellationToken cancellationToken)
{
if (!this.stopping)
{
this.stopping = true;

// This will stop the loop after current batch finishes.
this.cts.Cancel(false);
this.cts.Dispose();
this.cts = null;

// if there are more items, continue until cancellation token allows
while (this.currentQueueSize > 0 && !cancellationToken.IsCancellationRequested)
{
await this.ExportBatchAsync(cancellationToken).ConfigureAwait(false);
}

await this.exporter.ShutdownAsync(cancellationToken);

// there is no point in waiting for a worker task if cancellation happens
// it's dead already or will die on the next iteration on its own

// ExportBatchAsync must never throw, we are here either because it was cancelled
// or because there are no items left
OpenTelemetrySdkEventSource.Log.ShutdownEvent(this.currentQueueSize);
}
}

public void Dispose()
{
this.Dispose(true);
}

protected virtual void Dispose(bool isDisposing)
{
if (!this.stopping)
{
this.ShutdownAsync(CancellationToken.None).ContinueWith(_ => { }).GetAwaiter().GetResult();
}

if (isDisposing)
{
if (this.exporter is IDisposable disposableExporter)
{
try
{
disposableExporter.Dispose();
}
catch (Exception e)
{
OpenTelemetrySdkEventSource.Log.SpanProcessorException("Dispose", e);
}
}
}
}

private async Task ExportBatchAsync(CancellationToken cancellationToken)
{
try
{
if (cancellationToken.IsCancellationRequested)
{
return;
}

List<Activity> batch = null;
if (this.exportQueue.TryDequeue(out var nextActivity))
{
Interlocked.Decrement(ref this.currentQueueSize);
batch = new List<Activity> { nextActivity };
MikeGoldsmith marked this conversation as resolved.
Show resolved Hide resolved
}
else
{
// nothing in queue
return;
}

while (batch.Count < this.maxExportBatchSize && this.exportQueue.TryDequeue(out nextActivity))
{
Interlocked.Decrement(ref this.currentQueueSize);
batch.Add(nextActivity);
}

var result = await this.exporter.ExportAsync(batch, cancellationToken).ConfigureAwait(false);
if (result != ExportResult.Success)
{
OpenTelemetrySdkEventSource.Log.ExporterErrorResult(result);

// we do not support retries for now and leave it up to exporter
// as only exporter implementation knows how to retry: which items failed
// and what is the reasonable policy for that exporter.
}
}
catch (Exception ex)
{
OpenTelemetrySdkEventSource.Log.SpanProcessorException(nameof(this.ExportBatchAsync), ex);
}
}

private async Task Worker(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
var sw = Stopwatch.StartNew();
await this.ExportBatchAsync(cancellationToken).ConfigureAwait(false);

if (cancellationToken.IsCancellationRequested)
{
return;
}

var remainingWait = this.scheduleDelay - sw.Elapsed;
if (remainingWait > TimeSpan.Zero)
{
await Task.Delay(remainingWait, cancellationToken).ConfigureAwait(false);
MikeGoldsmith marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// <copyright file="TestActivityExporter.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>
using System;
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using OpenTelemetry.Trace.Export;
using System.Diagnostics;

namespace OpenTelemetry.Testing.Export
{
public class TestActivityExporter : ActivityExporter
{
private readonly ConcurrentQueue<Activity> activities = new ConcurrentQueue<Activity>();
private readonly Action<IEnumerable<Activity>> onExport;
public TestActivityExporter(Action<IEnumerable<Activity>> onExport)
{
this.onExport = onExport;
}

public Activity[] ExportedActivities => activities.ToArray();

public bool WasShutDown { get; private set; } = false;

public override Task<ExportResult> ExportAsync(IEnumerable<Activity> data, CancellationToken cancellationToken)
{
this.onExport?.Invoke(data);

foreach (var s in data)
{
this.activities.Enqueue(s);
}

return Task.FromResult(ExportResult.Success);
}

public override Task ShutdownAsync(CancellationToken cancellationToken)
{
this.WasShutDown = true;
return Task.CompletedTask;
}
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// <copyright file="TestExporter.cs" company="OpenTelemetry Authors">
// <copyright file="TestSpanExporter.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -18,16 +18,15 @@
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using OpenTelemetry.Trace;
using OpenTelemetry.Trace.Export;

namespace OpenTelemetry.Testing.Export
{
public class TestExporter : SpanExporter
public class TestSpanExporter : SpanExporter
{
private readonly ConcurrentQueue<SpanData> spanDataList = new ConcurrentQueue<SpanData>();
private readonly Action<IEnumerable<SpanData>> onExport;
public TestExporter(Action<IEnumerable<SpanData>> onExport)
public TestSpanExporter(Action<IEnumerable<SpanData>> onExport)
{
this.onExport = onExport;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public void PipelineBuilder_AddExporter()
{
var builder = new SpanProcessorPipelineBuilder();

var exporter = new TestExporter(null);
var exporter = new TestSpanExporter(null);
builder.SetExporter(exporter);

Assert.Same(exporter, builder.Exporter);
Expand All @@ -72,7 +72,7 @@ public void PipelineBuilder_AddExporterAndExportingProcessor()
{
var builder = new SpanProcessorPipelineBuilder();

var exporter = new TestExporter(null);
var exporter = new TestSpanExporter(null);
builder.SetExporter(exporter);

bool processorFactoryCalled = false;
Expand Down Expand Up @@ -203,7 +203,7 @@ public void PipelineBuilder_AddProcessorChainWithExporter()
Assert.NotNull(exporter);
return new SimpleSpanProcessor(exporter);
})
.SetExporter(new TestExporter(null));
.SetExporter(new TestSpanExporter(null));

var firstProcessor = (TestProcessor)builder.Build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public void TracerBuilder_ValidArgs()
bool instrumentationFactoryCalled = true;

var sampler = new ProbabilitySampler(0.1);
var exporter = new TestExporter(_ => { });
var exporter = new TestSpanExporter(_ => { });
var options = new TracerConfiguration(1, 1, 1);

builder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public void CreateFactory_BuilderWithArgs()
{
var exporterCalledCount = 0;

var testExporter = new TestExporter(spans =>
var testExporter = new TestSpanExporter(spans =>
{
exporterCalledCount++;
Assert.Single(spans);
Expand Down Expand Up @@ -126,7 +126,7 @@ public void CreateFactory_BuilderWithMultiplePipelines()
{
var exporterCalledCount = 0;

var testExporter = new TestExporter(spans =>
var testExporter = new TestSpanExporter(spans =>
{
exporterCalledCount++;
Assert.Single(spans);
Expand Down
Loading