Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#1687 - Make ResilienceContextPool settable via DI #1693

Merged
merged 9 commits into from
Oct 17, 2023
2 changes: 2 additions & 0 deletions src/Polly.Core/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
#nullable enable
Polly.ResiliencePipelineBuilderBase.ContextPool.get -> Polly.ResilienceContextPool?
Polly.ResiliencePipelineBuilderBase.ContextPool.set -> void
14 changes: 9 additions & 5 deletions src/Polly.Core/Registry/RegistryPipelineComponentBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,25 @@ public RegistryPipelineComponentBuilder(
_configure = configure;
}

internal PipelineComponent CreateComponent()
internal (ResilienceContextPool? contextPool, PipelineComponent component) CreateComponent()
{
var builder = CreateBuilder();
var component = builder.ComponentFactory();

if (builder.ReloadTokens.Count == 0)
{
return component;
return (builder.Instance.ContextPool, component);
}

return PipelineComponentFactory.CreateReloadable(
component = PipelineComponentFactory.CreateReloadable(
new ReloadableComponent.Entry(component, builder.ReloadTokens, builder.Telemetry),
() =>
{
var builder = CreateBuilder();
return new ReloadableComponent.Entry(builder.ComponentFactory(), builder.ReloadTokens, builder.Telemetry);
});

return (builder.Instance.ContextPool, component);
}

private Builder CreateBuilder()
Expand All @@ -69,11 +71,13 @@ private Builder CreateBuilder()
return PipelineComponentFactory.WithExecutionTracking(innerComponent, timeProvider);
},
context.ReloadTokens,
telemetry);
telemetry,
builder);
}

private record Builder(
Func<PipelineComponent> ComponentFactory,
List<CancellationToken> ReloadTokens,
ResilienceStrategyTelemetry Telemetry);
ResilienceStrategyTelemetry Telemetry,
TBuilder Instance);
}
8 changes: 5 additions & 3 deletions src/Polly.Core/Registry/ResiliencePipelineRegistry.TResult.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,16 @@ public ResiliencePipeline<TResult> GetOrAdd(TKey key, Action<ResiliencePipelineB

return _pipelines.GetOrAdd(key, k =>
{
var component = new RegistryPipelineComponentBuilder<ResiliencePipelineBuilder<TResult>, TKey>(
var componentBuilder = new RegistryPipelineComponentBuilder<ResiliencePipelineBuilder<TResult>, TKey>(
_activator,
k,
_builderNameFormatter(k),
_instanceNameFormatter?.Invoke(k),
configure).CreateComponent();
configure);

return new ResiliencePipeline<TResult>(component, DisposeBehavior.Reject);
(var contextPool, var component) = componentBuilder.CreateComponent();

return new ResiliencePipeline<TResult>(component, DisposeBehavior.Reject, contextPool);
});
}

Expand Down
9 changes: 6 additions & 3 deletions src/Polly.Core/Registry/ResiliencePipelineRegistry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -124,14 +124,17 @@ public ResiliencePipeline GetOrAddPipeline(TKey key, Action<ResiliencePipelineBu

return _pipelines.GetOrAdd(key, k =>
{
var component = new RegistryPipelineComponentBuilder<ResiliencePipelineBuilder, TKey>(
var componentBuilder = new RegistryPipelineComponentBuilder<ResiliencePipelineBuilder, TKey>(
_activator,
k,
_builderNameFormatter(k),
_instanceNameFormatter?.Invoke(k),
configure).CreateComponent();
configure)
;

return new ResiliencePipeline(component, DisposeBehavior.Reject);
(var contextPool, var component) = componentBuilder.CreateComponent();

return new ResiliencePipeline(component, DisposeBehavior.Reject, contextPool);
});
}

Expand Down
7 changes: 4 additions & 3 deletions src/Polly.Core/ResiliencePipeline.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,16 @@ public sealed partial class ResiliencePipeline
/// <summary>
/// Resilience pipeline that executes the user-provided callback without any additional logic.
/// </summary>
public static readonly ResiliencePipeline Empty = new(PipelineComponent.Empty, DisposeBehavior.Ignore);
public static readonly ResiliencePipeline Empty = new(PipelineComponent.Empty, DisposeBehavior.Ignore, null);

internal ResiliencePipeline(PipelineComponent component, DisposeBehavior disposeBehavior)
internal ResiliencePipeline(PipelineComponent component, DisposeBehavior disposeBehavior, ResilienceContextPool? pool)
{
Component = component;
DisposeHelper = new ComponentDisposeHelper(component, disposeBehavior);
Pool = pool ?? ResilienceContextPool.Shared;
}

internal static ResilienceContextPool Pool => ResilienceContextPool.Shared;
internal ResilienceContextPool Pool { get; }

internal PipelineComponent Component { get; }

Expand Down
2 changes: 1 addition & 1 deletion src/Polly.Core/ResiliencePipelineBuilder.TResult.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,5 @@ internal ResiliencePipelineBuilder(ResiliencePipelineBuilderBase other)
/// </summary>
/// <returns>An instance of <see cref="ResiliencePipeline{TResult}"/>.</returns>
/// <exception cref="ValidationException">Thrown when this builder has invalid configuration.</exception>
public ResiliencePipeline<TResult> Build() => new(BuildPipelineComponent(), DisposeBehavior.Allow);
public ResiliencePipeline<TResult> Build() => new(BuildPipelineComponent(), DisposeBehavior.Allow, ContextPool);
}
2 changes: 1 addition & 1 deletion src/Polly.Core/ResiliencePipelineBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ public sealed class ResiliencePipelineBuilder : ResiliencePipelineBuilderBase
/// </summary>
/// <returns>An instance of <see cref="ResiliencePipeline"/>.</returns>
/// <exception cref="ValidationException">Thrown when this builder has invalid configuration.</exception>
public ResiliencePipeline Build() => new(BuildPipelineComponent(), DisposeBehavior.Allow);
public ResiliencePipeline Build() => new(BuildPipelineComponent(), DisposeBehavior.Allow, ContextPool);
}
16 changes: 14 additions & 2 deletions src/Polly.Core/ResiliencePipelineBuilderBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,19 @@ private protected ResiliencePipelineBuilderBase(ResiliencePipelineBuilderBase ot
public string? InstanceName { get; set; }

/// <summary>
/// Gets or sets a <see cref="TimeProvider"/> that is used by strategies that work with time.
/// Gets or sets the <see cref="Polly.ResilienceContextPool"/> associated with the builder.
/// </summary>
/// <remarks>
/// A custom pool can be used to configure custom behavior for creation.
/// This can include setting a default <c>continueOnCapturedContext</c> parameter or custom operation key resolution.
/// </remarks>
/// <value>
/// If the default value of <see langword="null"/> is used, <see cref="ResilienceContextPool.Shared"/> will be used.
/// </value>
public ResilienceContextPool? ContextPool { get; set; }

/// <summary>
/// Gets or sets a <see cref="System.TimeProvider"/> that is used by strategies that work with time.
/// </summary>
/// <remarks>
/// This property is internal until we switch to official System.TimeProvider.
Expand All @@ -67,7 +79,7 @@ private protected ResiliencePipelineBuilderBase(ResiliencePipelineBuilderBase ot
internal TimeProvider TimeProvider { get; set; } = TimeProvider.System;

/// <summary>
/// Gets or sets the <see cref="TelemetryListener"/> that is used by Polly to report resilience events.
/// Gets or sets the <see cref="Polly.Telemetry.TelemetryListener"/> that is used by Polly to report resilience events.
/// </summary>
/// <remarks>
/// This property is used by the telemetry infrastructure and should not be used directly by user code.
Expand Down
6 changes: 3 additions & 3 deletions src/Polly.Core/ResiliencePipelineT.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ public sealed partial class ResiliencePipeline<T>
/// <summary>
/// Resilience pipeline that executes the user-provided callback without any additional logic.
/// </summary>
public static readonly ResiliencePipeline<T> Empty = new(PipelineComponent.Empty, DisposeBehavior.Ignore);
public static readonly ResiliencePipeline<T> Empty = new(PipelineComponent.Empty, DisposeBehavior.Ignore, null);

internal ResiliencePipeline(PipelineComponent component, DisposeBehavior disposeBehavior)
internal ResiliencePipeline(PipelineComponent component, DisposeBehavior disposeBehavior, ResilienceContextPool? pool)
{
// instead of re-implementing individual Execute* methods we
// can just re-use the non-generic ResiliencePipeline and
// call it from Execute* methods in this class
Pipeline = new ResiliencePipeline(component, disposeBehavior);
Pipeline = new ResiliencePipeline(component, disposeBehavior, pool);
DisposeHelper = Pipeline.DisposeHelper;
}

Expand Down
2 changes: 1 addition & 1 deletion test/Polly.Core.Tests/ResiliencePipelineTTests.Async.cs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public async Task ExecuteAsync_GenericStrategy_Ok(Func<ResiliencePipeline<string
c.ResultType.Should().Be(typeof(string));
c.CancellationToken.CanBeCanceled.Should().BeTrue();
},
}), DisposeBehavior.Allow);
}), DisposeBehavior.Allow, null);

await execute(pipeline);
}
Expand Down
2 changes: 1 addition & 1 deletion test/Polly.Core.Tests/ResiliencePipelineTTests.Sync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public void Execute_GenericStrategy_Ok(Action<ResiliencePipeline<string>> execut
c.IsSynchronous.Should().BeTrue();
c.ResultType.Should().Be(typeof(string));
},
}), DisposeBehavior.Allow);
}), DisposeBehavior.Allow, null);

execute(pipeline);
}
Expand Down
4 changes: 2 additions & 2 deletions test/Polly.Core.Tests/ResiliencePipelineTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public async Task DisposeAsync_NullGenericPipeline_OK()
public async Task DisposeAsync_Reject_Throws()
{
var component = Substitute.For<PipelineComponent>();
var pipeline = new ResiliencePipeline(component, DisposeBehavior.Reject);
var pipeline = new ResiliencePipeline(component, DisposeBehavior.Reject, null);

(await pipeline.Invoking(p => p.DisposeHelper.DisposeAsync().AsTask())
.Should()
Expand All @@ -44,7 +44,7 @@ public async Task DisposeAsync_Reject_Throws()
public async Task DisposeAsync_Allowed_Disposed()
{
var component = Substitute.For<PipelineComponent>();
var pipeline = new ResiliencePipeline(component, DisposeBehavior.Allow);
var pipeline = new ResiliencePipeline(component, DisposeBehavior.Allow, null);
await pipeline.DisposeHelper.DisposeAsync();
await pipeline.DisposeHelper.DisposeAsync();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public void Execute_NonGeneric_Ok()
var pipeline = new ResiliencePipeline(PipelineComponentFactory.FromStrategy(new Strategy<object>(outcome =>
{
values.Add(outcome.Result);
})), DisposeBehavior.Allow);
})), DisposeBehavior.Allow, null);

pipeline.Execute(args => "dummy");
pipeline.Execute(args => 0);
Expand All @@ -41,7 +41,7 @@ public void Execute_Generic_Ok()
var pipeline = new ResiliencePipeline(PipelineComponentFactory.FromStrategy(new Strategy<string>(outcome =>
{
values.Add(outcome.Result);
})), DisposeBehavior.Allow);
})), DisposeBehavior.Allow, null);

pipeline.Execute(args => "dummy");

Expand All @@ -58,7 +58,7 @@ public void Pipeline_TypeCheck_Ok()
{
outcome.Result.Should().Be(-1);
called = true;
})), DisposeBehavior.Allow);
})), DisposeBehavior.Allow, null);

pipeline.Execute(() => -1);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public async Task Create_Cancelled_EnsureNoExecution()
PipelineComponentFactory.FromStrategy(new TestResilienceStrategy()),
};

var pipeline = new ResiliencePipeline(CreateSut(strategies, new FakeTimeProvider()), DisposeBehavior.Allow);
var pipeline = new ResiliencePipeline(CreateSut(strategies, new FakeTimeProvider()), DisposeBehavior.Allow, null);
var context = ResilienceContextPool.Shared.Get();
context.CancellationToken = cancellation.Token;

Expand All @@ -104,7 +104,7 @@ public async Task Create_CancelledLater_EnsureNoExecution()
PipelineComponentFactory.FromStrategy(new TestResilienceStrategy { Before = (_, _) => { executed = true; cancellation.Cancel(); } }),
PipelineComponentFactory.FromStrategy(new TestResilienceStrategy()),
};
var pipeline = new ResiliencePipeline(CreateSut(strategies, new FakeTimeProvider()), DisposeBehavior.Allow);
var pipeline = new ResiliencePipeline(CreateSut(strategies, new FakeTimeProvider()), DisposeBehavior.Allow, null);
var context = ResilienceContextPool.Shared.Get();
context.CancellationToken = cancellation.Token;

Expand All @@ -118,7 +118,7 @@ public void ExecutePipeline_EnsureTelemetryArgumentsReported()
{
var timeProvider = new FakeTimeProvider();

var pipeline = new ResiliencePipeline(CreateSut(new[] { Substitute.For<PipelineComponent>() }, timeProvider), DisposeBehavior.Allow);
var pipeline = new ResiliencePipeline(CreateSut(new[] { Substitute.For<PipelineComponent>() }, timeProvider), DisposeBehavior.Allow, null);
pipeline.Execute(() => { timeProvider.Advance(TimeSpan.FromHours(1)); });

_listener.Events.Should().HaveCount(2);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
using System;
using System.Threading.Tasks;
using Microsoft.Extensions.Time.Testing;
using Microsoft.Extensions.Time.Testing;
using Polly.Utils.Pipeline;

namespace Polly.Core.Tests.Utils.Pipeline;
Expand All @@ -25,7 +23,7 @@ public async Task DisposeAsync_PendingOperations_Delayed()
};

var component = new ExecutionTrackingComponent(inner, _timeProvider);
var execution = Task.Run(() => new ResiliencePipeline(component, Polly.Utils.DisposeBehavior.Allow).Execute(() => { }));
var execution = Task.Run(() => new ResiliencePipeline(component, Polly.Utils.DisposeBehavior.Allow, null).Execute(() => { }));
executing.WaitOne();

var disposeTask = component.DisposeAsync().AsTask();
Expand Down Expand Up @@ -58,7 +56,7 @@ public async Task HasPendingExecutions_Ok()
};

await using var component = new ExecutionTrackingComponent(inner, _timeProvider);
var execution = Task.Run(() => new ResiliencePipeline(component, Polly.Utils.DisposeBehavior.Allow).Execute(() => { }));
var execution = Task.Run(() => new ResiliencePipeline(component, Polly.Utils.DisposeBehavior.Allow, null).Execute(() => { }));
executing.WaitOne();

component.HasPendingExecutions.Should().BeTrue();
Expand All @@ -84,7 +82,7 @@ public async Task DisposeAsync_Timeout_Ok()
};

var component = new ExecutionTrackingComponent(inner, _timeProvider);
var execution = Task.Run(() => new ResiliencePipeline(component, Polly.Utils.DisposeBehavior.Allow).Execute(() => { }));
var execution = Task.Run(() => new ResiliencePipeline(component, Polly.Utils.DisposeBehavior.Allow, null).Execute(() => { }));
executing.WaitOne();

var disposeTask = component.DisposeAsync().AsTask();
Expand Down Expand Up @@ -115,7 +113,7 @@ public async Task DisposeAsync_WhenRunningMultipleTasks_Ok()
};

var component = new ExecutionTrackingComponent(inner, TimeProvider.System);
var pipeline = new ResiliencePipeline(component, Polly.Utils.DisposeBehavior.Allow);
var pipeline = new ResiliencePipeline(component, Polly.Utils.DisposeBehavior.Allow, null);

for (int i = 0; i < 10; i++)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
using Microsoft.Extensions.DependencyInjection;
using Polly.Registry;

namespace Polly.Extensions.Tests.Issues;

public partial class IssuesTests
{
private class CustomResilienceContextPool : ResilienceContextPool
{
public override ResilienceContext Get(ResilienceContextCreationArguments arguments)
{
if (arguments.ContinueOnCapturedContext is null)
{
arguments = new ResilienceContextCreationArguments(arguments.OperationKey, continueOnCapturedContext: true, arguments.CancellationToken);
}

return Shared.Get(arguments);
}

public override void Return(ResilienceContext context) => Shared.Return(context);
}

private class ContextCreationTestStrategy : ResilienceStrategy
{
public int HitCount { get; private set; }

protected override async ValueTask<Outcome<TResult>> ExecuteCore<TResult, TState>(Func<ResilienceContext, TState, ValueTask<Outcome<TResult>>> callback,
ResilienceContext context,
TState state)
{
context.ContinueOnCapturedContext.Should().BeTrue();

HitCount++;

return await callback(context, state);
}
}

[Fact]
public async Task DynamicContextPool_1687()
{
var pool = new CustomResilienceContextPool();
var strategy = new ContextCreationTestStrategy();
var services = new ServiceCollection();
string key = "my-key";

services.AddResiliencePipelineRegistry<string>(options => options.BuilderFactory = () => new ResiliencePipelineBuilder
{
ContextPool = pool,
});

services.AddResiliencePipeline(key, builder =>
{
builder.ContextPool.Should().Be(pool);
builder.AddStrategy(strategy);
});

// create the pipeline provider
var provider = services.BuildServiceProvider().GetRequiredService<ResiliencePipelineProvider<string>>();

var pipeline = provider.GetPipeline(key);

await pipeline.ExecuteAsync(async ct => await default(ValueTask));

strategy.HitCount.Should().BeGreaterThan(0);
}
}