Skip to content

Commit

Permalink
Fix CircuitBreaker handling of Half-Open (#1991)
Browse files Browse the repository at this point in the history
* Rename methods to clarify intention

* Ensure that an unhandled exception in half open state closes the circuit instead of blocking any further transition from half open state

* Fix mutation survived in StrategyHelper; add explicit test to replace no longer existing implicit test in CircuitBreakerResilienceStrategyTests
  • Loading branch information
DominicUllmann authored Feb 24, 2024
1 parent 5f726d3 commit de6eb20
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ protected internal override async ValueTask<Outcome<T>> ExecuteCore<TState>(Func
var args = new CircuitBreakerPredicateArguments<T>(context, outcome);
if (await _handler(args).ConfigureAwait(context.ContinueOnCapturedContext))
{
await _controller.OnActionFailureAsync(outcome, context).ConfigureAwait(context.ContinueOnCapturedContext);
await _controller.OnHandledOutcomeAsync(outcome, context).ConfigureAwait(context.ContinueOnCapturedContext);
}
else if (outcome.Exception is null)
else
{
await _controller.OnActionSuccessAsync(outcome, context).ConfigureAwait(context.ContinueOnCapturedContext);
await _controller.OnUnhandledOutcomeAsync(outcome, context).ConfigureAwait(context.ContinueOnCapturedContext);
}

return outcome;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ public ValueTask CloseCircuitAsync(ResilienceContext context)
return null;
}

public ValueTask OnActionSuccessAsync(Outcome<T> outcome, ResilienceContext context)
public ValueTask OnUnhandledOutcomeAsync(Outcome<T> outcome, ResilienceContext context)
{
EnsureNotDisposed();

Expand All @@ -189,7 +189,7 @@ public ValueTask OnActionSuccessAsync(Outcome<T> outcome, ResilienceContext cont
return ExecuteScheduledTaskAsync(task, context);
}

public ValueTask OnActionFailureAsync(Outcome<T> outcome, ResilienceContext context)
public ValueTask OnHandledOutcomeAsync(Outcome<T> outcome, ResilienceContext context)
{
EnsureNotDisposed();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,16 +105,14 @@ public void Execute_HandledException_OnFailureCalled()
}

[Fact]
public void Execute_UnhandledException_NoCalls()
public void Execute_UnhandledException_OnActionSuccess()
{
_options.ShouldHandle = args => new ValueTask<bool>(args.Outcome.Exception is InvalidOperationException);
var strategy = Create();

strategy.Invoking(s => s.Execute<int>(_ => throw new ArgumentException())).Should().Throw<ArgumentException>();

_behavior.DidNotReceiveWithAnyArgs().OnActionFailure(default, out Arg.Any<bool>());
_behavior.DidNotReceiveWithAnyArgs().OnActionSuccess(default);
_behavior.DidNotReceiveWithAnyArgs().OnCircuitClosed();
_behavior.Received(1).OnActionSuccess(CircuitState.Closed);
}

public void Dispose() => _controller.Dispose();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,8 @@ public async Task Disposed_EnsureThrows()
await Assert.ThrowsAsync<ObjectDisposedException>(async () => await controller.CloseCircuitAsync(ResilienceContextPool.Shared.Get()));
await Assert.ThrowsAsync<ObjectDisposedException>(async () => await controller.IsolateCircuitAsync(ResilienceContextPool.Shared.Get()));
await Assert.ThrowsAsync<ObjectDisposedException>(async () => await controller.OnActionPreExecuteAsync(ResilienceContextPool.Shared.Get()));
await Assert.ThrowsAsync<ObjectDisposedException>(async () => await controller.OnActionSuccessAsync(Outcome.FromResult(10), ResilienceContextPool.Shared.Get()));
await Assert.ThrowsAsync<ObjectDisposedException>(async () => await controller.OnActionFailureAsync(Outcome.FromResult(10), ResilienceContextPool.Shared.Get()));
await Assert.ThrowsAsync<ObjectDisposedException>(async () => await controller.OnUnhandledOutcomeAsync(Outcome.FromResult(10), ResilienceContextPool.Shared.Get()));
await Assert.ThrowsAsync<ObjectDisposedException>(async () => await controller.OnHandledOutcomeAsync(Outcome.FromResult(10), ResilienceContextPool.Shared.Get()));
}

[Fact]
Expand Down Expand Up @@ -182,15 +182,15 @@ public async Task HalfOpen_EnsureCorrectStateTransitionAfterExecution(bool succe

if (success)
{
await controller.OnActionSuccessAsync(Outcome.FromResult(0), ResilienceContextPool.Shared.Get());
await controller.OnUnhandledOutcomeAsync(Outcome.FromResult(0), ResilienceContextPool.Shared.Get());
controller.CircuitState.Should().Be(CircuitState.Closed);

_circuitBehavior.Received().OnActionSuccess(CircuitState.HalfOpen);
_circuitBehavior.Received().OnCircuitClosed();
}
else
{
await controller.OnActionFailureAsync(Outcome.FromResult(0), ResilienceContextPool.Shared.Get());
await controller.OnHandledOutcomeAsync(Outcome.FromResult(0), ResilienceContextPool.Shared.Get());
controller.CircuitState.Should().Be(CircuitState.Open);

_circuitBehavior.DidNotReceiveWithAnyArgs().OnActionSuccess(default);
Expand Down Expand Up @@ -226,9 +226,9 @@ public async Task OnActionFailure_EnsureLock()
using var controller = CreateController();

// act
var executeAction = Task.Run(() => controller.OnActionFailureAsync(Outcome.FromResult(0), ResilienceContextPool.Shared.Get()));
var executeAction = Task.Run(() => controller.OnHandledOutcomeAsync(Outcome.FromResult(0), ResilienceContextPool.Shared.Get()));
executing.WaitOne();
var executeAction2 = Task.Run(() => controller.OnActionFailureAsync(Outcome.FromResult(0), ResilienceContextPool.Shared.Get()));
var executeAction2 = Task.Run(() => controller.OnHandledOutcomeAsync(Outcome.FromResult(0), ResilienceContextPool.Shared.Get()));

// assert
#pragma warning disable xUnit1031 // Do not use blocking task operations in test method
Expand Down Expand Up @@ -285,7 +285,7 @@ public async Task OnActionSuccess_EnsureCorrectBehavior(CircuitState state, Circ
await TransitionToState(controller, state);

// act
await controller.OnActionSuccessAsync(Outcome.FromResult(10), ResilienceContextPool.Shared.Get());
await controller.OnUnhandledOutcomeAsync(Outcome.FromResult(10), ResilienceContextPool.Shared.Get());

// assert
controller.CircuitState.Should().Be(expectedState);
Expand Down Expand Up @@ -330,7 +330,7 @@ public async Task OnActionFailureAsync_EnsureCorrectBehavior(CircuitState state,
.Do(x => x[1] = shouldBreak);

// act
await controller.OnActionFailureAsync(Outcome.FromResult(99), ResilienceContextPool.Shared.Get());
await controller.OnHandledOutcomeAsync(Outcome.FromResult(99), ResilienceContextPool.Shared.Get());

// assert
controller.LastHandledOutcome!.Value.Result.Should().Be(99);
Expand Down Expand Up @@ -367,7 +367,7 @@ public async Task OnActionFailureAsync_EnsureBreakDurationGeneration()
.Do(x => x[1] = true);

// act
await controller.OnActionFailureAsync(Outcome.FromResult(99), ResilienceContextPool.Shared.Get());
await controller.OnHandledOutcomeAsync(Outcome.FromResult(99), ResilienceContextPool.Shared.Get());

// assert
var blockedTill = GetBlockedTill(controller);
Expand Down Expand Up @@ -430,7 +430,7 @@ public async Task OnActionFailureAsync_EnsureBreakDurationNotOverflow(bool overf
.Do(x => x[1] = shouldBreak);

// act
await controller.OnActionFailureAsync(Outcome.FromResult(99), ResilienceContextPool.Shared.Get());
await controller.OnHandledOutcomeAsync(Outcome.FromResult(99), ResilienceContextPool.Shared.Get());

// assert
var blockedTill = GetBlockedTill(controller);
Expand All @@ -457,7 +457,7 @@ public async Task OnActionFailureAsync_VoidResult_EnsureBreakingExceptionNotSet(
.Do(x => x[1] = shouldBreak);

// act
await controller.OnActionFailureAsync(Outcome.FromResult(99), ResilienceContextPool.Shared.Get());
await controller.OnHandledOutcomeAsync(Outcome.FromResult(99), ResilienceContextPool.Shared.Get());

// assert
controller.LastException.Should().BeNull();
Expand All @@ -472,7 +472,7 @@ public async Task Flow_Closed_HalfOpen_Closed()

await TransitionToState(controller, CircuitState.HalfOpen);

await controller.OnActionSuccessAsync(Outcome.FromResult(0), ResilienceContextPool.Shared.Get());
await controller.OnUnhandledOutcomeAsync(Outcome.FromResult(0), ResilienceContextPool.Shared.Get());
controller.CircuitState.Should().Be(CircuitState.Closed);

_circuitBehavior.Received().OnActionSuccess(CircuitState.HalfOpen);
Expand All @@ -491,7 +491,7 @@ public async Task Flow_Closed_HalfOpen_Open_HalfOpen_Closed()
_circuitBehavior.When(v => v.OnActionFailure(CircuitState.HalfOpen, out Arg.Any<bool>()))
.Do(x => x[1] = shouldBreak);

await controller.OnActionFailureAsync(Outcome.FromResult(0), context);
await controller.OnHandledOutcomeAsync(Outcome.FromResult(0), context);
controller.CircuitState.Should().Be(CircuitState.Open);

// execution rejected
Expand All @@ -505,7 +505,7 @@ public async Task Flow_Closed_HalfOpen_Open_HalfOpen_Closed()
controller.CircuitState.Should().Be(CircuitState.HalfOpen);

// close circuit
await controller.OnActionSuccessAsync(Outcome.FromResult(0), ResilienceContextPool.Shared.Get());
await controller.OnUnhandledOutcomeAsync(Outcome.FromResult(0), ResilienceContextPool.Shared.Get());
controller.CircuitState.Should().Be(CircuitState.Closed);

_circuitBehavior.Received().OnActionSuccess(CircuitState.HalfOpen);
Expand Down Expand Up @@ -562,7 +562,7 @@ private async Task OpenCircuit(CircuitStateController<int> controller, Outcome<i
_circuitBehavior.When(v => v.OnActionFailure(CircuitState.Closed, out Arg.Any<bool>()))
.Do(x => x[1] = breakCircuit);

await controller.OnActionFailureAsync(outcome ?? Outcome.FromResult(10), ResilienceContextPool.Shared.Get().Initialize<int>(true));
await controller.OnHandledOutcomeAsync(outcome ?? Outcome.FromResult(10), ResilienceContextPool.Shared.Get().Initialize<int>(true));
}

private void AdvanceTime(TimeSpan timespan) => _timeProvider.Advance(timespan);
Expand Down
23 changes: 23 additions & 0 deletions test/Polly.Core.Tests/Utils/StrategyHelperTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,27 @@ await TestUtilities.AssertWithTimeoutAsync(async () =>

outcome.Exception.Should().BeOfType<InvalidOperationException>();
});

[InlineData(true)]
[InlineData(false)]
[Theory]
public async Task ExecuteCallbackSafeAsync_AsyncCallback_CompletedOk(bool isAsync) =>
await TestUtilities.AssertWithTimeoutAsync(async () =>
{
var outcomeTask = StrategyHelper.ExecuteCallbackSafeAsync<string, string>(
async (_, _) =>
{
if (isAsync)
{
await Task.Delay(15);
}

return Outcome.FromResult("success");
},
ResilienceContextPool.Shared.Get(),
"dummy");

outcomeTask.IsCompleted.Should().Be(!isAsync);
(await outcomeTask).Result.Should().Be("success");
});
}

0 comments on commit de6eb20

Please sign in to comment.