Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CircuitBreaker transition into next states only when previous state aligns with what has been read #1064

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions src/Transport/Receiving/MessagePump.cs
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,13 @@ public Task ChangeConcurrency(PushRuntimeSettings newLimitations, CancellationTo

void UpdateProcessingCapacity(int maxConcurrency)
{
processor.UpdateConcurrency(maxConcurrency);
processor.UpdatePrefetchCount(CalculatePrefetchCount(maxConcurrency));
// Updating the concurrency level and prefetch count is thread safe by design
// We are locking here because the circuit breaker might call this method concurrently
lock (processor)
{
processor.UpdateConcurrency(maxConcurrency);
processor.UpdatePrefetchCount(CalculatePrefetchCount(maxConcurrency));
}
}

public async Task StopReceive(CancellationToken cancellationToken = default)
Expand Down
43 changes: 30 additions & 13 deletions src/Transport/Receiving/RepeatedFailuresOverTimeCircuitBreaker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,46 +10,61 @@ sealed class RepeatedFailuresOverTimeCircuitBreaker
public RepeatedFailuresOverTimeCircuitBreaker(string name, TimeSpan timeToWaitBeforeTriggering,
Action<Exception> triggerAction,
Action armedAction,
Action disarmedAction)
Action disarmedAction,
TimeSpan? timeToWaitWhenTriggered = default,
TimeSpan? timeToWaitWhenArmed = default)
{
this.name = name;
this.triggerAction = triggerAction;
this.armedAction = armedAction;
this.disarmedAction = disarmedAction;
this.timeToWaitBeforeTriggering = timeToWaitBeforeTriggering;
this.timeToWaitWhenTriggered = timeToWaitWhenTriggered ?? TimeSpan.FromSeconds(10);
this.timeToWaitWhenArmed = timeToWaitWhenArmed ?? TimeSpan.FromSeconds(1);

timer = new Timer(CircuitBreakerTriggered);
}

public void Success()
{
var previousState = Interlocked.CompareExchange(ref circuitBreakerState, Disarmed, Armed);
// Take a snapshot of the current state of the circuit breaker
var previousState = circuitBreakerState;
if (previousState is not (Armed or Triggered))
{
return;
}

// If the circuit breaker was Armed or triggered before, disarm it
if (previousState == Armed || Interlocked.CompareExchange(ref circuitBreakerState, Disarmed, Triggered) == Triggered)
// Try to transition to the disarmed state if the circuit breaker is armed or triggered
// and the previous state is the same that we read before. If that is not the case
// then another thread has already transitioned the circuit breaker to another state.
var originalState = Interlocked.CompareExchange(ref circuitBreakerState, Disarmed, previousState);
if (originalState != previousState)
{
_ = timer.Change(Timeout.Infinite, Timeout.Infinite);
Logger.InfoFormat("The circuit breaker for {0} is now disarmed", name);
disarmedAction();
return;
}

_ = timer.Change(Timeout.Infinite, Timeout.Infinite);
Logger.InfoFormat("The circuit breaker for {0} is now disarmed", name);
disarmedAction();
}

public Task Failure(Exception exception, CancellationToken cancellationToken = default)
{
// Atomically store the exception that caused the circuit breaker to trip
_ = Interlocked.Exchange(ref lastException, exception);

// Atomically set state to Armed if it was previously Disarmed
var previousState = Interlocked.CompareExchange(ref circuitBreakerState, Armed, Disarmed);

if (previousState == Disarmed)
// Take a snapshot of the current state of the circuit breaker
var previousState = circuitBreakerState;
// If the circuit breaker is disarmed, try to transition to the armed state but the previous state must be disarmed
// otherwise another thread has already transitioned the circuit breaker to another state
if (previousState == Disarmed && Interlocked.CompareExchange(ref circuitBreakerState, Armed, Disarmed) == Disarmed)
{
armedAction();
_ = timer.Change(timeToWaitBeforeTriggering, NoPeriodicTriggering);
Logger.WarnFormat("The circuit breaker for {0} is now in the armed state due to {1}", name, exception);
}

// If the circuit breaker has been triggered, wait for 10 seconds before proceeding to prevent flooding the logs and hammering the ServiceBus
return Task.Delay(previousState == Triggered ? TimeSpan.FromSeconds(10) : TimeSpan.FromSeconds(1), cancellationToken);
return Task.Delay(previousState == Triggered ? timeToWaitWhenTriggered : timeToWaitWhenArmed, cancellationToken);
}

public void Dispose() => timer?.Dispose();
Expand All @@ -74,6 +89,8 @@ void CircuitBreakerTriggered(object state)
readonly Action<Exception> triggerAction;
readonly Action armedAction;
readonly Action disarmedAction;
readonly TimeSpan timeToWaitWhenTriggered;
readonly TimeSpan timeToWaitWhenArmed;

const int Disarmed = 0;
const int Armed = 1;
Expand Down