Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add missing cluster singleton detection #7363

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
//-----------------------------------------------------------------------

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Akka.Actor;
Expand All @@ -14,11 +15,18 @@
using Akka.Event;
using Akka.TestKit;
using Xunit;
using FluentAssertions;
using FluentAssertions.Extensions;
using Xunit.Abstractions;

namespace Akka.Cluster.Tools.Tests.Singleton
{
public class ClusterSingletonProxySpec : TestKit.Xunit2.TestKit
{
public ClusterSingletonProxySpec(ITestOutputHelper output): base(output: output)
{
}

[Fact]
public void ClusterSingletonProxy_must_correctly_identify_the_singleton()
{
Expand Down Expand Up @@ -67,25 +75,190 @@ await AwaitConditionAsync(
}
}

[Fact(DisplayName = "ClusterSingletonProxy should detect if its associated singleton failed to start after a period")]
public async Task ClusterSingletonProxySingletonTimeoutTest()
{
ActorSys seed = null;
ActorSys testSystem = null;

try
{
seed = new ActorSys(output: Output);
seed.Cluster.Join(seed.Cluster.SelfAddress);

// singleton proxy is waiting for a singleton in a non-existent role
testSystem = new ActorSys(
config: """
akka.cluster.singleton-proxy {
role = "non-existent"
log-singleton-identification-failure = true
singleton-identification-failure-period = 500ms
}
""",
output: Output);

testSystem.Sys.EventStream.Subscribe<ClusterSingletonProxy.IdentifySingletonTimedOut>(testSystem
.TestActor);

// Proxy should not try to detect missing singleton if it is not part of a cluster
await testSystem.ExpectNoMsgAsync(1.Seconds());

testSystem.Cluster.Join(seed.Cluster.SelfAddress);

// proxy will emit IdentifySingletonTimedOut event locally if it could not find its associated singleton
// within the detection period
var msg = await testSystem.ExpectMsgAsync<ClusterSingletonProxy.IdentifySingletonTimedOut>(3.Seconds());
msg.SingletonName.Should().Be("singleton");
msg.Role.Should().Be("non-existent");
msg.Duration.Should().Be(TimeSpan.FromMilliseconds(500));

// force seed to leave
seed.Cluster.Leave(seed.Cluster.SelfAddress);

// another event should be fired because the cluster topology changed
msg = await testSystem.ExpectMsgAsync<ClusterSingletonProxy.IdentifySingletonTimedOut>(3.Seconds());
msg.SingletonName.Should().Be("singleton");
msg.Role.Should().Be("non-existent");
msg.Duration.Should().Be(TimeSpan.FromMilliseconds(500));
}
finally
{
var tasks = new List<Task>();

if(seed is not null)
tasks.Add(seed.Sys.Terminate());
if(testSystem is not null)
tasks.Add(testSystem.Sys.Terminate());

if(tasks.Any())
await Task.WhenAll(tasks);
}
}

[Fact(DisplayName = "ClusterSingletonProxy should not start singleton identify detection if a singleton reference already found")]
public async Task ClusterSingletonProxySingletonTimeoutTest2()
{
const string seedConfig = """
akka.cluster {
roles = [seed] # only start singletons on seed role
min-nr-of-members = 1
singleton.role = seed # only start singletons on seed role
singleton-proxy.role = seed # only start singletons on seed role
}
""";

ActorSys seed = null;
ActorSys seed2 = null;
ActorSys testSystem = null;

try
{
seed = new ActorSys(config: seedConfig, output: Output);
seed.Cluster.Join(seed.Cluster.SelfAddress);

// need to make sure that cluster member age is correct. seed node should be oldest.
await AwaitConditionAsync(
() => Task.FromResult(Cluster.Get(seed.Sys).State.Members.Count(m => m.Status == MemberStatus.Up) == 1),
TimeSpan.FromSeconds(30));

seed2 = new ActorSys(config: seedConfig, output: Output);
seed2.Cluster.Join(seed.Cluster.SelfAddress);

// singleton proxy is waiting for a singleton in seed role
testSystem = new ActorSys(
config: """
akka.cluster {
roles = [proxy]
singleton.role = seed # only start singletons on seed role
singleton-proxy {
role = seed # only start singletons on seed role
log-singleton-identification-failure = true
singleton-identification-failure-period = 1s
}
}
""",
startSingleton: false,
output: Output);

testSystem.Sys.EventStream.Subscribe<ClusterSingletonProxy.IdentifySingletonTimedOut>(testSystem.TestActor);
testSystem.Cluster.Join(seed.Cluster.SelfAddress);

// need to make sure that cluster member age is correct. seed node should be oldest.
await AwaitConditionAsync(
() => Task.FromResult(Cluster.Get(testSystem.Sys).State.Members.Count(m => m.Status == MemberStatus.Up) == 3),
TimeSpan.FromSeconds(30));

testSystem.TestProxy("hello");

// timeout event should not fire
await testSystem.ExpectNoMsgAsync(1.5.Seconds());

// Second seed node left the cluster, no timeout should be fired because singleton is homed in the first seed
await seed2.Sys.Terminate();

// wait until MemberRemoved is triggered
await AwaitConditionAsync(
() => Task.FromResult(Cluster.Get(testSystem.Sys).State.Members.Count(m => m.Status == MemberStatus.Up) == 2),
TimeSpan.FromSeconds(30));

// timeout event should not fire
await testSystem.ExpectNoMsgAsync(1.5.Seconds());

// First seed node which homed the singleton left the cluster
await seed.Sys.Terminate();

// wait until MemberRemoved is triggered
await AwaitConditionAsync(
() => Task.FromResult(Cluster.Get(testSystem.Sys).State.Members.Count(m => m.Status == MemberStatus.Up) == 1),
TimeSpan.FromSeconds(30));

// Proxy will emit IdentifySingletonTimedOut event locally because it lost the singleton reference
// and no nodes are eligible to home the singleton
await testSystem.ExpectMsgAsync<ClusterSingletonProxy.IdentifySingletonTimedOut>(3.Seconds());
}
finally
{
var tasks = new List<Task>();

if(seed is not null)
tasks.Add(seed.Sys.Terminate());
if(seed2 is not null)
tasks.Add(seed2.Sys.Terminate());
if(testSystem is not null)
tasks.Add(testSystem.Sys.Terminate());

if(tasks.Any())
await Task.WhenAll(tasks);
}
}

private class ActorSys : TestKit.Xunit2.TestKit
{
public Cluster Cluster { get; }

public ActorSys(string name = "ClusterSingletonProxySystem", Address joinTo = null, int bufferSize = 1000)
: base(ActorSystem.Create(name, ConfigurationFactory.ParseString(_cfg).WithFallback(TestKit.Configs.TestConfigs.DefaultConfig)))
public ActorSys(string name = "ClusterSingletonProxySystem", Address joinTo = null, int bufferSize = 1000, string config = null, bool startSingleton = true, ITestOutputHelper output = null)
: base(ActorSystem.Create(
name: name,
config: config is null
? ConfigurationFactory.ParseString(_cfg).WithFallback(DefaultConfig)
: ConfigurationFactory.ParseString(config).WithFallback(_cfg).WithFallback(DefaultConfig)),
output: output)
{
Cluster = Cluster.Get(Sys);
if (joinTo != null)
{
Cluster.Join(joinTo);
}

Cluster.RegisterOnMemberUp(() =>
if (startSingleton)
{
Sys.ActorOf(ClusterSingletonManager.Props(Props.Create(() => new Singleton()), PoisonPill.Instance,
ClusterSingletonManagerSettings.Create(Sys)
.WithRemovalMargin(TimeSpan.FromSeconds(5))), "singletonmanager");
});
Cluster.RegisterOnMemberUp(() =>
{
Sys.ActorOf(ClusterSingletonManager.Props(Props.Create(() => new Singleton()), PoisonPill.Instance,
ClusterSingletonManagerSettings.Create(Sys)
.WithRemovalMargin(TimeSpan.FromSeconds(5))), "singletonmanager");
});
}

Proxy =
Sys.ActorOf(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,35 @@ internal sealed class TryToIdentifySingleton : INoSerializationVerificationNeede
private TryToIdentifySingleton() { }
}

/// <summary>
/// Used by the proxy to signal that no singleton has been found after a period of time
/// </summary>
internal sealed class IdentifySingletonTimeOutTick : INoSerializationVerificationNeeded
{
/// <summary>
/// TBD
/// </summary>
public static IdentifySingletonTimeOutTick Instance { get; } = new();
private IdentifySingletonTimeOutTick() { }
}

/// <summary>
/// Used by the proxy to signal that no singleton has been found after a period of time
/// </summary>
public sealed class IdentifySingletonTimedOut : INoSerializationVerificationNeeded
{
public IdentifySingletonTimedOut(string singletonName, string role, TimeSpan duration)
{
SingletonName = singletonName;
Role = role;
Duration = duration;
}

public string SingletonName { get; }
public string Role { get; }
public TimeSpan Duration { get; }
}

/// <summary>
/// Returns default HOCON configuration for the cluster singleton.
/// </summary>
Expand Down Expand Up @@ -85,6 +114,7 @@ public static Props Props(string singletonManagerPath, ClusterSingletonProxySett
private string _identityId;
private IActorRef _singleton = null;
private ICancelable _identityTimer = null;
private ICancelable _identityTimeoutTimer = null;
Arkatufus marked this conversation as resolved.
Show resolved Hide resolved
private ImmutableSortedSet<Member> _membersByAge;
private ILoggingAdapter _log;

Expand All @@ -110,7 +140,12 @@ public ClusterSingletonProxy(string singletonManagerPath, ClusterSingletonProxyS
if (m.Member.UniqueAddress.Equals(_cluster.SelfUniqueAddress))
Context.Stop(Self);
else
{
Remove(m.Member);

// start or reset identify timeout every time a member is removed (excluding self)
TrackIdentifyTimeout();
}
});
Receive<ClusterEvent.IMemberEvent>(_ =>
{
Expand Down Expand Up @@ -139,6 +174,22 @@ public ClusterSingletonProxy(string singletonManagerPath, ClusterSingletonProxyS
Context.ActorSelection(singletonAddress).Tell(new Identify(_identityId));
}
});
Receive<IdentifySingletonTimeOutTick>(_ =>
{
// We somehow missed a CancelTimer() and a singleton reference was found when we waited,
// ignoring the timeout tick message.
if (_singleton is not null)
return;

Log.Warning(
"ClusterSingletonProxy failed to find an associated singleton named [{0}] in role [{1}] after {2} seconds.",
_settings.SingletonName, _settings.Role, _settings.SingletonIdentificationFailurePeriod.TotalSeconds);

Context.System.EventStream.Publish(new IdentifySingletonTimedOut(
singletonName: _settings.SingletonName,
role: _settings.Role,
duration: _settings.SingletonIdentificationFailurePeriod));
});
Receive<Terminated>(terminated =>
{
if (Equals(_singleton, terminated.ActorRef))
Expand Down Expand Up @@ -191,6 +242,12 @@ private void CancelTimer()
_identityTimer.Cancel();
_identityTimer = null;
}

if (_identityTimeoutTimer is not null)
{
_identityTimeoutTimer.Cancel();
_identityTimeoutTimer = null;
}
}

private bool MatchingRole(Member member)
Expand Down Expand Up @@ -222,6 +279,29 @@ private void IdentifySingleton()
receiver: Self,
message: TryToIdentifySingleton.Instance,
sender: Self);

// reset identify timeout every time we try to identify a new singleton
TrackIdentifyTimeout();
}

private void TrackIdentifyTimeout()
{
if (_identityTimeoutTimer is not null)
{
_identityTimeoutTimer.Cancel();
_identityTimeoutTimer = null;
}

// Don't start the timer if we already have a singleton reference
if (_singleton is not null)
return;

if(_settings.LogSingletonIdentificationFailure)
_identityTimeoutTimer = Context.System.Scheduler.ScheduleTellOnceCancelable(
delay: _settings.SingletonIdentificationFailurePeriod,
receiver: Self,
message: IdentifySingletonTimeOutTick.Instance,
sender: Self);
}

private void TrackChanges(Action block)
Expand All @@ -242,6 +322,9 @@ private void Add(Member member)
_membersByAge = _membersByAge.Remove(member); //replace
_membersByAge = _membersByAge.Add(member);
});

// start or reset identify timeout every time a new member joined (including self)
TrackIdentifyTimeout();
}

private void Remove(Member member)
Expand Down
Loading
Loading