Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Stateful methods for circuitbreaker #5650

Merged
merged 7 commits into from
Mar 7, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4045,7 +4045,10 @@ namespace Akka.Pattern
public Akka.Pattern.CircuitBreaker OnOpen(System.Action callback) { }
public void Succeed() { }
public System.Threading.Tasks.Task<T> WithCircuitBreaker<T>(System.Func<System.Threading.Tasks.Task<T>> body) { }
public System.Threading.Tasks.Task<T> WithCircuitBreaker<T>(System.Func<System.Threading.Tasks.Task<T>> body) { }
public System.Threading.Tasks.Task<T> WithCircuitBreaker<T, TState>(TState state, System.Func<TState, System.Threading.Tasks.Task<T>> body) { }
public System.Threading.Tasks.Task WithCircuitBreaker(System.Func<System.Threading.Tasks.Task> body) { }
public System.Threading.Tasks.Task WithCircuitBreakerTState>(TState state, System.Func<TState, System.Threading.Tasks.Task body) { }
public Akka.Pattern.CircuitBreaker WithExponentialBackoff(System.TimeSpan maxResetTimeout) { }
public Akka.Pattern.CircuitBreaker WithRandomFactor(double randomFactor) { }
public void WithSyncCircuitBreaker(System.Action body) { }
Expand Down
14 changes: 12 additions & 2 deletions src/core/Akka/Pattern/CircuitBreaker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,12 @@ public Task<T> WithCircuitBreaker<T>(Func<Task<T>> body)
return CurrentState.Invoke(body);
}

public Task<T> WithCircuitBreaker<T, TState>(TState state,
Func<TState, Task<T>> body)
{
return CurrentState.InvokeState(state, body);
}

/// <summary>
/// Wraps invocation of asynchronous calls that need to be protected
/// </summary>
Expand All @@ -241,14 +247,18 @@ public Task WithCircuitBreaker(Func<Task> body)
{
return CurrentState.Invoke(body);
}
public Task WithCircuitBreaker<TState>(TState state, Func<TState, Task> body)
{
return CurrentState.InvokeState(state, body);
}

/// <summary>
/// The failure will be recorded farther down.
/// </summary>
/// <param name="body">TBD</param>
public void WithSyncCircuitBreaker(Action body)
{
var cbTask = WithCircuitBreaker(() => Task.Factory.StartNew(body));
var cbTask = WithCircuitBreaker(body,(b) => Task.Factory.StartNew(b));
if (!cbTask.Wait(CallTimeout))
{
//throw new TimeoutException( string.Format( "Execution did not complete within the time allotted {0} ms", CallTimeout.TotalMilliseconds ) );
Expand All @@ -275,7 +285,7 @@ public void WithSyncCircuitBreaker(Action body)
/// <returns><typeparamref name="T"/> or default(<typeparamref name="T"/>)</returns>
public T WithSyncCircuitBreaker<T>(Func<T> body)
{
var cbTask = WithCircuitBreaker(() => Task.Factory.StartNew(body));
var cbTask = WithCircuitBreaker(body,(b) => Task.Factory.StartNew(b));
return cbTask.Wait(CallTimeout) ? cbTask.Result : default(T);
}

Expand Down
52 changes: 44 additions & 8 deletions src/core/Akka/Pattern/CircuitBreakerState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,17 @@ private TimeSpan RemainingDuration()
public override Task<T> Invoke<T>(Func<Task<T>> body) =>
Task.FromException<T>(new OpenCircuitException(_breaker.LastCaughtException, RemainingDuration()));

public override Task
InvokeState<TState>(TState state, Func<TState, Task> body) =>
Task.FromException(
new OpenCircuitException(_breaker.LastCaughtException,
RemainingDuration()));

public override Task<T> InvokeState<T, TState>(TState state,
Func<TState, Task<T>> body) => Task.FromException<T>(
new OpenCircuitException(_breaker.LastCaughtException,
RemainingDuration()));

/// <summary>
/// Fail-fast on any invocation
/// </summary>
Expand Down Expand Up @@ -121,6 +132,14 @@ public HalfOpen(CircuitBreaker breaker)
_lock = new AtomicBoolean();
}

private void CheckState()
{
if (!_lock.CompareAndSet(true, false))
{
throw new OpenCircuitException("Circuit breaker is half open, only one call is allowed; this call is failing fast.", _breaker.LastCaughtException, TimeSpan.Zero);
}
}

/// <summary>
/// Allows a single call through, during which all other callers fail-fast. If the call fails, the breaker reopens.
/// If the call succeeds, the breaker closes.
Expand All @@ -130,12 +149,15 @@ public HalfOpen(CircuitBreaker breaker)
/// <returns><see cref="Task"/> containing result of protected call</returns>
public override async Task<T> Invoke<T>(Func<Task<T>> body)
{
if (!_lock.CompareAndSet(true, false))
{
throw new OpenCircuitException("Circuit breaker is half open, only one call is allowed; this call is failing fast.", _breaker.LastCaughtException, TimeSpan.Zero);
}
CheckState();
return await CallThrough(body);
}

public override async Task<T> InvokeState<T,TState>(TState state, Func<TState, Task<T>> body)
{
CheckState();
return await CallThrough(state,body);
}

/// <summary>
/// Allows a single call through, during which all other callers fail-fast. If the call fails, the breaker reopens.
Expand All @@ -145,13 +167,17 @@ public override async Task<T> Invoke<T>(Func<Task<T>> body)
/// <returns><see cref="Task"/> containing result of protected call</returns>
public override async Task Invoke(Func<Task> body)
{
if (!_lock.CompareAndSet(true, false))
{
throw new OpenCircuitException("Circuit breaker is half open, only one call is allowed; this call is failing fast.", _breaker.LastCaughtException, TimeSpan.Zero);
}
CheckState();
await CallThrough(body);
}

public override async Task InvokeState<TState>(TState state,
Func<TState, Task> body)
{
CheckState();
await CallThrough(state,body);
}

/// <summary>
/// Reopen breaker on failed call.
/// </summary>
Expand Down Expand Up @@ -216,6 +242,11 @@ public override Task<T> Invoke<T>(Func<Task<T>> body)
return CallThrough(body);
}

public override Task<T> InvokeState<T, TState>(TState state, Func<TState, Task<T>> body)
{
return CallThrough(state, body);
}

/// <summary>
/// Implementation of invoke, which simply attempts the call
/// </summary>
Expand All @@ -226,6 +257,11 @@ public override Task Invoke(Func<Task> body)
return CallThrough(body);
}

public override Task InvokeState<TState>(TState state, Func<TState, Task> body)
{
return CallThrough(state, body);
}

/// <summary>
/// On failed call, the failure count is incremented. The count is checked against the configured maxFailures, and
/// the breaker is tripped if we have reached maxFailures.
Expand Down
68 changes: 68 additions & 0 deletions src/core/Akka/Util/Internal/AtomicState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,38 @@ public async Task<T> CallThrough<T>(Func<Task<T>> task)
}
return result;
}

public async Task<T> CallThrough<T,TState>(TState state, Func<TState,Task<T>> task)
Aaronontheweb marked this conversation as resolved.
Show resolved Hide resolved
{
var deadline = DateTime.UtcNow.Add(_callTimeout);
ExceptionDispatchInfo capturedException = null;
T result = default(T);
try
{
result = await task(state).ConfigureAwait(false);
}
catch (Exception ex)
{
capturedException = ExceptionDispatchInfo.Capture(ex);
}

// Need to make sure that timeouts are reported as timeouts
if (capturedException != null)
{
CallFails(capturedException.SourceException);
capturedException.Throw();
}
else if (DateTime.UtcNow.CompareTo(deadline) >= 0)
{
CallFails(new TimeoutException(
$"Execution did not complete within the time allotted {_callTimeout.TotalMilliseconds} ms"));
}
else
{
CallSucceeds();
}
return result;
}

/// <summary>
/// Shared implementation of call across all states. Thrown exception or execution of the call beyond the allowed
Expand Down Expand Up @@ -154,8 +186,37 @@ public async Task CallThrough(Func<Task> task)
{
CallSucceeds();
}
}

public async Task CallThrough<TState>(TState state, Func<TState, Task> task)
{
var deadline = DateTime.UtcNow.Add(_callTimeout);
ExceptionDispatchInfo capturedException = null;

try
{
await task(state).ConfigureAwait(false);
}
catch (Exception ex)
{
capturedException = ExceptionDispatchInfo.Capture(ex);
}

// Need to make sure that timeouts are reported as timeouts
if (capturedException != null)
{
CallFails(capturedException?.SourceException);
capturedException.Throw();
}
else if (DateTime.UtcNow.CompareTo(deadline) >= 0)
{
CallFails(new TimeoutException(
$"Execution did not complete within the time allotted {_callTimeout.TotalMilliseconds} ms"));
}
else
{
CallSucceeds();
}
}

/// <summary>
Expand All @@ -166,13 +227,20 @@ public async Task CallThrough(Func<Task> task)
/// <returns><see cref="Task"/> containing result of protected call</returns>
public abstract Task<T> Invoke<T>(Func<Task<T>> body);

public abstract Task<T> InvokeState<T, TState>(TState state,
Func<TState, Task<T>> body);

/// <summary>
/// Abstract entry point for all states
/// </summary>
/// <param name="body">Implementation of the call that needs protected</param>
/// <returns><see cref="Task"/> containing result of protected call</returns>
public abstract Task Invoke(Func<Task> body);

public abstract Task InvokeState<TState>(TState state,
Func<TState, Task> body);


/// <summary>
/// Invoked when call fails
/// </summary>
Expand Down