Skip to content

Commit

Permalink
remove of Cluster.Get(sys) (#5399)
Browse files Browse the repository at this point in the history
  • Loading branch information
Zetanova authored Nov 26, 2021
1 parent bcd4bc3 commit 3798977
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 28 deletions.
15 changes: 8 additions & 7 deletions src/core/Akka.Cluster/AutoDown.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ namespace Akka.Cluster
/// The implementation is split into two classes AutoDown and AutoDownBase to be
/// able to unit test the logic without running cluster.
/// </summary>
internal class AutoDown : AutoDownBase
internal sealed class AutoDown : AutoDownBase
{
/// <summary>
/// TBD
Expand Down Expand Up @@ -275,21 +275,21 @@ private void Remove(UniqueAddress node)
/// </summary>
public sealed class AutoDowning : IDowningProvider
{
private readonly ClusterSettings _clusterSettings;
private readonly ActorSystem _system;

/// <summary>
/// TBD
/// </summary>
/// <param name="system">TBD</param>
public AutoDowning(ActorSystem system)
{
_clusterSettings = Cluster.Get(system).Settings;
_system = system;
}

/// <summary>
/// TBD
/// </summary>
public TimeSpan DownRemovalMargin => _clusterSettings.DownRemovalMargin;
public TimeSpan DownRemovalMargin => Cluster.Get(_system).Settings.DownRemovalMargin;

/// <summary>
/// TBD
Expand All @@ -301,10 +301,11 @@ public Props DowningActorProps
{
get
{
if (_clusterSettings.AutoDownUnreachableAfter.HasValue)
return AutoDown.Props(_clusterSettings.AutoDownUnreachableAfter.Value);
else
var autoDownUnreachableAfter = Cluster.Get(_system).Settings.AutoDownUnreachableAfter;
if (!autoDownUnreachableAfter.HasValue)
throw new ConfigurationException("AutoDowning downing provider selected but 'akka.cluster.auto-down-unreachable-after' not set");

return AutoDown.Props(autoDownUnreachableAfter.Value);
}
}
}
Expand Down
38 changes: 23 additions & 15 deletions src/core/Akka.Cluster/ClusterDaemon.cs
Original file line number Diff line number Diff line change
Expand Up @@ -816,6 +816,8 @@ public ClusterEvent.IClusterDomainEvent Event
/// </summary>
internal sealed class ClusterDaemon : ReceiveActor, IRequiresMessageQueue<IUnboundedMessageQueueSemantics>
{
private Cluster _cluster;

private IActorRef _coreSupervisor;
private readonly ClusterSettings _settings;
private readonly ILoggingAdapter _log = Context.GetLogger();
Expand All @@ -841,21 +843,21 @@ public ClusterDaemon(ClusterSettings settings)
Receive<InternalClusterAction.GetClusterCoreRef>(msg =>
{
if (_coreSupervisor == null)
CreateChildren();
CreateChildren(msg.Cluster);
_coreSupervisor.Forward(msg);
});

Receive<InternalClusterAction.AddOnMemberUpListener>(msg =>
{
Context.ActorOf(
Props.Create(() => new OnMemberStatusChangedListener(msg.Callback, MemberStatus.Up))
Props.Create(() => new OnMemberStatusChangedListener(_cluster, msg.Callback, MemberStatus.Up))
.WithDeploy(Deploy.Local));
});

Receive<InternalClusterAction.AddOnMemberRemovedListener>(msg =>
{
Context.ActorOf(
Props.Create(() => new OnMemberStatusChangedListener(msg.Callback, MemberStatus.Removed))
Props.Create(() => new OnMemberStatusChangedListener(_cluster, msg.Callback, MemberStatus.Removed))
.WithDeploy(Deploy.Local));
});

Expand All @@ -874,7 +876,7 @@ private void AddCoordinatedLeave()
var self = Self;
_coordShutdown.AddTask(CoordinatedShutdown.PhaseClusterLeave, "leave", () =>
{
if (Cluster.Get(sys).IsTerminated || Cluster.Get(sys).SelfMember.Status == MemberStatus.Down)
if (_cluster is null || _cluster.IsTerminated || _cluster.SelfMember.Status == MemberStatus.Down)
{
return Task.FromResult(Done.Instance);
}
Expand All @@ -888,11 +890,12 @@ private void AddCoordinatedLeave()
_coordShutdown.AddTask(CoordinatedShutdown.PhaseClusterShutdown, "wait-shutdown", () => _clusterPromise.Task);
}

private void CreateChildren()
private void CreateChildren(Cluster cluster)
{
_cluster = cluster;
_coreSupervisor = Context.ActorOf(Props.Create<ClusterCoreSupervisor>(), "core");

Context.ActorOf(ClusterHeartbeatReceiver.Props(() => Cluster.Get(Context.System)), "heartbeatReceiver");
Context.ActorOf(ClusterHeartbeatReceiver.Props(() => _cluster), "heartbeatReceiver");
}

protected override void PostStop()
Expand Down Expand Up @@ -1483,11 +1486,13 @@ public void JoinSeedNodes(ImmutableList<Address> newSeedNodes)
_seedNodeProcessCounter += 1;
if (newSeedNodes.Head().Equals(_cluster.SelfAddress))
{
_seedNodeProcess = Context.ActorOf(Props.Create(() => new FirstSeedNodeProcess(newSeedNodes)).WithDispatcher(_cluster.Settings.UseDispatcher), "firstSeedNodeProcess-" + _seedNodeProcessCounter);
_seedNodeProcess = Context.ActorOf(Props.Create(() => new FirstSeedNodeProcess(_cluster, newSeedNodes))
.WithDispatcher(_cluster.Settings.UseDispatcher), "firstSeedNodeProcess-" + _seedNodeProcessCounter);
}
else
{
_seedNodeProcess = Context.ActorOf(Props.Create(() => new JoinSeedNodeProcess(newSeedNodes)).WithDispatcher(_cluster.Settings.UseDispatcher), "joinSeedNodeProcess-" + _seedNodeProcessCounter);
_seedNodeProcess = Context.ActorOf(Props.Create(() => new JoinSeedNodeProcess(_cluster, newSeedNodes))
.WithDispatcher(_cluster.Settings.UseDispatcher), "joinSeedNodeProcess-" + _seedNodeProcessCounter);
}
}
}
Expand Down Expand Up @@ -2695,19 +2700,20 @@ internal sealed class JoinSeedNodeProcess : UntypedActor
/// <summary>
/// TBD
/// </summary>
/// <param name="cluster">TBD</param>
/// <param name="seeds">TBD</param>
/// <exception cref="ArgumentException">
/// This exception is thrown when either the list of specified <paramref name="seeds"/> is empty
/// or the first listed seed is a reference to the <see cref="IActorContext.System">IUntypedActorContext.System</see>'s address.
/// </exception>
public JoinSeedNodeProcess(ImmutableList<Address> seeds)
public JoinSeedNodeProcess(Cluster cluster, ImmutableList<Address> seeds)
{
_selfAddress = Cluster.Get(Context.System).SelfAddress;
_selfAddress = cluster.SelfAddress;
_seeds = seeds;
_otherSeeds = _seeds.Remove(_selfAddress);
if (seeds.IsEmpty || seeds.Head() == _selfAddress)
throw new ArgumentException("Join seed node should not be done");
Context.SetReceiveTimeout(Cluster.Get(Context.System).Settings.SeedNodeTimeout);
Context.SetReceiveTimeout(cluster.Settings.SeedNodeTimeout);
}

/// <summary>
Expand Down Expand Up @@ -2800,14 +2806,15 @@ internal sealed class FirstSeedNodeProcess : UntypedActor
/// <summary>
/// Launches a new instance of the "first seed node" joining process.
/// </summary>
/// <param name="cluster">TBD</param>
/// <param name="seeds">The set of seed nodes to join.</param>
/// <exception cref="ArgumentException">
/// This exception is thrown when either the number of specified <paramref name="seeds"/> is less than or equal to 1
/// or the first listed seed is a reference to the <see cref="IActorContext.System">IUntypedActorContext.System</see>'s address.
/// </exception>
public FirstSeedNodeProcess(ImmutableList<Address> seeds)
public FirstSeedNodeProcess(Cluster cluster, ImmutableList<Address> seeds)
{
_cluster = Cluster.Get(Context.System);
_cluster = cluster;
_selfAddress = _cluster.SelfAddress;

if (seeds.Count <= 1 || seeds.Head() != _selfAddress)
Expand Down Expand Up @@ -3043,17 +3050,18 @@ private Type To
/// <summary>
/// TBD
/// </summary>
/// <param name="cluster">TBD</param>
/// <param name="callback">TBD</param>
/// <param name="targetStatus">TBD</param>
/// <exception cref="ArgumentException">
/// This exception is thrown when the specified <paramref name="targetStatus"/> is invalid.
/// Acceptable values are: <see cref="MemberStatus.Up"/> | <see cref="MemberStatus.Down"/>.
/// </exception>
public OnMemberStatusChangedListener(Action callback, MemberStatus targetStatus)
public OnMemberStatusChangedListener(Cluster cluster, Action callback, MemberStatus targetStatus)
{
_cluster = cluster;
_callback = callback;
_status = targetStatus;
_cluster = Cluster.Get(Context.System);

Receive<ClusterEvent.CurrentClusterState>(state =>
{
Expand Down
4 changes: 1 addition & 3 deletions src/core/Akka.Cluster/SBR/SplitBrainResolverProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,6 @@ public Props DowningActorProps
{
get
{
var cluster = Cluster.Get(system);

DowningStrategy strategy;
switch (settings.DowningStrategy)
{
Expand All @@ -68,7 +66,7 @@ public Props DowningActorProps
break;
case SplitBrainResolverSettings.LeaseMajorityName:
var lms = settings.LeaseMajoritySettings;
var leaseOwnerName = cluster.SelfUniqueAddress.Address.HostPort();
var leaseOwnerName = Cluster.Get(system).SelfUniqueAddress.Address.HostPort();

var leaseName = lms.SafeLeaseName(system.Name);
var lease = LeaseProvider.Get(system).GetLease(leaseName, lms.LeaseImplementation, leaseOwnerName);
Expand Down
6 changes: 3 additions & 3 deletions src/core/Akka.Cluster/SplitBrainResolver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ namespace Akka.Cluster
{
public sealed class SplitBrainResolver : IDowningProvider
{
private readonly ClusterSettings _clusterSettings;
private readonly ActorSystem _system;

public SplitBrainResolver(ActorSystem system)
{
_clusterSettings = Cluster.Get(system).Settings;
_system = system;
var config = system.Settings.Config.GetConfig("akka.cluster.split-brain-resolver");
if (config.IsNullOrEmpty())
throw ConfigurationException.NullOrEmptyConfig<SplitBrainResolver>("akka.cluster.split-brain-resolver");
Expand All @@ -30,7 +30,7 @@ public SplitBrainResolver(ActorSystem system)
Strategy = ResolveSplitBrainStrategy(config);
}

public TimeSpan DownRemovalMargin => _clusterSettings.DownRemovalMargin;
public TimeSpan DownRemovalMargin => Cluster.Get(_system).Settings.DownRemovalMargin;
public TimeSpan StableAfter { get; }
public Props DowningActorProps => SplitBrainDecider.Props(StableAfter, Strategy);

Expand Down

0 comments on commit 3798977

Please sign in to comment.