Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify and enhance the pipeline reloads #1512

Merged
merged 3 commits into from
Aug 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Polly.Core/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ Polly.PredicateBuilder<TResult>.HandleResult(TResult result, System.Collections.
Polly.PredicateBuilder<TResult>.PredicateBuilder() -> void
Polly.PredicateResult
Polly.Registry.ConfigureBuilderContext<TKey>
Polly.Registry.ConfigureBuilderContext<TKey>.EnableReloads(System.Func<System.Func<System.Threading.CancellationToken>!>! tokenProducerFactory) -> void
Polly.Registry.ConfigureBuilderContext<TKey>.AddReloadToken(System.Threading.CancellationToken cancellationToken) -> void
Polly.Registry.ConfigureBuilderContext<TKey>.OnPipelineDisposed(System.Action! callback) -> void
Polly.Registry.ConfigureBuilderContext<TKey>.PipelineKey.get -> TKey
Polly.Registry.ResiliencePipelineProvider<TKey>
Expand Down
18 changes: 10 additions & 8 deletions src/Polly.Core/Registry/ConfigureBuilderContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,24 +31,26 @@ internal ConfigureBuilderContext(TKey strategyKey, string builderName, string? b
/// </summary>
internal string? BuilderInstanceName { get; }

internal Func<Func<CancellationToken>>? ReloadTokenProducer { get; private set; }
internal List<CancellationToken> ReloadTokens { get; } = new();

internal List<Action> DisposeCallbacks { get; } = new();

/// <summary>
/// Enables dynamic reloading of the strategy retrieved from <see cref="ResiliencePipelineRegistry{TKey}"/>.
/// Reloads the pipeline when <paramref name="cancellationToken"/> is canceled.
/// </summary>
/// <param name="tokenProducerFactory">The producer of <see cref="CancellationToken"/> that is triggered when change occurs.</param>
/// <param name="cancellationToken">The cancellation token that triggers the pipeline reload when cancelled.</param>
/// <remarks>
/// The <paramref name="tokenProducerFactory"/> should always return function that returns a new <see cref="CancellationToken"/> instance when invoked otherwise
/// the reload infrastructure will stop listening for changes. The <paramref name="tokenProducerFactory"/> is called only once for each strategy.
/// You can add multiple reload tokens to the context. Non-cancelable or already canceled tokens are ignored.
/// </remarks>
[EditorBrowsable(EditorBrowsableState.Never)]
public void EnableReloads(Func<Func<CancellationToken>> tokenProducerFactory)
public void AddReloadToken(CancellationToken cancellationToken)
{
Guard.NotNull(tokenProducerFactory);
if (!cancellationToken.CanBeCanceled || cancellationToken.IsCancellationRequested)
{
return;
}

ReloadTokenProducer = tokenProducerFactory;
ReloadTokens.Add(cancellationToken);
}

/// <summary>
Expand Down
19 changes: 11 additions & 8 deletions src/Polly.Core/Registry/RegistryPipelineComponentBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,20 @@ internal PipelineComponent CreateComponent()
new ResilienceTelemetrySource(_builderName, _instanceName, null),
builder.Listener);

var initialPipeline = builder.ComponentFactory();
var component = builder.ComponentFactory();

if (builder.ReloadTokenProducer is null)
if (builder.ReloadTokens.Count == 0)
{
return initialPipeline;
return component;
}

return PipelineComponentFactory.CreateReloadable(
initialPipeline,
builder.ReloadTokenProducer(),
() => CreateBuilder().ComponentFactory(),
new ReloadableComponent.Entry(component, builder.ReloadTokens),
() =>
{
var builder = CreateBuilder();
return new ReloadableComponent.Entry(builder.ComponentFactory(), builder.ReloadTokens);
},
telemetry);
}

Expand All @@ -63,14 +66,14 @@ private Builder CreateBuilder()
() => PipelineComponentFactory.WithDisposableCallbacks(
builder.BuildPipelineComponent(),
context.DisposeCallbacks),
context.ReloadTokenProducer,
context.ReloadTokens,
context.DisposeCallbacks,
builder.TelemetryListener);
}

private record Builder(
Func<PipelineComponent> ComponentFactory,
Func<Func<CancellationToken>>? ReloadTokenProducer,
List<CancellationToken> ReloadTokens,
List<Action> DisposeCallbacks,
TelemetryListener? Listener);
}
7 changes: 3 additions & 4 deletions src/Polly.Core/Utils/Pipeline/PipelineComponentFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ public static PipelineComponent CreateComposite(
TimeProvider timeProvider) => CompositeComponent.Create(components, telemetry, timeProvider);

public static PipelineComponent CreateReloadable(
PipelineComponent initialComponent,
Func<CancellationToken> onReload,
Func<PipelineComponent> factory,
ResilienceStrategyTelemetry telemetry) => new ReloadableComponent(initialComponent, onReload, factory, telemetry);
ReloadableComponent.Entry initial,
Func<ReloadableComponent.Entry> factory,
ResilienceStrategyTelemetry telemetry) => new ReloadableComponent(initial, factory, telemetry);
}
42 changes: 23 additions & 19 deletions src/Polly.Core/Utils/Pipeline/ReloadableComponent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,21 @@ internal sealed class ReloadableComponent : PipelineComponent

public const string OnReloadEvent = "OnReload";

private readonly Func<CancellationToken> _onReload;
private readonly Func<PipelineComponent> _factory;
private readonly Func<Entry> _factory;
private readonly ResilienceStrategyTelemetry _telemetry;
private CancellationTokenSource _tokenSource = null!;
private CancellationTokenRegistration _registration;
private List<CancellationToken> _reloadTokens;

public ReloadableComponent(
PipelineComponent initialComponent,
Func<CancellationToken> onReload,
Func<PipelineComponent> factory,
ResilienceStrategyTelemetry telemetry)
public ReloadableComponent(Entry entry, Func<Entry> factory, ResilienceStrategyTelemetry telemetry)
{
Component = initialComponent;
Component = entry.Component;

_onReload = onReload;
_reloadTokens = entry.ReloadTokens;
_factory = factory;
_telemetry = telemetry;

RegisterOnReload(default);
TryRegisterOnReload();
}

public PipelineComponent Component { get; private set; }
Expand All @@ -52,43 +49,50 @@ public override ValueTask DisposeAsync()
return Component.DisposeAsync();
}

private void RegisterOnReload(CancellationToken previousToken)
private void TryRegisterOnReload()
{
var token = _onReload();
if (token == previousToken)
if (_reloadTokens.Count == 0)
{
return;
}

_registration = token.Register(() =>
_tokenSource = CancellationTokenSource.CreateLinkedTokenSource(_reloadTokens.ToArray());
_registration = _tokenSource.Token.Register(() =>
{
var context = ResilienceContextPool.Shared.Get().Initialize<VoidResult>(isSynchronous: true);
PipelineComponent previousComponent = Component;

try
{
_telemetry.Report(new(ResilienceEventSeverity.Information, OnReloadEvent), context, new OnReloadArguments());
Component = _factory();
(Component, _reloadTokens) = _factory();

previousComponent.Dispose();
}
catch (Exception e)
{
_reloadTokens = new List<CancellationToken>();
var args = new OutcomeArguments<VoidResult, ReloadFailedArguments>(context, Outcome.FromException(e), new ReloadFailedArguments(e));
_telemetry.Report(new(ResilienceEventSeverity.Error, ReloadFailedEvent), args);
ResilienceContextPool.Shared.Return(context);
}

DisposeRegistration();
RegisterOnReload(token);
TryRegisterOnReload();
});
}

#pragma warning disable S2952 // Classes should "Dispose" of members from the classes' own "Dispose" methods
private void DisposeRegistration() => _registration.Dispose();
private void DisposeRegistration()
{
_registration.Dispose();
_tokenSource.Dispose();
}
#pragma warning restore S2952 // Classes should "Dispose" of members from the classes' own "Dispose" methods

internal readonly record struct ReloadFailedArguments(Exception Exception);
internal record ReloadFailedArguments(Exception Exception);

internal record OnReloadArguments();

internal readonly record struct OnReloadArguments();
internal record Entry(PipelineComponent Component, List<CancellationToken> ReloadTokens);
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Diagnostics.CodeAnalysis;
using System.Threading;
using Microsoft.Extensions.Options;
using Polly.Utils;

Expand Down Expand Up @@ -33,6 +34,24 @@ public static class ConfigureBuilderContextExtensions
Guard.NotNull(context);
Guard.NotNull(optionsMonitor);

context.EnableReloads(() => new OptionsReloadHelper<TOptions>(optionsMonitor, name ?? Options.DefaultName).GetCancellationToken);
name ??= string.Empty;

#pragma warning disable CA2000 // Dispose objects before losing scope
var source = new CancellationTokenSource();
#pragma warning restore CA2000 // Dispose objects before losing scope
var registration = optionsMonitor.OnChange((_, changedNamed) =>
{
if (name == changedNamed)
{
source.Cancel();
}
});

context.AddReloadToken(source.Token);
context.OnPipelineDisposed(() =>
{
registration?.Dispose();
source.Dispose();
});
}
}
30 changes: 0 additions & 30 deletions src/Polly.Extensions/Utils/OptionsReloadHelper.cs

This file was deleted.

21 changes: 21 additions & 0 deletions test/Polly.Core.Tests/Registry/ConfigureBuilderContextTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
using Polly.Registry;

namespace Polly.Core.Tests.Registry;

public class ConfigureBuilderContextTests
{
[Fact]
public void AddReloadToken_Ok()
{
var context = new ConfigureBuilderContext<int>(0, "dummy", "dummy");
using var source = new CancellationTokenSource();

context.AddReloadToken(CancellationToken.None);
context.AddReloadToken(source.Token);

source.Cancel();
context.AddReloadToken(source.Token);

context.ReloadTokens.Should().HaveCount(1);
}
}
27 changes: 20 additions & 7 deletions test/Polly.Core.Tests/Registry/ResiliencePipelineRegistryTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -249,18 +249,22 @@ public void TryGet_GenericNoBuilder_Null()
strategy.Should().BeNull();
}

[Fact]
public void EnableReloads_Ok()
[InlineData(true)]
[InlineData(false)]
[Theory]
public void EnableReloads_Ok(bool firstOne)
{
// arrange
var retryCount = 2;
using var registry = new ResiliencePipelineRegistry<string>();
using var changeSource = new CancellationTokenSource();
using var token1 = new CancellationTokenSource();
using var token2 = new CancellationTokenSource();

registry.TryAddBuilder("dummy", (builder, context) =>
{
// this call enables dynamic reloads for the dummy strategy
context.EnableReloads(() => () => changeSource.Token);
context.AddReloadToken(token1.Token);
context.AddReloadToken(token2.Token);

builder.AddRetry(new RetryStrategyOptions
{
Expand All @@ -280,7 +284,16 @@ public void EnableReloads_Ok()

tries = 0;
retryCount = 5;
changeSource.Cancel();

if (firstOne)
{
token1.Cancel();
}
else
{
token2.Cancel();
}

strategy.Execute(() => tries++);
tries.Should().Be(retryCount + 1);
}
Expand All @@ -296,7 +309,7 @@ public void EnableReloads_EnsureDisposedCallbackCalled()
registry.TryAddBuilder("dummy", (builder, context) =>
{
// this call enables dynamic reloads for the dummy strategy
context.EnableReloads(() => () => changeSource.Token);
context.AddReloadToken(changeSource.Token);
context.OnPipelineDisposed(() => disposedCalls++);
builder.AddTimeout(TimeSpan.FromSeconds(1));
});
Expand Down Expand Up @@ -327,7 +340,7 @@ public void EnableReloads_Generic_Ok()
registry.TryAddBuilder<string>("dummy", (builder, context) =>
{
// this call enables dynamic reloads for the dummy strategy
context.EnableReloads(() => () => changeSource.Token);
context.AddReloadToken(changeSource.Token);

builder.AddRetry(new RetryStrategyOptions<string>
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,24 +55,19 @@ public void ChangeTriggered_StrategyReloaded()
var component = Substitute.For<PipelineComponent>();
using var sut = CreateSut(component);

for (var i = 0; i < 10; i++)
{
var src = _cancellationTokenSource;
_cancellationTokenSource = new CancellationTokenSource();
src.Cancel();

sut.Component.Should().NotBe(component);
}
sut.Component.Should().Be(component);
_cancellationTokenSource.Cancel();
sut.Component.Should().NotBe(component);

_events.Where(e => e.Event.EventName == "ReloadFailed").Should().HaveCount(0);
_events.Where(e => e.Event.EventName == "OnReload").Should().HaveCount(10);
_events.Where(e => e.Event.EventName == "OnReload").Should().HaveCount(1);
}

[Fact]
public void ChangeTriggered_EnsureOldStrategyDisposed()
{
var component = Substitute.For<PipelineComponent>();
using var sut = CreateSut(component, () => Substitute.For<PipelineComponent>());
using var sut = CreateSut(component, () => new(Substitute.For<PipelineComponent>(), new List<CancellationToken>()));

for (var i = 0; i < 10; i++)
{
Expand Down Expand Up @@ -109,12 +104,12 @@ public void ChangeTriggered_FactoryError_LastStrategyUsedAndErrorReported()
args.Exception.Should().BeOfType<InvalidOperationException>();
}

private ReloadableComponent CreateSut(PipelineComponent? initial = null, Func<PipelineComponent>? factory = null)
private ReloadableComponent CreateSut(PipelineComponent? initial = null, Func<ReloadableComponent.Entry>? factory = null)
{
factory ??= () => PipelineComponent.Empty;
factory ??= () => new ReloadableComponent.Entry(PipelineComponent.Empty, new List<CancellationToken>());

return (ReloadableComponent)PipelineComponentFactory.CreateReloadable(initial ?? PipelineComponent.Empty,
() => _cancellationTokenSource.Token,
return (ReloadableComponent)PipelineComponentFactory.CreateReloadable(
new ReloadableComponent.Entry(initial ?? PipelineComponent.Empty, new List<CancellationToken> { _cancellationTokenSource.Token }),
factory,
_telemetry);
}
Expand Down
Loading