Skip to content

Commit

Permalink
JaegerExporter batching changes (#1732)
Browse files Browse the repository at this point in the history
* Removed the logic in JaegerExporter for generating batches by process.

* CHANGELOG update.

Co-authored-by: Cijo Thomas <cithomas@microsoft.com>
  • Loading branch information
CodeBlanch and cijothomas authored Jan 30, 2021
1 parent d724634 commit 959f443
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 122 deletions.
8 changes: 6 additions & 2 deletions src/OpenTelemetry.Exporter.Jaeger/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
* Changed `JaegerExporter` class and constructor from internal to public.
([#1612](https://github.com/open-telemetry/opentelemetry-dotnet/issues/1612))

* In `JaegerExporterOptions`: Exporter options now include a switch for
Batch vs Simple exporter, and settings for batch exporting properties.
* In `JaegerExporterOptions`: Exporter options now include a switch for Batch vs
Simple exporter, and settings for batch exporting properties.

* Jaeger will now set the `error` tag when `otel.status_code` is set to `ERROR`.
([#1579](https://github.com/open-telemetry/opentelemetry-dotnet/pull/1579) &
Expand All @@ -20,6 +20,10 @@
instead of `message`.
([#1609](https://github.com/open-telemetry/opentelemetry-dotnet/pull/1609))

* `JaegerExporter` batch format has changed to be compliant with the spec. This
may impact the way spans are displayed in Jaeger UI.
([#1732](https://github.com/open-telemetry/opentelemetry-dotnet/pull/1732))

## 1.0.0-rc1.1

Released 2020-Nov-17
Expand Down
6 changes: 0 additions & 6 deletions src/OpenTelemetry.Exporter.Jaeger/Implementation/Batch.cs
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,5 @@ internal void Clear()
{
PooledList<BufferWriterMemory>.Clear(ref this.spanMessages);
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal void Return()
{
this.spanMessages.Return();
}
}
}
65 changes: 14 additions & 51 deletions src/OpenTelemetry.Exporter.Jaeger/JaegerExporter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ public class JaegerExporter : BaseExporter<Activity>
private readonly JaegerThriftClient thriftClient;
private readonly InMemoryTransport memoryTransport;
private readonly TProtocol memoryProtocol;
private Dictionary<string, Process> processCache;
private int batchByteSize;
private bool disposedValue; // To detect redundant dispose calls

Expand Down Expand Up @@ -64,24 +63,24 @@ internal JaegerExporter(JaegerExporterOptions options, TTransport clientTranspor

internal Process Process { get; set; }

internal Dictionary<string, Batch> CurrentBatches { get; } = new Dictionary<string, Batch>();
internal Batch Batch { get; private set; }

/// <inheritdoc/>
public override ExportResult Export(in Batch<Activity> activityBatch)
{
try
{
if (this.processCache == null)
if (this.Batch == null)
{
this.SetResource(this.ParentProvider.GetResource());
this.SetResourceAndInitializeBatch(this.ParentProvider.GetResource());
}

foreach (var activity in activityBatch)
{
this.AppendSpan(activity.ToJaegerSpan());
}

this.SendCurrentBatches(null);
this.SendCurrentBatch();

return ExportResult.Success;
}
Expand All @@ -93,7 +92,7 @@ public override ExportResult Export(in Batch<Activity> activityBatch)
}
}

internal void SetResource(Resource resource)
internal void SetResourceAndInitializeBatch(Resource resource)
{
if (resource is null)
{
Expand Down Expand Up @@ -142,48 +141,25 @@ internal void SetResource(Resource resource)
}

this.Process.Message = this.BuildThriftMessage(this.Process).ToArray();
this.processCache = new Dictionary<string, Process>
{
[this.Process.ServiceName] = this.Process,
};
this.Batch = new Batch(this.Process);
this.batchByteSize = this.Process.Message.Length;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal void AppendSpan(JaegerSpan jaegerSpan)
{
var spanServiceName = jaegerSpan.PeerServiceName ?? this.Process.ServiceName;

if (!this.processCache.TryGetValue(spanServiceName, out var spanProcess))
{
spanProcess = new Process(spanServiceName, this.Process.Tags);
spanProcess.Message = this.BuildThriftMessage(spanProcess).ToArray();
this.processCache.Add(spanServiceName, spanProcess);
}

var spanMessage = this.BuildThriftMessage(jaegerSpan);

jaegerSpan.Return();

var spanTotalBytesNeeded = spanMessage.Count;

if (!this.CurrentBatches.TryGetValue(spanServiceName, out var spanBatch))
{
spanBatch = new Batch(spanProcess);
this.CurrentBatches.Add(spanServiceName, spanBatch);

spanTotalBytesNeeded += spanProcess.Message.Length;
}

if (this.batchByteSize + spanTotalBytesNeeded >= this.maxPayloadSizeInBytes)
{
this.SendCurrentBatches(spanBatch);

// Flushing effectively erases the spanBatch we were working on, so we have to rebuild it.
spanTotalBytesNeeded = spanMessage.Count + spanProcess.Message.Length;
this.CurrentBatches.Add(spanServiceName, spanBatch);
this.SendCurrentBatch();
}

spanBatch.Add(spanMessage);
this.Batch.Add(spanMessage);
this.batchByteSize += spanTotalBytesNeeded;
}

Expand All @@ -206,30 +182,17 @@ protected override void Dispose(bool disposing)
base.Dispose(disposing);
}

private void SendCurrentBatches(Batch workingBatch)
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void SendCurrentBatch()
{
try
{
foreach (var batchKvp in this.CurrentBatches)
{
var batch = batchKvp.Value;

this.thriftClient.SendBatch(batch);

if (batch != workingBatch)
{
batch.Return();
}
else
{
batch.Clear();
}
}
this.thriftClient.SendBatch(this.Batch);
}
finally
{
this.CurrentBatches.Clear();
this.batchByteSize = 0;
this.Batch.Clear();
this.batchByteSize = this.Process.Message.Length;
this.memoryTransport.Reset();
}
}
Expand Down
71 changes: 8 additions & 63 deletions test/OpenTelemetry.Exporter.Jaeger.Tests/JaegerExporterTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,15 @@ public void JaegerTraceExporter_SetResource_UpdatesServiceName()

process.ServiceName = "TestService";

jaegerTraceExporter.SetResource(Resource.Empty);
jaegerTraceExporter.SetResourceAndInitializeBatch(Resource.Empty);

Assert.Equal("TestService", process.ServiceName);

jaegerTraceExporter.SetResource(ResourceBuilder.CreateEmpty().AddService("MyService").Build());
jaegerTraceExporter.SetResourceAndInitializeBatch(ResourceBuilder.CreateEmpty().AddService("MyService").Build());

Assert.Equal("MyService", process.ServiceName);

jaegerTraceExporter.SetResource(ResourceBuilder.CreateEmpty().AddService("MyService", "MyNamespace").Build());
jaegerTraceExporter.SetResourceAndInitializeBatch(ResourceBuilder.CreateEmpty().AddService("MyService", "MyNamespace").Build());

Assert.Equal("MyNamespace.MyService", process.ServiceName);
}
Expand All @@ -71,7 +71,7 @@ public void JaegerTraceExporter_SetResource_CreatesTags()
using var jaegerTraceExporter = new JaegerExporter(new JaegerExporterOptions());
var process = jaegerTraceExporter.Process;

jaegerTraceExporter.SetResource(ResourceBuilder.CreateEmpty().AddAttributes(new Dictionary<string, object>
jaegerTraceExporter.SetResourceAndInitializeBatch(ResourceBuilder.CreateEmpty().AddAttributes(new Dictionary<string, object>
{
["Tag"] = "value",
}).Build());
Expand All @@ -89,7 +89,7 @@ public void JaegerTraceExporter_SetResource_CombinesTags()

process.Tags = new Dictionary<string, JaegerTag> { ["Tag1"] = new KeyValuePair<string, object>("Tag1", "value1").ToJaegerTag() };

jaegerTraceExporter.SetResource(ResourceBuilder.CreateEmpty().AddAttributes(new Dictionary<string, object>
jaegerTraceExporter.SetResourceAndInitializeBatch(ResourceBuilder.CreateEmpty().AddAttributes(new Dictionary<string, object>
{
["Tag2"] = "value2",
}).Build());
Expand All @@ -106,7 +106,7 @@ public void JaegerTraceExporter_SetResource_IgnoreServiceResources()
using var jaegerTraceExporter = new JaegerExporter(new JaegerExporterOptions());
var process = jaegerTraceExporter.Process;

jaegerTraceExporter.SetResource(ResourceBuilder.CreateEmpty().AddAttributes(new Dictionary<string, object>
jaegerTraceExporter.SetResourceAndInitializeBatch(ResourceBuilder.CreateEmpty().AddAttributes(new Dictionary<string, object>
{
[ResourceSemanticConventions.AttributeServiceName] = "servicename",
[ResourceSemanticConventions.AttributeServiceNamespace] = "servicenamespace",
Expand All @@ -115,75 +115,20 @@ public void JaegerTraceExporter_SetResource_IgnoreServiceResources()
Assert.Null(process.Tags);
}

[Fact]
public void JaegerTraceExporter_BuildBatchesToTransmit_DefaultBatch()
{
// Arrange
using var jaegerExporter = new JaegerExporter(new JaegerExporterOptions());
jaegerExporter.SetResource(Resource.Empty);

// Act
jaegerExporter.AppendSpan(CreateTestJaegerSpan());
jaegerExporter.AppendSpan(CreateTestJaegerSpan());
jaegerExporter.AppendSpan(CreateTestJaegerSpan());

var batches = jaegerExporter.CurrentBatches.Values;

// Assert
Assert.Single(batches);
Assert.Equal(DefaultServiceName, batches.First().Process.ServiceName);
Assert.Equal(3, batches.First().Count);
}

[Fact]
public void JaegerTraceExporter_BuildBatchesToTransmit_MultipleBatches()
{
// Arrange
using var jaegerExporter = new JaegerExporter(new JaegerExporterOptions());
jaegerExporter.SetResource(Resource.Empty);

// Act
jaegerExporter.AppendSpan(CreateTestJaegerSpan());
jaegerExporter.AppendSpan(
CreateTestJaegerSpan(
additionalAttributes: new Dictionary<string, object>
{
["peer.service"] = "MySQL",
}));
jaegerExporter.AppendSpan(CreateTestJaegerSpan());

var batches = jaegerExporter.CurrentBatches.Values;

// Assert
Assert.Equal(2, batches.Count());

var primaryBatch = batches.Where(b => b.Process.ServiceName == DefaultServiceName);
Assert.Single(primaryBatch);
Assert.Equal(2, primaryBatch.First().Count);

var mySQLBatch = batches.Where(b => b.Process.ServiceName == "MySQL");
Assert.Single(mySQLBatch);
Assert.Equal(1, mySQLBatch.First().Count);
}

[Fact]
public void JaegerTraceExporter_BuildBatchesToTransmit_FlushedBatch()
{
// Arrange
using var jaegerExporter = new JaegerExporter(new JaegerExporterOptions { MaxPayloadSizeInBytes = 1500 });
jaegerExporter.SetResource(Resource.Empty);
jaegerExporter.SetResourceAndInitializeBatch(Resource.Empty);

// Act
jaegerExporter.AppendSpan(CreateTestJaegerSpan());
jaegerExporter.AppendSpan(CreateTestJaegerSpan());
jaegerExporter.AppendSpan(CreateTestJaegerSpan());

var batches = jaegerExporter.CurrentBatches.Values;

// Assert
Assert.Single(batches);
Assert.Equal(DefaultServiceName, batches.First().Process.ServiceName);
Assert.Equal(1, batches.First().Count);
Assert.Equal(1, jaegerExporter.Batch.Count);
}

internal static JaegerSpan CreateTestJaegerSpan(
Expand Down

0 comments on commit 959f443

Please sign in to comment.