Skip to content

Commit

Permalink
Add circuit breaker exponential backoff support (#4350)
Browse files Browse the repository at this point in the history
  • Loading branch information
ismaelhamed authored Aug 22, 2020
1 parent 920497e commit 65f3044
Show file tree
Hide file tree
Showing 10 changed files with 243 additions and 76 deletions.
27 changes: 11 additions & 16 deletions docs/articles/utilities/circuit-breaker.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,20 @@ The Akka.NET library provides an implementation of a circuit breaker called `Akk
## What do they do?

* During normal operation, a circuit breaker is in the `Closed` state:
* Exceptions or calls exceeding the configured `СallTimeout` increment a
failure counter
* Successes reset the failure count to zero
* When the failure counter reaches a `MaxFailures` count, the breaker is
tripped into `Open` state
* Exceptions or calls exceeding the configured `СallTimeout` increment a failure counter
* Successes reset the failure count to zero
* When the failure counter reaches a `MaxFailures` count, the breaker is tripped into `Open` state
* While in `Open` state:
* All calls fail-fast with a `OpenCircuitException`
* After the configured `ResetTimeout`, the circuit breaker enters a
`Half-Open` state
* All calls fail-fast with a `OpenCircuitException`
* After the configured `ResetTimeout`, the circuit breaker enters a `Half-Open` state
* In `Half-Open` state:
* The first call attempted is allowed through without failing fast
* All other calls fail-fast with an exception just as in `Open` state
* If the first call succeeds, the breaker is reset back to `Closed` state
* If the first call fails, the breaker is tripped again into the `Open` state
for another full `ResetTimeout`
* The first call attempted is allowed through without failing fast
* All other calls fail-fast with an exception just as in `Open` state
* If the first call succeeds, the breaker is reset back to `Closed` state and the `ResetTimeout` is reset
* If the first call fails, the breaker is tripped again into the `Open` state (as for exponential backoff circuit breaker, the `ResetTimeout` is multiplied by the exponential backoff factor)
* State transition listeners:
* Callbacks can be provided for every state entry via `OnOpen`, `OnClose`,
and `OnHalfOpen`
* These are executed in the `ExecutionContext` provided.
* Callbacks can be provided for every state entry via `OnOpen`, `OnClose`, and `OnHalfOpen`
* These are executed in the `ExecutionContext` provided.

![Circuit breaker states](/images/circuit-breaker-states.png)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,7 @@ protected BatchingSqlJournal(BatchingSqlJournalSetup setup)
_serialization = Context.System.Serialization;
Log = Context.GetLogger();
_circuitBreaker = CircuitBreaker.Create(
Context.System.Scheduler,
maxFailures: Setup.CircuitBreakerSettings.MaxFailures,
callTimeout: Setup.CircuitBreakerSettings.CallTimeout,
resetTimeout: Setup.CircuitBreakerSettings.ResetTimeout);
Expand Down
15 changes: 12 additions & 3 deletions src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3885,23 +3885,28 @@ namespace Akka.Pattern
}
public class CircuitBreaker
{
public CircuitBreaker(int maxFailures, System.TimeSpan callTimeout, System.TimeSpan resetTimeout) { }
public CircuitBreaker(Akka.Actor.IScheduler scheduler, int maxFailures, System.TimeSpan callTimeout, System.TimeSpan resetTimeout) { }
public CircuitBreaker(Akka.Actor.IScheduler scheduler, int maxFailures, System.TimeSpan callTimeout, System.TimeSpan resetTimeout, System.TimeSpan maxResetTimeout, double exponentialBackoffFactor) { }
public System.TimeSpan CallTimeout { get; }
public long CurrentFailureCount { get; }
public double ExponentialBackoffFactor { get; }
public bool IsClosed { get; }
public bool IsHalfOpen { get; }
public bool IsOpen { get; }
public System.Exception LastCaughtException { get; }
public int MaxFailures { get; }
public System.TimeSpan MaxResetTimeout { get; }
public System.TimeSpan ResetTimeout { get; }
public static Akka.Pattern.CircuitBreaker Create(int maxFailures, System.TimeSpan callTimeout, System.TimeSpan resetTimeout) { }
public Akka.Actor.IScheduler Scheduler { get; }
public static Akka.Pattern.CircuitBreaker Create(Akka.Actor.IScheduler scheduler, int maxFailures, System.TimeSpan callTimeout, System.TimeSpan resetTimeout) { }
public void Fail() { }
public Akka.Pattern.CircuitBreaker OnClose(System.Action callback) { }
public Akka.Pattern.CircuitBreaker OnHalfOpen(System.Action callback) { }
public Akka.Pattern.CircuitBreaker OnOpen(System.Action callback) { }
public void Succeed() { }
public System.Threading.Tasks.Task<T> WithCircuitBreaker<T>(System.Func<System.Threading.Tasks.Task<T>> body) { }
public System.Threading.Tasks.Task WithCircuitBreaker(System.Func<System.Threading.Tasks.Task> body) { }
public Akka.Pattern.CircuitBreaker WithExponentialBackoff(System.TimeSpan maxResetTimeout) { }
public void WithSyncCircuitBreaker(System.Action body) { }
public T WithSyncCircuitBreaker<T>(System.Func<T> body) { }
}
Expand All @@ -3918,9 +3923,13 @@ namespace Akka.Pattern
public class OpenCircuitException : Akka.Actor.AkkaException
{
public OpenCircuitException() { }
public OpenCircuitException(System.Exception cause) { }
public OpenCircuitException(string message) { }
public OpenCircuitException(string message, System.TimeSpan remainingDuration) { }
public OpenCircuitException(string message, System.Exception cause) { }
public OpenCircuitException(string message, System.Exception cause, System.TimeSpan remainingDuration) { }
public OpenCircuitException(System.Exception cause) { }
public OpenCircuitException(System.Exception cause, System.TimeSpan remainingDuration) { }
public System.TimeSpan RemainingDuration { get; }
}
public class UserCalledFailException : Akka.Actor.AkkaException
{
Expand Down
19 changes: 10 additions & 9 deletions src/core/Akka.Docs.Tests/Utilities/CircuitBreakerDocSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ public class DangerousActor : ReceiveActor
public DangerousActor()
{
var breaker = new CircuitBreaker(
maxFailures: 5,
callTimeout: TimeSpan.FromSeconds(10),
resetTimeout: TimeSpan.FromMinutes(1))
.OnOpen(NotifyMeOnOpen);
Context.System.Scheduler,
maxFailures: 5,
callTimeout: TimeSpan.FromSeconds(10),
resetTimeout: TimeSpan.FromMinutes(1)).OnOpen(NotifyMeOnOpen);
}

private void NotifyMeOnOpen()
Expand All @@ -42,10 +42,10 @@ public class DangerousActorCallProtection : ReceiveActor
public DangerousActorCallProtection()
{
var breaker = new CircuitBreaker(
maxFailures: 5,
callTimeout: TimeSpan.FromSeconds(10),
resetTimeout: TimeSpan.FromMinutes(1))
.OnOpen(NotifyMeOnOpen);
Context.System.Scheduler,
maxFailures: 5,
callTimeout: TimeSpan.FromSeconds(10),
resetTimeout: TimeSpan.FromMinutes(1)).OnOpen(NotifyMeOnOpen);

var dangerousCall = "This really isn't that dangerous of a call after all";

Expand Down Expand Up @@ -77,7 +77,8 @@ public TellPatternActor(IActorRef recipient )
{
_recipient = recipient;
_breaker = new CircuitBreaker(
maxFailures: 5,
Context.System.Scheduler,
maxFailures: 5,
callTimeout: TimeSpan.FromSeconds(10),
resetTimeout: TimeSpan.FromMinutes(1)).OnOpen(NotifyMeOnOpen);
}
Expand Down
1 change: 1 addition & 0 deletions src/core/Akka.Persistence/Journal/AsyncWriteJournal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ protected AsyncWriteJournal()
CanPublish = extension.Settings.Internal.PublishPluginCommands;
var config = extension.ConfigFor(Self);
_breaker = new CircuitBreaker(
Context.System.Scheduler,
config.GetInt("circuit-breaker.max-failures", 0),
config.GetTimeSpan("circuit-breaker.call-timeout", null),
config.GetTimeSpan("circuit-breaker.reset-timeout", null));
Expand Down
1 change: 1 addition & 0 deletions src/core/Akka.Persistence/Snapshot/SnapshotStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ protected SnapshotStore()
_publish = extension.Settings.Internal.PublishPluginCommands;
var config = extension.ConfigFor(Self);
_breaker = CircuitBreaker.Create(
Context.System.Scheduler,
config.GetInt("circuit-breaker.max-failures", 0),
config.GetTimeSpan("circuit-breaker.call-timeout", null),
config.GetTimeSpan("circuit-breaker.reset-timeout", null));
Expand Down
62 changes: 51 additions & 11 deletions src/core/Akka.Tests/Pattern/CircuitBreakerSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
//-----------------------------------------------------------------------

using System;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
using System.Runtime.Serialization;
Expand Down Expand Up @@ -345,6 +346,29 @@ public void Should_Transition_To_Half_Open_When_Reset_Timeout( )
Assert.True( InterceptExceptionType<TestException>( ( ) => breaker.Instance.WithCircuitBreaker( () => Task.Factory.StartNew( ThrowException ) ).Wait( ) ) );
Assert.True( CheckLatch( breaker.HalfOpenLatch ) );
}

[Fact(DisplayName = "An asynchronous circuit breaker that is open should increase the reset timeout after it transits to open again")]
public void Should_Reset_Timeout_After_It_Transits_To_Open_Again()
{
var breaker = NonOneFactorCb();
Assert.True(InterceptExceptionType<TestException>(() => breaker.Instance.WithCircuitBreaker(() => Task.Run(ThrowException)).Wait()));
Assert.True(CheckLatch(breaker.OpenLatch));

var e1 = InterceptException<OpenCircuitException>(() => breaker.Instance.WithSyncCircuitBreaker(SayTest));
var shortRemainingDuration = e1.RemainingDuration;

Thread.Sleep(1000);
Assert.True(CheckLatch(breaker.HalfOpenLatch));

// transit to open again
Assert.True(InterceptExceptionType<TestException>(() => breaker.Instance.WithCircuitBreaker(() => Task.Run(ThrowException)).Wait()));
Assert.True(CheckLatch(breaker.OpenLatch));

var e2 = InterceptException<OpenCircuitException>(() => breaker.Instance.WithSyncCircuitBreaker(SayTest));
var longRemainingDuration = e2.RemainingDuration;

Assert.True(shortRemainingDuration < longRemainingDuration);
}
}

public class CircuitBreakerSpecBase : AkkaSpec
Expand All @@ -368,14 +392,25 @@ public async Task<string> DelayedSayTest(TimeSpan delay)
return "Test";
}

public void ThrowException( )
{
throw new TestException( "Test Exception" );
}
[DebuggerStepThrough]
public void ThrowException() => throw new TestException("Test Exception");

public string SayTest( )
public string SayTest( ) => "Test";

protected T InterceptException<T>(Action actionThatThrows) where T : Exception
{
return "Test";
return Assert.Throws<T>(() =>
{
try
{
actionThatThrows();
}
catch (AggregateException ex)
{
foreach (var e in ex.Flatten().InnerExceptions.Where(e => e is T).Select(e => e))
throw e;
}
});
}

[SuppressMessage( "Microsoft.Design", "CA1004:GenericMethodsShouldProvideTypeParameter" )]
Expand Down Expand Up @@ -434,27 +469,32 @@ public async Task<bool> InterceptExceptionTypeAsync<T>(Task action) where T : Ex

public TestBreaker ShortCallTimeoutCb( )
{
return new TestBreaker( new CircuitBreaker( 1, TimeSpan.FromMilliseconds( 50 ), TimeSpan.FromMilliseconds( 500 ) ) );
return new TestBreaker( new CircuitBreaker(Sys.Scheduler, 1, TimeSpan.FromMilliseconds( 50 ), TimeSpan.FromMilliseconds( 500 ) ) );
}

public TestBreaker ShortResetTimeoutCb( )
{
return new TestBreaker( new CircuitBreaker( 1, TimeSpan.FromMilliseconds( 1000 ), TimeSpan.FromMilliseconds( 50 ) ) );
return new TestBreaker( new CircuitBreaker(Sys.Scheduler, 1, TimeSpan.FromMilliseconds( 1000 ), TimeSpan.FromMilliseconds( 50 ) ) );
}

public TestBreaker LongCallTimeoutCb( )
{
return new TestBreaker( new CircuitBreaker( 1, TimeSpan.FromMilliseconds( 5000 ), TimeSpan.FromMilliseconds( 500 ) ) );
return new TestBreaker( new CircuitBreaker(Sys.Scheduler, 1, TimeSpan.FromMilliseconds( 5000 ), TimeSpan.FromMilliseconds( 500 ) ) );
}

public TestBreaker LongResetTimeoutCb( )
{
return new TestBreaker( new CircuitBreaker( 1, TimeSpan.FromMilliseconds( 100 ), TimeSpan.FromMilliseconds( 5000 ) ) );
return new TestBreaker( new CircuitBreaker(Sys.Scheduler, 1, TimeSpan.FromMilliseconds( 100 ), TimeSpan.FromMilliseconds( 5000 ) ) );
}

public TestBreaker MultiFailureCb( )
{
return new TestBreaker( new CircuitBreaker( 5, TimeSpan.FromMilliseconds( 200 ), TimeSpan.FromMilliseconds( 500 ) ) );
return new TestBreaker( new CircuitBreaker(Sys.Scheduler, 5, TimeSpan.FromMilliseconds( 200 ), TimeSpan.FromMilliseconds( 500 ) ) );
}

public TestBreaker NonOneFactorCb()
{
return new TestBreaker(new CircuitBreaker(Sys.Scheduler, 1, TimeSpan.FromMilliseconds(2000), TimeSpan.FromMilliseconds(1000), TimeSpan.FromDays(1), 5));
}
}

Expand Down
Loading

0 comments on commit 65f3044

Please sign in to comment.