From a81c094bfd7d6143d7c37e1519f2fe5b318eb764 Mon Sep 17 00:00:00 2001 From: danielmarbach Date: Tue, 1 Oct 2024 18:38:01 +0200 Subject: [PATCH 1/4] Doing extra checks for more safety --- .../RepeatedFailuresOverTimeCircuitBreaker.cs | 24 +++++++++++-------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/src/Transport/Receiving/RepeatedFailuresOverTimeCircuitBreaker.cs b/src/Transport/Receiving/RepeatedFailuresOverTimeCircuitBreaker.cs index 80ae22d4..bf519bb4 100644 --- a/src/Transport/Receiving/RepeatedFailuresOverTimeCircuitBreaker.cs +++ b/src/Transport/Receiving/RepeatedFailuresOverTimeCircuitBreaker.cs @@ -23,25 +23,29 @@ public RepeatedFailuresOverTimeCircuitBreaker(string name, TimeSpan timeToWaitBe public void Success() { - var previousState = Interlocked.CompareExchange(ref circuitBreakerState, Disarmed, Armed); + var previousState = circuitBreakerState; + if (previousState is not (Armed or Triggered)) + { + return; + } - // If the circuit breaker was Armed or triggered before, disarm it - if (previousState == Armed || Interlocked.CompareExchange(ref circuitBreakerState, Disarmed, Triggered) == Triggered) + var originalState = Interlocked.CompareExchange(ref circuitBreakerState, Disarmed, previousState); + if (originalState != previousState) { - _ = timer.Change(Timeout.Infinite, Timeout.Infinite); - Logger.InfoFormat("The circuit breaker for {0} is now disarmed", name); - disarmedAction(); + return; } + + _ = timer.Change(Timeout.Infinite, Timeout.Infinite); + Logger.InfoFormat("The circuit breaker for {0} is now disarmed", name); + disarmedAction(); } public Task Failure(Exception exception, CancellationToken cancellationToken = default) { _ = Interlocked.Exchange(ref lastException, exception); - // Atomically set state to Armed if it was previously Disarmed - var previousState = Interlocked.CompareExchange(ref circuitBreakerState, Armed, Disarmed); - - if (previousState == Disarmed) + var previousState = circuitBreakerState; + if (previousState == Disarmed && Interlocked.CompareExchange(ref circuitBreakerState, Armed, Disarmed) == Disarmed) { armedAction(); _ = timer.Change(timeToWaitBeforeTriggering, NoPeriodicTriggering); From d06a39eea5c274523c91c305bf64082b600f6a21 Mon Sep 17 00:00:00 2001 From: Daniel Marbach Date: Tue, 1 Oct 2024 21:22:06 +0200 Subject: [PATCH 2/4] Comments for clarity --- .../Receiving/RepeatedFailuresOverTimeCircuitBreaker.cs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/Transport/Receiving/RepeatedFailuresOverTimeCircuitBreaker.cs b/src/Transport/Receiving/RepeatedFailuresOverTimeCircuitBreaker.cs index bf519bb4..17094c07 100644 --- a/src/Transport/Receiving/RepeatedFailuresOverTimeCircuitBreaker.cs +++ b/src/Transport/Receiving/RepeatedFailuresOverTimeCircuitBreaker.cs @@ -23,12 +23,16 @@ public RepeatedFailuresOverTimeCircuitBreaker(string name, TimeSpan timeToWaitBe public void Success() { + // Take a snapshot of the current state of the circuit breaker var previousState = circuitBreakerState; if (previousState is not (Armed or Triggered)) { return; } + // Try to transition to the disarmed state if the circuit breaker is armed or triggered + // and the previous state is the same that we read before. If that is not the case + // then another thread has already transitioned the circuit breaker to another state. var originalState = Interlocked.CompareExchange(ref circuitBreakerState, Disarmed, previousState); if (originalState != previousState) { @@ -42,9 +46,13 @@ public void Success() public Task Failure(Exception exception, CancellationToken cancellationToken = default) { + // Atomically store the exception that caused the circuit breaker to trip _ = Interlocked.Exchange(ref lastException, exception); + // Take a snapshot of the current state of the circuit breaker var previousState = circuitBreakerState; + // If the circuit breaker is disarmed, try to transition to the armed state but the previous state must be disarmed + // otherwise another thread has already transitioned the circuit breaker to another state if (previousState == Disarmed && Interlocked.CompareExchange(ref circuitBreakerState, Armed, Disarmed) == Disarmed) { armedAction(); From de9b1c2c02a7efb3db27b080ced1a1d71f579850 Mon Sep 17 00:00:00 2001 From: Daniel Marbach Date: Tue, 1 Oct 2024 21:27:57 +0200 Subject: [PATCH 3/4] Configurable timeouts --- .../RepeatedFailuresOverTimeCircuitBreaker.cs | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/src/Transport/Receiving/RepeatedFailuresOverTimeCircuitBreaker.cs b/src/Transport/Receiving/RepeatedFailuresOverTimeCircuitBreaker.cs index 17094c07..1b385dda 100644 --- a/src/Transport/Receiving/RepeatedFailuresOverTimeCircuitBreaker.cs +++ b/src/Transport/Receiving/RepeatedFailuresOverTimeCircuitBreaker.cs @@ -10,13 +10,17 @@ sealed class RepeatedFailuresOverTimeCircuitBreaker public RepeatedFailuresOverTimeCircuitBreaker(string name, TimeSpan timeToWaitBeforeTriggering, Action triggerAction, Action armedAction, - Action disarmedAction) + Action disarmedAction, + TimeSpan? timeToWaitWhenTriggered = default, + TimeSpan? timeToWaitWhenArmed = default) { this.name = name; this.triggerAction = triggerAction; this.armedAction = armedAction; this.disarmedAction = disarmedAction; this.timeToWaitBeforeTriggering = timeToWaitBeforeTriggering; + this.timeToWaitWhenTriggered = timeToWaitWhenTriggered ?? TimeSpan.FromSeconds(10); + this.timeToWaitWhenArmed = timeToWaitWhenArmed ?? TimeSpan.FromSeconds(1); timer = new Timer(CircuitBreakerTriggered); } @@ -60,8 +64,7 @@ public Task Failure(Exception exception, CancellationToken cancellationToken = d Logger.WarnFormat("The circuit breaker for {0} is now in the armed state due to {1}", name, exception); } - // If the circuit breaker has been triggered, wait for 10 seconds before proceeding to prevent flooding the logs and hammering the ServiceBus - return Task.Delay(previousState == Triggered ? TimeSpan.FromSeconds(10) : TimeSpan.FromSeconds(1), cancellationToken); + return Task.Delay(previousState == Triggered ? timeToWaitWhenTriggered : timeToWaitWhenArmed, cancellationToken); } public void Dispose() => timer?.Dispose(); @@ -86,6 +89,8 @@ void CircuitBreakerTriggered(object state) readonly Action triggerAction; readonly Action armedAction; readonly Action disarmedAction; + readonly TimeSpan timeToWaitWhenTriggered; + readonly TimeSpan timeToWaitWhenArmed; const int Disarmed = 0; const int Armed = 1; From 4c78142fe12f44f7da3f5e0aae58ff4695dbf1bf Mon Sep 17 00:00:00 2001 From: Daniel Marbach Date: Tue, 1 Oct 2024 22:06:26 +0200 Subject: [PATCH 4/4] Updating the processing capacity sequentially --- src/Transport/Receiving/MessagePump.cs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/Transport/Receiving/MessagePump.cs b/src/Transport/Receiving/MessagePump.cs index 07b84833..b73fcaa8 100644 --- a/src/Transport/Receiving/MessagePump.cs +++ b/src/Transport/Receiving/MessagePump.cs @@ -193,8 +193,13 @@ public Task ChangeConcurrency(PushRuntimeSettings newLimitations, CancellationTo void UpdateProcessingCapacity(int maxConcurrency) { - processor.UpdateConcurrency(maxConcurrency); - processor.UpdatePrefetchCount(CalculatePrefetchCount(maxConcurrency)); + // Updating the concurrency level and prefetch count is thread safe by design + // We are locking here because the circuit breaker might call this method concurrently + lock (processor) + { + processor.UpdateConcurrency(maxConcurrency); + processor.UpdatePrefetchCount(CalculatePrefetchCount(maxConcurrency)); + } } public async Task StopReceive(CancellationToken cancellationToken = default)