Skip to content

Commit

Permalink
Receive failures lock transport into single concurrency mode (#1060)
Browse files Browse the repository at this point in the history
* Address races in the repeated failure circuit breaker by introducing explicit state transitions (#1057)

* Try address races in the repeated failure circuit breaker

* Address races in the repeated failure circuit breaker

* Improve log message

---------

Co-authored-by: danielmarbach <danielmarbach@users.noreply.github.com>

* Update the processing capacity consistently together with the prefetch count to make sure during throttling the number of prefetched messages is kept aligned with the settings of the user (#1058)

Co-authored-by: danielmarbach <danielmarbach@users.noreply.github.com>

* Use locks to serialize arm/disarm actions (#1066)

* Use locks to serialize arm/disarm actions

* Explicit state lock, Volatile read of state outside lock, actually reduce nesting where possible

* Small cosmetics for better readability

* Better logging

* Verbose comment

* Even better logging

* Basic test coverage

---------

Co-authored-by: danielmarbach <danielmarbach@users.noreply.github.com>

---------

Co-authored-by: Daniel Marbach <daniel.marbach@openplace.net>
Co-authored-by: danielmarbach <danielmarbach@users.noreply.github.com>
  • Loading branch information
3 people authored Oct 2, 2024
1 parent 56eb7df commit a85aba0
Show file tree
Hide file tree
Showing 3 changed files with 392 additions and 48 deletions.
222 changes: 222 additions & 0 deletions src/Tests/Receiving/RepeatedFailuresOverTimeCircuitBreakerTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
namespace NServiceBus.Transport.AzureServiceBus.Tests.Receiving
{
using System;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using NUnit.Framework;

// Ideally the circuit breaker would use a time provider to allow for easier testing but that would require a significant refactor
// and we want keep the changes to a minimum for now to allow backporting to older versions.
[TestFixture]
public class RepeatedFailuresOverTimeCircuitBreakerTests
{
[Test]
public async Task Should_disarm_on_success()
{
var armedActionCalled = false;
var disarmedActionCalled = false;

var circuitBreaker = new RepeatedFailuresOverTimeCircuitBreaker(
"TestCircuitBreaker",
TimeSpan.FromMilliseconds(100),
ex => { },
() => armedActionCalled = true,
() => disarmedActionCalled = true,
TimeSpan.Zero,
TimeSpan.Zero
);

await circuitBreaker.Failure(new Exception("Test Exception"));
circuitBreaker.Success();

Assert.That(armedActionCalled, Is.True, "The armed action should be called.");
Assert.That(disarmedActionCalled, Is.True, "The disarmed action should be called.");
}

[Test]
public async Task Should_rethrow_exception_on_success()
{
var circuitBreaker = new RepeatedFailuresOverTimeCircuitBreaker(
"TestCircuitBreaker",
TimeSpan.FromMilliseconds(100),
ex => { },
() => { },
() => throw new Exception("Exception from disarmed action"),
timeToWaitWhenTriggered: TimeSpan.Zero,
timeToWaitWhenArmed: TimeSpan.Zero
);

await circuitBreaker.Failure(new Exception("Test Exception"));

var ex = Assert.Throws<Exception>(() => circuitBreaker.Success());
Assert.That(ex.Message, Is.EqualTo("Exception from disarmed action"));
}

[Test]
public async Task Should_trigger_after_failure_timeout()
{
var triggerActionCalled = false;
Exception lastTriggerException = null;

var circuitBreaker = new RepeatedFailuresOverTimeCircuitBreaker(
"TestCircuitBreaker",
TimeSpan.Zero,
ex => { triggerActionCalled = true; lastTriggerException = ex; },
timeToWaitWhenTriggered: TimeSpan.Zero,
timeToWaitWhenArmed: TimeSpan.FromMilliseconds(100)
);

await circuitBreaker.Failure(new Exception("Test Exception"));

Assert.That(triggerActionCalled, Is.True, "The trigger action should be called after timeout.");
Assert.That(lastTriggerException, Is.Not.Null, "The exception passed to the trigger action should not be null.");
}

[Test]
public void Should_rethrow_exception_on_failure()
{
var circuitBreaker = new RepeatedFailuresOverTimeCircuitBreaker(
"TestCircuitBreaker",
TimeSpan.FromMilliseconds(100),
ex => { },
() => throw new Exception("Exception from armed action"),
() => { },
timeToWaitWhenTriggered: TimeSpan.Zero,
timeToWaitWhenArmed: TimeSpan.Zero
);

var ex = Assert.ThrowsAsync<Exception>(async () => await circuitBreaker.Failure(new Exception("Test Exception")));
Assert.That(ex.Message, Is.EqualTo("Exception from armed action"));
}

[Test]
public async Task Should_delay_after_trigger_failure()
{
var timeToWaitWhenTriggered = TimeSpan.FromMilliseconds(50);
var timeToWaitWhenArmed = TimeSpan.FromMilliseconds(100);

var circuitBreaker = new RepeatedFailuresOverTimeCircuitBreaker(
"TestCircuitBreaker",
TimeSpan.Zero,
_ => { },
timeToWaitWhenTriggered: timeToWaitWhenTriggered,
timeToWaitWhenArmed: timeToWaitWhenArmed
);

var stopWatch = Stopwatch.StartNew();

await circuitBreaker.Failure(new Exception("Test Exception"));
await circuitBreaker.Failure(new Exception("Test Exception After Trigger"));

stopWatch.Stop();

Assert.That(stopWatch.ElapsedMilliseconds, Is.GreaterThanOrEqualTo(timeToWaitWhenTriggered.Add(timeToWaitWhenArmed).TotalMilliseconds).Within(20), "The circuit breaker should delay after a triggered failure.");
}

[Test]
public async Task Should_not_trigger_if_disarmed_before_timeout()
{
var triggerActionCalled = false;

var circuitBreaker = new RepeatedFailuresOverTimeCircuitBreaker(
"TestCircuitBreaker",
TimeSpan.FromMilliseconds(100),
ex => triggerActionCalled = true,
timeToWaitWhenTriggered: TimeSpan.Zero,
timeToWaitWhenArmed: TimeSpan.Zero
);

await circuitBreaker.Failure(new Exception("Test Exception"));
circuitBreaker.Success();

Assert.That(triggerActionCalled, Is.False, "The trigger action should not be called if the circuit breaker was disarmed.");
}

[Test]
public async Task Should_handle_concurrent_failure_and_success()
{
var armedActionCalled = false;
var disarmedActionCalled = false;
var triggerActionCalled = false;

var circuitBreaker = new RepeatedFailuresOverTimeCircuitBreaker(
"TestCircuitBreaker",
TimeSpan.FromMilliseconds(100),
ex => triggerActionCalled = true,
() => armedActionCalled = true,
() => disarmedActionCalled = true,
TimeSpan.Zero,
TimeSpan.Zero
);

var failureTask = circuitBreaker.Failure(new Exception("Test Exception"));
var successTask = Task.Run(() =>
{
Thread.Sleep(50); // Simulate some delay before success
circuitBreaker.Success();
});

await Task.WhenAll(failureTask, successTask);

Assert.That(armedActionCalled, Is.True, "The armed action should be called.");
Assert.That(disarmedActionCalled, Is.True, "The disarmed action should be called.");
Assert.That(triggerActionCalled, Is.False, "The trigger action should not be called if success occurred before timeout.");
}

[Test]
public async Task Should_handle_high_concurrent_failure_and_success()
{
var armedActionCalled = 0;
var disarmedActionCalled = 0;
var triggerActionCalled = 0;

var circuitBreaker = new RepeatedFailuresOverTimeCircuitBreaker(
"TestCircuitBreaker",
TimeSpan.FromSeconds(5),
ex => Interlocked.Increment(ref triggerActionCalled),
() => Interlocked.Increment(ref armedActionCalled),
() => Interlocked.Increment(ref disarmedActionCalled),
TimeSpan.Zero,
TimeSpan.FromMilliseconds(25)
);

var tasks = Enumerable.Range(0, 1000)
.Select(
i => i % 2 == 0 ?
circuitBreaker.Failure(new Exception($"Test Exception {i}")) :
Task.Run(() =>
{
Thread.Sleep(25); // Simulate some delay before success
circuitBreaker.Success();
})
).ToArray();

await Task.WhenAll(tasks);

Assert.That(armedActionCalled, Is.EqualTo(1), "The armed action should be called.");
Assert.That(disarmedActionCalled, Is.EqualTo(1), "The disarmed action should be called.");
Assert.That(triggerActionCalled, Is.Zero, "The trigger action should not be called if success occurred before timeout.");
}

[Test]
public async Task Should_trigger_after_multiple_failures_and_timeout()
{
var triggerActionCalled = false;

var circuitBreaker = new RepeatedFailuresOverTimeCircuitBreaker(
"TestCircuitBreaker",
TimeSpan.FromMilliseconds(50),
ex => triggerActionCalled = true,
timeToWaitWhenTriggered: TimeSpan.FromMilliseconds(50),
timeToWaitWhenArmed: TimeSpan.FromMilliseconds(50)
);

await circuitBreaker.Failure(new Exception("Test Exception"));
await circuitBreaker.Failure(new Exception("Another Exception After Trigger"));

Assert.That(triggerActionCalled, Is.True, "The trigger action should be called after repeated failures and timeout.");
}
}
}
25 changes: 12 additions & 13 deletions src/Transport/Receiving/MessagePump.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,9 @@ public Task Initialize(

public async Task StartReceive(CancellationToken cancellationToken = default)
{
int prefetchCount = CalculatePrefetchCount();

var receiveOptions = new ServiceBusProcessorOptions
{
PrefetchCount = prefetchCount,
PrefetchCount = CalculatePrefetchCount(limitations.MaxConcurrency),
ReceiveMode = TransactionMode == TransportTransactionMode.None
? ServiceBusReceiveMode.ReceiveAndDelete
: ServiceBusReceiveMode.PeekLock,
Expand All @@ -96,20 +94,18 @@ public async Task StartReceive(CancellationToken cancellationToken = default)
{
criticalErrorAction("Failed to receive message from Azure Service Bus.", ex,
messageProcessingCancellationTokenSource.Token);
}, () =>
//We don't have to update the prefetch count since we are failing to receive anyway
processor.UpdateConcurrency(1),
() => processor.UpdateConcurrency(limitations.MaxConcurrency));
}, () => UpdateProcessingCapacity(1),
() => UpdateProcessingCapacity(limitations.MaxConcurrency));

await processor.StartProcessingAsync(cancellationToken)
.ConfigureAwait(false);
}

TransportTransactionMode TransactionMode => transportSettings.TransportTransactionMode;

int CalculatePrefetchCount()
int CalculatePrefetchCount(int maxConcurrency)
{
var prefetchCount = limitations.MaxConcurrency * transportSettings.PrefetchMultiplier;
var prefetchCount = maxConcurrency * transportSettings.PrefetchMultiplier;

if (transportSettings.PrefetchCount.HasValue)
{
Expand Down Expand Up @@ -190,14 +186,17 @@ public Task ChangeConcurrency(PushRuntimeSettings newLimitations, CancellationTo
{
limitations = newLimitations;

processor.UpdateConcurrency(limitations.MaxConcurrency);

int prefetchCount = CalculatePrefetchCount();
processor.UpdatePrefetchCount(prefetchCount);
UpdateProcessingCapacity(limitations.MaxConcurrency);

return Task.CompletedTask;
}

void UpdateProcessingCapacity(int maxConcurrency)
{
processor.UpdateConcurrency(maxConcurrency);
processor.UpdatePrefetchCount(CalculatePrefetchCount(maxConcurrency));
}

public async Task StopReceive(CancellationToken cancellationToken = default)
{
// Wiring up the stop token to trigger the cancellation token that is being
Expand Down
Loading

0 comments on commit a85aba0

Please sign in to comment.