Skip to content

Commit

Permalink
harden BackoffSupervisorSpecs (#7420)
Browse files Browse the repository at this point in the history
* BackoffSupervisorSpec: fix `Watch` calls to `WatchAsync`

* prevent race condition with competing `Termination` messages

* add `AwaitAssert`

* fixed more race conditions
  • Loading branch information
Aaronontheweb authored Dec 19, 2024
1 parent 2a82323 commit d4aa3fa
Showing 1 changed file with 76 additions and 56 deletions.
132 changes: 76 additions & 56 deletions src/core/Akka.Tests/Pattern/BackoffSupervisorSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public async Task BackoffSupervisor_must_start_child_again_when_it_stops_when_us
var supervisor = Create(OnStopOptions());
supervisor.Tell(BackoffSupervisor.GetCurrentChild.Instance);
var c1 = (await ExpectMsgAsync<BackoffSupervisor.CurrentChild>()).Ref;
Watch(c1);
await WatchAsync(c1);
c1.Tell(PoisonPill.Instance);
await ExpectTerminatedAsync(c1);
await AwaitAssertAsync(async() =>
Expand All @@ -125,21 +125,6 @@ public async Task BackoffSupervisor_must_forward_messages_to_the_child()
[Fact]
public async Task BackoffSupervisor_must_support_custom_supervision_strategy()
{
Func<IActorRef, Task> assertCustomStrategy = async supervisor =>
{
supervisor.Tell(BackoffSupervisor.GetCurrentChild.Instance);
var c1 = (await ExpectMsgAsync<BackoffSupervisor.CurrentChild>()).Ref;
Watch(c1);
c1.Tell("boom");
await ExpectTerminatedAsync(c1);
await AwaitAssertAsync(async () =>
{
supervisor.Tell(BackoffSupervisor.GetCurrentChild.Instance);
// new instance
(await ExpectMsgAsync<BackoffSupervisor.CurrentChild>()).Ref.Should().NotBeSameAs(c1);
});
};

// TODO: use FilterException
await EventFilter.Exception<TestException>().ExpectAsync(2, async () =>
{
Expand All @@ -163,9 +148,25 @@ await EventFilter.Exception<TestException>().ExpectAsync(2, async () =>
return Directive.Escalate;
});

await assertCustomStrategy(Create(OnStopOptions().WithSupervisorStrategy(stoppingStrategy)));
await assertCustomStrategy(Create(OnFailureOptions().WithSupervisorStrategy(restartingStrategy)));
await AssertCustomStrategy(Create(OnStopOptions().WithSupervisorStrategy(stoppingStrategy)));
await AssertCustomStrategy(Create(OnFailureOptions().WithSupervisorStrategy(restartingStrategy)));
});
return;

async Task AssertCustomStrategy(IActorRef supervisor)
{
supervisor.Tell(BackoffSupervisor.GetCurrentChild.Instance);
var c1 = (await ExpectMsgAsync<BackoffSupervisor.CurrentChild>()).Ref;
await WatchAsync(c1);
c1.Tell("boom");
await ExpectTerminatedAsync(c1);
await AwaitAssertAsync(async () =>
{
supervisor.Tell(BackoffSupervisor.GetCurrentChild.Instance);
// new instance
(await ExpectMsgAsync<BackoffSupervisor.CurrentChild>()).Ref.Should().NotBeSameAs(c1);
});
}
}

[Fact]
Expand All @@ -177,7 +178,7 @@ await EventFilter.Exception<TestException>().ExpectAsync(1, async () =>
var supervisor = Create(OnStopOptions().WithDefaultStoppingStrategy().WithManualReset());
supervisor.Tell(BackoffSupervisor.GetCurrentChild.Instance);
var c1 = (await ExpectMsgAsync<BackoffSupervisor.CurrentChild>()).Ref;
Watch(c1);
await WatchAsync(c1);
supervisor.Tell(BackoffSupervisor.GetRestartCount.Instance);
(await ExpectMsgAsync<BackoffSupervisor.RestartCount>()).Count.Should().Be(0);

Expand Down Expand Up @@ -279,7 +280,7 @@ await EventFilter.Exception<TestException>().ExpectAsync(1, async() =>
supervisor.Tell(BackoffSupervisor.GetCurrentChild.Instance);

var c1 = (await ExpectMsgAsync<BackoffSupervisor.CurrentChild>()).Ref;
Watch(c1);
await WatchAsync(c1);
supervisor.Tell(BackoffSupervisor.GetRestartCount.Instance);
(await ExpectMsgAsync<BackoffSupervisor.RestartCount>()).Count.Should().Be(0);

Expand All @@ -306,7 +307,7 @@ await EventFilter.Exception<TestException>().ExpectAsync(1, async() =>
supervisor.Tell(BackoffSupervisor.GetCurrentChild.Instance);

var c1 = (await ExpectMsgAsync<BackoffSupervisor.CurrentChild>()).Ref;
Watch(c1);
await WatchAsync(c1);
supervisor.Tell(BackoffSupervisor.GetRestartCount.Instance);
(await ExpectMsgAsync<BackoffSupervisor.RestartCount>()).Count.Should().Be(0);

Expand Down Expand Up @@ -365,33 +366,42 @@ await AwaitConditionAsync(async() =>
return (await ExpectMsgAsync<BackoffSupervisor.CurrentChild>()).Ref;
}

Watch(supervisor);
await WatchAsync(supervisor);

supervisor.Tell(BackoffSupervisor.GetRestartCount.Instance);
(await ExpectMsgAsync<BackoffSupervisor.RestartCount>()).Count.Should().Be(0);

supervisor.Tell(BackoffSupervisor.GetCurrentChild.Instance);
var c1 = (await ExpectMsgAsync<BackoffSupervisor.CurrentChild>()).Ref;
Watch(c1);
await WatchAsync(c1);
c1.Tell(PoisonPill.Instance);
await ExpectTerminatedAsync(c1);

supervisor.Tell(BackoffSupervisor.GetRestartCount.Instance);
(await ExpectMsgAsync<BackoffSupervisor.RestartCount>()).Count.Should().Be(1);
// have to spin here because our message might get processed first, before the BackoffSupervisor can do its work
await AwaitAssertAsync(async () =>
{
supervisor.Tell(BackoffSupervisor.GetRestartCount.Instance);
(await ExpectMsgAsync<BackoffSupervisor.RestartCount>()).Count.Should().Be(1);
});


// This code looks suspicious, this might be the cause of the raciness
var c2 = await WaitForChild();
await AwaitAssertAsync(() => c2.ShouldNotBe(c1));
Watch(c2);
await WatchAsync(c2);
c2.Tell(PoisonPill.Instance);
await ExpectTerminatedAsync(c2);

supervisor.Tell(BackoffSupervisor.GetRestartCount.Instance);
(await ExpectMsgAsync<BackoffSupervisor.RestartCount>()).Count.Should().Be(2);
// have to spin here because our message might get processed first, before the BackoffSupervisor can do its work
await AwaitAssertAsync(async () =>
{
supervisor.Tell(BackoffSupervisor.GetRestartCount.Instance);
(await ExpectMsgAsync<BackoffSupervisor.RestartCount>()).Count.Should().Be(2);
});

var c3 = await WaitForChild();
await AwaitAssertAsync(() => c3.ShouldNotBe(c2));
Watch(c3);
await WatchAsync(c3);
c3.Tell(PoisonPill.Instance);
await ExpectTerminatedAsync(c3);
await ExpectTerminatedAsync(supervisor);
Expand All @@ -403,50 +413,60 @@ public async Task BackoffSupervisor_must_stop_restarting_the_child_after_reachin
await EventFilter.Exception<TestException>().ExpectAsync(3, async() =>
{
var supervisor = Create(OnFailureOptions(maxNrOfRetries: 2));

async Task<IActorRef> WaitForChild()
{
await AwaitConditionAsync(async () =>
{
supervisor.Tell(BackoffSupervisor.GetCurrentChild.Instance);
var c = (await ExpectMsgAsync<BackoffSupervisor.CurrentChild>()).Ref;
return !c.IsNobody();
}, TimeSpan.FromSeconds(1), TimeSpan.FromMilliseconds(50));

supervisor.Tell(BackoffSupervisor.GetCurrentChild.Instance);
return (await ExpectMsgAsync<BackoffSupervisor.CurrentChild>()).Ref;
}

Watch(supervisor);
var supervisorProbe = CreateTestProbe();
await supervisorProbe.WatchAsync(supervisor);

supervisor.Tell(BackoffSupervisor.GetRestartCount.Instance);
(await ExpectMsgAsync<BackoffSupervisor.RestartCount>()).Count.Should().Be(0);

supervisor.Tell(BackoffSupervisor.GetCurrentChild.Instance);
var c1 = (await ExpectMsgAsync<BackoffSupervisor.CurrentChild>()).Ref;
Watch(c1);
await WatchAsync(c1);
c1.Tell("boom");
await ExpectTerminatedAsync(c1);

supervisor.Tell(BackoffSupervisor.GetRestartCount.Instance);
(await ExpectMsgAsync<BackoffSupervisor.RestartCount>()).Count.Should().Be(1);
// have to spin here because our message might get processed first, before the BackoffSupervisor can do its work
await AwaitAssertAsync(async () =>
{
supervisor.Tell(BackoffSupervisor.GetRestartCount.Instance);
(await ExpectMsgAsync<BackoffSupervisor.RestartCount>()).Count.Should().Be(1);
});

// This code looks suspicious, this might be the cause of the raciness
var c2 = await WaitForChild();
await AwaitAssertAsync(() => c2.ShouldNotBe(c1));
Watch(c2);
await WatchAsync(c2);
c2.Tell("boom");
await ExpectTerminatedAsync(c2);

supervisor.Tell(BackoffSupervisor.GetRestartCount.Instance);
(await ExpectMsgAsync<BackoffSupervisor.RestartCount>()).Count.Should().Be(2);

// have to spin here because our message might get processed first, before the BackoffSupervisor can do its work
await AwaitAssertAsync(async () =>
{
supervisor.Tell(BackoffSupervisor.GetRestartCount.Instance);
(await ExpectMsgAsync<BackoffSupervisor.RestartCount>()).Count.Should().Be(2);
});

var c3 = await WaitForChild();
await AwaitAssertAsync(() => c3.ShouldNotBe(c2));
Watch(c3);
await WatchAsync(c3);
c3.Tell("boom");
await ExpectTerminatedAsync(c3);
await ExpectTerminatedAsync(supervisor);
await supervisorProbe.ExpectTerminatedAsync(supervisor);
return;

async Task<IActorRef> WaitForChild()
{
await AwaitConditionAsync(async () =>
{
supervisor.Tell(BackoffSupervisor.GetCurrentChild.Instance);
var c = (await ExpectMsgAsync<BackoffSupervisor.CurrentChild>()).Ref;
return !c.IsNobody();
}, TimeSpan.FromSeconds(1), TimeSpan.FromMilliseconds(50));

supervisor.Tell(BackoffSupervisor.GetCurrentChild.Instance);
return (await ExpectMsgAsync<BackoffSupervisor.CurrentChild>()).Ref;
}
});
}

Expand All @@ -458,8 +478,8 @@ public async Task BackoffSupervisor_must_stop_restarting_the_child_if_final_stop
supervisor.Tell(BackoffSupervisor.GetCurrentChild.Instance);
var c1 = (await ExpectMsgAsync<BackoffSupervisor.CurrentChild>()).Ref;
var parentSupervisor = CreateTestProbe();
Watch(c1);
parentSupervisor.Watch(supervisor);
await WatchAsync(c1);
await parentSupervisor.WatchAsync(supervisor);

supervisor.Tell(stopMessage);
await ExpectMsgAsync("stop");
Expand All @@ -476,8 +496,8 @@ public async Task BackoffSupervisor_must_not_stop_when_final_stop_message_has_no
var supervisor = Create(OnStopOptions(maxNrOfRetries: 100).WithFinalStopMessage(message => ReferenceEquals(message, stopMessage)));
supervisor.Tell(BackoffSupervisor.GetCurrentChild.Instance);
var c1 = (await ExpectMsgAsync<BackoffSupervisor.CurrentChild>()).Ref;
Watch(c1);
supervisorWatcher.Watch(supervisor);
await WatchAsync(c1);
await supervisorWatcher.WatchAsync(supervisor);

c1.Tell(PoisonPill.Instance);
await ExpectTerminatedAsync(c1);
Expand Down

0 comments on commit d4aa3fa

Please sign in to comment.