Skip to content

Commit

Permalink
Refactor exporter - step 5 (#1087)
Browse files Browse the repository at this point in the history
* add a circular buffer

* add memory barrier

* ++

* clean up

* integrate the circular buffer into BatchExportActivityProcessor

* integrate the circular buffer into BatchExportActivityProcessor

* better naming

* better naming
  • Loading branch information
reyang authored Aug 16, 2020
1 parent 90c370f commit ec5683a
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 4 deletions.
22 changes: 22 additions & 0 deletions src/OpenTelemetry/Internal/CircularBuffer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,28 @@ public int Count
}
}

/// <summary>
/// Gets the number of items added to the <see cref="CircularBuffer{T}"/>.
/// </summary>
public long AddedCount
{
get
{
return this.head;
}
}

/// <summary>
/// Gets the number of items removed from the <see cref="CircularBuffer{T}"/>.
/// </summary>
public long RemovedCount
{
get
{
return this.tail;
}
}

/// <summary>
/// Attempts to add the specified item to the buffer.
/// </summary>
Expand Down
52 changes: 48 additions & 4 deletions src/OpenTelemetry/Trace/BatchExportActivityProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,12 @@ namespace OpenTelemetry.Trace
public class BatchExportActivityProcessor : ActivityProcessor
{
private readonly ActivityExporterSync exporter;
private readonly int maxQueueSize;
private readonly CircularBuffer<Activity> queue;
private readonly TimeSpan scheduledDelay;
private readonly TimeSpan exporterTimeout;
private readonly int maxExportBatchSize;
private bool disposed;
private long droppedCount = 0;

/// <summary>
/// Initializes a new instance of the <see cref="BatchExportActivityProcessor"/> class with custom settings.
Expand Down Expand Up @@ -70,17 +71,60 @@ public BatchExportActivityProcessor(
}

this.exporter = exporter ?? throw new ArgumentNullException(nameof(exporter));
this.maxQueueSize = maxQueueSize;
this.queue = new CircularBuffer<Activity>(maxQueueSize);
this.scheduledDelay = TimeSpan.FromMilliseconds(scheduledDelayMillis);
this.exporterTimeout = TimeSpan.FromMilliseconds(exporterTimeoutMillis);
this.maxExportBatchSize = maxExportBatchSize;
}

/// <summary>
/// Gets the number of <see cref="Activity"/> dropped (when the queue is full).
/// </summary>
internal long DroppedCount
{
get
{
return this.droppedCount;
}
}

/// <summary>
/// Gets the number of <see cref="Activity"/> received by the processor.
/// </summary>
internal long ReceivedCount
{
get
{
return this.queue.AddedCount + this.DroppedCount;
}
}

/// <summary>
/// Gets the number of <see cref="Activity"/> processed by the underlying exporter.
/// </summary>
internal long ProcessedCount
{
get
{
return this.queue.RemovedCount;
}
}

/// <inheritdoc/>
public override void OnEnd(Activity activity)
{
// TODO
throw new NotImplementedException();
if (this.queue.TryAdd(activity))
{
if (this.queue.Count >= this.maxExportBatchSize)
{
// TODO: signal the exporter
}

return; // enqueue succeeded
}

// drop item on the floor
Interlocked.Increment(ref this.droppedCount);
}

/// <inheritdoc/>
Expand Down

0 comments on commit ec5683a

Please sign in to comment.