diff --git a/src/core/Akka.Cluster.Tests.MultiNode/TransitionSpec.cs b/src/core/Akka.Cluster.Tests.MultiNode/TransitionSpec.cs index e4e20ffa445..257ad4a9109 100644 --- a/src/core/Akka.Cluster.Tests.MultiNode/TransitionSpec.cs +++ b/src/core/Akka.Cluster.Tests.MultiNode/TransitionSpec.cs @@ -122,12 +122,12 @@ private void AwaitMemberStatus(Address address, MemberStatus status) private void LeaderActions() { - Cluster.ClusterCore.Tell(InternalClusterAction.LeaderActionsTick.Instance); + Cluster.TellCoreSafe(InternalClusterAction.LeaderActionsTick.Instance); } private void ReapUnreachable() { - Cluster.ClusterCore.Tell(InternalClusterAction.ReapUnreachableTick.Instance); + Cluster.TellCoreSafe(InternalClusterAction.ReapUnreachableTick.Instance); } private int _gossipBarrierCounter = 0; @@ -148,7 +148,7 @@ private void GossipTo(RoleName fromRole, RoleName toRole) { EnterBarrier("before-gossip-" + _gossipBarrierCounter); // send gossip - Cluster.ClusterCore.Tell(new InternalClusterAction.SendGossipTo(GetAddress(toRole))); + Cluster.TellCoreSafe(new InternalClusterAction.SendGossipTo(GetAddress(toRole))); // gossip chat will synchronize the views AwaitCondition(() => ImmutableHashSet.Create(fromRole, toRole).Except(SeenLatestGossip()).IsEmpty); EnterBarrier("after-gossip-" + _gossipBarrierCounter); @@ -286,7 +286,7 @@ private void A_Cluster_must_perform_correct_transitions_when_third_joins_second( RunOn(() => { // send gossip - Cluster.ClusterCore.Tell(new InternalClusterAction.SendGossipTo(GetAddress(other2))); + Cluster.TellCoreSafe(new InternalClusterAction.SendGossipTo(GetAddress(other2))); }, other1); RunOn(() => diff --git a/src/core/Akka.Cluster.Tests/ClusterSpec.cs b/src/core/Akka.Cluster.Tests/ClusterSpec.cs index 58dfce90905..2a9ca9b55ad 100644 --- a/src/core/Akka.Cluster.Tests/ClusterSpec.cs +++ b/src/core/Akka.Cluster.Tests/ClusterSpec.cs @@ -61,7 +61,7 @@ public ClusterSpec(ITestOutputHelper output) internal void LeaderActions() { - _cluster.ClusterCore.Tell(InternalClusterAction.LeaderActionsTick.Instance); + _cluster.TellCoreSafe(InternalClusterAction.LeaderActionsTick.Instance); } [Fact] diff --git a/src/core/Akka.Cluster/Cluster.cs b/src/core/Akka.Cluster/Cluster.cs index 4beb07754c7..8213d32be58 100644 --- a/src/core/Akka.Cluster/Cluster.cs +++ b/src/core/Akka.Cluster/Cluster.cs @@ -126,6 +126,9 @@ public Cluster(ActorSystemImpl system) LogInfo("Starting up..."); + var clusterCoreTaskSource = new TaskCompletionSource(); + _clusterCoreTask = clusterCoreTaskSource.Task; + FailureDetector = new DefaultFailureDetectorRegistry
(() => FailureDetectorLoader.Load(Settings.FailureDetectorImplementationClass, Settings.FailureDetectorConfig, system)); @@ -140,27 +143,26 @@ public Cluster(ActorSystemImpl system) _readView = new ClusterReadView(this); // force the underlying system to start - _clusterCore = GetClusterCoreRef().Result; - - system.RegisterOnTermination(Shutdown); + _ = Task.Run(async () => + { + try + { + _clusterCore = await _clusterDaemons.Ask(new InternalClusterAction.GetClusterCoreRef(this), System.Settings.CreationTimeout).ConfigureAwait(false); + clusterCoreTaskSource.SetResult(_clusterCore); - LogInfo("Started up successfully"); - } + system.RegisterOnTermination(Shutdown); + LogInfo("Started up successfully"); + } + catch (Exception ex) + { + _log.Error(ex, "Failed to startup Cluster. You can try to increase 'akka.actor.creation-timeout'."); + Shutdown(); + System.DeadLetters.Tell(ex); //don't re-throw the error. Just log it. - private async Task GetClusterCoreRef() - { - var timeout = System.Settings.CreationTimeout; - try - { - return await _clusterDaemons.Ask(new InternalClusterAction.GetClusterCoreRef(this), timeout).ConfigureAwait(false); - } - catch (Exception ex) - { - _log.Error(ex, "Failed to startup Cluster. You can try to increase 'akka.actor.creation-timeout'."); - Shutdown(); - System.DeadLetters.Tell(ex); //don't re-throw the error. Just log it. - return System.DeadLetters; - } + _clusterCore = System.DeadLetters; + clusterCoreTaskSource.SetResult(_clusterCore); + } + }); } /// @@ -197,7 +199,7 @@ public void Subscribe(IActorRef subscriber, ClusterEvent.SubscriptionInitialStat if (!to.All(t => typeof(ClusterEvent.IClusterDomainEvent).IsAssignableFrom(t))) throw new ArgumentException($"Subscribe to `IClusterDomainEvent` or subclasses, was [{string.Join(", ", to.Select(c => c.Name))}]", nameof(to)); - ClusterCore.Tell(new InternalClusterAction.Subscribe(subscriber, initialStateMode, ImmutableHashSet.Create(to))); + TellCoreSafe(new InternalClusterAction.Subscribe(subscriber, initialStateMode, ImmutableHashSet.Create(to))); } /// @@ -216,7 +218,7 @@ public void Unsubscribe(IActorRef subscriber) /// The event type that the actor no longer receives. public void Unsubscribe(IActorRef subscriber, Type to) { - ClusterCore.Tell(new InternalClusterAction.Unsubscribe(subscriber, to)); + TellCoreSafe(new InternalClusterAction.Unsubscribe(subscriber, to)); } /// @@ -227,7 +229,7 @@ public void Unsubscribe(IActorRef subscriber, Type to) /// The actor that receives the current cluster state. public void SendCurrentClusterState(IActorRef receiver) { - ClusterCore.Tell(new InternalClusterAction.SendCurrentClusterState(receiver)); + TellCoreSafe(new InternalClusterAction.SendCurrentClusterState(receiver)); } /// @@ -241,7 +243,7 @@ public void SendCurrentClusterState(IActorRef receiver) /// The address of the node we want to join. public void Join(Address address) { - ClusterCore.Tell(new ClusterUserAction.JoinTo(FillLocal(address))); + TellCoreSafe(new ClusterUserAction.JoinTo(FillLocal(address))); } /// @@ -299,8 +301,7 @@ private Address FillLocal(Address address) /// TBD public void JoinSeedNodes(IEnumerable
seedNodes) { - ClusterCore.Tell( - new InternalClusterAction.JoinSeedNodes(seedNodes.Select(FillLocal).ToImmutableList())); + TellCoreSafe(new InternalClusterAction.JoinSeedNodes(seedNodes.Select(FillLocal).ToImmutableList())); } /// @@ -353,7 +354,7 @@ public void Leave(Address address) LeaveSelf(); } else - ClusterCore.Tell(new ClusterUserAction.Leave(FillLocal(address))); + TellCoreSafe(new ClusterUserAction.Leave(FillLocal(address))); } /// @@ -400,7 +401,7 @@ private Task LeaveSelf() _clusterDaemons.Tell(new InternalClusterAction.AddOnMemberRemovedListener(() => tcs.TrySetResult(null))); // Send leave message - ClusterCore.Tell(new ClusterUserAction.Leave(SelfAddress)); + TellCoreSafe(new ClusterUserAction.Leave(SelfAddress)); return tcs.Task; } @@ -416,7 +417,7 @@ private Task LeaveSelf() /// The address of the node we're going to mark as public void Down(Address address) { - ClusterCore.Tell(new ClusterUserAction.Down(FillLocal(address))); + TellCoreSafe(new ClusterUserAction.Down(FillLocal(address))); } /// @@ -563,22 +564,40 @@ internal void Shutdown() private readonly IActorRef _clusterDaemons; private IActorRef _clusterCore; + private Task _clusterCoreTask; /// /// TBD /// + [Obsolete("use TellCoreSafe()")] internal IActorRef ClusterCore { get { - if (_clusterCore == null) + if (_clusterCore is null) { - _clusterCore = GetClusterCoreRef().Result; + if (_clusterCoreTask is null) + throw new InvalidOperationException(); + + _clusterCore = _clusterCoreTask.Result; } return _clusterCore; } } + /// + /// INTERNAL API. + /// + /// We have to wait for cluster core to startup before we can use it + /// + internal void TellCoreSafe(object message) + { + if (_clusterCore is null) + _ = _clusterCoreTask.ContinueWith((t, m) => t.Result.Tell(m), message); + else + _clusterCore.Tell(message); + } + /// /// INTERNAL API. ///