Skip to content

Commit

Permalink
Address races in the repeated failure circuit breaker by introducing …
Browse files Browse the repository at this point in the history
…explicit state transitions (#1057)
  • Loading branch information
danielmarbach authored and DavidBoike committed Oct 1, 2024
1 parent 7984165 commit d75a7a0
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 29 deletions.
4 changes: 3 additions & 1 deletion src/Transport/Receiving/MessagePump.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,13 @@ public Task Init(Func<MessageContext, Task> onMessage, Func<ErrorContext, Task<E
this.criticalError = criticalError;
pushSettings = settings;

Action noOp = () => { };

circuitBreaker = new RepeatedFailuresOverTimeCircuitBreaker($"'{settings.InputQueue}'",
timeToWaitBeforeTriggeringCircuitBreaker, ex =>
{
criticalError.Raise("Failed to receive message from Azure Service Bus.", ex);
});
}, noOp, noOp);

return Task.CompletedTask;
}
Expand Down
63 changes: 35 additions & 28 deletions src/Transport/Receiving/RepeatedFailuresOverTimeCircuitBreaker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,72 +5,79 @@
using System.Threading.Tasks;
using Logging;

class RepeatedFailuresOverTimeCircuitBreaker
sealed class RepeatedFailuresOverTimeCircuitBreaker
{
public RepeatedFailuresOverTimeCircuitBreaker(string name, TimeSpan timeToWaitBeforeTriggering,
Action<Exception> triggerAction)
Action<Exception> triggerAction,
Action armedAction,
Action disarmedAction)
{
this.name = name;
this.triggerAction = triggerAction;
this.armedAction = armedAction;
this.disarmedAction = disarmedAction;
this.timeToWaitBeforeTriggering = timeToWaitBeforeTriggering;

timer = new Timer(CircuitBreakerTriggered);
}

public void Success()
{
var oldValue = Interlocked.Exchange(ref failureCount, 0);
var previousState = Interlocked.CompareExchange(ref circuitBreakerState, Disarmed, Armed);

if (oldValue == 0)
// If the circuit breaker was Armed or triggered before, disarm it
if (previousState == Armed || Interlocked.CompareExchange(ref circuitBreakerState, Disarmed, Triggered) == Triggered)
{
return;
_ = timer.Change(Timeout.Infinite, Timeout.Infinite);
Logger.InfoFormat("The circuit breaker for {0} is now disarmed", name);
disarmedAction();
}

timer.Change(Timeout.Infinite, Timeout.Infinite);
Logger.InfoFormat("The circuit breaker for {0} is now disarmed", name);
triggered = false;
}

public Task Failure(Exception exception, CancellationToken cancellationToken = default)
{
lastException = exception;
var newValue = Interlocked.Increment(ref failureCount);
_ = Interlocked.Exchange(ref lastException, exception);

// Atomically set state to Armed if it was previously Disarmed
var previousState = Interlocked.CompareExchange(ref circuitBreakerState, Armed, Disarmed);

if (newValue == 1)
if (previousState == Disarmed)
{
timer.Change(timeToWaitBeforeTriggering, NoPeriodicTriggering);
Logger.WarnFormat("The circuit breaker for {0} is now in the armed state", name);
armedAction();
_ = timer.Change(timeToWaitBeforeTriggering, NoPeriodicTriggering);
Logger.WarnFormat("The circuit breaker for {0} is now in the armed state due to {1}", name, exception);
}

//If the circuit breaker has been triggered, wait for 10 seconds before proceeding to prevent flooding the logs and hammering the ServiceBus
var delay = triggered ? TimeSpan.FromSeconds(10) : TimeSpan.FromSeconds(1);

return Task.Delay(delay, cancellationToken);
// If the circuit breaker has been triggered, wait for 10 seconds before proceeding to prevent flooding the logs and hammering the ServiceBus
return Task.Delay(previousState == Triggered ? TimeSpan.FromSeconds(10) : TimeSpan.FromSeconds(1), cancellationToken);
}

public void Dispose()
{
timer?.Dispose();
}
public void Dispose() => timer?.Dispose();

void CircuitBreakerTriggered(object state)
{
if (Interlocked.Read(ref failureCount) > 0)
if (Interlocked.CompareExchange(ref circuitBreakerState, Triggered, Armed) != Armed)
{
Logger.WarnFormat("The circuit breaker for {0} will now be triggered", name);
triggered = true;
triggerAction(lastException);
return;
}

Logger.WarnFormat("The circuit breaker for {0} will now be triggered with exception {1}", name, lastException);
triggerAction(lastException);
}

long failureCount;
volatile bool triggered;
int circuitBreakerState = Disarmed;
Exception lastException;

readonly string name;
readonly Timer timer;
readonly TimeSpan timeToWaitBeforeTriggering;
readonly Action<Exception> triggerAction;
readonly Action armedAction;
readonly Action disarmedAction;

const int Disarmed = 0;
const int Armed = 1;
const int Triggered = 2;

static readonly TimeSpan NoPeriodicTriggering = TimeSpan.FromMilliseconds(-1);
static readonly ILog Logger = LogManager.GetLogger<RepeatedFailuresOverTimeCircuitBreaker>();
Expand Down

0 comments on commit d75a7a0

Please sign in to comment.