Skip to content

Commit

Permalink
Delay pipeline disposal when still in use (#1579)
Browse files Browse the repository at this point in the history
  • Loading branch information
martintmk committed Sep 8, 2023
1 parent 6999e7c commit a0e7735
Show file tree
Hide file tree
Showing 7 changed files with 260 additions and 15 deletions.
9 changes: 6 additions & 3 deletions src/Polly.Core/Registry/RegistryPipelineComponentBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,17 @@ private Builder CreateBuilder()
builder.InstanceName = _instanceName;
_configure(builder, context);

var timeProvider = builder.TimeProvider;
var telemetry = new ResilienceStrategyTelemetry(
new ResilienceTelemetrySource(builder.Name, builder.InstanceName, null),
builder.TelemetryListener);

return new(
() => PipelineComponentFactory.WithDisposableCallbacks(
builder.BuildPipelineComponent(),
context.DisposeCallbacks),
() =>
{
var innerComponent = PipelineComponentFactory.WithDisposableCallbacks(builder.BuildPipelineComponent(), context.DisposeCallbacks);
return PipelineComponentFactory.WithExecutionTracking(innerComponent, timeProvider);
},
context.ReloadTokens,
telemetry);
}
Expand Down
59 changes: 59 additions & 0 deletions src/Polly.Core/Utils/Pipeline/ExecutionTrackingComponent.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
namespace Polly.Utils.Pipeline;

internal sealed class ExecutionTrackingComponent : PipelineComponent
{
public static readonly TimeSpan Timeout = TimeSpan.FromSeconds(30);

public static readonly TimeSpan SleepDelay = TimeSpan.FromSeconds(1);

private readonly TimeProvider _timeProvider;
private int _pendingExecutions;

public ExecutionTrackingComponent(PipelineComponent component, TimeProvider timeProvider)
{
Component = component;
_timeProvider = timeProvider;
}

public PipelineComponent Component { get; }

public bool HasPendingExecutions => Interlocked.CompareExchange(ref _pendingExecutions, 0, 0) > 0;

internal override async ValueTask<Outcome<TResult>> ExecuteCore<TResult, TState>(
Func<ResilienceContext, TState, ValueTask<Outcome<TResult>>> callback,
ResilienceContext context,
TState state)
{
Interlocked.Increment(ref _pendingExecutions);

try
{
return await Component.ExecuteCore(callback, context, state).ConfigureAwait(context.ContinueOnCapturedContext);
}
finally
{
Interlocked.Decrement(ref _pendingExecutions);
}
}

public override async ValueTask DisposeAsync()
{
var start = _timeProvider.GetTimestamp();
var stopwatch = Stopwatch.StartNew();

// We don't want to introduce locks or any synchronization primitives to main execution path
// so we will do "dummy" retries until there are no more executions.
while (HasPendingExecutions)
{
await _timeProvider.Delay(SleepDelay).ConfigureAwait(false);

// stryker disable once equality : no means to test this
if (_timeProvider.GetElapsedTime(start) > Timeout)
{
break;
}
}

await Component.DisposeAsync().ConfigureAwait(false);
}
}
2 changes: 2 additions & 0 deletions src/Polly.Core/Utils/Pipeline/PipelineComponentFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ public static PipelineComponent WithDisposableCallbacks(PipelineComponent compon
return new ComponentWithDisposeCallbacks(component, callbacks.ToList());
}

public static PipelineComponent WithExecutionTracking(PipelineComponent component, TimeProvider timeProvider) => new ExecutionTrackingComponent(component, timeProvider);

public static PipelineComponent CreateComposite(
IReadOnlyList<PipelineComponent> components,
ResilienceStrategyTelemetry telemetry,
Expand Down
18 changes: 8 additions & 10 deletions src/Polly.Testing/ResiliencePipelineExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ private static ResiliencePipelineDescriptor GetPipelineDescriptorCore<T>(Pipelin
var components = new List<PipelineComponent>();
component.ExpandComponents(components);

var descriptors = components.Select(s => new ResilienceStrategyDescriptor(s.Options, GetStrategyInstance<T>(s))).ToList();
var descriptors = components.OfType<BridgeComponentBase>().Select(s => new ResilienceStrategyDescriptor(s.Options, GetStrategyInstance<T>(s))).ToList().AsReadOnly();

return new ResiliencePipelineDescriptor(
descriptors.Where(s => !ShouldSkip(s.StrategyInstance)).ToList().AsReadOnly(),
descriptors,
isReloadable: components.Exists(s => s is ReloadableComponent));
}

Expand All @@ -54,16 +54,9 @@ private static object GetStrategyInstance<T>(PipelineComponent component)
return reactiveBridge.Strategy;
}

if (component is BridgeComponent nonReactiveBridge)
{
return nonReactiveBridge.Strategy;
}

return component;
return ((BridgeComponent)component).Strategy;
}

private static bool ShouldSkip(object instance) => instance is ReloadableComponent || instance is ComponentWithDisposeCallbacks;

private static void ExpandComponents(this PipelineComponent component, List<PipelineComponent> components)
{
if (component is CompositeComponent pipeline)
Expand All @@ -78,6 +71,11 @@ private static void ExpandComponents(this PipelineComponent component, List<Pipe
components.Add(reloadable);
ExpandComponents(reloadable.Component, components);
}
else if (component is ExecutionTrackingComponent tracking)
{
components.Add(tracking);
ExpandComponents(tracking.Component, components);
}
else if (component is ComponentWithDisposeCallbacks callbacks)
{
ExpandComponents(callbacks.Component, components);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,11 @@ public void Dispose_WhenScheduledTaskExecuting()
ResilienceContextPool.Shared.Get(),
out var task);

ready.WaitOne(TimeSpan.FromSeconds(2)).Should().BeTrue();
ready.WaitOne(TimeSpan.FromSeconds(10)).Should().BeTrue();
scheduler.Dispose();
disposed.Set();

scheduler.ProcessingTask.Wait(TimeSpan.FromSeconds(2)).Should().BeTrue();
scheduler.ProcessingTask.Wait(TimeSpan.FromSeconds(10)).Should().BeTrue();
}

[Fact]
Expand Down
15 changes: 15 additions & 0 deletions test/Polly.Core.Tests/Registry/ResiliencePipelineRegistryTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using Polly.Testing;
using Polly.Timeout;
using Polly.Utils;
using Polly.Utils.Pipeline;

namespace Polly.Core.Tests.Registry;

Expand Down Expand Up @@ -380,6 +381,20 @@ public void GetOrAddPipeline_Ok()
called.Should().Be(1);
}

[Fact]
public void GetOrAddPipeline_EnsureCorrectComponents()
{
var id = new StrategyId(typeof(string), "A");

using var registry = CreateRegistry();

var pipeline = registry.GetOrAddPipeline(id, builder => builder.AddTimeout(TimeSpan.FromSeconds(1)));
pipeline.Component.Should().BeOfType<ExecutionTrackingComponent>().Subject.Component.Should().BeOfType<CompositeComponent>();

var genericPipeline = registry.GetOrAddPipeline<string>(id, builder => builder.AddTimeout(TimeSpan.FromSeconds(1)));
pipeline.Component.Should().BeOfType<ExecutionTrackingComponent>().Subject.Component.Should().BeOfType<CompositeComponent>();
}

[Fact]
public void GetOrAddPipeline_Generic_Ok()
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
using System;
using System.Threading.Tasks;
using Microsoft.Extensions.Time.Testing;
using Polly.Utils.Pipeline;

namespace Polly.Core.Tests.Utils.Pipeline;

public class ExecutionTrackingComponentTests
{
private readonly FakeTimeProvider _timeProvider = new();

[Fact]
public async Task DisposeAsync_PendingOperations_Delayed()
{
using var assert = new ManualResetEvent(false);
using var executing = new ManualResetEvent(false);

await using var inner = new Inner
{
OnExecute = () =>
{
executing.Set();
assert.WaitOne();
}
};

var component = new ExecutionTrackingComponent(inner, _timeProvider);
var execution = Task.Run(() => new ResiliencePipeline(component, Polly.Utils.DisposeBehavior.Allow).Execute(() => { }));
executing.WaitOne();

var disposeTask = component.DisposeAsync().AsTask();
_timeProvider.Advance(ExecutionTrackingComponent.SleepDelay);
inner.Disposed.Should().BeFalse();
assert.Set();

_timeProvider.Advance(ExecutionTrackingComponent.SleepDelay);
await execution;

_timeProvider.Advance(ExecutionTrackingComponent.SleepDelay);
await disposeTask;

inner.Disposed.Should().BeTrue();
}

[Fact]
public async Task HasPendingExecutions_Ok()
{
using var assert = new ManualResetEvent(false);
using var executing = new ManualResetEvent(false);

await using var inner = new Inner
{
OnExecute = () =>
{
executing.Set();
assert.WaitOne();
}
};

await using var component = new ExecutionTrackingComponent(inner, _timeProvider);
var execution = Task.Run(() => new ResiliencePipeline(component, Polly.Utils.DisposeBehavior.Allow).Execute(() => { }));
executing.WaitOne();

component.HasPendingExecutions.Should().BeTrue();
assert.Set();
await execution;

component.HasPendingExecutions.Should().BeFalse();
}

[Fact]
public async Task DisposeAsync_Timeout_Ok()
{
using var assert = new ManualResetEvent(false);
using var executing = new ManualResetEvent(false);

await using var inner = new Inner
{
OnExecute = () =>
{
executing.Set();
assert.WaitOne();
}
};

var component = new ExecutionTrackingComponent(inner, _timeProvider);
var execution = Task.Run(() => new ResiliencePipeline(component, Polly.Utils.DisposeBehavior.Allow).Execute(() => { }));
executing.WaitOne();

var disposeTask = component.DisposeAsync().AsTask();
inner.Disposed.Should().BeFalse();
_timeProvider.Advance(ExecutionTrackingComponent.Timeout - TimeSpan.FromSeconds(1));
inner.Disposed.Should().BeFalse();
_timeProvider.Advance(TimeSpan.FromSeconds(1));
_timeProvider.Advance(TimeSpan.FromSeconds(1));
await disposeTask;
inner.Disposed.Should().BeTrue();

assert.Set();
await execution;
}

[Fact]
public async Task DisposeAsync_WhenRunningMultipleTasks_Ok()
{
var tasks = new ConcurrentQueue<ManualResetEvent>();
await using var inner = new Inner
{
OnExecute = () =>
{
var ev = new ManualResetEvent(false);
tasks.Enqueue(ev);
ev.WaitOne();
}
};

var component = new ExecutionTrackingComponent(inner, TimeProvider.System);
var pipeline = new ResiliencePipeline(component, Polly.Utils.DisposeBehavior.Allow);

for (int i = 0; i < 10; i++)
{
_ = Task.Run(() => pipeline.Execute(() => { }));
}

while (tasks.Count != 10)
{
await Task.Delay(1);
}

var disposeTask = component.DisposeAsync().AsTask();

while (tasks.Count > 1)
{
tasks.TryDequeue(out var ev).Should().BeTrue();
ev!.Set();
ev.Dispose();
disposeTask.Wait(1).Should().BeFalse();
inner.Disposed.Should().BeFalse();
}

// last one
tasks.TryDequeue(out var last).Should().BeTrue();
last!.Set();
last.Dispose();
await disposeTask;
inner.Disposed.Should().BeTrue();
}

private class Inner : PipelineComponent
{
public bool Disposed { get; private set; }

public override ValueTask DisposeAsync()
{
Disposed = true;
return default;
}

public Action OnExecute { get; set; } = () => { };

internal override async ValueTask<Outcome<TResult>> ExecuteCore<TResult, TState>(Func<ResilienceContext, TState, ValueTask<Outcome<TResult>>> callback, ResilienceContext context, TState state)
{
OnExecute();

return await callback(context, state);
}
}
}

0 comments on commit a0e7735

Please sign in to comment.