Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport of circuit breaker fixes involved in bug in later versions #1062

Merged
merged 3 commits into from
Oct 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
222 changes: 222 additions & 0 deletions src/Tests/Receiving/RepeatedFailuresOverTimeCircuitBreakerTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
namespace NServiceBus.Transport.AzureServiceBus.Tests.Receiving
{
using System;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using NUnit.Framework;

// Ideally the circuit breaker would use a time provider to allow for easier testing but that would require a significant refactor
// and we want keep the changes to a minimum for now to allow backporting to older versions.
[TestFixture]
public class RepeatedFailuresOverTimeCircuitBreakerTests
{
[Test]
public async Task Should_disarm_on_success()
{
var armedActionCalled = false;
var disarmedActionCalled = false;

var circuitBreaker = new RepeatedFailuresOverTimeCircuitBreaker(
"TestCircuitBreaker",
TimeSpan.FromMilliseconds(100),
ex => { },
() => armedActionCalled = true,
() => disarmedActionCalled = true,
TimeSpan.Zero,
TimeSpan.Zero
);

await circuitBreaker.Failure(new Exception("Test Exception"));
circuitBreaker.Success();

Assert.That(armedActionCalled, Is.True, "The armed action should be called.");
Assert.That(disarmedActionCalled, Is.True, "The disarmed action should be called.");
}

[Test]
public async Task Should_rethrow_exception_on_success()
{
var circuitBreaker = new RepeatedFailuresOverTimeCircuitBreaker(
"TestCircuitBreaker",
TimeSpan.FromMilliseconds(100),
x => { },
() => { },
() => throw new Exception("Exception from disarmed action"),
timeToWaitWhenTriggered: TimeSpan.Zero,
timeToWaitWhenArmed: TimeSpan.Zero
);

await circuitBreaker.Failure(new Exception("Test Exception"));

var ex = Assert.Throws<Exception>(() => circuitBreaker.Success());
Assert.That(ex.Message, Is.EqualTo("Exception from disarmed action"));
}

[Test]
public async Task Should_trigger_after_failure_timeout()
{
var triggerActionCalled = false;
Exception lastTriggerException = null;

var circuitBreaker = new RepeatedFailuresOverTimeCircuitBreaker(
"TestCircuitBreaker",
TimeSpan.Zero,
ex => { triggerActionCalled = true; lastTriggerException = ex; },
timeToWaitWhenTriggered: TimeSpan.Zero,
timeToWaitWhenArmed: TimeSpan.FromMilliseconds(100)
);

await circuitBreaker.Failure(new Exception("Test Exception"));

Assert.That(triggerActionCalled, Is.True, "The trigger action should be called after timeout.");
Assert.That(lastTriggerException, Is.Not.Null, "The exception passed to the trigger action should not be null.");
}

[Test]
public void Should_rethrow_exception_on_failure()
{
var circuitBreaker = new RepeatedFailuresOverTimeCircuitBreaker(
"TestCircuitBreaker",
TimeSpan.FromMilliseconds(100),
x => { },
() => throw new Exception("Exception from armed action"),
() => { },
timeToWaitWhenTriggered: TimeSpan.Zero,
timeToWaitWhenArmed: TimeSpan.Zero
);

var ex = Assert.ThrowsAsync<Exception>(async () => await circuitBreaker.Failure(new Exception("Test Exception")));
Assert.That(ex.Message, Is.EqualTo("Exception from armed action"));
}

[Test]
public async Task Should_delay_after_trigger_failure()
{
var timeToWaitWhenTriggered = TimeSpan.FromMilliseconds(50);
var timeToWaitWhenArmed = TimeSpan.FromMilliseconds(100);

var circuitBreaker = new RepeatedFailuresOverTimeCircuitBreaker(
"TestCircuitBreaker",
TimeSpan.Zero,
_ => { },
timeToWaitWhenTriggered: timeToWaitWhenTriggered,
timeToWaitWhenArmed: timeToWaitWhenArmed
);

var stopWatch = Stopwatch.StartNew();

await circuitBreaker.Failure(new Exception("Test Exception"));
await circuitBreaker.Failure(new Exception("Test Exception After Trigger"));

stopWatch.Stop();

Assert.That(stopWatch.ElapsedMilliseconds, Is.GreaterThanOrEqualTo(timeToWaitWhenTriggered.Add(timeToWaitWhenArmed).TotalMilliseconds).Within(20), "The circuit breaker should delay after a triggered failure.");
}

[Test]
public async Task Should_not_trigger_if_disarmed_before_timeout()
{
var triggerActionCalled = false;

var circuitBreaker = new RepeatedFailuresOverTimeCircuitBreaker(
"TestCircuitBreaker",
TimeSpan.FromMilliseconds(100),
ex => triggerActionCalled = true,
timeToWaitWhenTriggered: TimeSpan.Zero,
timeToWaitWhenArmed: TimeSpan.Zero
);

await circuitBreaker.Failure(new Exception("Test Exception"));
circuitBreaker.Success();

Assert.That(triggerActionCalled, Is.False, "The trigger action should not be called if the circuit breaker was disarmed.");
}

[Test]
public async Task Should_handle_concurrent_failure_and_success()
{
var armedActionCalled = false;
var disarmedActionCalled = false;
var triggerActionCalled = false;

var circuitBreaker = new RepeatedFailuresOverTimeCircuitBreaker(
"TestCircuitBreaker",
TimeSpan.FromMilliseconds(100),
ex => triggerActionCalled = true,
() => armedActionCalled = true,
() => disarmedActionCalled = true,
TimeSpan.Zero,
TimeSpan.Zero
);

var failureTask = circuitBreaker.Failure(new Exception("Test Exception"));
var successTask = Task.Run(() =>
{
Thread.Sleep(50); // Simulate some delay before success
circuitBreaker.Success();
});

await Task.WhenAll(failureTask, successTask);

Assert.That(armedActionCalled, Is.True, "The armed action should be called.");
Assert.That(disarmedActionCalled, Is.True, "The disarmed action should be called.");
Assert.That(triggerActionCalled, Is.False, "The trigger action should not be called if success occurred before timeout.");
}

[Test]
public async Task Should_handle_high_concurrent_failure_and_success()
{
var armedActionCalled = 0;
var disarmedActionCalled = 0;
var triggerActionCalled = 0;

var circuitBreaker = new RepeatedFailuresOverTimeCircuitBreaker(
"TestCircuitBreaker",
TimeSpan.FromSeconds(5),
ex => Interlocked.Increment(ref triggerActionCalled),
() => Interlocked.Increment(ref armedActionCalled),
() => Interlocked.Increment(ref disarmedActionCalled),
TimeSpan.Zero,
TimeSpan.FromMilliseconds(25)
);

var tasks = Enumerable.Range(0, 1000)
.Select(
i => i % 2 == 0 ?
circuitBreaker.Failure(new Exception($"Test Exception {i}")) :
Task.Run(() =>
{
Thread.Sleep(25); // Simulate some delay before success
circuitBreaker.Success();
})
).ToArray();

await Task.WhenAll(tasks);

Assert.That(armedActionCalled, Is.EqualTo(1), "The armed action should be called.");
Assert.That(disarmedActionCalled, Is.EqualTo(1), "The disarmed action should be called.");
Assert.That(triggerActionCalled, Is.Zero, "The trigger action should not be called if success occurred before timeout.");
}

[Test]
public async Task Should_trigger_after_multiple_failures_and_timeout()
{
var triggerActionCalled = false;

var circuitBreaker = new RepeatedFailuresOverTimeCircuitBreaker(
"TestCircuitBreaker",
TimeSpan.FromMilliseconds(50),
ex => triggerActionCalled = true,
timeToWaitWhenTriggered: TimeSpan.FromMilliseconds(50),
timeToWaitWhenArmed: TimeSpan.FromMilliseconds(50)
);

await circuitBreaker.Failure(new Exception("Test Exception"));
await circuitBreaker.Failure(new Exception("Another Exception After Trigger"));

Assert.That(triggerActionCalled, Is.True, "The trigger action should be called after repeated failures and timeout.");
}
}
}
4 changes: 3 additions & 1 deletion src/Transport/Receiving/MessagePump.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,13 @@ public Task Init(Func<MessageContext, Task> onMessage, Func<ErrorContext, Task<E
this.criticalError = criticalError;
pushSettings = settings;

Action noOp = () => { };

circuitBreaker = new RepeatedFailuresOverTimeCircuitBreaker($"'{settings.InputQueue}'",
timeToWaitBeforeTriggeringCircuitBreaker, ex =>
{
criticalError.Raise("Failed to receive message from Azure Service Bus.", ex);
});
}, noOp, noOp);

return Task.CompletedTask;
}
Expand Down
Loading