Skip to content

Commit

Permalink
Fix Akka.Cluster Startup (#5398)
Browse files Browse the repository at this point in the history
  • Loading branch information
Zetanova authored Nov 25, 2021
1 parent 086a110 commit bcd4bc3
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 35 deletions.
8 changes: 4 additions & 4 deletions src/core/Akka.Cluster.Tests.MultiNode/TransitionSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,12 @@ private void AwaitMemberStatus(Address address, MemberStatus status)

private void LeaderActions()
{
Cluster.ClusterCore.Tell(InternalClusterAction.LeaderActionsTick.Instance);
Cluster.TellCoreSafe(InternalClusterAction.LeaderActionsTick.Instance);
}

private void ReapUnreachable()
{
Cluster.ClusterCore.Tell(InternalClusterAction.ReapUnreachableTick.Instance);
Cluster.TellCoreSafe(InternalClusterAction.ReapUnreachableTick.Instance);
}

private int _gossipBarrierCounter = 0;
Expand All @@ -148,7 +148,7 @@ private void GossipTo(RoleName fromRole, RoleName toRole)
{
EnterBarrier("before-gossip-" + _gossipBarrierCounter);
// send gossip
Cluster.ClusterCore.Tell(new InternalClusterAction.SendGossipTo(GetAddress(toRole)));
Cluster.TellCoreSafe(new InternalClusterAction.SendGossipTo(GetAddress(toRole)));
// gossip chat will synchronize the views
AwaitCondition(() => ImmutableHashSet.Create(fromRole, toRole).Except(SeenLatestGossip()).IsEmpty);
EnterBarrier("after-gossip-" + _gossipBarrierCounter);
Expand Down Expand Up @@ -286,7 +286,7 @@ private void A_Cluster_must_perform_correct_transitions_when_third_joins_second(
RunOn(() =>
{
// send gossip
Cluster.ClusterCore.Tell(new InternalClusterAction.SendGossipTo(GetAddress(other2)));
Cluster.TellCoreSafe(new InternalClusterAction.SendGossipTo(GetAddress(other2)));
}, other1);

RunOn(() =>
Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka.Cluster.Tests/ClusterSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public ClusterSpec(ITestOutputHelper output)

internal void LeaderActions()
{
_cluster.ClusterCore.Tell(InternalClusterAction.LeaderActionsTick.Instance);
_cluster.TellCoreSafe(InternalClusterAction.LeaderActionsTick.Instance);
}

[Fact]
Expand Down
79 changes: 49 additions & 30 deletions src/core/Akka.Cluster/Cluster.cs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ public Cluster(ActorSystemImpl system)

LogInfo("Starting up...");

var clusterCoreTaskSource = new TaskCompletionSource<IActorRef>();
_clusterCoreTask = clusterCoreTaskSource.Task;

FailureDetector = new DefaultFailureDetectorRegistry<Address>(() => FailureDetectorLoader.Load(Settings.FailureDetectorImplementationClass, Settings.FailureDetectorConfig,
system));

Expand All @@ -140,27 +143,26 @@ public Cluster(ActorSystemImpl system)
_readView = new ClusterReadView(this);

// force the underlying system to start
_clusterCore = GetClusterCoreRef().Result;

system.RegisterOnTermination(Shutdown);
_ = Task.Run(async () =>
{
try
{
_clusterCore = await _clusterDaemons.Ask<IActorRef>(new InternalClusterAction.GetClusterCoreRef(this), System.Settings.CreationTimeout).ConfigureAwait(false);
clusterCoreTaskSource.SetResult(_clusterCore);

LogInfo("Started up successfully");
}
system.RegisterOnTermination(Shutdown);
LogInfo("Started up successfully");
}
catch (Exception ex)
{
_log.Error(ex, "Failed to startup Cluster. You can try to increase 'akka.actor.creation-timeout'.");
Shutdown();
System.DeadLetters.Tell(ex); //don't re-throw the error. Just log it.

private async Task<IActorRef> GetClusterCoreRef()
{
var timeout = System.Settings.CreationTimeout;
try
{
return await _clusterDaemons.Ask<IActorRef>(new InternalClusterAction.GetClusterCoreRef(this), timeout).ConfigureAwait(false);
}
catch (Exception ex)
{
_log.Error(ex, "Failed to startup Cluster. You can try to increase 'akka.actor.creation-timeout'.");
Shutdown();
System.DeadLetters.Tell(ex); //don't re-throw the error. Just log it.
return System.DeadLetters;
}
_clusterCore = System.DeadLetters;
clusterCoreTaskSource.SetResult(_clusterCore);
}
});
}

/// <summary>
Expand Down Expand Up @@ -197,7 +199,7 @@ public void Subscribe(IActorRef subscriber, ClusterEvent.SubscriptionInitialStat
if (!to.All(t => typeof(ClusterEvent.IClusterDomainEvent).IsAssignableFrom(t)))
throw new ArgumentException($"Subscribe to `IClusterDomainEvent` or subclasses, was [{string.Join(", ", to.Select(c => c.Name))}]", nameof(to));

ClusterCore.Tell(new InternalClusterAction.Subscribe(subscriber, initialStateMode, ImmutableHashSet.Create(to)));
TellCoreSafe(new InternalClusterAction.Subscribe(subscriber, initialStateMode, ImmutableHashSet.Create(to)));
}

/// <summary>
Expand All @@ -216,7 +218,7 @@ public void Unsubscribe(IActorRef subscriber)
/// <param name="to">The event type that the actor no longer receives.</param>
public void Unsubscribe(IActorRef subscriber, Type to)
{
ClusterCore.Tell(new InternalClusterAction.Unsubscribe(subscriber, to));
TellCoreSafe(new InternalClusterAction.Unsubscribe(subscriber, to));
}

/// <summary>
Expand All @@ -227,7 +229,7 @@ public void Unsubscribe(IActorRef subscriber, Type to)
/// <param name="receiver">The actor that receives the current cluster state.</param>
public void SendCurrentClusterState(IActorRef receiver)
{
ClusterCore.Tell(new InternalClusterAction.SendCurrentClusterState(receiver));
TellCoreSafe(new InternalClusterAction.SendCurrentClusterState(receiver));
}

/// <summary>
Expand All @@ -241,7 +243,7 @@ public void SendCurrentClusterState(IActorRef receiver)
/// <param name="address">The address of the node we want to join.</param>
public void Join(Address address)
{
ClusterCore.Tell(new ClusterUserAction.JoinTo(FillLocal(address)));
TellCoreSafe(new ClusterUserAction.JoinTo(FillLocal(address)));
}

/// <summary>
Expand Down Expand Up @@ -299,8 +301,7 @@ private Address FillLocal(Address address)
/// <param name="seedNodes">TBD</param>
public void JoinSeedNodes(IEnumerable<Address> seedNodes)
{
ClusterCore.Tell(
new InternalClusterAction.JoinSeedNodes(seedNodes.Select(FillLocal).ToImmutableList()));
TellCoreSafe(new InternalClusterAction.JoinSeedNodes(seedNodes.Select(FillLocal).ToImmutableList()));
}

/// <summary>
Expand Down Expand Up @@ -353,7 +354,7 @@ public void Leave(Address address)
LeaveSelf();
}
else
ClusterCore.Tell(new ClusterUserAction.Leave(FillLocal(address)));
TellCoreSafe(new ClusterUserAction.Leave(FillLocal(address)));
}

/// <summary>
Expand Down Expand Up @@ -400,7 +401,7 @@ private Task LeaveSelf()
_clusterDaemons.Tell(new InternalClusterAction.AddOnMemberRemovedListener(() => tcs.TrySetResult(null)));

// Send leave message
ClusterCore.Tell(new ClusterUserAction.Leave(SelfAddress));
TellCoreSafe(new ClusterUserAction.Leave(SelfAddress));

return tcs.Task;
}
Expand All @@ -416,7 +417,7 @@ private Task LeaveSelf()
/// <param name="address">The address of the node we're going to mark as <see cref="MemberStatus.Down"/></param>
public void Down(Address address)
{
ClusterCore.Tell(new ClusterUserAction.Down(FillLocal(address)));
TellCoreSafe(new ClusterUserAction.Down(FillLocal(address)));
}

/// <summary>
Expand Down Expand Up @@ -563,22 +564,40 @@ internal void Shutdown()

private readonly IActorRef _clusterDaemons;
private IActorRef _clusterCore;
private Task<IActorRef> _clusterCoreTask;

/// <summary>
/// TBD
/// </summary>
[Obsolete("use TellCoreSafe()")]
internal IActorRef ClusterCore
{
get
{
if (_clusterCore == null)
if (_clusterCore is null)
{
_clusterCore = GetClusterCoreRef().Result;
if (_clusterCoreTask is null)
throw new InvalidOperationException();

_clusterCore = _clusterCoreTask.Result;
}
return _clusterCore;
}
}

/// <summary>
/// INTERNAL API.
///
/// We have to wait for cluster core to startup before we can use it
/// </summary>
internal void TellCoreSafe(object message)
{
if (_clusterCore is null)
_ = _clusterCoreTask.ContinueWith((t, m) => t.Result.Tell(m), message);
else
_clusterCore.Tell(message);
}

/// <summary>
/// INTERNAL API.
///
Expand Down

0 comments on commit bcd4bc3

Please sign in to comment.