Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ZipkinExporter Stuck Batch Fix #1726

Merged
merged 10 commits into from
Jan 28, 2021
46 changes: 39 additions & 7 deletions src/OpenTelemetry.Exporter.Zipkin/ZipkinExporter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,14 @@ public override ExportResult Export(in Batch<Activity> batch)
// Prevent Zipkin's HTTP operations from being instrumented.
using var scope = SuppressInstrumentationScope.Begin();

using var content = new JsonContent(this, batch);
try
{
var requestUri = this.options.Endpoint;

using var request = new HttpRequestMessage(HttpMethod.Post, requestUri)
{
Content = new JsonContent(this, batch),
Content = content,
};

using var response = this.httpClient.SendAsync(request, CancellationToken.None).GetAwaiter().GetResult();
Expand All @@ -89,6 +90,14 @@ public override ExportResult Export(in Batch<Activity> batch)
}
catch (Exception ex)
{
if (!content.BatchFlushed)
{
foreach (Activity activity in batch)
CodeBlanch marked this conversation as resolved.
Show resolved Hide resolved
{
// Drain the batch in the event of a connection failure.
}
}

ZipkinExporterEventSource.Log.FailedExport(ex);

return ExportResult.Failure;
Expand Down Expand Up @@ -221,6 +230,8 @@ public JsonContent(ZipkinExporter exporter, in Batch<Activity> batch)
this.Headers.ContentType = JsonHeader;
}

public bool BatchFlushed { get; private set; }

protected override Task SerializeToStreamAsync(Stream stream, TransportContext context)
{
#if NET452
Expand All @@ -239,19 +250,40 @@ protected override Task SerializeToStreamAsync(Stream stream, TransportContext c

this.writer.WriteStartArray();

Exception writeException = null;

foreach (var activity in this.batch)
{
var zipkinSpan = activity.ToZipkinSpan(this.exporter.LocalEndpoint, this.exporter.options.UseShortTraceIds);
if (writeException != null)
{
continue;
}

zipkinSpan.Write(this.writer);
try
{
var zipkinSpan = activity.ToZipkinSpan(this.exporter.LocalEndpoint, this.exporter.options.UseShortTraceIds);

zipkinSpan.Return();
zipkinSpan.Write(this.writer);

zipkinSpan.Return();
#if !NET452
if (this.writer.BytesPending >= this.exporter.maxPayloadSizeInBytes)
if (this.writer.BytesPending >= this.exporter.maxPayloadSizeInBytes)
{
this.writer.Flush();
}
#endif
}
catch (Exception exception)
{
this.writer.Flush();
writeException = exception;
}
#endif
}

this.BatchFlushed = true;

if (writeException != null)
{
throw writeException;
}

this.writer.WriteEndArray();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,11 @@ public void OnNext(KeyValuePair<string, object> value)
{
if (!this.handler.SupportsNullActivity && Activity.Current == null)
{
InstrumentationEventSource.Log.NullActivity(value.Key);
if (!Sdk.SuppressInstrumentation)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we add this check inside the instrumentationEventSource? because we can forget if someone adds a new Log, no?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. If you look at this method there are 4 or 5 SuppressInstrumentation checks inside it. Smells like it could use a bit of cleanup. /cc @alanwest

{
InstrumentationEventSource.Log.NullActivity(value.Key);
}

return;
}

Expand Down