Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Suppress instrumentation upon calling ActivityExporter.ExportAsync #977

15 changes: 10 additions & 5 deletions docs/trace/building-your-own-processor/MyActivityProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,6 @@ public override string ToString()
return $"{this.GetType()}({this.name})";
}

public override void OnEnd(Activity activity)
{
Console.WriteLine($"{this}.OnEnd");
}

public override Task ForceFlushAsync(CancellationToken cancellationToken)
{
Console.WriteLine($"{this}.ForceFlushAsync");
Expand All @@ -50,4 +45,14 @@ public override Task ShutdownAsync(CancellationToken cancellationToken)
Console.WriteLine($"{this}.ShutdownAsync");
return Task.CompletedTask;
}

protected override void OnStartInternal(Activity activity)
{
Console.WriteLine($"{this}.OnStart");
}

protected override void OnEndInternal(Activity activity)
{
Console.WriteLine($"{this}.OnEnd");
}
}
6 changes: 5 additions & 1 deletion examples/Console/TestConsoleExporter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ internal static object Run(ConsoleOptions options)

internal class MyProcessor : ActivityProcessor
{
public override void OnStart(Activity activity)
protected override void OnStartInternal(Activity activity)
{
if (activity.IsAllDataRequested)
{
Expand All @@ -115,6 +115,10 @@ public override void OnStart(Activity activity)
}
}
}

protected override void OnEndInternal(Activity activity)
{
}
}
}
}
4 changes: 2 additions & 2 deletions src/OpenTelemetry.Exporter.ZPages/ZPagesProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public ZPagesProcessor(ZPagesExporter exporter)
}

/// <inheritdoc />
public override void OnStart(Activity activity)
protected override void OnStartInternal(Activity activity)
{
if (!ZPagesActivityTracker.ProcessingList.ContainsKey(activity.DisplayName))
{
Expand All @@ -64,7 +64,7 @@ public override void OnStart(Activity activity)
}

/// <inheritdoc />
public override void OnEnd(Activity activity)
protected override void OnEndInternal(Activity activity)
{
try
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,6 @@ internal static bool IsInternalUrl(Uri requestUri)
{
var originalString = requestUri.OriginalString;

// zipkin
if (originalString.Contains(":9411/api/v2/spans"))
{
return true;
}

// applicationinsights
if (originalString.StartsWith("https://dc.services.visualstudio") ||
originalString.StartsWith("https://rt.services.visualstudio") ||
Expand Down
24 changes: 22 additions & 2 deletions src/OpenTelemetry/Trace/ActivityProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,28 @@ public abstract class ActivityProcessor : IDisposable
/// Activity start hook.
/// </summary>
/// <param name="activity">Instance of activity to process.</param>
public virtual void OnStart(Activity activity)
public void OnStart(Activity activity)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you open to renaming these? I think the more typical .NET-y style for this would be:

public void Start(Activity activity);
protected abstract void OnStart(Activity activity);

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If others are open, I'm open to this. It does mean a changing the public API and would affect anyone out there that has created a custom processor.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both ways are breaking, no?

Today:

public abstract void OnStart(Activity activity);

First version:

public void OnStart(Activity activity);
protected abstract void OnStartInternal(Activity activity);

Proposed version:

public void Start(Activity activity);
protected abstract void OnStart(Activity activity);

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duh. 🤦 Yes, you're right.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could move this logic up into ActivityListener to avoid the break. Like what's I'm doing on #969. But I think this is a better long-term design. We don't want a lot of stuff in ActivityListener IMO. I was thinking once this is available, I would re-do how that is done to be in the Start or Stop implementation of base ActivityProcessor.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Normally I would struggle with naming things. This time it is easier for me as the spec said we should use OnStart and OnEnd 😄.

Copy link
Member Author

@alanwest alanwest Aug 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah ha! Yes I wondered about that right after submitting this question. Great! We won't touch it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alanwest Separate PR works for me 👍

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@CodeBlanch no PR necessary 😄. As Reiley pointed out OnStart and OnEnd are the names specified in the spec for the public API.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Sorry I didn't realize you changed them to Start/OnStart already. That works for me. I think if there wasn't a spec involved something like StartActivity/OnActivityStarted would be better (what you were getting at), but good to stay as close to the spec as possible.

{
if (Sdk.SuppressInstrumentation)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wondering out loud here... should the SuppressInstrumentation check go in ActivityProcessor or should it be part of the ActivitySampler? If SuppressInstrumentation == true forced a lower sampling decision (None or PropagationOnly) that would be better for perf I think.

{
return;
}

this.OnStartInternal(activity);
}

/// <summary>
/// Activity end hook.
/// </summary>
/// <param name="activity">Instance of activity to process.</param>
public virtual void OnEnd(Activity activity)
public void OnEnd(Activity activity)
{
if (Sdk.SuppressInstrumentation)
{
return;
}

this.OnEndInternal(activity);
}

/// <summary>
Expand Down Expand Up @@ -101,5 +113,13 @@ protected virtual void Dispose(bool disposing)

this.disposed = true;
}

protected virtual void OnStartInternal(Activity activity)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alanwest The "Internal" in the name is kind of non-standard. I think we were talking about this before but I lost track. Could we not do?

public void Start(Activity activity)
protected virtual void OnStart(Activity activity)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't looked through the changes yet, imagine we have a chain of processors, checking SuppressInstrumentation on every single processor seems to be a big perf hit.

Copy link
Member Author

@alanwest alanwest Aug 11, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alanwest The "Internal" in the name is kind of non-standard. I think we were talking about this before but I lost track. Could we not do?

Agreed. I actually have this change staged up, just haven't pushed yet. Mostly just picked things up on the PR today and resolved the various conflicts.

I haven't looked through the changes yet, imagine we have a chain of processors, checking SuppressInstrumentation on every single processor seems to be a big perf hit.

Yes, I haven't spent enough time analyzing this yet. Earlier there was an idea from @CodeBlanch to see if this fits better in the sampler. Will consider this soon.

Copy link
Member

@reyang reyang Aug 11, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given SuppressInstrumentation is used for folks to express whether they want the telemetry to be suppressed or not, how about the following approach:

  • If the code (app/lib) is using new ActivitySource.StartActivity, it will be responsible to checking the SuppressInstrumentation flag and not creating the activity if the flag is set.
    • In this way the sampler/processor/exporter won't receive the activity at all since it is not even created.
    • If the code is not performing the check, the code is faulty and we are noting doing another round of check in the processor - due to perf reason.
  • If the code is using legacy Activity, the adapter will be responsible to drop the activity before sending it to the sampler/processor.
    • In this way we gain perf since the adapter is only taking legacy activities.

Copy link
Member Author

@alanwest alanwest Aug 11, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the code (app/lib) is using new ActivitySource.StartActivity, it will be responsible to checking the SuppressInstrumentation flag and not creating the activity if the flag is set.

This would require taking an OTel dependency. We want to avoid this need for library authors, no?

If the code is using legacy Activity, the adapter will be responsible to drop the activity before sending it to the sampler/processor.

Yes, this makes sense for the legacy route.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would worry a bit relying on others to do it perfectly. I'm envisioning a bunch of difficult-to-triage issues being opened when applications go into infinite loops. We could move the check into the ActivityListener? If SuppressInstrumentation is triggered, we return ActivityDataRequest.None from GetRequestedDataUsingContext (or whatever it ends up being called). If we do that, Activity is never created.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could move the check into the ActivityListener?

If perf looks good, putting the logic in the ActivityListener will work.

This would require taking an OTel dependency. We want to avoid this need for library authors, no?

Definitely yes. I think most library authors don't need to care about this (infinite loops). Only library authors who build libraries that can be used for exporting task will have to worry about it. Probably on for HTTP / TCP / UDP / Unix domain socket.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More data points:

  • In OpenTelemetry Python it seems to be fine to have only HTTP instrumentation doing the loop detection using SuppressInstrumentation, check this.
  • If only 5% of the scenario would use it, making it too generic and ask everyone to pay the perf tax could be a hard-sell (if the perf tax is low, for example 0.1%, I guess it'll be fine).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll try on this on for size tomorrow:

  • Check in ActivityListener for the new ActivitySource way
  • And check in the adapter for the legacy

Then try to get some perf numbers. If we're not happy, then I'm definitely ok entertaining scoping this only to certain instrumentation HttpClient/HttpWebRequest.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alanwest FYI - I've shared a mini benchmark project which I used for refactor the provider/processor/pipeline #1039.

{
}

protected virtual void OnEndInternal(Activity activity)
{
}
}
}
98 changes: 51 additions & 47 deletions src/OpenTelemetry/Trace/BatchingActivityProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -136,47 +136,6 @@ public BatchingActivityProcessor(ActivityExporter exporter, int maxQueueSize, Ti
};
}

/// <inheritdoc/>
public override void OnEnd(Activity activity)
{
// because of race-condition between checking the size and enqueueing,
// we might end up with a bit more activities than maxQueueSize.
// Let's just tolerate it to avoid extra synchronization.
if (this.currentQueueSize >= this.maxQueueSize)
{
OpenTelemetrySdkEventSource.Log.SpanProcessorQueueIsExhausted();
return;
}

var size = Interlocked.Increment(ref this.currentQueueSize);

this.exportQueue.Enqueue(activity);

if (size >= this.maxExportBatchSize)
{
bool lockTaken = this.flushLock.Wait(0);
if (lockTaken)
{
Task.Run(async () =>
{
try
{
await this.FlushAsyncInternal(drain: false, lockAlreadyHeld: true, CancellationToken.None).ConfigureAwait(false);
}
catch (Exception ex)
{
OpenTelemetrySdkEventSource.Log.SpanProcessorException(nameof(this.OnEnd), ex);
}
finally
{
this.flushLock.Release();
}
});
return;
}
}
}

/// <inheritdoc/>
/// <exception cref="OperationCanceledException">If the <paramref name="cancellationToken"/> is canceled.</exception>
public override async Task ShutdownAsync(CancellationToken cancellationToken)
Expand Down Expand Up @@ -225,6 +184,47 @@ protected override void Dispose(bool disposing)
}
}

/// <inheritdoc/>
protected override void OnEndInternal(Activity activity)
{
// because of race-condition between checking the size and enqueueing,
// we might end up with a bit more activities than maxQueueSize.
// Let's just tolerate it to avoid extra synchronization.
if (this.currentQueueSize >= this.maxQueueSize)
{
OpenTelemetrySdkEventSource.Log.SpanProcessorQueueIsExhausted();
return;
}

var size = Interlocked.Increment(ref this.currentQueueSize);

this.exportQueue.Enqueue(activity);

if (size >= this.maxExportBatchSize)
{
bool lockTaken = this.flushLock.Wait(0);
if (lockTaken)
{
Task.Run(async () =>
{
try
{
await this.FlushAsyncInternal(drain: false, lockAlreadyHeld: true, CancellationToken.None).ConfigureAwait(false);
}
catch (Exception ex)
{
OpenTelemetrySdkEventSource.Log.SpanProcessorException(nameof(this.OnEnd), ex);
}
finally
{
this.flushLock.Release();
}
});
return;
}
}
}

private async Task FlushAsyncInternal(bool drain, bool lockAlreadyHeld, CancellationToken cancellationToken)
{
if (!lockAlreadyHeld)
Expand Down Expand Up @@ -320,14 +320,18 @@ private async Task<int> ExportBatchAsync(CancellationToken cancellationToken)
this.batch.Add(nextActivity);
}

var result = await this.exporter.ExportAsync(this.batch, cancellationToken).ConfigureAwait(false);
if (result != ExportResult.Success)
using (Sdk.SuppressInstrumentation)
Copy link
Member Author

@alanwest alanwest Aug 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@CodeBlanch @reyang

Uh oh this is a terrible bug waiting to happen. This should be:

using (Sdk.SuppressInstrumentation.Begin())

This bug would result in the static instance getting disposed. I think we need to rethink things a bit to prevent this from being possible.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+100

{
OpenTelemetrySdkEventSource.Log.ExporterErrorResult(result);
var result = await this.exporter.ExportAsync(this.batch, cancellationToken).ConfigureAwait(false);

// we do not support retries for now and leave it up to exporter
// as only exporter implementation knows how to retry: which items failed
// and what is the reasonable policy for that exporter.
if (result != ExportResult.Success)
{
OpenTelemetrySdkEventSource.Log.ExporterErrorResult(result);

// we do not support retries for now and leave it up to exporter
// as only exporter implementation knows how to retry: which items failed
// and what is the reasonable policy for that exporter.
}
}

return this.batch.Count;
Expand Down
44 changes: 22 additions & 22 deletions src/OpenTelemetry/Trace/CompositeActivityProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -69,28 +69,6 @@ public CompositeActivityProcessor AddProcessor(ActivityProcessor processor)
return this;
}

public override void OnEnd(Activity activity)
{
var cur = this.head;

while (cur != null)
{
cur.Value.OnEnd(activity);
cur = cur.Next;
}
}

public override void OnStart(Activity activity)
{
var cur = this.head;

while (cur != null)
{
cur.Value.OnStart(activity);
cur = cur.Next;
}
}

public override Task ShutdownAsync(CancellationToken cancellationToken)
{
var cur = this.head;
Expand Down Expand Up @@ -148,6 +126,28 @@ protected override void Dispose(bool disposing)
this.disposed = true;
}

protected override void OnStartInternal(Activity activity)
{
var cur = this.head;

while (cur != null)
{
cur.Value.OnStart(activity);
cur = cur.Next;
}
}

protected override void OnEndInternal(Activity activity)
{
var cur = this.head;

while (cur != null)
{
cur.Value.OnEnd(activity);
cur = cur.Next;
}
}

private class DoublyLinkedListNode<T>
{
public readonly T Value;
Expand Down
65 changes: 35 additions & 30 deletions src/OpenTelemetry/Trace/Internal/FanOutActivityProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,36 +44,6 @@ public FanOutActivityProcessor(IEnumerable<ActivityProcessor> processors)
this.processors = new List<ActivityProcessor>(processors);
}

public override void OnEnd(Activity activity)
{
foreach (var processor in this.processors)
{
try
{
processor.OnEnd(activity);
}
catch (Exception e)
{
OpenTelemetrySdkEventSource.Log.SpanProcessorException("OnEnd", e);
}
}
}

public override void OnStart(Activity activity)
{
foreach (var processor in this.processors)
{
try
{
processor.OnStart(activity);
}
catch (Exception e)
{
OpenTelemetrySdkEventSource.Log.SpanProcessorException("OnStart", e);
}
}
}

public override Task ShutdownAsync(CancellationToken cancellationToken)
{
var tasks = new List<Task>();
Expand Down Expand Up @@ -120,5 +90,40 @@ protected override void Dispose(bool disposing)
this.disposed = true;
}
}

protected override void OnEndInternal(Activity activity)
{
foreach (var processor in this.processors)
{
try
{
processor.OnEnd(activity);
}
catch (Exception e)
{
OpenTelemetrySdkEventSource.Log.SpanProcessorException("OnEnd", e);
}
}
}

protected override void OnStartInternal(Activity activity)
{
if (Sdk.SuppressInstrumentation)
{
return;
}

foreach (var processor in this.processors)
{
try
{
processor.OnStart(activity);
}
catch (Exception e)
{
OpenTelemetrySdkEventSource.Log.SpanProcessorException("OnStart", e);
}
}
}
}
}
Loading