Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ZipkinExporter Stuck Batch Fix #1726

Merged
merged 10 commits into from
Jan 28, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ OpenTelemetry.BaseProcessor<T>.ParentProvider.get -> OpenTelemetry.BaseProvider
OpenTelemetry.BaseProcessor<T>.Shutdown(int timeoutMilliseconds = -1) -> bool
OpenTelemetry.Batch<T>
OpenTelemetry.Batch<T>.Batch() -> void
OpenTelemetry.Batch<T>.Dispose() -> void
OpenTelemetry.Batch<T>.Enumerator
OpenTelemetry.Batch<T>.Enumerator.Current.get -> T
OpenTelemetry.Batch<T>.Enumerator.Dispose() -> void
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ OpenTelemetry.BaseProcessor<T>.ParentProvider.get -> OpenTelemetry.BaseProvider
OpenTelemetry.BaseProcessor<T>.Shutdown(int timeoutMilliseconds = -1) -> bool
OpenTelemetry.Batch<T>
OpenTelemetry.Batch<T>.Batch() -> void
OpenTelemetry.Batch<T>.Dispose() -> void
OpenTelemetry.Batch<T>.Enumerator
OpenTelemetry.Batch<T>.Enumerator.Current.get -> T
OpenTelemetry.Batch<T>.Enumerator.Dispose() -> void
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ OpenTelemetry.BaseProcessor<T>.ParentProvider.get -> OpenTelemetry.BaseProvider
OpenTelemetry.BaseProcessor<T>.Shutdown(int timeoutMilliseconds = -1) -> bool
OpenTelemetry.Batch<T>
OpenTelemetry.Batch<T>.Batch() -> void
OpenTelemetry.Batch<T>.Dispose() -> void
OpenTelemetry.Batch<T>.Enumerator
OpenTelemetry.Batch<T>.Enumerator.Current.get -> T
OpenTelemetry.Batch<T>.Enumerator.Dispose() -> void
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ OpenTelemetry.BaseProcessor<T>.ParentProvider.get -> OpenTelemetry.BaseProvider
OpenTelemetry.BaseProcessor<T>.Shutdown(int timeoutMilliseconds = -1) -> bool
OpenTelemetry.Batch<T>
OpenTelemetry.Batch<T>.Batch() -> void
OpenTelemetry.Batch<T>.Dispose() -> void
OpenTelemetry.Batch<T>.Enumerator
OpenTelemetry.Batch<T>.Enumerator.Current.get -> T
OpenTelemetry.Batch<T>.Enumerator.Dispose() -> void
Expand Down
38 changes: 25 additions & 13 deletions src/OpenTelemetry/Batch.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,18 @@ namespace OpenTelemetry
/// Stores a batch of completed <typeparamref name="T"/> objects to be exported.
/// </summary>
/// <typeparam name="T">The type of object in the <see cref="Batch{T}"/>.</typeparam>
public readonly struct Batch<T>
public readonly struct Batch<T> : IDisposable
where T : class
{
private readonly T item;
private readonly CircularBuffer<T> circularBuffer;
private readonly int maxSize;
private readonly long targetCount;

internal Batch(T item)
{
this.item = item ?? throw new ArgumentNullException(nameof(item));
this.circularBuffer = null;
this.maxSize = 1;
this.targetCount = 1;
}

internal Batch(CircularBuffer<T> circularBuffer, int maxSize)
Expand All @@ -46,7 +46,20 @@ internal Batch(CircularBuffer<T> circularBuffer, int maxSize)

this.item = null;
this.circularBuffer = circularBuffer ?? throw new ArgumentNullException(nameof(circularBuffer));
this.maxSize = maxSize;
this.targetCount = circularBuffer.RemovedCount + Math.Min(maxSize, circularBuffer.Count);
}

/// <inheritdoc/>
public void Dispose()
{
if (this.circularBuffer != null)
{
// Drain anything left in the batch.
while (this.circularBuffer.RemovedCount < this.targetCount)
{
this.circularBuffer.Read();
}
}
}

/// <summary>
Expand All @@ -56,7 +69,7 @@ internal Batch(CircularBuffer<T> circularBuffer, int maxSize)
public Enumerator GetEnumerator()
{
return this.circularBuffer != null
? new Enumerator(this.circularBuffer, this.maxSize)
? new Enumerator(this.circularBuffer, this.targetCount)
: new Enumerator(this.item);
}

Expand All @@ -66,20 +79,20 @@ public Enumerator GetEnumerator()
public struct Enumerator : IEnumerator<T>
{
private readonly CircularBuffer<T> circularBuffer;
private int count;
private long targetCount;

internal Enumerator(T item)
{
this.Current = item;
this.circularBuffer = null;
this.count = -1;
this.targetCount = -1;
}

internal Enumerator(CircularBuffer<T> circularBuffer, int maxSize)
internal Enumerator(CircularBuffer<T> circularBuffer, long targetCount)
{
this.Current = null;
this.circularBuffer = circularBuffer;
this.count = Math.Min(maxSize, circularBuffer.Count);
this.targetCount = targetCount;
}

/// <inheritdoc/>
Expand All @@ -100,20 +113,19 @@ public bool MoveNext()

if (circularBuffer == null)
{
if (this.count >= 0)
if (this.targetCount >= 0)
{
this.Current = null;
return false;
}

this.count++;
this.targetCount++;
return true;
}

if (this.count > 0)
if (circularBuffer.RemovedCount < this.targetCount)
{
this.Current = circularBuffer.Read();
this.count--;
return true;
}

Expand Down
5 changes: 4 additions & 1 deletion src/OpenTelemetry/BatchExportProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,10 @@ private void ExporterProc()

if (this.circularBuffer.Count > 0)
{
this.exporter.Export(new Batch<T>(this.circularBuffer, this.maxExportBatchSize));
using (var batch = new Batch<T>(this.circularBuffer, this.maxExportBatchSize))
{
this.exporter.Export(batch);
}

this.dataExportedNotification.Set();
this.dataExportedNotification.Reset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,11 @@ public void OnNext(KeyValuePair<string, object> value)
{
if (!this.handler.SupportsNullActivity && Activity.Current == null)
{
InstrumentationEventSource.Log.NullActivity(value.Key);
if (!Sdk.SuppressInstrumentation)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we add this check inside the instrumentationEventSource? because we can forget if someone adds a new Log, no?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. If you look at this method there are 4 or 5 SuppressInstrumentation checks inside it. Smells like it could use a bit of cleanup. /cc @alanwest

{
InstrumentationEventSource.Log.NullActivity(value.Key);
}

return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,5 +176,25 @@ public void CheckExportForRecordingButNotSampledActivity()
Assert.Empty(exportedItems);
Assert.Equal(0, processor.ProcessedCount);
}

[Fact]
public void CheckExportDrainsBatchOnFailure()
{
using var exporter = new InMemoryExporter<Activity>(null);
using var processor = new BatchActivityExportProcessor(
exporter,
maxQueueSize: 3,
maxExportBatchSize: 3);

var activity = new Activity("start");
activity.ActivityTraceFlags = ActivityTraceFlags.Recorded;

processor.OnEnd(activity);
processor.OnEnd(activity);
processor.OnEnd(activity);
processor.Shutdown();

Assert.Equal(3, processor.ProcessedCount); // Verify batch was drained even though nothing was exported.
}
}
}
59 changes: 59 additions & 0 deletions test/OpenTelemetry.Tests/Trace/BatchTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,38 @@ public void CheckValidConstructors()
}
}

[Fact]
public void CheckDispose()
{
var value = "a";
var batch = new Batch<string>(value);
batch.Dispose(); // A test to make sure it doesn't bomb on a null CircularBuffer.

var circularBuffer = new CircularBuffer<string>(10);
circularBuffer.Add(value);
circularBuffer.Add(value);
circularBuffer.Add(value);
batch = new Batch<string>(circularBuffer, 10); // Max size = 10
batch.GetEnumerator().MoveNext();
Assert.Equal(3, circularBuffer.AddedCount);
Assert.Equal(1, circularBuffer.RemovedCount);
batch.Dispose(); // Test anything remaining in the batch is drained when disposed.
Assert.Equal(3, circularBuffer.AddedCount);
Assert.Equal(3, circularBuffer.RemovedCount);
batch.Dispose(); // Verify we don't go into an infinite loop or thrown when empty.

circularBuffer = new CircularBuffer<string>(10);
circularBuffer.Add(value);
circularBuffer.Add(value);
circularBuffer.Add(value);
batch = new Batch<string>(circularBuffer, 2); // Max size = 2
Assert.Equal(3, circularBuffer.AddedCount);
Assert.Equal(0, circularBuffer.RemovedCount);
batch.Dispose(); // Test the batch is drained up to max size.
Assert.Equal(3, circularBuffer.AddedCount);
Assert.Equal(2, circularBuffer.RemovedCount);
}

[Fact]
public void CheckEnumerator()
{
Expand All @@ -63,6 +95,33 @@ public void CheckEnumerator()
this.ValidateEnumerator(enumerator, value);
}

[Fact]
public void CheckMultipleEnumerator()
{
var value = "a";
var circularBuffer = new CircularBuffer<string>(10);
circularBuffer.Add(value);
circularBuffer.Add(value);
circularBuffer.Add(value);
var batch = new Batch<string>(circularBuffer, 10);

int itemsProcessed = 0;
foreach (var item in batch)
{
itemsProcessed++;
}

Assert.Equal(3, itemsProcessed);

itemsProcessed = 0;
foreach (var item in batch)
{
itemsProcessed++;
}

Assert.Equal(0, itemsProcessed);
}

[Fact]
public void CheckEnumeratorResetException()
{
Expand Down