Skip to content

Commit

Permalink
ZipkinExporter Stuck Batch Fix (#1726)
Browse files Browse the repository at this point in the history
* Fix batches growing indefinitely when ZipkinExporter can't make a connection. Fixed null activity events spamming when using SuppressInstrumentation.

* Tweaked DiagnosticSourceListener code.

* Warning fixup.

* Added Dispose to Batch<T>. BatchExportProcessor calls dispose on the Batch it creates.

* Code review.

* Code review.

* CHANGELOG updates.

Co-authored-by: Cijo Thomas <cithomas@microsoft.com>
  • Loading branch information
CodeBlanch and cijothomas authored Jan 28, 2021
1 parent cc93a78 commit 4f39a58
Show file tree
Hide file tree
Showing 11 changed files with 124 additions and 15 deletions.
4 changes: 4 additions & 0 deletions src/OpenTelemetry.Exporter.Zipkin/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
* Span tags will no longer be populated with Resource Attributes.
([#1663](https://github.com/open-telemetry/opentelemetry-dotnet/pull/1663))

* Spans will no longer be held in memory indefinitely when `ZipkinExporter`
cannot connect to the configured endpoint.
([#1726](https://github.com/open-telemetry/opentelemetry-dotnet/pull/1726))

## 1.0.0-rc1.1

Released 2020-Nov-17
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ OpenTelemetry.BaseProcessor<T>.ParentProvider.get -> OpenTelemetry.BaseProvider
OpenTelemetry.BaseProcessor<T>.Shutdown(int timeoutMilliseconds = -1) -> bool
OpenTelemetry.Batch<T>
OpenTelemetry.Batch<T>.Batch() -> void
OpenTelemetry.Batch<T>.Dispose() -> void
OpenTelemetry.Batch<T>.Enumerator
OpenTelemetry.Batch<T>.Enumerator.Current.get -> T
OpenTelemetry.Batch<T>.Enumerator.Dispose() -> void
Expand Down
1 change: 1 addition & 0 deletions src/OpenTelemetry/.publicApi/net46/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ OpenTelemetry.BaseProcessor<T>.ParentProvider.get -> OpenTelemetry.BaseProvider
OpenTelemetry.BaseProcessor<T>.Shutdown(int timeoutMilliseconds = -1) -> bool
OpenTelemetry.Batch<T>
OpenTelemetry.Batch<T>.Batch() -> void
OpenTelemetry.Batch<T>.Dispose() -> void
OpenTelemetry.Batch<T>.Enumerator
OpenTelemetry.Batch<T>.Enumerator.Current.get -> T
OpenTelemetry.Batch<T>.Enumerator.Dispose() -> void
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ OpenTelemetry.BaseProcessor<T>.ParentProvider.get -> OpenTelemetry.BaseProvider
OpenTelemetry.BaseProcessor<T>.Shutdown(int timeoutMilliseconds = -1) -> bool
OpenTelemetry.Batch<T>
OpenTelemetry.Batch<T>.Batch() -> void
OpenTelemetry.Batch<T>.Dispose() -> void
OpenTelemetry.Batch<T>.Enumerator
OpenTelemetry.Batch<T>.Enumerator.Current.get -> T
OpenTelemetry.Batch<T>.Enumerator.Dispose() -> void
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ OpenTelemetry.BaseProcessor<T>.ParentProvider.get -> OpenTelemetry.BaseProvider
OpenTelemetry.BaseProcessor<T>.Shutdown(int timeoutMilliseconds = -1) -> bool
OpenTelemetry.Batch<T>
OpenTelemetry.Batch<T>.Batch() -> void
OpenTelemetry.Batch<T>.Dispose() -> void
OpenTelemetry.Batch<T>.Enumerator
OpenTelemetry.Batch<T>.Enumerator.Current.get -> T
OpenTelemetry.Batch<T>.Enumerator.Dispose() -> void
Expand Down
38 changes: 25 additions & 13 deletions src/OpenTelemetry/Batch.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,18 @@ namespace OpenTelemetry
/// Stores a batch of completed <typeparamref name="T"/> objects to be exported.
/// </summary>
/// <typeparam name="T">The type of object in the <see cref="Batch{T}"/>.</typeparam>
public readonly struct Batch<T>
public readonly struct Batch<T> : IDisposable
where T : class
{
private readonly T item;
private readonly CircularBuffer<T> circularBuffer;
private readonly int maxSize;
private readonly long targetCount;

internal Batch(T item)
{
this.item = item ?? throw new ArgumentNullException(nameof(item));
this.circularBuffer = null;
this.maxSize = 1;
this.targetCount = 1;
}

internal Batch(CircularBuffer<T> circularBuffer, int maxSize)
Expand All @@ -46,7 +46,20 @@ internal Batch(CircularBuffer<T> circularBuffer, int maxSize)

this.item = null;
this.circularBuffer = circularBuffer ?? throw new ArgumentNullException(nameof(circularBuffer));
this.maxSize = maxSize;
this.targetCount = circularBuffer.RemovedCount + Math.Min(maxSize, circularBuffer.Count);
}

/// <inheritdoc/>
public void Dispose()
{
if (this.circularBuffer != null)
{
// Drain anything left in the batch.
while (this.circularBuffer.RemovedCount < this.targetCount)
{
this.circularBuffer.Read();
}
}
}

/// <summary>
Expand All @@ -56,7 +69,7 @@ internal Batch(CircularBuffer<T> circularBuffer, int maxSize)
public Enumerator GetEnumerator()
{
return this.circularBuffer != null
? new Enumerator(this.circularBuffer, this.maxSize)
? new Enumerator(this.circularBuffer, this.targetCount)
: new Enumerator(this.item);
}

Expand All @@ -66,20 +79,20 @@ public Enumerator GetEnumerator()
public struct Enumerator : IEnumerator<T>
{
private readonly CircularBuffer<T> circularBuffer;
private int count;
private long targetCount;

internal Enumerator(T item)
{
this.Current = item;
this.circularBuffer = null;
this.count = -1;
this.targetCount = -1;
}

internal Enumerator(CircularBuffer<T> circularBuffer, int maxSize)
internal Enumerator(CircularBuffer<T> circularBuffer, long targetCount)
{
this.Current = null;
this.circularBuffer = circularBuffer;
this.count = Math.Min(maxSize, circularBuffer.Count);
this.targetCount = targetCount;
}

/// <inheritdoc/>
Expand All @@ -100,20 +113,19 @@ public bool MoveNext()

if (circularBuffer == null)
{
if (this.count >= 0)
if (this.targetCount >= 0)
{
this.Current = null;
return false;
}

this.count++;
this.targetCount++;
return true;
}

if (this.count > 0)
if (circularBuffer.RemovedCount < this.targetCount)
{
this.Current = circularBuffer.Read();
this.count--;
return true;
}

Expand Down
5 changes: 4 additions & 1 deletion src/OpenTelemetry/BatchExportProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,10 @@ private void ExporterProc()

if (this.circularBuffer.Count > 0)
{
this.exporter.Export(new Batch<T>(this.circularBuffer, this.maxExportBatchSize));
using (var batch = new Batch<T>(this.circularBuffer, this.maxExportBatchSize))
{
this.exporter.Export(batch);
}

this.dataExportedNotification.Set();
this.dataExportedNotification.Reset();
Expand Down
3 changes: 3 additions & 0 deletions src/OpenTelemetry/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
resource's attributes in a conflict. We've rectified to follow a recent
change to the spec. We previously prioritized "this" resource's tags.
([#1728](https://github.com/open-telemetry/opentelemetry-dotnet/pull/1728))
* `BatchExportProcessor` will now flush any remaining spans left in a `Batch`
after the export operation has completed.
([#1726](https://github.com/open-telemetry/opentelemetry-dotnet/pull/1726))

## 1.0.0-rc1.1

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,11 @@ public void OnNext(KeyValuePair<string, object> value)
{
if (!this.handler.SupportsNullActivity && Activity.Current == null)
{
InstrumentationEventSource.Log.NullActivity(value.Key);
if (!Sdk.SuppressInstrumentation)
{
InstrumentationEventSource.Log.NullActivity(value.Key);
}

return;
}

Expand Down
20 changes: 20 additions & 0 deletions test/OpenTelemetry.Tests/Trace/BatchExportActivityProcessorTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -176,5 +176,25 @@ public void CheckExportForRecordingButNotSampledActivity()
Assert.Empty(exportedItems);
Assert.Equal(0, processor.ProcessedCount);
}

[Fact]
public void CheckExportDrainsBatchOnFailure()
{
using var exporter = new InMemoryExporter<Activity>(null);
using var processor = new BatchActivityExportProcessor(
exporter,
maxQueueSize: 3,
maxExportBatchSize: 3);

var activity = new Activity("start");
activity.ActivityTraceFlags = ActivityTraceFlags.Recorded;

processor.OnEnd(activity);
processor.OnEnd(activity);
processor.OnEnd(activity);
processor.Shutdown();

Assert.Equal(3, processor.ProcessedCount); // Verify batch was drained even though nothing was exported.
}
}
}
59 changes: 59 additions & 0 deletions test/OpenTelemetry.Tests/Trace/BatchTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,38 @@ public void CheckValidConstructors()
}
}

[Fact]
public void CheckDispose()
{
var value = "a";
var batch = new Batch<string>(value);
batch.Dispose(); // A test to make sure it doesn't bomb on a null CircularBuffer.

var circularBuffer = new CircularBuffer<string>(10);
circularBuffer.Add(value);
circularBuffer.Add(value);
circularBuffer.Add(value);
batch = new Batch<string>(circularBuffer, 10); // Max size = 10
batch.GetEnumerator().MoveNext();
Assert.Equal(3, circularBuffer.AddedCount);
Assert.Equal(1, circularBuffer.RemovedCount);
batch.Dispose(); // Test anything remaining in the batch is drained when disposed.
Assert.Equal(3, circularBuffer.AddedCount);
Assert.Equal(3, circularBuffer.RemovedCount);
batch.Dispose(); // Verify we don't go into an infinite loop or thrown when empty.

circularBuffer = new CircularBuffer<string>(10);
circularBuffer.Add(value);
circularBuffer.Add(value);
circularBuffer.Add(value);
batch = new Batch<string>(circularBuffer, 2); // Max size = 2
Assert.Equal(3, circularBuffer.AddedCount);
Assert.Equal(0, circularBuffer.RemovedCount);
batch.Dispose(); // Test the batch is drained up to max size.
Assert.Equal(3, circularBuffer.AddedCount);
Assert.Equal(2, circularBuffer.RemovedCount);
}

[Fact]
public void CheckEnumerator()
{
Expand All @@ -63,6 +95,33 @@ public void CheckEnumerator()
this.ValidateEnumerator(enumerator, value);
}

[Fact]
public void CheckMultipleEnumerator()
{
var value = "a";
var circularBuffer = new CircularBuffer<string>(10);
circularBuffer.Add(value);
circularBuffer.Add(value);
circularBuffer.Add(value);
var batch = new Batch<string>(circularBuffer, 10);

int itemsProcessed = 0;
foreach (var item in batch)
{
itemsProcessed++;
}

Assert.Equal(3, itemsProcessed);

itemsProcessed = 0;
foreach (var item in batch)
{
itemsProcessed++;
}

Assert.Equal(0, itemsProcessed);
}

[Fact]
public void CheckEnumeratorResetException()
{
Expand Down

0 comments on commit 4f39a58

Please sign in to comment.