Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor exporter - step 10 #1135

Merged
merged 6 commits into from
Aug 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions docs/trace/building-your-own-exporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

* To export telemetry to a specific destination, custom exporters must be
written.
* Exporters should inherit from `ActivityExporter` and implement `ExportAsync`
and `ShutdownAsync` methods. `ActivityExporter` is part of the [OpenTelemetry
* Exporters should inherit from `ActivityExporter` and implement `Export` and
`Shutdown` methods. `ActivityExporter` is part of the [OpenTelemetry
Package](https://www.nuget.org/packages/opentelemetry).
* Depending on user's choice and load on the application, `ExportAsync` may get
* Depending on user's choice and load on the application, `Export` may get
called with zero or more activities.
* Exporters will only receive sampled-in and ended activities.
* Exporters must not throw.
Expand Down
11 changes: 5 additions & 6 deletions docs/trace/building-your-own-processor/MyActivityProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,14 @@ public override void OnEnd(Activity activity)
Console.WriteLine($"{this}.OnEnd");
}

public override Task ForceFlushAsync(CancellationToken cancellationToken)
public override bool ForceFlush(int timeoutMilliseconds = Timeout.Infinite)
{
Console.WriteLine($"{this}.ForceFlushAsync");
return Task.CompletedTask;
Console.WriteLine($"{this}.ForceFlush");
return true;
}

public override Task ShutdownAsync(CancellationToken cancellationToken)
public override void Shutdown(int timeoutMilliseconds = Timeout.Infinite)
{
Console.WriteLine($"{this}.ShutdownAsync");
return Task.CompletedTask;
Console.WriteLine($"{this}.Shutdown");
}
}
6 changes: 2 additions & 4 deletions docs/trace/building-your-own-sampler/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,11 @@
critical code path.

```csharp
class MySampler : Sampler
internal class MySampler : Sampler
{
public override SamplingResult ShouldSample(in SamplingParameters samplingParameters)
{
var shouldSample = true;

return new SamplingResult(shouldSample);
return new SamplingResult(SamplingDecision.RecordAndSampled);
}
}
```
3 changes: 2 additions & 1 deletion src/OpenTelemetry.Exporter.Jaeger/JaegerExporter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using OpenTelemetry.Exporter.Jaeger.Implementation;
using OpenTelemetry.Resources;
using OpenTelemetry.Trace;
Expand Down Expand Up @@ -61,7 +62,7 @@ public override ExportResult Export(in Batch<Activity> activityBatch)
}

/// <inheritdoc/>
public override void Shutdown()
public override void Shutdown(int timeoutMilliseconds = Timeout.Infinite)
{
this.JaegerAgentUdpBatcher.FlushAsync(default).GetAwaiter().GetResult();
}
Expand Down
3 changes: 2 additions & 1 deletion src/OpenTelemetry/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@
[#1094](https://github.com/open-telemetry/opentelemetry-dotnet/pull/1094)
[#1113](https://github.com/open-telemetry/opentelemetry-dotnet/pull/1113)
[#1127](https://github.com/open-telemetry/opentelemetry-dotnet/pull/1127)
[#1129](https://github.com/open-telemetry/opentelemetry-dotnet/pull/1129))
[#1129](https://github.com/open-telemetry/opentelemetry-dotnet/pull/1129)
[#1135](https://github.com/open-telemetry/opentelemetry-dotnet/pull/1135))

## 0.4.0-beta.2

Expand Down
10 changes: 8 additions & 2 deletions src/OpenTelemetry/Trace/ActivityExporter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

using System;
using System.Diagnostics;
using System.Threading;

namespace OpenTelemetry.Trace
{
Expand Down Expand Up @@ -48,9 +49,14 @@ public abstract class ActivityExporter : IDisposable
public abstract ExportResult Export(in Batch<Activity> batch);

/// <summary>
/// Shuts down the exporter.
/// Attempts to shutdown the exporter, blocks the current thread until
/// shutdown completed or timed out.
/// </summary>
public virtual void Shutdown()
/// <param name="timeoutMilliseconds">
/// The number of milliseconds to wait, or <c>Timeout.Infinite</c> to
/// wait indefinitely.
/// </param>
public virtual void Shutdown(int timeoutMilliseconds = Timeout.Infinite)
{
}

Expand Down
38 changes: 19 additions & 19 deletions src/OpenTelemetry/Trace/ActivityProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,31 +46,31 @@ public virtual void OnEnd(Activity activity)
}

/// <summary>
/// Shuts down Activity processor asynchronously.
/// Flushes the <see cref="ActivityProcessor"/>, blocks the current
/// thread until flush completed, shutdown signaled or timed out.
/// </summary>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>Returns <see cref="Task"/>.</returns>
public virtual Task ShutdownAsync(CancellationToken cancellationToken)
/// <param name="timeoutMilliseconds">
/// The number of milliseconds to wait, or <c>Timeout.Infinite</c> to
/// wait indefinitely.
/// </param>
/// <returns>
/// Returns <c>true</c> when flush completed; otherwise, <c>false</c>.
/// </returns>
public virtual bool ForceFlush(int timeoutMilliseconds = Timeout.Infinite)
{
#if NET452
return Task.FromResult(0);
#else
return Task.CompletedTask;
#endif
return true;
}

/// <summary>
/// Flushes all activities that have not yet been processed.
/// Attempts to shutdown the processor, blocks the current thread until
/// shutdown completed or timed out.
/// </summary>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>Returns <see cref="Task"/>.</returns>
public virtual Task ForceFlushAsync(CancellationToken cancellationToken)
/// <param name="timeoutMilliseconds">
/// The number of milliseconds to wait, or <c>Timeout.Infinite</c> to
/// wait indefinitely.
/// </param>
public virtual void Shutdown(int timeoutMilliseconds = Timeout.Infinite)
{
#if NET452
return Task.FromResult(0);
#else
return Task.CompletedTask;
#endif
}

/// <inheritdoc/>
Expand All @@ -91,7 +91,7 @@ protected virtual void Dispose(bool disposing)
{
try
{
this.ShutdownAsync(CancellationToken.None).GetAwaiter().GetResult();
this.Shutdown();
}
catch (Exception ex)
{
Expand Down
44 changes: 14 additions & 30 deletions src/OpenTelemetry/Trace/BatchExportActivityProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -143,18 +143,18 @@ public override void OnEnd(Activity activity)
/// the current thread until flush completed, shutdown signaled or
/// timed out.
/// </summary>
/// <param name="timeoutMillis">
/// <param name="timeoutMilliseconds">
/// The number of milliseconds to wait, or <c>Timeout.Infinite</c> to
/// wait indefinitely.
/// </param>
/// <returns>
/// Returns <c>true</c> when flush completed; otherwise, <c>false</c>.
/// </returns>
public bool ForceFlush(int timeoutMillis = Timeout.Infinite)
public override bool ForceFlush(int timeoutMilliseconds = Timeout.Infinite)
{
if (timeoutMillis < 0 && timeoutMillis != Timeout.Infinite)
if (timeoutMilliseconds < 0 && timeoutMilliseconds != Timeout.Infinite)
{
throw new ArgumentOutOfRangeException(nameof(timeoutMillis));
throw new ArgumentOutOfRangeException(nameof(timeoutMilliseconds));
}

var tail = this.circularBuffer.RemovedCount;
Expand All @@ -167,7 +167,7 @@ public bool ForceFlush(int timeoutMillis = Timeout.Infinite)

this.exportTrigger.Set();

if (timeoutMillis == 0)
if (timeoutMilliseconds == 0)
{
return false;
}
Expand All @@ -182,13 +182,13 @@ public bool ForceFlush(int timeoutMillis = Timeout.Infinite)

while (true)
{
if (timeoutMillis == Timeout.Infinite)
if (timeoutMilliseconds == Timeout.Infinite)
{
WaitHandle.WaitAny(triggers, pollingMillis);
}
else
{
var timeout = (long)timeoutMillis - sw.ElapsedMilliseconds;
var timeout = (long)timeoutMilliseconds - sw.ElapsedMilliseconds;

if (timeout <= 0)
{
Expand All @@ -210,46 +210,30 @@ public bool ForceFlush(int timeoutMillis = Timeout.Infinite)
}
}

/// <inheritdoc/>
/// <exception cref="OperationCanceledException">If the <paramref name="cancellationToken"/> is canceled.</exception>
public override Task ForceFlushAsync(CancellationToken cancellationToken)
{
// TODO
throw new NotImplementedException();
}

/// <summary>
/// Attempt to drain the queue and shutdown the exporter, blocks the
/// Attempts to drain the queue and shutdown the exporter, blocks the
/// current thread until shutdown completed or timed out.
/// </summary>
/// <param name="timeoutMillis">
/// <param name="timeoutMilliseconds">
/// The number of milliseconds to wait, or <c>Timeout.Infinite</c> to
/// wait indefinitely.
/// </param>
public void Shutdown(int timeoutMillis = Timeout.Infinite)
public override void Shutdown(int timeoutMilliseconds = Timeout.Infinite)
{
if (timeoutMillis < 0 && timeoutMillis != Timeout.Infinite)
if (timeoutMilliseconds < 0 && timeoutMilliseconds != Timeout.Infinite)
{
throw new ArgumentOutOfRangeException(nameof(timeoutMillis));
throw new ArgumentOutOfRangeException(nameof(timeoutMilliseconds));
}

this.shutdownDrainTarget = this.circularBuffer.AddedCount;
this.shutdownTrigger.Set();

if (timeoutMillis != 0)
if (timeoutMilliseconds != 0)
{
this.exporterThread.Join(timeoutMillis);
this.exporterThread.Join(timeoutMilliseconds);
}
}

/// <inheritdoc/>
/// <exception cref="OperationCanceledException">If the <paramref name="cancellationToken"/> is canceled.</exception>
public override Task ShutdownAsync(CancellationToken cancellationToken)
{
// TODO
throw new NotImplementedException();
}

/// <summary>
/// Releases the unmanaged resources used by this class and optionally releases the managed resources.
/// </summary>
Expand Down
71 changes: 58 additions & 13 deletions src/OpenTelemetry/Trace/CompositeActivityProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public CompositeActivityProcessor AddProcessor(ActivityProcessor processor)
return this;
}

/// <inheritdoc/>
public override void OnEnd(Activity activity)
{
var cur = this.head;
Expand All @@ -80,6 +81,7 @@ public override void OnEnd(Activity activity)
}
}

/// <inheritdoc/>
public override void OnStart(Activity activity)
{
var cur = this.head;
Expand All @@ -91,32 +93,75 @@ public override void OnStart(Activity activity)
}
}

public override Task ShutdownAsync(CancellationToken cancellationToken)
/// <inheritdoc/>
public override bool ForceFlush(int timeoutMilliseconds = Timeout.Infinite)
{
if (timeoutMilliseconds < 0 && timeoutMilliseconds != Timeout.Infinite)
{
throw new ArgumentOutOfRangeException(nameof(timeoutMilliseconds));
}

var cur = this.head;
var task = cur.Value.ShutdownAsync(cancellationToken);

for (cur = cur.Next; cur != null; cur = cur.Next)
var sw = Stopwatch.StartNew();

while (cur != null)
{
var processor = cur.Value;
task = task.ContinueWith(t => processor.ShutdownAsync(cancellationToken));
if (timeoutMilliseconds == Timeout.Infinite)
{
var succeeded = cur.Value.ForceFlush(Timeout.Infinite);
}
else
{
var timeout = (long)timeoutMilliseconds - sw.ElapsedMilliseconds;

if (timeout <= 0)
{
return false;
}

var succeeded = cur.Value.ForceFlush((int)timeout);

if (!succeeded)
{
return false;
}
}

cur = cur.Next;
}

return task;
return true;
}

public override Task ForceFlushAsync(CancellationToken cancellationToken)
/// <inheritdoc/>
public override void Shutdown(int timeoutMilliseconds = Timeout.Infinite)
{
if (timeoutMilliseconds < 0 && timeoutMilliseconds != Timeout.Infinite)
{
throw new ArgumentOutOfRangeException(nameof(timeoutMilliseconds));
}

var cur = this.head;
var task = cur.Value.ForceFlushAsync(cancellationToken);

for (cur = cur.Next; cur != null; cur = cur.Next)
var sw = Stopwatch.StartNew();

while (cur != null)
{
var processor = cur.Value;
task = task.ContinueWith(t => processor.ForceFlushAsync(cancellationToken));
}
if (timeoutMilliseconds == Timeout.Infinite)
{
cur.Value.Shutdown(Timeout.Infinite);
}
else
{
var timeout = (long)timeoutMilliseconds - sw.ElapsedMilliseconds;

return task;
// notify all the processors, even if we run overtime
cur.Value.Shutdown((int)Math.Max(timeout, 0));
}

cur = cur.Next;
}
}

protected override void Dispose(bool disposing)
Expand Down
9 changes: 2 additions & 7 deletions src/OpenTelemetry/Trace/ReentrantExportActivityProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,19 +53,14 @@ public override void OnEnd(Activity activity)
}

/// <inheritdoc />
public override Task ShutdownAsync(CancellationToken cancellationToken)
public override void Shutdown(int timeoutMilliseconds = Timeout.Infinite)
{
if (!this.stopped)
{
// TODO: pass down the timeout to exporter
this.exporter.Shutdown();
this.stopped = true;
}

#if NET452
return Task.FromResult(0);
#else
return Task.CompletedTask;
#endif
}

/// <summary>
Expand Down
Loading