Skip to content

Commit

Permalink
Allow to dispose linked resources on pipeline disposal (#1511)
Browse files Browse the repository at this point in the history
  • Loading branch information
martintmk authored Aug 24, 2023
1 parent 32a3390 commit 3c76e05
Show file tree
Hide file tree
Showing 13 changed files with 253 additions and 3 deletions.
1 change: 1 addition & 0 deletions src/Polly.Core/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ Polly.PredicateBuilder<TResult>.PredicateBuilder() -> void
Polly.PredicateResult
Polly.Registry.ConfigureBuilderContext<TKey>
Polly.Registry.ConfigureBuilderContext<TKey>.EnableReloads(System.Func<System.Func<System.Threading.CancellationToken>!>! tokenProducerFactory) -> void
Polly.Registry.ConfigureBuilderContext<TKey>.OnPipelineDisposed(System.Action! callback) -> void
Polly.Registry.ConfigureBuilderContext<TKey>.PipelineKey.get -> TKey
Polly.Registry.ResiliencePipelineProvider<TKey>
Polly.Registry.ResiliencePipelineProvider<TKey>.ResiliencePipelineProvider() -> void
Expand Down
13 changes: 13 additions & 0 deletions src/Polly.Core/Registry/ConfigureBuilderContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ internal ConfigureBuilderContext(TKey strategyKey, string builderName, string? b

internal Func<Func<CancellationToken>>? ReloadTokenProducer { get; private set; }

internal List<Action> DisposeCallbacks { get; } = new();

/// <summary>
/// Enables dynamic reloading of the strategy retrieved from <see cref="ResiliencePipelineRegistry{TKey}"/>.
/// </summary>
Expand All @@ -48,4 +50,15 @@ public void EnableReloads(Func<Func<CancellationToken>> tokenProducerFactory)

ReloadTokenProducer = tokenProducerFactory;
}

/// <summary>
/// Registers a callback that is called when the pipeline instance being configured is disposed.
/// </summary>
/// <param name="callback">The callback delegate.</param>
public void OnPipelineDisposed(Action callback)
{
Guard.NotNull(callback);

DisposeCallbacks.Add(callback);
}
}
6 changes: 5 additions & 1 deletion src/Polly.Core/Registry/RegistryPipelineComponentBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,17 @@ private Builder CreateBuilder()
_configure(builder, context);

return new(
builder.BuildPipelineComponent,
() => PipelineComponentFactory.WithDisposableCallbacks(
builder.BuildPipelineComponent(),
context.DisposeCallbacks),
context.ReloadTokenProducer,
context.DisposeCallbacks,
builder.TelemetryListener);
}

private record Builder(
Func<PipelineComponent> ComponentFactory,
Func<Func<CancellationToken>>? ReloadTokenProducer,
List<Action> DisposeCallbacks,
TelemetryListener? Listener);
}
43 changes: 43 additions & 0 deletions src/Polly.Core/Utils/Pipeline/ComponentWithDisposeCallbacks.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
namespace Polly.Utils.Pipeline;

internal class ComponentWithDisposeCallbacks : PipelineComponent
{
private readonly List<Action> _callbacks;

public ComponentWithDisposeCallbacks(PipelineComponent component, List<Action> callbacks)
{
Component = component;
_callbacks = callbacks;
}

internal PipelineComponent Component { get; }

public override void Dispose()
{
ExecuteCallbacks();

Component.Dispose();
}

public override ValueTask DisposeAsync()
{
ExecuteCallbacks();

return Component.DisposeAsync();
}

internal override ValueTask<Outcome<TResult>> ExecuteCore<TResult, TState>(
Func<ResilienceContext, TState, ValueTask<Outcome<TResult>>> callback,
ResilienceContext context,
TState state) => Component.ExecuteCore(callback, context, state);

private void ExecuteCallbacks()
{
foreach (var callback in _callbacks)
{
callback();
}

_callbacks.Clear();
}
}
11 changes: 11 additions & 0 deletions src/Polly.Core/Utils/Pipeline/PipelineComponentFactory.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using Polly.Telemetry;

namespace Polly.Utils.Pipeline;

internal static class PipelineComponentFactory
Expand All @@ -13,6 +14,16 @@ internal static class PipelineComponentFactory

public static PipelineComponent FromStrategy<T>(ResilienceStrategy<T> strategy) => new BridgeComponent<T>(strategy);

public static PipelineComponent WithDisposableCallbacks(PipelineComponent component, IEnumerable<Action> callbacks)
{
if (!callbacks.Any())
{
return component;
}

return new ComponentWithDisposeCallbacks(component, callbacks.ToList());
}

public static PipelineComponent CreateComposite(
IReadOnlyList<PipelineComponent> components,
ResilienceStrategyTelemetry telemetry,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using Polly.Registry;
using Polly.Utils;

namespace Polly.DependencyInjection;

Expand Down Expand Up @@ -63,4 +64,15 @@ internal AddResiliencePipelineContext(ConfigureBuilderContext<TKey> registryCont

return name == null ? monitor.CurrentValue : monitor.Get(name);
}

/// <summary>
/// Registers a callback that is called when the pipeline instance being configured is disposed.
/// </summary>
/// <param name="callback">The callback delegate.</param>
public void OnPipelineDisposed(Action callback)
{
Guard.NotNull(callback);

RegistryContext.OnPipelineDisposed(callback);
}
}
1 change: 1 addition & 0 deletions src/Polly.Extensions/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ abstract Polly.Telemetry.MeteringEnricher.Enrich<TResult, TArgs>(in Polly.Teleme
Polly.DependencyInjection.AddResiliencePipelineContext<TKey>
Polly.DependencyInjection.AddResiliencePipelineContext<TKey>.EnableReloads<TOptions>(string? name = null) -> void
Polly.DependencyInjection.AddResiliencePipelineContext<TKey>.GetOptions<TOptions>(string? name = null) -> TOptions
Polly.DependencyInjection.AddResiliencePipelineContext<TKey>.OnPipelineDisposed(System.Action! callback) -> void
Polly.DependencyInjection.AddResiliencePipelineContext<TKey>.PipelineKey.get -> TKey
Polly.DependencyInjection.AddResiliencePipelineContext<TKey>.ServiceProvider.get -> System.IServiceProvider!
Polly.PollyServiceCollectionExtensions
Expand Down
6 changes: 5 additions & 1 deletion src/Polly.Testing/ResiliencePipelineExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ private static object GetStrategyInstance<T>(PipelineComponent component)
return component;
}

private static bool ShouldSkip(object instance) => instance is ReloadableComponent;
private static bool ShouldSkip(object instance) => instance is ReloadableComponent || instance is ComponentWithDisposeCallbacks;

private static void ExpandComponents(this PipelineComponent component, List<PipelineComponent> components)
{
Expand All @@ -78,6 +78,10 @@ private static void ExpandComponents(this PipelineComponent component, List<Pipe
components.Add(reloadable);
ExpandComponents(reloadable.Component, components);
}
else if (component is ComponentWithDisposeCallbacks callbacks)
{
ExpandComponents(callbacks.Component, components);
}
else
{
components.Add(component);
Expand Down
31 changes: 31 additions & 0 deletions test/Polly.Core.Tests/Registry/ResiliencePipelineRegistryTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,37 @@ public void EnableReloads_Ok()
tries.Should().Be(retryCount + 1);
}

[Fact]
public void EnableReloads_EnsureDisposedCallbackCalled()
{
// arrange
var registry = new ResiliencePipelineRegistry<string>();
using var changeSource = new CancellationTokenSource();
var disposedCalls = 0;

registry.TryAddBuilder("dummy", (builder, context) =>
{
// this call enables dynamic reloads for the dummy strategy
context.EnableReloads(() => () => changeSource.Token);
context.OnPipelineDisposed(() => disposedCalls++);
builder.AddTimeout(TimeSpan.FromSeconds(1));
});

// act
var strategy = registry.GetPipeline("dummy");

// assert
disposedCalls.Should().Be(0);
strategy.Execute(() => { });

changeSource.Cancel();
disposedCalls.Should().Be(1);
strategy.Execute(() => { });

registry.Dispose();
disposedCalls.Should().Be(2);
}

[Fact]
public void EnableReloads_Generic_Ok()
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
using NSubstitute;
using Polly.Utils.Pipeline;

namespace Polly.Core.Tests.Utils.Pipeline;

public class ComponentWithDisposeCallbacksTests
{
[InlineData(true)]
[InlineData(false)]
[Theory]
public async Task Dispose_Ok(bool isAsync)
{
// Arrange
var called1 = 0;
var called2 = 0;

var callbacks = new List<Action>
{
() => called1++,
() => called2++
};
var component = Substitute.For<PipelineComponent>();
var sut = new ComponentWithDisposeCallbacks(component, callbacks);

// Act
if (isAsync)
{
await sut.DisposeAsync();
await sut.DisposeAsync();
await component.Received(2).DisposeAsync();
}
else
{
sut.Dispose();
#pragma warning disable S3966 // Objects should not be disposed more than once
sut.Dispose();
#pragma warning restore S3966 // Objects should not be disposed more than once
component.Received(2).Dispose();
}

// Assert
callbacks.Should().BeEmpty();
called1.Should().Be(1);
called2.Should().Be(1);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
using NSubstitute;
using Polly.Utils.Pipeline;

namespace Polly.Core.Tests.Utils.Pipeline;

public class PipelineComponentFactoryTests
{
[Fact]
public void WithDisposableCallbacks_NoCallbacks_ReturnsOriginalComponent()
{
var component = Substitute.For<PipelineComponent>();
var result = PipelineComponentFactory.WithDisposableCallbacks(component, new List<Action>());
result.Should().BeSameAs(component);
}

[Fact]
public void PipelineComponentFactory_Should_Return_WrapperComponent_With_Callbacks()
{
var component = Substitute.For<PipelineComponent>();
var callbacks = new List<Action> { () => { } };

var result = PipelineComponentFactory.WithDisposableCallbacks(component, callbacks);

result.Should().BeOfType<ComponentWithDisposeCallbacks>();
}
}
58 changes: 58 additions & 0 deletions test/Polly.Extensions.Tests/DisposablePipelineTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
using System.Threading.RateLimiting;
using Microsoft.Extensions.DependencyInjection;
using Polly.RateLimiting;
using Polly.Registry;

namespace Polly.Extensions.Tests;

public class DisposablePipelineTests
{
[Fact]
public void DisposePipeline_EnsureLinkedResourcesDisposedToo()
{
var limiters = new List<RateLimiter>();

var provider = new ServiceCollection()
.AddResiliencePipeline("my-pipeline", (builder, context) =>
{
var limiter = new ConcurrencyLimiter(new ConcurrencyLimiterOptions
{
PermitLimit = 1,
QueueLimit = 1
});
limiters.Add(limiter);
builder.AddRateLimiter(new RateLimiterStrategyOptions
{
RateLimiter = args => limiter.AcquireAsync(1, args.Context.CancellationToken)
});
// when the pipeline instance is disposed, limiter is disposed too
context.OnPipelineDisposed(() => limiter.Dispose());
})
.BuildServiceProvider();

limiters.Should().HaveCount(0);
provider.GetRequiredService<ResiliencePipelineProvider<string>>().GetPipeline("my-pipeline");
provider.GetRequiredService<ResiliencePipelineProvider<string>>().GetPipeline("my-pipeline");
limiters.Should().HaveCount(1);
IsDisposed(limiters[0]).Should().BeFalse();

provider.Dispose();
limiters.Should().HaveCount(1);
IsDisposed(limiters[0]).Should().BeTrue();
}

private static bool IsDisposed(RateLimiter limiter)
{
try
{
limiter.AcquireAsync(1).AsTask().GetAwaiter().GetResult();
return false;
}
catch (ObjectDisposedException)
{
return true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public void GetPipelineDescriptor_Reloadable_Ok()
var strategy = registry.GetOrAddPipeline("dummy", (builder, context) =>
{
context.EnableReloads(() => () => CancellationToken.None);
context.OnPipelineDisposed(() => { });
builder
.AddConcurrencyLimiter(10)
.AddStrategy(_ => new CustomStrategy(), new TestOptions());
Expand Down

0 comments on commit 3c76e05

Please sign in to comment.