Skip to content

Commit

Permalink
Port scala akka PR #26816 to Akka.NET (#4511)
Browse files Browse the repository at this point in the history
* Port scala akka PR #26816 to Akka.NET

* Remove SynchronizationContext usage

* Use internal dispatcher in other part of Akka.NET, update documentation

* Port internal blocking dispatcher and default internal dispatcher for Akka.Stream

* Add circular dispatcher alias reference checking, unroll recursion

* Make ClusterSingletonManager and Proxy to use internal dispatchers

* Update API approver list

* Remove SynchronizationContext

* Clean up Pigeon.conf config file

Co-authored-by: Aaron Stannard <aaron@petabridge.com>
  • Loading branch information
Arkatufus and Aaronontheweb authored Oct 6, 2020
1 parent 8b1d54b commit a80ddd7
Show file tree
Hide file tree
Showing 41 changed files with 578 additions and 115 deletions.
8 changes: 8 additions & 0 deletions docs/articles/actors/dispatchers.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ system.ActorOf(Props.Create<MyActor>().WithDispatcher("my-dispatcher"), "my-acto
Some dispatcher configurations are available out-of-the-box for convenience. You can use them during actor deployment, [as described above](#configuring-dispatchers).

* **default-dispatcher** - A configuration that uses the [ThreadPoolDispatcher](#threadpooldispatcher). As the name says, this is the default dispatcher configuration used by the global dispatcher, and you don't need to define anything during deployment to use it.
* **internal-dispatcher** - To protect the internal Actors that is spawned by the various Akka modules, a separate internal dispatcher is used by default.
* **task-dispatcher** - A configuration that uses the [TaskDispatcher](#taskdispatcher).
* **default-fork-join-dispatcher** - A configuration that uses the [ForkJoinDispatcher](#forkjoindispatcher).
* **synchronized-dispatcher** - A configuration that uses the [SynchronizedDispatcher](#synchronizeddispatcher).
Expand Down Expand Up @@ -174,3 +175,10 @@ The following configuration keys are available for any dispatcher configuration:

> [!NOTE]
> The throughput-deadline-time is used as a *best effort*, not as a *hard limit*. This means that if a message takes more time than the deadline allows, Akka.NET won't interrupt the process. Instead it will wait for it to finish before giving turn to the next actor.
## Dispatcher aliases

When a dispatcher is looked up, and the given setting contains a string rather than a dispatcher config block,
the lookup will treat it as an alias, and follow that string to an alternate location for a dispatcher config.
If the dispatcher config is referenced both through an alias and through the absolute path only one dispatcher will
be used and shared among the two ids.
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ public ClusterSharding(ExtendedActorSystem system)
{
var guardianName = system.Settings.Config.GetString("akka.cluster.sharding.guardian-name");
var dispatcher = system.Settings.Config.GetString("akka.cluster.sharding.use-dispatcher");
if (string.IsNullOrEmpty(dispatcher)) dispatcher = Dispatchers.DefaultDispatcherId;
if (string.IsNullOrEmpty(dispatcher)) dispatcher = Dispatchers.InternalDispatcherId;
return system.SystemActorOf(Props.Create(() => new ClusterShardingGuardian()).WithDispatcher(dispatcher), guardianName);
});
}
Expand Down
2 changes: 1 addition & 1 deletion src/contrib/cluster/Akka.Cluster.Sharding/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ akka.cluster.sharding {
coordinator-singleton = "akka.cluster.singleton"

# The id of the dispatcher to use for ClusterSharding actors.
# If not specified default dispatcher is used.
# If not specified, the internal dispatcher is used.
# If specified you need to define the settings of the actual dispatcher.
# This dispatcher for the entity actors is defined by the user provided
# Props, i.e. this dispatcher is not used for the entity actors.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ private IActorRef CreateReceptionist()
{
var name = _config.GetString("name");
var dispatcher = _config.GetString("use-dispatcher", null);
if (string.IsNullOrEmpty(dispatcher)) dispatcher = Dispatchers.DefaultDispatcherId;
if (string.IsNullOrEmpty(dispatcher)) dispatcher = Dispatchers.InternalDispatcherId;

// important to use var mediator here to activate it outside of ClusterReceptionist constructor
var mediator = PubSubMediator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ akka.cluster.client.receptionist {
response-tunnel-receive-timeout = 30s

# The id of the dispatcher to use for ClusterReceptionist actors.
# If not specified default dispatcher is used.
# If not specified, the internal dispatcher is used.
# If specified you need to define the settings of the actual dispatcher.
use-dispatcher = ""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ private IActorRef CreateMediator()
var name = _system.Settings.Config.GetString("akka.cluster.pub-sub.name");
var dispatcher = _system.Settings.Config.GetString("akka.cluster.pub-sub.use-dispatcher", null);
if (string.IsNullOrEmpty(dispatcher))
dispatcher = Dispatchers.DefaultDispatcherId;
dispatcher = Dispatchers.InternalDispatcherId;

return _system.SystemActorOf(
Props.Create(() => new DistributedPubSubMediator(_settings))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ akka.cluster.pub-sub {
max-delta-elements = 3000

# The id of the dispatcher to use for DistributedPubSubMediator actors.
# If not specified default dispatcher is used.
# If not specified, the internal dispatcher is used.
# If specified you need to define the settings of the actual dispatcher.
use-dispatcher = ""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
using Akka.Actor;
using Akka.Configuration;
using Akka.Coordination;
using Akka.Dispatch;
using Akka.Event;
using Akka.Pattern;
using Akka.Remote;
Expand Down Expand Up @@ -604,7 +605,9 @@ public static Props Props(Props singletonProps, ClusterSingletonManagerSettings
/// <returns>TBD</returns>
public static Props Props(Props singletonProps, object terminationMessage, ClusterSingletonManagerSettings settings)
{
return Actor.Props.Create(() => new ClusterSingletonManager(singletonProps, terminationMessage, settings)).WithDeploy(Deploy.Local);
return Actor.Props.Create(() => new ClusterSingletonManager(singletonProps, terminationMessage, settings))
.WithDispatcher(Dispatchers.InternalDispatcherId)
.WithDeploy(Deploy.Local);
}

private readonly Props _singletonProps;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
using System.Linq;
using Akka.Actor;
using Akka.Configuration;
using Akka.Dispatch;
using Akka.Event;

namespace Akka.Cluster.Tools.Singleton
Expand Down Expand Up @@ -70,7 +71,9 @@ public static Config DefaultConfig()
/// <returns>TBD</returns>
public static Props Props(string singletonManagerPath, ClusterSingletonProxySettings settings)
{
return Actor.Props.Create(() => new ClusterSingletonProxy(singletonManagerPath, settings)).WithDeploy(Deploy.Local);
return Actor.Props.Create(() => new ClusterSingletonProxy(singletonManagerPath, settings))
.WithDispatcher(Dispatchers.InternalDispatcherId)
.WithDeploy(Deploy.Local);
}

private readonly ClusterSingletonProxySettings _settings;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public static ReplicatorSettings Create(Config config)
throw ConfigurationException.NullOrEmptyConfig<ReplicatorSettings>();

var dispatcher = config.GetString("use-dispatcher", null);
if (string.IsNullOrEmpty(dispatcher)) dispatcher = Dispatchers.DefaultDispatcherId;
if (string.IsNullOrEmpty(dispatcher)) dispatcher = Dispatchers.InternalDispatcherId;

var durableConfig = config.GetConfig("durable");
var durableKeys = durableConfig.GetStringList("keys");
Expand All @@ -63,7 +63,7 @@ public static ReplicatorSettings Create(Config config)
{
throw new ArgumentException($"`akka.cluster.distributed-data.durable.store-actor-class` is set to an invalid class {durableStoreType}.");
}
durableStoreProps = Props.Create(durableStoreType, durableConfig).WithDispatcher(durableConfig.GetString("use-dispatcher"));
durableStoreProps = Props.Create(durableStoreType, durableConfig).WithDispatcher(dispatcher);
}

// TODO: This constructor call fails when these fields are not populated inside the Config object:
Expand Down Expand Up @@ -212,7 +212,7 @@ private ReplicatorSettings Copy(string role = null,
public ReplicatorSettings WithGossipInterval(TimeSpan gossipInterval) => Copy(gossipInterval: gossipInterval);
public ReplicatorSettings WithNotifySubscribersInterval(TimeSpan notifySubscribersInterval) => Copy(notifySubscribersInterval: notifySubscribersInterval);
public ReplicatorSettings WithMaxDeltaElements(int maxDeltaElements) => Copy(maxDeltaElements: maxDeltaElements);
public ReplicatorSettings WithDispatcher(string dispatcher) => Copy(dispatcher: string.IsNullOrEmpty(dispatcher) ? Dispatchers.DefaultDispatcherId : dispatcher);
public ReplicatorSettings WithDispatcher(string dispatcher) => Copy(dispatcher: string.IsNullOrEmpty(dispatcher) ? Dispatchers.InternalDispatcherId : dispatcher);
public ReplicatorSettings WithPruning(TimeSpan pruningInterval, TimeSpan maxPruningDissemination) =>
Copy(pruningInterval: pruningInterval, maxPruningDissemination: maxPruningDissemination);
public ReplicatorSettings WithDurableKeys(IImmutableSet<string> durableKeys) => Copy(durableKeys: durableKeys);
Expand Down
4 changes: 2 additions & 2 deletions src/contrib/cluster/Akka.DistributedData/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ akka.cluster.distributed-data {
# the replicas. Next chunk will be transferred in next round of gossip.
max-delta-elements = 1000

# The id of the dispatcher to use for Replicator actors. If not specified
# default dispatcher is used.
# The id of the dispatcher to use for Replicator actors.
# If not specified, the internal dispatcher is used.
# If specified you need to define the settings of the actual dispatcher.
use-dispatcher = ""

Expand Down
8 changes: 6 additions & 2 deletions src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Benchmarks")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Cluster")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Cluster.Sharding")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Cluster.TestKit")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Cluster.Tests")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Cluster.Tests.MultiNode")]
Expand Down Expand Up @@ -1841,12 +1842,13 @@ namespace Akka.Actor.Internal
public class ActorSystemImpl : Akka.Actor.ExtendedActorSystem
{
public ActorSystemImpl(string name) { }
public ActorSystemImpl(string name, Akka.Configuration.Config config, Akka.Actor.Setup.ActorSystemSetup setup) { }
public ActorSystemImpl(string name, Akka.Configuration.Config config, Akka.Actor.Setup.ActorSystemSetup setup, System.Nullable<Akka.Util.Option<Akka.Actor.Props>> guardianProps = null) { }
public override Akka.Actor.ActorProducerPipelineResolver ActorPipelineResolver { get; }
public override Akka.Actor.IActorRef DeadLetters { get; }
public override Akka.Dispatch.Dispatchers Dispatchers { get; }
public override Akka.Event.EventStream EventStream { get; }
public override Akka.Actor.IInternalActorRef Guardian { get; }
public Akka.Util.Option<Akka.Actor.Props> GuardianProps { get; }
public override Akka.Event.ILoggingAdapter Log { get; }
public override Akka.Actor.IInternalActorRef LookupRoot { get; }
public override Akka.Dispatch.Mailboxes Mailboxes { get; }
Expand Down Expand Up @@ -2452,9 +2454,10 @@ namespace Akka.Dispatch
}
public sealed class Dispatchers
{
public static readonly string DefaultBlockingDispatcherId;
public static readonly string DefaultDispatcherId;
public static readonly string SynchronizedDispatcherId;
public Dispatchers(Akka.Actor.ActorSystem system, Akka.Dispatch.IDispatcherPrerequisites prerequisites) { }
public Dispatchers(Akka.Actor.ActorSystem system, Akka.Dispatch.IDispatcherPrerequisites prerequisites, Akka.Event.ILoggingAdapter logger) { }
public Akka.Configuration.Config DefaultDispatcherConfig { get; }
public Akka.Dispatch.MessageDispatcher DefaultGlobalDispatcher { get; }
public Akka.Dispatch.IDispatcherPrerequisites Prerequisites { get; }
Expand Down Expand Up @@ -4838,6 +4841,7 @@ namespace Akka.Util
public static readonly Akka.Util.Option<T> None;
public Option(T value) { }
public bool HasValue { get; }
public bool IsEmpty { get; }
public T Value { get; }
public bool Equals(Akka.Util.Option<T> other) { }
public override bool Equals(object obj) { }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ namespace Akka.Streams
}
public class static ActorAttributes
{
public static Akka.Streams.ActorAttributes.Dispatcher IODispatcher { get; }
public static Akka.Streams.Attributes CreateDebugLogging(bool enabled) { }
public static Akka.Streams.Attributes CreateDispatcher(string dispatcherName) { }
public static Akka.Streams.Attributes CreateFuzzingMode(bool enabled) { }
Expand Down Expand Up @@ -140,12 +141,14 @@ namespace Akka.Streams
public readonly int SyncProcessingLimit;
public ActorMaterializerSettings(int initialInputBufferSize, int maxInputBufferSize, string dispatcher, Akka.Streams.Supervision.Decider supervisionDecider, Akka.Streams.StreamSubscriptionTimeoutSettings subscriptionTimeoutSettings, Akka.Streams.Dsl.StreamRefSettings streamRefSettings, bool isDebugLogging, int outputBurstLimit, bool isFuzzingMode, bool isAutoFusing, int maxFixedBufferSize, int syncProcessingLimit = 1000) { }
public static Akka.Streams.ActorMaterializerSettings Create(Akka.Actor.ActorSystem system) { }
public override bool Equals(object obj) { }
public Akka.Streams.ActorMaterializerSettings WithAutoFusing(bool isAutoFusing) { }
public Akka.Streams.ActorMaterializerSettings WithDebugLogging(bool isEnabled) { }
public Akka.Streams.ActorMaterializerSettings WithDispatcher(string dispatcher) { }
public Akka.Streams.ActorMaterializerSettings WithFuzzingMode(bool isFuzzingMode) { }
public Akka.Streams.ActorMaterializerSettings WithInputBuffer(int initialSize, int maxSize) { }
public Akka.Streams.ActorMaterializerSettings WithMaxFixedBufferSize(int maxFixedBufferSize) { }
public Akka.Streams.ActorMaterializerSettings WithOutputBurstLimit(int limit) { }
public Akka.Streams.ActorMaterializerSettings WithStreamRefSettings(Akka.Streams.Dsl.StreamRefSettings settings) { }
public Akka.Streams.ActorMaterializerSettings WithSubscriptionTimeoutSettings(Akka.Streams.StreamSubscriptionTimeoutSettings settings) { }
public Akka.Streams.ActorMaterializerSettings WithSupervisionStrategy(Akka.Streams.Supervision.Decider decider) { }
Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka.Cluster.Tests/ClusterConfigSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public void Clustering_must_be_able_to_parse_generic_cluster_config_elements()
settings.MinNrOfMembers.Should().Be(1);
settings.MinNrOfMembersOfRole.Should().Equal(ImmutableDictionary<string, int>.Empty);
settings.Roles.Should().BeEquivalentTo(ImmutableHashSet<string>.Empty);
settings.UseDispatcher.Should().Be(Dispatchers.DefaultDispatcherId);
settings.UseDispatcher.Should().Be(Dispatchers.InternalDispatcherId);
settings.GossipDifferentViewProbability.Should().Be(0.8);
settings.ReduceGossipDifferentViewProbability.Should().Be(400);

Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka.Cluster/ClusterSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public ClusterSettings(Config config, string systemName)
MinNrOfMembers = clusterConfig.GetInt("min-nr-of-members", 0);

_useDispatcher = clusterConfig.GetString("use-dispatcher", null);
if (String.IsNullOrEmpty(_useDispatcher)) _useDispatcher = Dispatchers.DefaultDispatcherId;
if (string.IsNullOrEmpty(_useDispatcher)) _useDispatcher = Dispatchers.InternalDispatcherId;
GossipDifferentViewProbability = clusterConfig.GetDouble("gossip-different-view-probability", 0);
ReduceGossipDifferentViewProbability = clusterConfig.GetInt("reduce-gossip-different-view-probability", 0);
SchedulerTickDuration = clusterConfig.GetTimeSpan("scheduler.tick-duration", null);
Expand Down
4 changes: 2 additions & 2 deletions src/core/Akka.Cluster/Configuration/Cluster.conf
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,8 @@ akka {
# Disable with "off".
publish-stats-interval = off

# The id of the dispatcher to use for cluster actors. If not specified
# default dispatcher is used.
# The id of the dispatcher to use for cluster actors.
# If not specified, the internal dispatcher is used.
# If specified you need to define the settings of the actual dispatcher.
use-dispatcher = ""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
<TargetFrameworks>$(NetStandardLibVersion)</TargetFrameworks>
<EnableDefaultCompileItems>false</EnableDefaultCompileItems>
<GenerateAssemblyInfo>false</GenerateAssemblyInfo>
<!-- FIX for "<GenerateTargetFrameworkAttribute>false</GenerateTargetFrameworkAttribute>..." build error with the latest VS2019 update-->
<GenerateTargetFrameworkAttribute>false</GenerateTargetFrameworkAttribute>
<!-- END FIX-->
</PropertyGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ public void A_UnfoldResourceAsyncSource_must_use_dedicated_blocking_io_dispatche
var actorRef = refs.First(@ref => @ref.Path.ToString().Contains("unfoldResourceSourceAsync"));
try
{
Utils.AssertDispatcher(actorRef, "akka.stream.default-blocking-io-dispatcher");
Utils.AssertDispatcher(actorRef, ActorAttributes.IODispatcher.Name);
}
finally
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ public void A_UnfoldResourceSource_must_use_dedicated_blocking_io_dispatcher_by_
var actorRef = refs.First(@ref => @ref.Path.ToString().Contains("unfoldResourceSource"));
try
{
Utils.AssertDispatcher(actorRef, "akka.stream.default-blocking-io-dispatcher");
Utils.AssertDispatcher(actorRef, ActorAttributes.IODispatcher.Name);
}
finally
{
Expand Down
Loading

0 comments on commit a80ddd7

Please sign in to comment.