Skip to content

Commit

Permalink
pass Akka.Cluster.Cluster into IDowningProvider directly (#5965)
Browse files Browse the repository at this point in the history
* pass `Akka.Cluster.Cluster` into `IDowningProvider` directly

close #5962

* enhanced `StartupWithOneThreadSpec` to include SBR

* C#-ified HOCON

* added #5962 repro

* fixed `AutoDown`

* fixed other `IDowningProvider` instances

* updated Akka.Cluster API approvals
  • Loading branch information
Aaronontheweb authored May 26, 2022
1 parent 67f6737 commit 9260924
Show file tree
Hide file tree
Showing 11 changed files with 155 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ namespace Akka.Cluster
{
public sealed class AutoDowning : Akka.Cluster.IDowningProvider
{
public AutoDowning(Akka.Actor.ActorSystem system) { }
public AutoDowning(Akka.Actor.ActorSystem system, Akka.Cluster.Cluster cluster) { }
public System.TimeSpan DownRemovalMargin { get; }
public Akka.Actor.Props DowningActorProps { get; }
}
Expand Down Expand Up @@ -266,13 +266,13 @@ namespace Akka.Cluster
}
public sealed class NoDowning : Akka.Cluster.IDowningProvider
{
public NoDowning(Akka.Actor.ActorSystem system) { }
public NoDowning(Akka.Actor.ActorSystem system, Akka.Cluster.Cluster cluster) { }
public System.TimeSpan DownRemovalMargin { get; }
public Akka.Actor.Props DowningActorProps { get; }
}
public sealed class SplitBrainResolver : Akka.Cluster.IDowningProvider
{
public SplitBrainResolver(Akka.Actor.ActorSystem system) { }
public SplitBrainResolver(Akka.Actor.ActorSystem system, Akka.Cluster.Cluster cluster) { }
public System.TimeSpan DownRemovalMargin { get; }
public Akka.Actor.Props DowningActorProps { get; }
public System.TimeSpan StableAfter { get; }
Expand Down Expand Up @@ -417,7 +417,7 @@ namespace Akka.Cluster.SBR
}
public class SplitBrainResolverProvider : Akka.Cluster.IDowningProvider
{
public SplitBrainResolverProvider(Akka.Actor.ActorSystem system) { }
public SplitBrainResolverProvider(Akka.Actor.ActorSystem system, Akka.Cluster.Cluster cluster) { }
public System.TimeSpan DownRemovalMargin { get; }
public Akka.Actor.Props DowningActorProps { get; }
}
Expand Down
88 changes: 88 additions & 0 deletions src/core/Akka.Cluster.Tests/Bugfix5962Spec.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// //-----------------------------------------------------------------------
// // <copyright file="Bugfix5962Spec.cs" company="Akka.NET Project">
// // Copyright (C) 2009-2022 Lightbend Inc. <http://www.lightbend.com>
// // Copyright (C) 2013-2022 .NET Foundation <https://github.com/akkadotnet/akka.net>
// // </copyright>
// //-----------------------------------------------------------------------

using System;
using System.Threading.Tasks;
using Akka.Configuration;
using Akka.TestKit;
using FluentAssertions;
using FluentAssertions.Extensions;
using Xunit;
using Xunit.Abstractions;
using static FluentAssertions.FluentActions;


namespace Akka.Cluster.Tests
{
public class Bugfix5962Spec : TestKit.Xunit2.TestKit
{
private static readonly Config Config = ConfigurationFactory.ParseString(@"
akka {
loglevel = INFO
actor {
provider = cluster
default-dispatcher = {
executor = channel-executor
channel-executor.priority = normal
}
# Adding this part in combination with the SplitBrainResolverProvider causes the error
internal-dispatcher = {
executor = channel-executor
channel-executor.priority = high
}
}
remote {
dot-netty.tcp {
port = 15508
hostname = ""127.0.0.1""
}
default-remote-dispatcher {
executor = channel-executor
channel-executor.priority = high
}
backoff-remote-dispatcher {
executor = channel-executor
channel-executor.priority = low
}
}
cluster {
seed-nodes = [""akka.tcp://Bugfix5962Spec@127.0.0.1:15508""]
downing-provider-class = ""Akka.Cluster.SBR.SplitBrainResolverProvider, Akka.Cluster""
}
}");

private readonly Type _timerMsgType;

public Bugfix5962Spec(ITestOutputHelper output): base(Config, nameof(Bugfix5962Spec), output)
{
_timerMsgType = Type.GetType("Akka.Actor.Scheduler.TimerScheduler+TimerMsg, Akka");
}

[Fact]
public async Task SBR_Should_work_with_channel_executor()
{
var latch = new TestLatch(1);
var cluster = Cluster.Get(Sys);
cluster.RegisterOnMemberUp(() =>
{
latch.CountDown();
});

var selection = Sys.ActorSelection("akka://Bugfix5962Spec/system/cluster/core/daemon/downingProvider");

await Awaiting(() => selection.ResolveOne(1.Seconds()))
.Should().NotThrowAsync("Downing provider should be alive. ActorSelection will throw an ActorNotFoundException if this fails");

// There should be no TimerMsg being sent to dead letter, this signals that the downing provider is dead
await EventFilter.DeadLetter(_timerMsgType).ExpectAsync(0, async () =>
{
latch.Ready(1.Seconds());
await Task.Delay(2.Seconds());
});
}
}
}
6 changes: 3 additions & 3 deletions src/core/Akka.Cluster.Tests/DowningProviderSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@

namespace Akka.Cluster.Tests
{
class FailingDowningProvider : IDowningProvider
internal class FailingDowningProvider : IDowningProvider
{
public FailingDowningProvider(ActorSystem system)
public FailingDowningProvider(ActorSystem system, Cluster cluster)
{
}

Expand All @@ -36,7 +36,7 @@ public Props DowningActorProps
class DummyDowningProvider : IDowningProvider
{
public readonly AtomicBoolean ActorPropsAccessed = new AtomicBoolean(false);
public DummyDowningProvider(ActorSystem system)
public DummyDowningProvider(ActorSystem system, Cluster cluster)
{
}

Expand Down
14 changes: 11 additions & 3 deletions src/core/Akka.Cluster.Tests/StartupWithOneThreadSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@

using System;
using System.Threading;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Actor.Dsl;
using Akka.Configuration;
using Akka.Event;
using Akka.TestKit;
using Akka.Util;
using Xunit;
using Xunit.Abstractions;

namespace Akka.Cluster.Tests
{
Expand All @@ -25,12 +27,13 @@ public class StartupWithOneThreadSpec : AkkaSpec
akka.actor.default-dispatcher.dedicated-thread-pool.thread-count = 1
akka.actor.provider = ""Akka.Cluster.ClusterActorRefProvider, Akka.Cluster""
akka.remote.dot-netty.tcp.port = 0
akka.cluster.downing-provider-class = ""Akka.Cluster.SBR.SplitBrainResolverProvider, Akka.Cluster""
akka.cluster.split-brain-resolver.active-strategy = keep-majority
");

private long _startTime;

public StartupWithOneThreadSpec() : base(Configuration)
{
public StartupWithOneThreadSpec(ITestOutputHelper output) : base(Configuration, output) {
_startTime = MonotonicClock.GetTicks();
}

Expand All @@ -53,7 +56,7 @@ private Props TestProps
}

[Fact]
public void A_cluster_must_startup_with_one_dispatcher_thread()
public async Task A_cluster_must_startup_with_one_dispatcher_thread()
{
// This test failed before fixing https://github.com/akkadotnet/akka.net/issues/1959 when adding a sleep before the
// Await of GetClusterCoreRef in the Cluster extension constructor.
Expand All @@ -75,6 +78,11 @@ public void A_cluster_must_startup_with_one_dispatcher_thread()
ExpectMsg("hello");
ExpectMsg("hello");
ExpectMsg("hello");

// perform a self-join
var cts = new CancellationTokenSource(TimeSpan.FromSeconds((3)));
var selfAddress = cluster.SelfAddress;
await cluster.JoinSeedNodesAsync(new[] { selfAddress }, cts.Token);
}
}
}
31 changes: 13 additions & 18 deletions src/core/Akka.Cluster/AutoDown.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@ internal sealed class AutoDown : AutoDownBase
/// TBD
/// </summary>
/// <param name="autoDownUnreachableAfter">TBD</param>
/// <param name="cluster"></param>
/// <returns>TBD</returns>
public static Props Props(TimeSpan autoDownUnreachableAfter)
public static Props Props(TimeSpan autoDownUnreachableAfter, Cluster cluster)
{
return Actor.Props.Create<AutoDown>(autoDownUnreachableAfter);
return Actor.Props.Create(() => new AutoDown(autoDownUnreachableAfter, cluster));
}

/// <summary>
Expand Down Expand Up @@ -76,14 +77,10 @@ public override int GetHashCode()
}

private readonly Cluster _cluster;

/// <summary>
/// TBD
/// </summary>
/// <param name="autoDownUnreachableAfter">TBD</param>
public AutoDown(TimeSpan autoDownUnreachableAfter) : base(autoDownUnreachableAfter)

public AutoDown(TimeSpan autoDownUnreachableAfter, Cluster cluster) : base(autoDownUnreachableAfter)
{
_cluster = Cluster.Get(Context.System);
_cluster = cluster;
}

/// <summary>
Expand Down Expand Up @@ -276,20 +273,18 @@ private void Remove(UniqueAddress node)
public sealed class AutoDowning : IDowningProvider
{
private readonly ActorSystem _system;

/// <summary>
/// TBD
/// </summary>
/// <param name="system">TBD</param>
public AutoDowning(ActorSystem system)
private readonly Cluster _cluster;

public AutoDowning(ActorSystem system, Cluster cluster)
{
_system = system;
_cluster = cluster;
}

/// <summary>
/// TBD
/// </summary>
public TimeSpan DownRemovalMargin => Cluster.Get(_system).Settings.DownRemovalMargin;
public TimeSpan DownRemovalMargin => _cluster.Settings.DownRemovalMargin;

/// <summary>
/// TBD
Expand All @@ -301,11 +296,11 @@ public Props DowningActorProps
{
get
{
var autoDownUnreachableAfter = Cluster.Get(_system).Settings.AutoDownUnreachableAfter;
var autoDownUnreachableAfter = _cluster.Settings.AutoDownUnreachableAfter;
if (!autoDownUnreachableAfter.HasValue)
throw new ConfigurationException("AutoDowning downing provider selected but 'akka.cluster.auto-down-unreachable-after' not set");

return AutoDown.Props(autoDownUnreachableAfter.Value);
return AutoDown.Props(autoDownUnreachableAfter.Value, _cluster);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka.Cluster/Cluster.cs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public Cluster(ActorSystemImpl system)
Scheduler = CreateScheduler(system);

// it has to be lazy - otherwise if downing provider will init a cluster itself, it will deadlock
_downingProvider = new Lazy<IDowningProvider>(() => Akka.Cluster.DowningProvider.Load(Settings.DowningProviderType, system), LazyThreadSafetyMode.ExecutionAndPublication);
_downingProvider = new Lazy<IDowningProvider>(() => Akka.Cluster.DowningProvider.Load(Settings.DowningProviderType, system, this), LazyThreadSafetyMode.ExecutionAndPublication);

//create supervisor for daemons under path "/system/cluster"
_clusterDaemons = system.SystemActorOf(Props.Create(() => new ClusterDaemon(Settings)).WithDeploy(Deploy.Local), "cluster");
Expand Down
6 changes: 4 additions & 2 deletions src/core/Akka.Cluster/Configuration/Cluster.conf
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,10 @@ akka {
# * if it is 'off' the `NoDowning` provider is used and no automatic downing will be performed
# * if it is set to a duration the `AutoDowning` provider is with the configured downing duration
#
# If specified the value must be the fully qualified class name of a subclass of
# `akka.cluster.DowningProvider` having a public one argument constructor accepting an `ActorSystem`
# If specified the value must be the fully qualified class name of an implementation of
# `Akka.Cluster.IDowningProvider` having two argument constructor:
# - argument 1: accepting an `ActorSystem`
# - argument 2: accepting an `Akka.Cluster.Cluster`
downing-provider-class = ""

# If this is set to "off", the leader will not move 'Joining' members to 'Up' during a network
Expand Down
25 changes: 14 additions & 11 deletions src/core/Akka.Cluster/DowningProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,20 +45,18 @@ public interface IDowningProvider
public sealed class NoDowning : IDowningProvider
{
private readonly ActorSystem _system;

/// <summary>
/// TBD
/// </summary>
/// <param name="system">TBD</param>
public NoDowning(ActorSystem system)
private readonly Cluster _cluster;

public NoDowning(ActorSystem system, Cluster cluster)
{
_system = system;
_cluster = cluster;
}

/// <summary>
/// TBD
/// </summary>
public TimeSpan DownRemovalMargin => Cluster.Get(_system).Settings.DownRemovalMargin;
public TimeSpan DownRemovalMargin => _cluster.Settings.DownRemovalMargin;

/// <summary>
/// TBD
Expand All @@ -72,20 +70,25 @@ public NoDowning(ActorSystem system)
internal static class DowningProvider
{
/// <summary>
/// TBD
/// Loads the <see cref="IDowningProvider"/> from configuration and instantiates it via reflection.
/// </summary>
/// <param name="downingProviderType">TBD</param>
/// <param name="system">TBD</param>
/// <param name="cluster">The current cluster object.</param>
/// <exception cref="ConfigurationException">
/// This exception is thrown when the specified <paramref name="downingProviderType"/> does not implement <see cref="IDowningProvider"/>.
/// </exception>
/// <returns>TBD</returns>
public static IDowningProvider Load(Type downingProviderType, ActorSystem system)
/// <returns>The activated <see cref="IDowningProvider"/></returns>
/// <remarks>
/// Required to pass in <see cref="Akka.Cluster.Cluster"/> manually here since https://github.com/akkadotnet/akka.net/issues/5962
/// can cause the SBR startup to fail when running with the `channel-executor`.
/// </remarks>
public static IDowningProvider Load(Type downingProviderType, ActorSystem system, Cluster cluster)
{
var extendedSystem = system as ExtendedActorSystem;
try
{
return (IDowningProvider)Activator.CreateInstance(downingProviderType, extendedSystem);
return (IDowningProvider)Activator.CreateInstance(downingProviderType, extendedSystem, cluster);
}
catch (Exception e)
{
Expand Down
8 changes: 4 additions & 4 deletions src/core/Akka.Cluster/SBR/SplitBrainResolver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,22 @@ internal class SplitBrainResolver : SplitBrainResolverBase
{
private Cluster _cluster;

public SplitBrainResolver(TimeSpan stableAfter, DowningStrategy strategy)
public SplitBrainResolver(TimeSpan stableAfter, DowningStrategy strategy, Cluster cluster)
: base(stableAfter, strategy)
{
_cluster = cluster;
}

public override UniqueAddress SelfUniqueAddress => _cluster.SelfUniqueAddress;

public static Props Props2(TimeSpan stableAfter, DowningStrategy strategy)
public static Props Props2(TimeSpan stableAfter, DowningStrategy strategy, Cluster cluster)
{
return Props.Create(() => new SplitBrainResolver(stableAfter, strategy));
return Props.Create(() => new SplitBrainResolver(stableAfter, strategy, cluster));
}

// re-subscribe when restart
protected override void PreStart()
{
_cluster = Cluster.Get(Context.System);
_cluster.Subscribe(Self, InitialStateAsEvents, typeof(IClusterDomainEvent));

base.PreStart();
Expand Down
6 changes: 4 additions & 2 deletions src/core/Akka.Cluster/SBR/SplitBrainResolverProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ public class SplitBrainResolverProvider : IDowningProvider
{
private readonly SplitBrainResolverSettings _settings;
private readonly ActorSystem _system;
private readonly Cluster _cluster;

public SplitBrainResolverProvider(ActorSystem system)
public SplitBrainResolverProvider(ActorSystem system, Cluster cluster)
{
_system = system;
_settings = new SplitBrainResolverSettings(system.Settings.Config);
_cluster = cluster;
}

public TimeSpan DownRemovalMargin
Expand Down Expand Up @@ -77,7 +79,7 @@ public Props DowningActorProps
throw new InvalidOperationException();
}

return SplitBrainResolver.Props2(_settings.DowningStableAfter, strategy);
return SplitBrainResolver.Props2(_settings.DowningStableAfter, strategy, _cluster);
}
}
}
Expand Down
Loading

0 comments on commit 9260924

Please sign in to comment.