Skip to content

Commit

Permalink
Refactor exporter - step 10 (#1135)
Browse files Browse the repository at this point in the history
* make Shutdown/Flush sync

* calculate remaining time

* fix doc

* update changelog

* s/timeoutMillis/timeoutMilliseconds/g

Co-authored-by: Mikel Blanchard <mblanchard@macrosssoftware.com>
  • Loading branch information
reyang and CodeBlanch authored Aug 22, 2020
1 parent f5bffdb commit 74232b8
Show file tree
Hide file tree
Showing 16 changed files with 142 additions and 158 deletions.
6 changes: 3 additions & 3 deletions docs/trace/building-your-own-exporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

* To export telemetry to a specific destination, custom exporters must be
written.
* Exporters should inherit from `ActivityExporter` and implement `ExportAsync`
and `ShutdownAsync` methods. `ActivityExporter` is part of the [OpenTelemetry
* Exporters should inherit from `ActivityExporter` and implement `Export` and
`Shutdown` methods. `ActivityExporter` is part of the [OpenTelemetry
Package](https://www.nuget.org/packages/opentelemetry).
* Depending on user's choice and load on the application, `ExportAsync` may get
* Depending on user's choice and load on the application, `Export` may get
called with zero or more activities.
* Exporters will only receive sampled-in and ended activities.
* Exporters must not throw.
Expand Down
11 changes: 5 additions & 6 deletions docs/trace/building-your-own-processor/MyActivityProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,14 @@ public override void OnEnd(Activity activity)
Console.WriteLine($"{this}.OnEnd");
}

public override Task ForceFlushAsync(CancellationToken cancellationToken)
public override bool ForceFlush(int timeoutMilliseconds = Timeout.Infinite)
{
Console.WriteLine($"{this}.ForceFlushAsync");
return Task.CompletedTask;
Console.WriteLine($"{this}.ForceFlush");
return true;
}

public override Task ShutdownAsync(CancellationToken cancellationToken)
public override void Shutdown(int timeoutMilliseconds = Timeout.Infinite)
{
Console.WriteLine($"{this}.ShutdownAsync");
return Task.CompletedTask;
Console.WriteLine($"{this}.Shutdown");
}
}
6 changes: 2 additions & 4 deletions docs/trace/building-your-own-sampler/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,11 @@
critical code path.

```csharp
class MySampler : Sampler
internal class MySampler : Sampler
{
public override SamplingResult ShouldSample(in SamplingParameters samplingParameters)
{
var shouldSample = true;

return new SamplingResult(shouldSample);
return new SamplingResult(SamplingDecision.RecordAndSampled);
}
}
```
3 changes: 2 additions & 1 deletion src/OpenTelemetry.Exporter.Jaeger/JaegerExporter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using OpenTelemetry.Exporter.Jaeger.Implementation;
using OpenTelemetry.Resources;
using OpenTelemetry.Trace;
Expand Down Expand Up @@ -61,7 +62,7 @@ public override ExportResult Export(in Batch<Activity> activityBatch)
}

/// <inheritdoc/>
public override void Shutdown()
public override void Shutdown(int timeoutMilliseconds = Timeout.Infinite)
{
this.JaegerAgentUdpBatcher.FlushAsync(default).GetAwaiter().GetResult();
}
Expand Down
3 changes: 2 additions & 1 deletion src/OpenTelemetry/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@
[#1094](https://github.com/open-telemetry/opentelemetry-dotnet/pull/1094)
[#1113](https://github.com/open-telemetry/opentelemetry-dotnet/pull/1113)
[#1127](https://github.com/open-telemetry/opentelemetry-dotnet/pull/1127)
[#1129](https://github.com/open-telemetry/opentelemetry-dotnet/pull/1129))
[#1129](https://github.com/open-telemetry/opentelemetry-dotnet/pull/1129)
[#1135](https://github.com/open-telemetry/opentelemetry-dotnet/pull/1135))

## 0.4.0-beta.2

Expand Down
10 changes: 8 additions & 2 deletions src/OpenTelemetry/Trace/ActivityExporter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

using System;
using System.Diagnostics;
using System.Threading;

namespace OpenTelemetry.Trace
{
Expand Down Expand Up @@ -48,9 +49,14 @@ public abstract class ActivityExporter : IDisposable
public abstract ExportResult Export(in Batch<Activity> batch);

/// <summary>
/// Shuts down the exporter.
/// Attempts to shutdown the exporter, blocks the current thread until
/// shutdown completed or timed out.
/// </summary>
public virtual void Shutdown()
/// <param name="timeoutMilliseconds">
/// The number of milliseconds to wait, or <c>Timeout.Infinite</c> to
/// wait indefinitely.
/// </param>
public virtual void Shutdown(int timeoutMilliseconds = Timeout.Infinite)
{
}

Expand Down
38 changes: 19 additions & 19 deletions src/OpenTelemetry/Trace/ActivityProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,31 +46,31 @@ public virtual void OnEnd(Activity activity)
}

/// <summary>
/// Shuts down Activity processor asynchronously.
/// Flushes the <see cref="ActivityProcessor"/>, blocks the current
/// thread until flush completed, shutdown signaled or timed out.
/// </summary>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>Returns <see cref="Task"/>.</returns>
public virtual Task ShutdownAsync(CancellationToken cancellationToken)
/// <param name="timeoutMilliseconds">
/// The number of milliseconds to wait, or <c>Timeout.Infinite</c> to
/// wait indefinitely.
/// </param>
/// <returns>
/// Returns <c>true</c> when flush completed; otherwise, <c>false</c>.
/// </returns>
public virtual bool ForceFlush(int timeoutMilliseconds = Timeout.Infinite)
{
#if NET452
return Task.FromResult(0);
#else
return Task.CompletedTask;
#endif
return true;
}

/// <summary>
/// Flushes all activities that have not yet been processed.
/// Attempts to shutdown the processor, blocks the current thread until
/// shutdown completed or timed out.
/// </summary>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>Returns <see cref="Task"/>.</returns>
public virtual Task ForceFlushAsync(CancellationToken cancellationToken)
/// <param name="timeoutMilliseconds">
/// The number of milliseconds to wait, or <c>Timeout.Infinite</c> to
/// wait indefinitely.
/// </param>
public virtual void Shutdown(int timeoutMilliseconds = Timeout.Infinite)
{
#if NET452
return Task.FromResult(0);
#else
return Task.CompletedTask;
#endif
}

/// <inheritdoc/>
Expand All @@ -91,7 +91,7 @@ protected virtual void Dispose(bool disposing)
{
try
{
this.ShutdownAsync(CancellationToken.None).GetAwaiter().GetResult();
this.Shutdown();
}
catch (Exception ex)
{
Expand Down
44 changes: 14 additions & 30 deletions src/OpenTelemetry/Trace/BatchExportActivityProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -143,18 +143,18 @@ public override void OnEnd(Activity activity)
/// the current thread until flush completed, shutdown signaled or
/// timed out.
/// </summary>
/// <param name="timeoutMillis">
/// <param name="timeoutMilliseconds">
/// The number of milliseconds to wait, or <c>Timeout.Infinite</c> to
/// wait indefinitely.
/// </param>
/// <returns>
/// Returns <c>true</c> when flush completed; otherwise, <c>false</c>.
/// </returns>
public bool ForceFlush(int timeoutMillis = Timeout.Infinite)
public override bool ForceFlush(int timeoutMilliseconds = Timeout.Infinite)
{
if (timeoutMillis < 0 && timeoutMillis != Timeout.Infinite)
if (timeoutMilliseconds < 0 && timeoutMilliseconds != Timeout.Infinite)
{
throw new ArgumentOutOfRangeException(nameof(timeoutMillis));
throw new ArgumentOutOfRangeException(nameof(timeoutMilliseconds));
}

var tail = this.circularBuffer.RemovedCount;
Expand All @@ -167,7 +167,7 @@ public bool ForceFlush(int timeoutMillis = Timeout.Infinite)

this.exportTrigger.Set();

if (timeoutMillis == 0)
if (timeoutMilliseconds == 0)
{
return false;
}
Expand All @@ -182,13 +182,13 @@ public bool ForceFlush(int timeoutMillis = Timeout.Infinite)

while (true)
{
if (timeoutMillis == Timeout.Infinite)
if (timeoutMilliseconds == Timeout.Infinite)
{
WaitHandle.WaitAny(triggers, pollingMillis);
}
else
{
var timeout = (long)timeoutMillis - sw.ElapsedMilliseconds;
var timeout = (long)timeoutMilliseconds - sw.ElapsedMilliseconds;

if (timeout <= 0)
{
Expand All @@ -210,46 +210,30 @@ public bool ForceFlush(int timeoutMillis = Timeout.Infinite)
}
}

/// <inheritdoc/>
/// <exception cref="OperationCanceledException">If the <paramref name="cancellationToken"/> is canceled.</exception>
public override Task ForceFlushAsync(CancellationToken cancellationToken)
{
// TODO
throw new NotImplementedException();
}

/// <summary>
/// Attempt to drain the queue and shutdown the exporter, blocks the
/// Attempts to drain the queue and shutdown the exporter, blocks the
/// current thread until shutdown completed or timed out.
/// </summary>
/// <param name="timeoutMillis">
/// <param name="timeoutMilliseconds">
/// The number of milliseconds to wait, or <c>Timeout.Infinite</c> to
/// wait indefinitely.
/// </param>
public void Shutdown(int timeoutMillis = Timeout.Infinite)
public override void Shutdown(int timeoutMilliseconds = Timeout.Infinite)
{
if (timeoutMillis < 0 && timeoutMillis != Timeout.Infinite)
if (timeoutMilliseconds < 0 && timeoutMilliseconds != Timeout.Infinite)
{
throw new ArgumentOutOfRangeException(nameof(timeoutMillis));
throw new ArgumentOutOfRangeException(nameof(timeoutMilliseconds));
}

this.shutdownDrainTarget = this.circularBuffer.AddedCount;
this.shutdownTrigger.Set();

if (timeoutMillis != 0)
if (timeoutMilliseconds != 0)
{
this.exporterThread.Join(timeoutMillis);
this.exporterThread.Join(timeoutMilliseconds);
}
}

/// <inheritdoc/>
/// <exception cref="OperationCanceledException">If the <paramref name="cancellationToken"/> is canceled.</exception>
public override Task ShutdownAsync(CancellationToken cancellationToken)
{
// TODO
throw new NotImplementedException();
}

/// <summary>
/// Releases the unmanaged resources used by this class and optionally releases the managed resources.
/// </summary>
Expand Down
71 changes: 58 additions & 13 deletions src/OpenTelemetry/Trace/CompositeActivityProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public CompositeActivityProcessor AddProcessor(ActivityProcessor processor)
return this;
}

/// <inheritdoc/>
public override void OnEnd(Activity activity)
{
var cur = this.head;
Expand All @@ -80,6 +81,7 @@ public override void OnEnd(Activity activity)
}
}

/// <inheritdoc/>
public override void OnStart(Activity activity)
{
var cur = this.head;
Expand All @@ -91,32 +93,75 @@ public override void OnStart(Activity activity)
}
}

public override Task ShutdownAsync(CancellationToken cancellationToken)
/// <inheritdoc/>
public override bool ForceFlush(int timeoutMilliseconds = Timeout.Infinite)
{
if (timeoutMilliseconds < 0 && timeoutMilliseconds != Timeout.Infinite)
{
throw new ArgumentOutOfRangeException(nameof(timeoutMilliseconds));
}

var cur = this.head;
var task = cur.Value.ShutdownAsync(cancellationToken);

for (cur = cur.Next; cur != null; cur = cur.Next)
var sw = Stopwatch.StartNew();

while (cur != null)
{
var processor = cur.Value;
task = task.ContinueWith(t => processor.ShutdownAsync(cancellationToken));
if (timeoutMilliseconds == Timeout.Infinite)
{
var succeeded = cur.Value.ForceFlush(Timeout.Infinite);
}
else
{
var timeout = (long)timeoutMilliseconds - sw.ElapsedMilliseconds;

if (timeout <= 0)
{
return false;
}

var succeeded = cur.Value.ForceFlush((int)timeout);

if (!succeeded)
{
return false;
}
}

cur = cur.Next;
}

return task;
return true;
}

public override Task ForceFlushAsync(CancellationToken cancellationToken)
/// <inheritdoc/>
public override void Shutdown(int timeoutMilliseconds = Timeout.Infinite)
{
if (timeoutMilliseconds < 0 && timeoutMilliseconds != Timeout.Infinite)
{
throw new ArgumentOutOfRangeException(nameof(timeoutMilliseconds));
}

var cur = this.head;
var task = cur.Value.ForceFlushAsync(cancellationToken);

for (cur = cur.Next; cur != null; cur = cur.Next)
var sw = Stopwatch.StartNew();

while (cur != null)
{
var processor = cur.Value;
task = task.ContinueWith(t => processor.ForceFlushAsync(cancellationToken));
}
if (timeoutMilliseconds == Timeout.Infinite)
{
cur.Value.Shutdown(Timeout.Infinite);
}
else
{
var timeout = (long)timeoutMilliseconds - sw.ElapsedMilliseconds;

return task;
// notify all the processors, even if we run overtime
cur.Value.Shutdown((int)Math.Max(timeout, 0));
}

cur = cur.Next;
}
}

protected override void Dispose(bool disposing)
Expand Down
9 changes: 2 additions & 7 deletions src/OpenTelemetry/Trace/ReentrantExportActivityProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,19 +53,14 @@ public override void OnEnd(Activity activity)
}

/// <inheritdoc />
public override Task ShutdownAsync(CancellationToken cancellationToken)
public override void Shutdown(int timeoutMilliseconds = Timeout.Infinite)
{
if (!this.stopped)
{
// TODO: pass down the timeout to exporter
this.exporter.Shutdown();
this.stopped = true;
}

#if NET452
return Task.FromResult(0);
#else
return Task.CompletedTask;
#endif
}

/// <summary>
Expand Down
Loading

0 comments on commit 74232b8

Please sign in to comment.