Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Akka.Cluster.Tools.Tests: disable auto-downing on restart specs #7214

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
using FluentAssertions;
using Microsoft.Extensions.Logging;
using Xunit;
using Xunit.Abstractions;

namespace Akka.Cluster.Tools.Tests.Singleton
{
Expand Down Expand Up @@ -77,14 +78,14 @@ protected TestException(SerializationInfo info, StreamingContext context)
}
}

private Cluster cluster;
private TestLeaseExt testLeaseExt;
private readonly Cluster _cluster;
private readonly TestLeaseExt _testLeaseExt;

private AtomicCounter counter = new(0);
private TimeSpan shortDuration = TimeSpan.FromMilliseconds(50);
private string leaseOwner;
private readonly AtomicCounter _counter = new(0);
private readonly TimeSpan _shortDuration = TimeSpan.FromMilliseconds(50);
private readonly string _leaseOwner;

public ClusterSingletonLeaseSpec() : base(ConfigurationFactory.ParseString(@"
public ClusterSingletonLeaseSpec(ITestOutputHelper output) : base(ConfigurationFactory.ParseString(@"
#akka.loglevel = INFO
akka.loglevel = DEBUG
akka.actor.provider = ""cluster""
Expand All @@ -99,22 +100,22 @@ public ClusterSingletonLeaseSpec() : base(ConfigurationFactory.ParseString(@"
hostname = ""127.0.0.1""
port = 0
}
}").WithFallback(TestLease.Configuration))
}").WithFallback(TestLease.Configuration), output)
{

cluster = Cluster.Get(Sys);
testLeaseExt = TestLeaseExt.Get(Sys);
_cluster = Cluster.Get(Sys);
_testLeaseExt = TestLeaseExt.Get(Sys);

leaseOwner = cluster.SelfMember.Address.HostPort();
_leaseOwner = _cluster.SelfMember.Address.HostPort();

cluster.Join(cluster.SelfAddress);
_cluster.Join(_cluster.SelfAddress);
AwaitAssert(() =>
{
cluster.SelfMember.Status.ShouldBe(MemberStatus.Up);
_cluster.SelfMember.Status.ShouldBe(MemberStatus.Up);
});
}

private string NextName() => $"important-{counter.GetAndIncrement()}";
private string NextName() => $"important-{_counter.GetAndIncrement()}";

private ClusterSingletonManagerSettings NextSettings() => ClusterSingletonManagerSettings.Create(Sys).WithSingletonName(NextName());

Expand All @@ -133,10 +134,10 @@ public void ClusterSingleton_with_lease_should_not_start_until_lease_is_availabl
TestLease testLease = null;
AwaitAssert(() =>
{
testLease = testLeaseExt.GetTestLease(LeaseNameFor(settings));
testLease = _testLeaseExt.GetTestLease(LeaseNameFor(settings));
}); // allow singleton manager to create the lease

probe.ExpectNoMsg(shortDuration);
probe.ExpectNoMsg(_shortDuration);
testLease.InitialPromise.SetResult(true);
probe.ExpectMsg("preStart");
}
Expand All @@ -154,12 +155,12 @@ public void ClusterSingleton_with_lease_should_do_not_start_if_lease_acquire_ret
TestLease testLease = null;
AwaitAssert(() =>
{
testLease = testLeaseExt.GetTestLease(LeaseNameFor(settings));
testLease = _testLeaseExt.GetTestLease(LeaseNameFor(settings));
}); // allow singleton manager to create the lease

probe.ExpectNoMsg(shortDuration);
probe.ExpectNoMsg(_shortDuration);
testLease.InitialPromise.SetResult(false);
probe.ExpectNoMsg(shortDuration);
probe.ExpectNoMsg(_shortDuration);
}

[Fact]
Expand All @@ -175,17 +176,17 @@ public void ClusterSingleton_with_lease_should_retry_trying_to_get_lease_if_acqu
TestLease testLease = null;
AwaitAssert(() =>
{
testLease = testLeaseExt.GetTestLease(LeaseNameFor(settings));
testLease = _testLeaseExt.GetTestLease(LeaseNameFor(settings));
}); // allow singleton manager to create the lease

testLease.Probe.ExpectMsg(new TestLease.AcquireReq(leaseOwner));
singletonProbe.ExpectNoMsg(shortDuration);
TaskCompletionSource<bool> nextResponse = new TaskCompletionSource<bool>();
testLease.Probe.ExpectMsg(new TestLease.AcquireReq(_leaseOwner));
singletonProbe.ExpectNoMsg(_shortDuration);
var nextResponse = new TaskCompletionSource<bool>();

testLease.SetNextAcquireResult(nextResponse.Task);
testLease.InitialPromise.SetResult(false);
testLease.Probe.ExpectMsg(new TestLease.AcquireReq(leaseOwner));
singletonProbe.ExpectNoMsg(shortDuration);
testLease.Probe.ExpectMsg(new TestLease.AcquireReq(_leaseOwner));
singletonProbe.ExpectNoMsg(_shortDuration);
nextResponse.SetResult(true);
singletonProbe.ExpectMsg("preStart");
}
Expand All @@ -203,13 +204,13 @@ public void ClusterSingleton_with_lease_should_do_not_start_if_lease_acquire_fai
TestLease testLease = null;
AwaitAssert(() =>
{
testLease = testLeaseExt.GetTestLease(LeaseNameFor(settings));
testLease = _testLeaseExt.GetTestLease(LeaseNameFor(settings));
}); // allow singleton manager to create the lease


probe.ExpectNoMsg(shortDuration);
probe.ExpectNoMsg(_shortDuration);
testLease.InitialPromise.SetException(new TestException("no lease for you"));
probe.ExpectNoMsg(shortDuration);
probe.ExpectNoMsg(_shortDuration);
}

[Fact]
Expand All @@ -225,16 +226,16 @@ public void ClusterSingleton_with_lease_should_retry_trying_to_get_lease_if_acqu
TestLease testLease = null;
AwaitAssert(() =>
{
testLease = testLeaseExt.GetTestLease(LeaseNameFor(settings));
testLease = _testLeaseExt.GetTestLease(LeaseNameFor(settings));
}); // allow singleton manager to create the lease

testLease.Probe.ExpectMsg(new TestLease.AcquireReq(leaseOwner));
singletonProbe.ExpectNoMsg(shortDuration);
testLease.Probe.ExpectMsg(new TestLease.AcquireReq(_leaseOwner));
singletonProbe.ExpectNoMsg(_shortDuration);
TaskCompletionSource<bool> nextResponse = new TaskCompletionSource<bool>();
testLease.SetNextAcquireResult(nextResponse.Task);
testLease.InitialPromise.SetException(new TestException("no lease for you"));
testLease.Probe.ExpectMsg(new TestLease.AcquireReq(leaseOwner));
singletonProbe.ExpectNoMsg(shortDuration);
testLease.Probe.ExpectMsg(new TestLease.AcquireReq(_leaseOwner));
singletonProbe.ExpectNoMsg(_shortDuration);
nextResponse.SetResult(true);
singletonProbe.ExpectMsg("preStart");
}
Expand All @@ -252,19 +253,19 @@ public void ClusterSingleton_with_lease_should_stop_singleton_if_the_lease_fails
TestLease testLease = null;
AwaitAssert(() =>
{
testLease = testLeaseExt.GetTestLease(LeaseNameFor(settings));
testLease = _testLeaseExt.GetTestLease(LeaseNameFor(settings));
}); // allow singleton manager to create the lease

testLease.Probe.ExpectMsg(new TestLease.AcquireReq(leaseOwner));
testLease.Probe.ExpectMsg(new TestLease.AcquireReq(_leaseOwner));
testLease.InitialPromise.SetResult(true);
lifecycleProbe.ExpectMsg("preStart");
var callback = testLease.GetCurrentCallback();
callback(null);
lifecycleProbe.ExpectMsg("postStop");
testLease.Probe.ExpectMsg(new TestLease.ReleaseReq(leaseOwner));
testLease.Probe.ExpectMsg(new TestLease.ReleaseReq(_leaseOwner));

// should try and reacquire lease
testLease.Probe.ExpectMsg(new TestLease.AcquireReq(leaseOwner));
testLease.Probe.ExpectMsg(new TestLease.AcquireReq(_leaseOwner));
lifecycleProbe.ExpectMsg("preStart");
}

Expand All @@ -281,15 +282,15 @@ public void ClusterSingleton_with_lease_should_release_lease_when_leaving_oldest
TestLease testLease = null;
AwaitAssert(() =>
{
testLease = testLeaseExt.GetTestLease(LeaseNameFor(settings));
testLease = _testLeaseExt.GetTestLease(LeaseNameFor(settings));
}); // allow singleton manager to create the lease

singletonProbe.ExpectNoMsg(shortDuration);
testLease.Probe.ExpectMsg(new TestLease.AcquireReq(leaseOwner));
singletonProbe.ExpectNoMsg(_shortDuration);
testLease.Probe.ExpectMsg(new TestLease.AcquireReq(_leaseOwner));
testLease.InitialPromise.SetResult(true);
singletonProbe.ExpectMsg("preStart");
cluster.Leave(cluster.SelfAddress);
testLease.Probe.ExpectMsg(new TestLease.ReleaseReq(leaseOwner));
_cluster.Leave(_cluster.SelfAddress);
testLease.Probe.ExpectMsg(new TestLease.ReleaseReq(_leaseOwner));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
using Akka.TestKit;
using FluentAssertions;
using Xunit;
using Xunit.Abstractions;

namespace Akka.Cluster.Tools.Tests.Singleton
{
Expand All @@ -25,26 +26,33 @@ public class ClusterSingletonRestart2Spec : AkkaSpec
private readonly ActorSystem _sys3;
private ActorSystem _sys4 = null;

public ClusterSingletonRestart2Spec() : base(@"
akka.loglevel = INFO
akka.actor.provider = ""cluster""
akka.cluster.roles = [singleton]
akka.cluster.auto-down-unreachable-after = 2s
akka.cluster.singleton.min-number-of-hand-over-retries = 5
akka.remote {
dot-netty.tcp {
hostname = ""127.0.0.1""
port = 0
}
}")
public ClusterSingletonRestart2Spec(ITestOutputHelper output) : base("""

akka.loglevel = INFO
akka.actor.provider = "cluster"
akka.cluster.roles = [singleton]
#akka.cluster.auto-down-unreachable-after = 2s
akka.cluster.singleton.min-number-of-hand-over-retries = 5
akka.remote {
dot-netty.tcp {
hostname = "127.0.0.1"
port = 0
}
}
""", output)
{
_sys1 = ActorSystem.Create(Sys.Name, Sys.Settings.Config);
_sys2 = ActorSystem.Create(Sys.Name, Sys.Settings.Config);
_sys3 = ActorSystem.Create(Sys.Name, ConfigurationFactory.ParseString("akka.cluster.roles = [other]")
.WithFallback(Sys.Settings.Config));

// ensure xunit output is captured
InitializeLogger(_sys1);
InitializeLogger(_sys2);
InitializeLogger(_sys3);
}

public void Join(ActorSystem from, ActorSystem to)
public Task Join(ActorSystem from, ActorSystem to)
{
if (Cluster.Get(from).SelfRoles.Contains("singleton"))
{
Expand All @@ -54,9 +62,9 @@ public void Join(ActorSystem from, ActorSystem to)
}


Within(TimeSpan.FromSeconds(45), () =>
return WithinAsync(TimeSpan.FromSeconds(45), () =>
{
AwaitAssert(() =>
return AwaitAssertAsync(() =>
{
Cluster.Get(from).Join(Cluster.Get(to).SelfAddress);
Cluster.Get(from).State.Members.Select(x => x.UniqueAddress).Should().Contain(Cluster.Get(from).SelfUniqueAddress);
Expand All @@ -70,56 +78,61 @@ public void Join(ActorSystem from, ActorSystem to)
}

[Fact]
public void Restarting_cluster_node_during_hand_over_must_restart_singletons_in_restarted_node()
public async Task Restarting_cluster_node_during_hand_over_must_restart_singletons_in_restarted_node()
{
Join(_sys1, _sys1);
Join(_sys2, _sys1);
Join(_sys3, _sys1);
var joinTasks = new[]
{
Join(_sys1, _sys1),
Join(_sys2, _sys1),
Join(_sys3, _sys1)
};
await Task.WhenAll(joinTasks);

var proxy3 = _sys3.ActorOf(
ClusterSingletonProxy.Props("user/echo", ClusterSingletonProxySettings.Create(_sys3).WithRole("singleton")), "proxy3");

Within(TimeSpan.FromSeconds(5), () =>
await WithinAsync(TimeSpan.FromSeconds(5), () =>
{
AwaitAssert(() =>
return AwaitAssertAsync(async () =>
{
var probe = CreateTestProbe(_sys3);
proxy3.Tell("hello", probe.Ref);
probe.ExpectMsg<UniqueAddress>(TimeSpan.FromSeconds(1))
.Should()
var msg = await probe.ExpectMsgAsync<UniqueAddress>(TimeSpan.FromSeconds(1));
msg.Should()
.Be(Cluster.Get(_sys1).SelfUniqueAddress);
});
});

Cluster.Get(_sys1).Leave(Cluster.Get(_sys1).SelfAddress);


// ReSharper disable once PossibleInvalidOperationException
var sys2Port = Cluster.Get(_sys2).SelfAddress.Port.Value; // grab value before shutdown
// at the same time, shutdown sys2, which would be the expected next singleton node
Shutdown(_sys2);
// it will be downed by the join attempts of the new incarnation

// then restart it
// ReSharper disable once PossibleInvalidOperationException
var sys2Port = Cluster.Get(_sys2).SelfAddress.Port.Value;
var sys4Config = ConfigurationFactory.ParseString(@"akka.remote.dot-netty.tcp.port=" + sys2Port)
.WithFallback(_sys1.Settings.Config);
_sys4 = ActorSystem.Create(_sys1.Name, sys4Config);
InitializeLogger(_sys4); // ensure xunit output is captured

Join(_sys4, _sys3);
await Join(_sys4, _sys3);

// let it stabilize
Task.Delay(TimeSpan.FromSeconds(5)).Wait();
//Task.Delay(TimeSpan.FromSeconds(5)).Wait();

Within(TimeSpan.FromSeconds(10), () =>
await WithinAsync(TimeSpan.FromSeconds(10), () =>
{
AwaitAssert(() =>
return AwaitAssertAsync(async () =>
{
var probe = CreateTestProbe(_sys3);
proxy3.Tell("hello2", probe.Ref);

// note that sys3 doesn't have the required singleton role, so singleton instance should be
// on the restarted node
probe.ExpectMsg<UniqueAddress>(TimeSpan.FromSeconds(1))
.Should()
var msg = await probe.ExpectMsgAsync<UniqueAddress>(TimeSpan.FromSeconds(1));
msg.Should()
.Be(Cluster.Get(_sys4).SelfUniqueAddress);
});
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public ClusterSingletonRestart3Spec(ITestOutputHelper output) : base(@"
akka.loglevel = DEBUG
akka.actor.provider = ""cluster""
akka.cluster.app-version = ""1.0.0""
akka.cluster.auto-down-unreachable-after = 2s
#akka.cluster.auto-down-unreachable-after = 2s
akka.cluster.singleton.min-number-of-hand-over-retries = 5
akka.cluster.singleton.consider-app-version = true
akka.remote {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
using Akka.TestKit.TestActors;
using FluentAssertions;
using Xunit;
using Xunit.Abstractions;

namespace Akka.Cluster.Tools.Tests.Singleton
{
Expand All @@ -25,16 +26,16 @@ public class ClusterSingletonRestartSpec : AkkaSpec
private readonly ActorSystem _sys2;
private ActorSystem _sys3 = null;

public ClusterSingletonRestartSpec() : base(@"
public ClusterSingletonRestartSpec(ITestOutputHelper output) : base(@"
akka.loglevel = INFO
akka.actor.provider = ""cluster""
akka.cluster.auto-down-unreachable-after = 2s
#akka.cluster.auto-down-unreachable-after = 2s
akka.remote {
dot-netty.tcp {
hostname = ""127.0.0.1""
port = 0
}
}")
}", output)
{
_sys1 = ActorSystem.Create(Sys.Name, Sys.Settings.Config);
_sys2 = ActorSystem.Create(Sys.Name, Sys.Settings.Config);
Expand Down
Loading