From ba9ff3ec33c43ea8940d9b2d5cd792474432b237 Mon Sep 17 00:00:00 2001 From: Ismael Hamed <1279846+ismaelhamed@users.noreply.github.com> Date: Tue, 24 Jan 2023 16:39:43 +0100 Subject: [PATCH] [BACKPORT] Several stashing improvements (#6323, #6325 and #6327) (#6358) * Read stash capacity from actor's mailbox or dispatcher configuration (#6323) * Added support for `UnrestrictedStash` (#6325) * Add API for UntypedActorWithStash types (#6327) Co-authored-by: Aaron Stannard Co-authored-by: Aaron Stannard --- docs/articles/actors/receive-actor-api.md | 20 +- docs/articles/actors/untyped-actor-api.md | 18 +- docs/cSpell.json | 1 + .../CoreAPISpec.ApproveCore.verified.txt | 43 +++- ...oreAPISpec.ApprovePersistence.verified.txt | 8 +- ...c.ApprovePersistenceSqlCommon.verified.txt | 4 +- .../Actor/Stash/ActorWithBoundedStashSpec.cs | 210 ++++++++++++++++++ .../Akka/Actor/Stash/IWithBoundedStash.cs | 4 +- src/core/Akka/Actor/Stash/IWithStash.cs | 37 +++ .../Akka/Actor/Stash/IWithUnboundedStash.cs | 7 +- .../Actor/Stash/IWithUnrestrictedStash.cs | 19 ++ .../Actor/Stash/Internal/AbstractStash.cs | 171 ++++++++------ .../Actor/Stash/Internal/BoundedStashImpl.cs | 5 +- .../Stash/Internal/UnboundedStashImpl.cs | 2 +- .../Stash/Internal/UnrestrictedStashImpl.cs | 16 ++ src/core/Akka/Actor/Stash/StashFactory.cs | 19 +- src/core/Akka/Actor/UntypedActorWithStash.cs | 59 +++++ src/core/Akka/Dispatch/Dispatchers.cs | 30 +++ .../Akka/Dispatch/IRequiresMessageQueue.cs | 17 +- src/core/Akka/Dispatch/Mailbox.cs | 110 ++++++--- src/core/Akka/Dispatch/Mailboxes.cs | 90 +++++--- 21 files changed, 705 insertions(+), 185 deletions(-) create mode 100644 src/core/Akka.Tests/Actor/Stash/ActorWithBoundedStashSpec.cs create mode 100644 src/core/Akka/Actor/Stash/IWithStash.cs create mode 100644 src/core/Akka/Actor/Stash/IWithUnrestrictedStash.cs create mode 100644 src/core/Akka/Actor/Stash/Internal/UnrestrictedStashImpl.cs create mode 100644 src/core/Akka/Actor/UntypedActorWithStash.cs diff --git a/docs/articles/actors/receive-actor-api.md b/docs/articles/actors/receive-actor-api.md index 9f8b4f79f96..45aa1c8eebb 100644 --- a/docs/articles/actors/receive-actor-api.md +++ b/docs/articles/actors/receive-actor-api.md @@ -790,12 +790,15 @@ static void Main(string[] args) ## Stash -The `IWithUnboundedStash` interface enables an actor to temporarily stash away messages that can not or should not be handled using the actor's current behavior. Upon changing the actor's message handler, i.e., right before invoking `Context.BecomeStacked()` or `Context.UnbecomeStacked()`;, all stashed messages can be "un-stashed", thereby prepending them to the actor's mailbox. This way, the stashed messages can be processed in the same order as they have been received originally. An actor that implements `IWithUnboundedStash` will automatically get a dequeue-based mailbox. +The `IWithStash` interface enables an actor to temporarily stash away messages that can not or should not be handled using the actor's current behavior. Upon changing the actor's message handler, i.e., right before invoking `Context.Become()` or `Context.Unbecome()`, all stashed messages can be "un-stashed", thereby prepending them to the actor's mailbox. This way, the stashed messages can be processed in the same order as they have been received originally. -Here is an example of the `IWithUnboundedStash` interface in action: +> [!NOTE] +> The interface `IWithStash` implements the marker interface `IRequiresMessageQueue` which requests the system to automatically choose a deque-based mailbox implementation for the actor (defaults to an unbounded deque mailbox). If you want more control over the mailbox, see the documentation on mailboxes: [Mailboxes](xref:mailboxes). + +Here is an example of the `IWithStash` interface in action: ```csharp -public class ActorWithProtocol : ReceiveActor, IWithUnboundedStash +public class ActorWithProtocol : ReceiveActor, IWithStash { public IStash Stash { get; set; } @@ -827,11 +830,18 @@ public class ActorWithProtocol : ReceiveActor, IWithUnboundedStash } ``` -Invoking `Stash()` adds the current message (the message that the actor received last) to the actor's stash. It is typically invoked when handling the default case in the actor's message handler to stash messages that aren't handled by the other cases. It is illegal to stash the same message twice; to do so results in an `IllegalStateException` being thrown. The stash may also be bounded in which case invoking `Stash()` may lead to a capacity violation, which results in a `StashOverflowException`. The capacity of the stash can be configured using the stash-capacity setting (an `Int`) of the mailbox's configuration. +Invoking `Stash()` adds the current message (the message that the actor received last) to the actor's stash. It is typically invoked when handling the default case in the actor's message handler to stash messages that aren't handled by the other cases. It is illegal to stash the same message twice; to do so results in an `IllegalStateException` being thrown. The stash may also be bounded in which case invoking `Stash()` may lead to a capacity violation, which results in a `StashOverflowException`. The capacity of the stash can be configured using the stash-capacity setting (an `int`) of the mailbox's configuration. Invoking `UnstashAll()` enqueues messages from the stash to the actor's mailbox until the capacity of the mailbox (if any) has been reached (note that messages from the stash are prepended to the mailbox). In case a bounded mailbox overflows, a `MessageQueueAppendFailedException` is thrown. The stash is guaranteed to be empty after calling `UnstashAll()`. -Note that the `stash` is part of the ephemeral actor state, unlike the mailbox. Therefore, it should be managed like other parts of the actor's state which have the same property. The `IWithUnboundedStash` interface implementation of `PreRestart` will call `UnstashAll()`, which is usually the desired behavior. +Note that the stash is part of the ephemeral actor state, unlike the mailbox. Therefore, it should be managed like other parts of the actor's state which have the same property. + +However, the `IWithStash` interface implementation of `PreRestart` will call `UnstashAll()`. This means that before the actor restarts, it will transfer all stashed messages back to the actor’s mailbox. + +The result of this is that when an actor is restarted, any stashed messages will be delivered to the new incarnation of the actor. This is usually the desired behavior. + +> [!NOTE] +> If you want to enforce that your actor can only work with an unbounded stash, then you should use the `IWithUnboundedStash` interface instead. ## Killing an Actor diff --git a/docs/articles/actors/untyped-actor-api.md b/docs/articles/actors/untyped-actor-api.md index d80054d2756..db701b8f378 100644 --- a/docs/articles/actors/untyped-actor-api.md +++ b/docs/articles/actors/untyped-actor-api.md @@ -708,15 +708,16 @@ static void Main(string[] args) ## Stash -The `IWithUnboundedStash` interface enables an actor to temporarily stash away messages that can not or should not be handled using the actor's current behavior. Upon changing the actor's message handler, i.e., right before invoking `Context.BecomeStacked()` or `Context.UnbecomeStacked()`;, all stashed messages can be "un-stashed", thereby prepending them to the actor's mailbox. This way, the stashed messages can be processed in the same order as they have been received originally. An actor that implements `IWithUnboundedStash` will automatically get a dequeue-based mailbox. +The `UntypedActorWithStash` class enables an actor to temporarily stash away messages that can not or should not be handled using the actor's current behavior. Upon changing the actor's message handler, i.e., right before invoking `Context.Become()` or `Context.Unbecome()`, all stashed messages can be "un-stashed", thereby prepending them to the actor's mailbox. This way, the stashed messages can be processed in the same order as they have been received originally. An actor that extends `UntypedActorWithStash` will automatically get a deque-based mailbox. -Here is an example of the `IWithUnboundedStash` interface in action: +> [!NOTE] +> The abstract class `UntypedActorWithStash` implements the marker interface `IRequiresMessageQueue` which requests the system to automatically choose a deque-based mailbox implementation for the actor. If you want more control over the mailbox, see the documentation on mailboxes: [Mailboxes](xref:mailboxes). + +Here is an example of the `UntypedActorWithStash` interface in action: ```csharp -public class ActorWithProtocol : UntypedActor, IWithUnboundedStash +public class ActorWithProtocol : UntypedActorWithStash { - public IStash Stash { get; set; } - protected override void OnReceive(object message) { switch (message) @@ -748,11 +749,14 @@ public class ActorWithProtocol : UntypedActor, IWithUnboundedStash } ``` -Invoking `Stash()` adds the current message (the message that the actor received last) to the actor's stash. It is typically invoked when handling the default case in the actor's message handler to stash messages that aren't handled by the other cases. It is illegal to stash the same message twice; to do so results in an `IllegalStateException` being thrown. The stash may also be bounded in which case invoking `Stash()` may lead to a capacity violation, which results in a `StashOverflowException`. The capacity of the stash can be configured using the stash-capacity setting (an `Int`) of the mailbox's configuration. +Invoking `Stash()` adds the current message (the message that the actor received last) to the actor's stash. It is typically invoked when handling the default case in the actor's message handler to stash messages that aren't handled by the other cases. It is illegal to stash the same message twice; to do so results in an `IllegalStateException` being thrown. The stash may also be bounded in which case invoking `Stash()` may lead to a capacity violation, which results in a `StashOverflowException`. The capacity of the stash can be configured using the stash-capacity setting (an `int`) of the mailbox's configuration. Invoking `UnstashAll()` enqueues messages from the stash to the actor's mailbox until the capacity of the mailbox (if any) has been reached (note that messages from the stash are prepended to the mailbox). In case a bounded mailbox overflows, a `MessageQueueAppendFailedException` is thrown. The stash is guaranteed to be empty after calling `UnstashAll()`. -Note that the `stash` is part of the ephemeral actor state, unlike the mailbox. Therefore, it should be managed like other parts of the actor's state which have the same property. The `IWithUnboundedStash` interface implementation of `PreRestart` will call `UnstashAll()`, which is usually the desired behavior. +Note that the stash is part of the ephemeral actor state, unlike the mailbox. Therefore, it should be managed like other parts of the actor's state which have the same property. The `UntypedActorWithStash` implementation of `PreRestart` will call `UnstashAll()`, which is usually the desired behavior. + +> [!NOTE] +> If you want to enforce that your actor can only work with an unbounded stash, then you should use the `UntypedActorWithUnboundedStash` class instead. ## Killing an Actor diff --git a/docs/cSpell.json b/docs/cSpell.json index 562380cd646..011d54c0889 100644 --- a/docs/cSpell.json +++ b/docs/cSpell.json @@ -25,6 +25,7 @@ "CRDT", "datacenter", "denormalization", + "deque", "deserialization", "downstream", "downstreams", diff --git a/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.verified.txt b/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.verified.txt index 33d56906676..1b04678d3fc 100644 --- a/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.verified.txt +++ b/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.verified.txt @@ -1175,14 +1175,15 @@ namespace Akka.Actor void Become(Akka.Actor.UntypedReceive receive); void BecomeStacked(Akka.Actor.UntypedReceive receive); } - [System.ObsoleteAttribute("Bounded stashing is not yet implemented. Unbounded stashing will be used instead " + - "[0.7.0]")] - public interface IWithBoundedStash : Akka.Actor.IActorStash, Akka.Dispatch.IRequiresMessageQueue { } + [System.ObsoleteAttribute("Use `IWithStash` with a configured BoundedDeque-based mailbox instead.")] + public interface IWithBoundedStash : Akka.Actor.IActorStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue { } + public interface IWithStash : Akka.Actor.IActorStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue { } public interface IWithTimers { Akka.Actor.ITimerScheduler Timers { get; set; } } - public interface IWithUnboundedStash : Akka.Actor.IActorStash, Akka.Dispatch.IRequiresMessageQueue { } + public interface IWithUnboundedStash : Akka.Actor.IActorStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue { } + public interface IWithUnrestrictedStash : Akka.Actor.IActorStash { } public interface IWrappedMessage { object Message { get; } @@ -1857,6 +1858,21 @@ namespace Akka.Actor protected void RunTask(System.Action action) { } protected void RunTask(System.Func action) { } } + public abstract class UntypedActorWithStash : Akka.Actor.UntypedActor, Akka.Actor.IActorStash, Akka.Actor.IWithStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue + { + protected UntypedActorWithStash() { } + public Akka.Actor.IStash Stash { get; set; } + } + public abstract class UntypedActorWithUnboundedStash : Akka.Actor.UntypedActor, Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue + { + protected UntypedActorWithUnboundedStash() { } + public Akka.Actor.IStash Stash { get; set; } + } + public abstract class UntypedActorWithUnrestrictedStash : Akka.Actor.UntypedActor, Akka.Actor.IActorStash, Akka.Actor.IWithUnrestrictedStash + { + protected UntypedActorWithUnrestrictedStash() { } + public Akka.Actor.IStash Stash { get; set; } + } public delegate void UntypedReceive(object message); public class static WrappedMessage { @@ -1927,13 +1943,13 @@ namespace Akka.Actor.Internal { public abstract class AbstractStash : Akka.Actor.IStash { - protected AbstractStash(Akka.Actor.IActorContext context, int capacity = 100) { } + protected AbstractStash(Akka.Actor.IActorContext context) { } public System.Collections.Generic.IEnumerable ClearStash() { } public void Prepend(System.Collections.Generic.IEnumerable envelopes) { } public void Stash() { } public void Unstash() { } public void UnstashAll() { } - public void UnstashAll(System.Func predicate) { } + public void UnstashAll(System.Func filterPredicate) { } } public class ActorSystemImpl : Akka.Actor.ExtendedActorSystem { @@ -1981,7 +1997,7 @@ namespace Akka.Actor.Internal } public class BoundedStashImpl : Akka.Actor.Internal.AbstractStash { - public BoundedStashImpl(Akka.Actor.IActorContext context, int capacity = 100) { } + public BoundedStashImpl(Akka.Actor.IActorContext context) { } } public class ChildNameReserved : Akka.Actor.Internal.IChildStats { @@ -2520,14 +2536,14 @@ namespace Akka.Dispatch public static void RunTask(System.Func asyncAction) { } protected override bool TryExecuteTaskInline(System.Threading.Tasks.Task task, bool taskWasPreviouslyQueued) { } } - public sealed class BoundedDequeBasedMailbox : Akka.Dispatch.MailboxType, Akka.Dispatch.IProducesMessageQueue + public class BoundedDequeBasedMailbox : Akka.Dispatch.MailboxType, Akka.Dispatch.IProducesMessageQueue, Akka.Dispatch.IProducesPushTimeoutSemanticsMailbox { public BoundedDequeBasedMailbox(Akka.Actor.Settings settings, Akka.Configuration.Config config) { } public int Capacity { get; } public System.TimeSpan PushTimeout { get; } public override Akka.Dispatch.MessageQueues.IMessageQueue Create(Akka.Actor.IActorRef owner, Akka.Actor.ActorSystem system) { } } - public sealed class BoundedMailbox : Akka.Dispatch.MailboxType, Akka.Dispatch.IProducesMessageQueue + public sealed class BoundedMailbox : Akka.Dispatch.MailboxType, Akka.Dispatch.IProducesMessageQueue, Akka.Dispatch.IProducesPushTimeoutSemanticsMailbox { public BoundedMailbox(Akka.Actor.Settings settings, Akka.Configuration.Config config) { } public int Capacity { get; } @@ -2583,6 +2599,8 @@ namespace Akka.Dispatch public Akka.Configuration.Config DefaultDispatcherConfig { get; } public Akka.Dispatch.MessageDispatcher DefaultGlobalDispatcher { get; } public Akka.Dispatch.IDispatcherPrerequisites Prerequisites { get; } + [Akka.Annotations.InternalApiAttribute()] + public static Akka.Configuration.Config GetConfig(Akka.Configuration.Config config, string id, int depth = 0) { } public bool HasDispatcher(string id) { } public Akka.Dispatch.MessageDispatcher Lookup(string dispatcherName) { } public bool RegisterConfigurator(string id, Akka.Dispatch.MessageDispatcherConfigurator configurator) { } @@ -2636,6 +2654,10 @@ namespace Akka.Dispatch public interface IMultipleConsumerSemantics : Akka.Dispatch.ISemantics { } public interface IProducesMessageQueue where TQueue : Akka.Dispatch.MessageQueues.IMessageQueue { } + public interface IProducesPushTimeoutSemanticsMailbox + { + System.TimeSpan PushTimeout { get; } + } public interface IRequiresMessageQueue where T : Akka.Dispatch.ISemantics { } public interface IRunnable @@ -2669,13 +2691,14 @@ namespace Akka.Dispatch public static readonly string NoMailboxRequirement; public Mailboxes(Akka.Actor.ActorSystem system) { } public Akka.Actor.DeadLetterMailbox DeadLetterMailbox { get; } - public System.Type FromConfig(string path) { } public Akka.Dispatch.MailboxType GetMailboxType(Akka.Actor.Props props, Akka.Configuration.Config dispatcherConfig) { } public System.Type GetRequiredType(System.Type actorType) { } public bool HasRequiredType(System.Type actorType) { } public Akka.Dispatch.MailboxType Lookup(string id) { } public Akka.Dispatch.MailboxType LookupByQueueType(System.Type queueType) { } public bool ProducesMessageQueue(System.Type mailboxType) { } + [Akka.Annotations.InternalApiAttribute()] + public int StashCapacity(string dispatcher, string mailbox) { } } public abstract class MessageDispatcher { diff --git a/src/core/Akka.API.Tests/CoreAPISpec.ApprovePersistence.verified.txt b/src/core/Akka.API.Tests/CoreAPISpec.ApprovePersistence.verified.txt index 05759e954d8..c967f712df1 100644 --- a/src/core/Akka.API.Tests/CoreAPISpec.ApprovePersistence.verified.txt +++ b/src/core/Akka.API.Tests/CoreAPISpec.ApprovePersistence.verified.txt @@ -209,7 +209,7 @@ namespace Akka.Persistence { public static Akka.Persistence.DiscardToDeadLetterStrategy Instance { get; } } - public abstract class Eventsourced : Akka.Actor.ActorBase, Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Dispatch.IRequiresMessageQueue, Akka.Persistence.IPersistenceRecovery, Akka.Persistence.IPersistenceStash, Akka.Persistence.IPersistentIdentity + public abstract class Eventsourced : Akka.Actor.ActorBase, Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue, Akka.Persistence.IPersistenceRecovery, Akka.Persistence.IPersistenceStash, Akka.Persistence.IPersistentIdentity { public static readonly System.Func UnstashFilterPredicate; protected Eventsourced() { } @@ -270,7 +270,7 @@ namespace Akka.Persistence { Akka.Persistence.Recovery Recovery { get; } } - public interface IPersistenceStash : Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Dispatch.IRequiresMessageQueue + public interface IPersistenceStash : Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue { Akka.Persistence.IStashOverflowStrategy InternalStashOverflowStrategy { get; } } @@ -844,7 +844,7 @@ namespace Akka.Persistence.Journal protected System.Exception TryUnwrapException(System.Exception e) { } protected abstract System.Threading.Tasks.Task> WriteMessagesAsync(System.Collections.Generic.IEnumerable messages); } - public abstract class AsyncWriteProxy : Akka.Persistence.Journal.AsyncWriteJournal, Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Dispatch.IRequiresMessageQueue + public abstract class AsyncWriteProxy : Akka.Persistence.Journal.AsyncWriteJournal, Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue { protected AsyncWriteProxy() { } public Akka.Actor.IStash Stash { get; set; } @@ -980,7 +980,7 @@ namespace Akka.Persistence.Journal public System.Collections.Generic.IDictionary> Update(string pid, long seqNr, System.Func updater) { } protected override System.Threading.Tasks.Task> WriteMessagesAsync(System.Collections.Generic.IEnumerable messages) { } } - public class PersistencePluginProxy : Akka.Actor.ActorBase, Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Dispatch.IRequiresMessageQueue + public class PersistencePluginProxy : Akka.Actor.ActorBase, Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue { public PersistencePluginProxy(Akka.Configuration.Config config) { } public Akka.Actor.IStash Stash { get; set; } diff --git a/src/core/Akka.API.Tests/CoreAPISpec.ApprovePersistenceSqlCommon.verified.txt b/src/core/Akka.API.Tests/CoreAPISpec.ApprovePersistenceSqlCommon.verified.txt index f3dfaddb419..9cb9bae2b7f 100644 --- a/src/core/Akka.API.Tests/CoreAPISpec.ApprovePersistenceSqlCommon.verified.txt +++ b/src/core/Akka.API.Tests/CoreAPISpec.ApprovePersistenceSqlCommon.verified.txt @@ -272,7 +272,7 @@ namespace Akka.Persistence.Sql.Common.Journal public long Offset { get; } public Akka.Actor.IActorRef ReplyTo { get; } } - public abstract class SqlJournal : Akka.Persistence.Journal.AsyncWriteJournal, Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Dispatch.IRequiresMessageQueue + public abstract class SqlJournal : Akka.Persistence.Journal.AsyncWriteJournal, Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue { protected SqlJournal(Akka.Configuration.Config journalConfig) { } protected bool HasNewEventSubscribers { get; } @@ -424,7 +424,7 @@ namespace Akka.Persistence.Sql.Common.Snapshot public readonly System.DateTime Timestamp; public SnapshotEntry(string persistenceId, long sequenceNr, System.DateTime timestamp, string manifest, object payload) { } } - public abstract class SqlSnapshotStore : Akka.Persistence.Snapshot.SnapshotStore, Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Dispatch.IRequiresMessageQueue + public abstract class SqlSnapshotStore : Akka.Persistence.Snapshot.SnapshotStore, Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue { protected SqlSnapshotStore(Akka.Configuration.Config config) { } protected Akka.Event.ILoggingAdapter Log { get; } diff --git a/src/core/Akka.Tests/Actor/Stash/ActorWithBoundedStashSpec.cs b/src/core/Akka.Tests/Actor/Stash/ActorWithBoundedStashSpec.cs new file mode 100644 index 00000000000..60f77d1c602 --- /dev/null +++ b/src/core/Akka.Tests/Actor/Stash/ActorWithBoundedStashSpec.cs @@ -0,0 +1,210 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2022 Lightbend Inc. +// Copyright (C) 2013-2022 .NET Foundation +// +//----------------------------------------------------------------------- + +using System; +using System.Text.RegularExpressions; +using System.Threading.Tasks; +using Akka.Actor; +using Akka.Configuration; +using Akka.Dispatch; +using Akka.Event; +using Akka.TestKit; +using Xunit; +using Xunit.Abstractions; + +namespace Akka.Tests.Actor.Stash +{ + public class StashingActor : UntypedActor, IWithBoundedStash + { + public IStash Stash { get; set; } + + protected override void OnReceive(object message) + { + if (message is string s && s.StartsWith("hello")) + { + Stash.Stash(); + Sender.Tell("ok"); + } + else if (message.Equals("world")) + { + Context.Become(AfterWorldBehavior); + Stash.UnstashAll(); + } + } + + private void AfterWorldBehavior(object message) => Stash.Stash(); + } + + public class StashingActorWithOverflow : UntypedActor, IWithBoundedStash + { + private int numStashed = 0; + + public IStash Stash { get; set; } + + protected override void OnReceive(object message) + { + if (!(message is string s) || !s.StartsWith("hello")) + return; + + numStashed++; + try + { + Stash.Stash(); + Sender.Tell("ok"); + } + catch (Exception ex) when (ex is StashOverflowException) + { + if (numStashed == 21) + { + Sender.Tell("STASHOVERFLOW"); + Context.Stop(Self); + } + else + { + Sender.Tell("Unexpected StashOverflowException: " + numStashed); + } + } + } + } + + // bounded deque-based mailbox with capacity 10 + public class Bounded10 : BoundedDequeBasedMailbox + { + public Bounded10(Settings settings, Config config) + : base(settings, config) + { } + } + + public class Bounded100 : BoundedDequeBasedMailbox + { + public Bounded100(Settings settings, Config config) + : base(settings, config) + { } + } + + public class ActorWithBoundedStashSpec : AkkaSpec + { + private static Config SpecConfig => ConfigurationFactory.ParseString($@" + akka.loggers = [""Akka.TestKit.TestEventListener, Akka.TestKit""] + my-dispatcher-1 {{ + mailbox-type = ""{typeof(Bounded10).AssemblyQualifiedName}"" + mailbox-capacity = 10 + mailbox-push-timeout-time = 500ms + stash-capacity = 20 + }} + my-dispatcher-2 {{ + mailbox-type = ""{typeof(Bounded100).AssemblyQualifiedName}"" + mailbox-capacity = 100 + mailbox-push-timeout-time = 500ms + stash-capacity = 20 + }} + my-aliased-dispatcher-1 = my-dispatcher-1 + my-aliased-dispatcher-2 = my-aliased-dispatcher-1 + my-mailbox-1 {{ + mailbox-type = ""{typeof(Bounded10).AssemblyQualifiedName}"" + mailbox-capacity = 10 + mailbox-push-timeout-time = 500ms + stash-capacity = 20 + }} + my-mailbox-2 {{ + mailbox-type = ""{typeof(Bounded100).AssemblyQualifiedName}"" + mailbox-capacity = 100 + mailbox-push-timeout-time = 500ms + stash-capacity = 20 + }}"); + + public ActorWithBoundedStashSpec(ITestOutputHelper outputHelper) + : base(outputHelper, SpecConfig) + { + Sys.EventStream.Subscribe(TestActor, typeof(DeadLetter)); + } + + protected override void AtStartup() + { + base.AtStartup(); + Sys.EventStream.Publish(EventFilter.Warning(pattern: new Regex(".*Received dead letter from.*hello.*")).Mute()); + } + + protected override void AfterAll() + { + base.AfterAll(); + Sys.EventStream.Unsubscribe(TestActor, typeof(DeadLetter)); + } + + private void TestDeadLetters(IActorRef stasher) + { + for (var n = 1; n <= 11; n++) + { + stasher.Tell("hello" + n); + ExpectMsg("ok"); + } + + // cause unstashAll with capacity violation + stasher.Tell("world"); + ExpectMsg().Equals(new DeadLetter("hello1", TestActor, stasher)); + + AwaitCondition(() => ExpectMsg() != null); + + stasher.Tell(PoisonPill.Instance); + // stashed messages are sent to deadletters when stasher is stopped + for (var n = 2; n <= 11; n++) + ExpectMsg().Equals(new DeadLetter("hello" + n, TestActor, stasher)); + } + + private void TestStashOverflowException(IActorRef stasher) + { + // fill up stash + for (var n = 1; n <= 20; n++) + { + stasher.Tell("hello" + n); + ExpectMsg("ok"); + } + + stasher.Tell("hello21"); + ExpectMsg("STASHOVERFLOW"); + + // stashed messages are sent to deadletters when stasher is stopped + for (var n = 1; n <= 20; n++) + ExpectMsg().Equals(new DeadLetter("hello" + n, TestActor, stasher)); + } + + [Fact(Skip = "DequeWrapperMessageQueue implementations are not actually double-ended queues, so this cannot currently work.")] + public void An_actor_with_stash_must_end_up_in_DeadLetters_in_case_of_a_capacity_violation_when_configure_via_dispatcher() + { + var stasher = Sys.ActorOf(Props.Create().WithDispatcher("my-dispatcher-1")); + TestDeadLetters(stasher); + } + + [Fact(Skip = "DequeWrapperMessageQueue implementations are not actually double-ended queues, so this cannot currently work.")] + public void An_actor_with_stash_must_end_up_in_DeadLetters_in_case_of_a_capacity_violation_when_configure_via_mailbox() + { + var stasher = Sys.ActorOf(Props.Create().WithDispatcher("my-mailbox-1")); + TestDeadLetters(stasher); + } + + [Fact] + public void An_actor_with_stash_must_throw_a_StashOverflowException_in_case_of_a_stash_capacity_violation_when_configured_via_dispatcher() + { + var stasher = Sys.ActorOf(Props.Create().WithDispatcher("my-dispatcher-2")); + TestStashOverflowException(stasher); + } + + [Fact] + public void An_actor_with_stash_must_throw_a_StashOverflowException_in_case_of_a_stash_capacity_violation_when_configured_via_mailbox() + { + var stasher = Sys.ActorOf(Props.Create().WithDispatcher("my-mailbox-2")); + TestStashOverflowException(stasher); + } + + [Fact] + public void An_actor_with_stash_must_get_stash_capacity_from_aliased_dispatchers() + { + var stasher = Sys.ActorOf(Props.Create().WithDispatcher("my-aliased-dispatcher-2")); + TestStashOverflowException(stasher); + } + } +} diff --git a/src/core/Akka/Actor/Stash/IWithBoundedStash.cs b/src/core/Akka/Actor/Stash/IWithBoundedStash.cs index 4777155f92c..26b1f8a955b 100644 --- a/src/core/Akka/Actor/Stash/IWithBoundedStash.cs +++ b/src/core/Akka/Actor/Stash/IWithBoundedStash.cs @@ -17,8 +17,8 @@ namespace Akka.Actor /// public IStash Stash { get; set; } /// // ReSharper disable once InconsistentNaming - [Obsolete("Bounded stashing is not yet implemented. Unbounded stashing will be used instead [0.7.0]")] - public interface IWithBoundedStash : IActorStash, IRequiresMessageQueue + [Obsolete("Use `IWithStash` with a configured BoundedDeque-based mailbox instead.")] + public interface IWithBoundedStash : IWithUnrestrictedStash, IRequiresMessageQueue { } } diff --git a/src/core/Akka/Actor/Stash/IWithStash.cs b/src/core/Akka/Actor/Stash/IWithStash.cs new file mode 100644 index 00000000000..bcc7b2cafe5 --- /dev/null +++ b/src/core/Akka/Actor/Stash/IWithStash.cs @@ -0,0 +1,37 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2022 Lightbend Inc. +// Copyright (C) 2013-2022 .NET Foundation +// +//----------------------------------------------------------------------- + +using Akka.Dispatch; + +namespace Akka.Actor +{ + /// + /// The `IWithStash` interface enables an actor to temporarily stash away messages that can not or + /// should not be handled using the actor's current behavior. + /// + /// Note that the `IWithStash` interface can only be used together with actors that have a deque-based + /// mailbox. By default Stash based actors request a Deque based mailbox since the stash + /// interface extends . + /// + /// You can override the default mailbox provided when `IDequeBasedMessageQueueSemantics` are requested via config: + /// + /// akka.actor.mailbox.requirements { + /// "Akka.Dispatch.IBoundedDequeBasedMessageQueueSemantics" = your-custom-mailbox + /// } + /// + /// Alternatively, you can add your own requirement marker to the actor and configure a mailbox type to be used + /// for your marker. + /// + /// For a `Stash` that also enforces unboundedness of the deque see . For a `Stash` + /// that does not enforce any mailbox type see . + /// + /// + public interface IWithStash : IWithUnrestrictedStash, IRequiresMessageQueue + { + } +} + diff --git a/src/core/Akka/Actor/Stash/IWithUnboundedStash.cs b/src/core/Akka/Actor/Stash/IWithUnboundedStash.cs index d8f9d8632a7..79603378f7f 100644 --- a/src/core/Akka/Actor/Stash/IWithUnboundedStash.cs +++ b/src/core/Akka/Actor/Stash/IWithUnboundedStash.cs @@ -10,13 +10,10 @@ namespace Akka.Actor { /// - /// Lets the know that this Actor needs stash support - /// with unrestricted storage capacity. - /// You need to add the property: - /// public IStash Stash { get; set; } + /// The `IWithUnboundedStash` interface is a version of that enforces an unbounded stash for you actor. /// // ReSharper disable once InconsistentNaming - public interface IWithUnboundedStash : IActorStash, IRequiresMessageQueue + public interface IWithUnboundedStash : IWithUnrestrictedStash, IRequiresMessageQueue { } } diff --git a/src/core/Akka/Actor/Stash/IWithUnrestrictedStash.cs b/src/core/Akka/Actor/Stash/IWithUnrestrictedStash.cs new file mode 100644 index 00000000000..65d16dc7935 --- /dev/null +++ b/src/core/Akka/Actor/Stash/IWithUnrestrictedStash.cs @@ -0,0 +1,19 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2022 Lightbend Inc. +// Copyright (C) 2013-2022 .NET Foundation +// +//----------------------------------------------------------------------- + +using Akka.Dispatch; + +namespace Akka.Actor +{ + /// + /// A version of that does not enforce any mailbox type. The proper mailbox has to be configured + /// manually, and the mailbox should extend the marker interface. + /// + public interface IWithUnrestrictedStash : IActorStash + { + } +} diff --git a/src/core/Akka/Actor/Stash/Internal/AbstractStash.cs b/src/core/Akka/Actor/Stash/Internal/AbstractStash.cs index 4de6bf898d4..fc1098bdbf8 100644 --- a/src/core/Akka/Actor/Stash/Internal/AbstractStash.cs +++ b/src/core/Akka/Actor/Stash/Internal/AbstractStash.cs @@ -1,7 +1,7 @@ //----------------------------------------------------------------------- // -// Copyright (C) 2009-2021 Lightbend Inc. -// Copyright (C) 2013-2021 .NET Foundation +// Copyright (C) 2009-2022 Lightbend Inc. +// Copyright (C) 2013-2022 .NET Foundation // //----------------------------------------------------------------------- @@ -15,54 +15,63 @@ namespace Akka.Actor.Internal { - /// INTERNAL - /// Abstract base class for stash support - /// Note! Part of internal API. Breaking changes may occur without notice. Use at own risk. + /// + /// INTERNAL API + /// + /// Support class for implementing a stash for an actor instance. A default stash per actor (= user stash) + /// is maintained by this class. Actors that explicitly need other stashes + /// (optionally in addition to and isolated from the user stash) can create new stashes via . + /// /// public abstract class AbstractStash : IStash { - private LinkedList _theStash; + /// + /// The private stash of the actor. It is only accessible using and + /// . + /// + private LinkedList _theStash = new(); + private readonly ActorCell _actorCell; + + /// + /// The capacity of the stash. Configured in the actor's mailbox or dispatcher config. + /// private readonly int _capacity; - /// INTERNAL - /// Abstract base class for stash support - /// Note! Part of internal API. Breaking changes may occur without notice. Use at own risk. + /// + /// The actor's deque-based message queue. + /// `mailbox.queue` is the underlying `Deque`. /// - /// TBD - /// TBD - /// This exception is thrown if the actor's mailbox isn't deque-based (e.g. ). - protected AbstractStash(IActorContext context, int capacity = 100) + private readonly IDequeBasedMessageQueueSemantics _mailbox; + + protected AbstractStash(IActorContext context) { var actorCell = (ActorCell)context; - Mailbox = actorCell.Mailbox.MessageQueue as IDequeBasedMessageQueueSemantics; - if(Mailbox == null) + _mailbox = actorCell.Mailbox.MessageQueue as IDequeBasedMessageQueueSemantics; + if (_mailbox == null) { - string message = $@"DequeBasedMailbox required, got: {actorCell.Mailbox.GetType().Name} -An (unbounded) deque-based mailbox can be configured as follows: - my-custom-mailbox {{ - mailbox-type = ""Akka.Dispatch.UnboundedDequeBasedMailbox"" - }}"; - throw new NotSupportedException(message); + var message = $"DequeBasedMailbox required, got: {actorCell.Mailbox.GetType().Name}\n" + + "An (unbounded) deque-based mailbox can be configured as follows:\n" + + "my-custom-mailbox {\n" + + " mailbox-type = \"Akka.Dispatch.UnboundedDequeBasedMailbox\"\n" + + "}\n"; + + throw new ActorInitializationException(actorCell.Self, message); } - _theStash = new LinkedList(); _actorCell = actorCell; - // TODO: capacity needs to come from dispatcher or mailbox config - // https://github.com/akka/akka/blob/master/akka-actor/src/main/scala/akka/actor/Stash.scala#L126 - _capacity = capacity; + // The capacity of the stash. Configured in the actor's mailbox or dispatcher config. + _capacity = context.System.Mailboxes.StashCapacity(context.Props.Dispatcher, context.Props.Mailbox); } - private IDequeBasedMessageQueueSemantics Mailbox { get; } - private int _currentEnvelopeId; /// - /// Stashes the current message in the actor's state. + /// Adds the current message (the message that the actor received last) to the actor's stash. /// /// This exception is thrown if we attempt to stash the same message more than once. /// - /// This exception is thrown in the event that we're using a for the and we've exceeded capacity. + /// This exception is thrown in the event that we're using a for the and we've exceeded capacity. /// public void Stash() { @@ -74,68 +83,90 @@ public void Stash() throw new IllegalActorStateException($"Can't stash the same message {currMsg} more than once"); } _currentEnvelopeId = _actorCell.CurrentEnvelopeId; - - if(_capacity <= 0 || _theStash.Count < _capacity) + + if (_capacity <= 0 || _theStash.Count < _capacity) _theStash.AddLast(new Envelope(currMsg, sender)); - else throw new StashOverflowException($"Couldn't enqueue message {currMsg} to stash of {_actorCell.Self}"); + else + throw new StashOverflowException($"Couldn't enqueue message {currMsg} from ${sender} to stash of {_actorCell.Self}"); } /// - /// Unstash the most recently stashed message (top of the message stack.) + /// Prepends the oldest message in the stash to the mailbox, and then removes that + /// message from the stash. + /// + /// Messages from the stash are enqueued to the mailbox until the capacity of the + /// mailbox (if any) has been reached. In case a bounded mailbox overflows, a + /// `MessageQueueAppendFailedException` is thrown. + /// + /// The unstashed message is guaranteed to be removed from the stash regardless + /// if the call successfully returns or throws an exception. /// public void Unstash() { - if(_theStash.Count > 0) + if (_theStash.Count <= 0) + return; + + try { - try - { - EnqueueFirst(_theStash.Head()); - } - finally - { - _theStash.RemoveFirst(); - } + EnqueueFirst(_theStash.Head()); + } + finally + { + _theStash.RemoveFirst(); } } /// - /// Unstash all of the s in the Stash. + /// Prepends all messages in the stash to the mailbox, and then clears the stash. + /// + /// Messages from the stash are enqueued to the mailbox until the capacity of the + /// mailbox(if any) has been reached. In case a bounded mailbox overflows, a + /// `MessageQueueAppendFailedException` is thrown. + /// + /// The stash is guaranteed to be empty after calling . /// - public void UnstashAll() - { - UnstashAll(envelope => true); - } + public void UnstashAll() => UnstashAll(_ => true); /// - /// Unstash all of the s in the Stash. + /// INTERNA API + /// + /// Prepends selected messages in the stash, applying `filterPredicate`, to the + /// mailbox, and then clears the stash. + /// + /// + /// Messages from the stash are enqueued to the mailbox until the capacity of the + /// mailbox(if any) has been reached. In case a bounded mailbox overflows, a + /// `MessageQueueAppendFailedException` is thrown. + /// + /// The stash is guaranteed to be empty after calling . /// - /// A predicate function to determine which messages to select. - public void UnstashAll(Func predicate) + /// Only stashed messages selected by this predicate are prepended to the mailbox. + public void UnstashAll(Func filterPredicate) { - if(_theStash.Count > 0) + if (_theStash.Count <= 0) + return; + + try { - try - { - foreach(var item in _theStash.Reverse().Where(predicate)) - { - EnqueueFirst(item); - } - } - finally - { - _theStash = new LinkedList(); - } + foreach (var item in _theStash.Reverse().Where(filterPredicate)) + EnqueueFirst(item); + } + finally + { + _theStash = new LinkedList(); } } - + /// - /// Eliminates the contents of the , and returns - /// the previous contents of the messages. + /// INTERNAL API. + /// + /// Clears the stash and and returns all envelopes that have not been unstashed. + /// /// /// Previously stashed messages. public IEnumerable ClearStash() { - if(_theStash.Count == 0) + if (_theStash.Count == 0) return Enumerable.Empty(); var stashed = _theStash; @@ -144,7 +175,8 @@ public IEnumerable ClearStash() } /// - /// TBD + /// Prepends `others` to this stash. This method is optimized for a large stash and + /// small `others`. /// /// TBD public void Prepend(IEnumerable envelopes) @@ -164,9 +196,8 @@ public void Prepend(IEnumerable envelopes) /// private void EnqueueFirst(Envelope msg) { - Mailbox.EnqueueFirst(msg); - var terminatedMessage = msg.Message as Terminated; - if(terminatedMessage != null) + _mailbox.EnqueueFirst(msg); + if (msg.Message is Terminated terminatedMessage) { _actorCell.TerminatedQueuedFor(terminatedMessage.ActorRef, Option.None); } diff --git a/src/core/Akka/Actor/Stash/Internal/BoundedStashImpl.cs b/src/core/Akka/Actor/Stash/Internal/BoundedStashImpl.cs index 0e24b21ee1f..4260e3137d5 100644 --- a/src/core/Akka/Actor/Stash/Internal/BoundedStashImpl.cs +++ b/src/core/Akka/Actor/Stash/Internal/BoundedStashImpl.cs @@ -18,9 +18,8 @@ public class BoundedStashImpl : AbstractStash /// Note! Part of internal API. Breaking changes may occur without notice. Use at own risk. /// /// TBD - /// TBD - public BoundedStashImpl(IActorContext context, int capacity = 100) - : base(context, capacity) + public BoundedStashImpl(IActorContext context) + : base(context) { } } diff --git a/src/core/Akka/Actor/Stash/Internal/UnboundedStashImpl.cs b/src/core/Akka/Actor/Stash/Internal/UnboundedStashImpl.cs index cd8394ea79f..39d59afe56d 100644 --- a/src/core/Akka/Actor/Stash/Internal/UnboundedStashImpl.cs +++ b/src/core/Akka/Actor/Stash/Internal/UnboundedStashImpl.cs @@ -19,7 +19,7 @@ public class UnboundedStashImpl : AbstractStash /// /// TBD public UnboundedStashImpl(IActorContext context) - : base(context, int.MaxValue) + : base(context) { } } diff --git a/src/core/Akka/Actor/Stash/Internal/UnrestrictedStashImpl.cs b/src/core/Akka/Actor/Stash/Internal/UnrestrictedStashImpl.cs new file mode 100644 index 00000000000..87e88603371 --- /dev/null +++ b/src/core/Akka/Actor/Stash/Internal/UnrestrictedStashImpl.cs @@ -0,0 +1,16 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2022 Lightbend Inc. +// Copyright (C) 2013-2022 .NET Foundation +// +//----------------------------------------------------------------------- + +namespace Akka.Actor.Internal +{ + internal class UnrestrictedStashImpl : AbstractStash + { + public UnrestrictedStashImpl(IActorContext context) + : base(context) + { } + } +} diff --git a/src/core/Akka/Actor/Stash/StashFactory.cs b/src/core/Akka/Actor/Stash/StashFactory.cs index df7ee46e4ee..9cca3d59c6a 100644 --- a/src/core/Akka/Actor/Stash/StashFactory.cs +++ b/src/core/Akka/Actor/Stash/StashFactory.cs @@ -22,7 +22,7 @@ public static class StashFactory /// TBD /// TBD /// TBD - public static IStash CreateStash(this IActorContext context) where T:ActorBase + public static IStash CreateStash(this IActorContext context) where T : ActorBase { var actorType = typeof(T); return CreateStash(context, actorType); @@ -34,10 +34,8 @@ public static IStash CreateStash(this IActorContext context) where T:ActorBas /// TBD /// TBD /// TBD - public static IStash CreateStash(this IActorContext context, IActorStash actorInstance) - { - return CreateStash(context, actorInstance.GetType()); - } + public static IStash CreateStash(this IActorContext context, IActorStash actorInstance) => + CreateStash(context, actorInstance.GetType()); /// /// TBD @@ -50,15 +48,14 @@ public static IStash CreateStash(this IActorContext context, IActorStash actorIn /// TBD public static IStash CreateStash(this IActorContext context, Type actorType) { - if(actorType.Implements()) - { + if (actorType.Implements()) return new BoundedStashImpl(context); - } - if(actorType.Implements()) - { + if (actorType.Implements()) return new UnboundedStashImpl(context); - } + + if (actorType.Implements()) + return new UnrestrictedStashImpl(context); throw new ArgumentException($"Actor {actorType} implements an unrecognized subclass of {typeof(IActorStash)} - cannot instantiate", nameof(actorType)); } diff --git a/src/core/Akka/Actor/UntypedActorWithStash.cs b/src/core/Akka/Actor/UntypedActorWithStash.cs new file mode 100644 index 00000000000..850ab66092f --- /dev/null +++ b/src/core/Akka/Actor/UntypedActorWithStash.cs @@ -0,0 +1,59 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2022 Lightbend Inc. +// Copyright (C) 2013-2022 .NET Foundation +// +//----------------------------------------------------------------------- + +using Akka.Dispatch; + +namespace Akka.Actor +{ + /// + /// Actor base class that should be extended to create an actor with a stash. + /// + /// The stash enables an actor to temporarily stash away messages that can not or + /// should not be handled using the actor's current behavior. + /// + /// + /// Note that the subclasses of `UntypedActorWithStash` by default request a Deque based mailbox since this class + /// implements the marker interface. + /// + /// You can override the default mailbox provided when `IDequeBasedMessageQueueSemantics` are requested via config: + /// + /// akka.actor.mailbox.requirements { + /// "Akka.Dispatch.IDequeBasedMessageQueueSemantics" = your-custom-mailbox + /// } + /// + /// Alternatively, you can add your own requirement marker to the actor and configure a mailbox type to be used + /// for your marker. + /// + /// For a `Stash` based actor that enforces unbounded deques see . + /// There is also an unrestricted version that does not + /// enforce the mailbox type. + /// + /// + public abstract class UntypedActorWithStash : UntypedActor, IWithStash + { + public IStash Stash { get; set; } + } + + /// + /// Actor base class with `Stash` that enforces an unbounded deque for the actor. + /// See for details on how `Stash` works. + /// + public abstract class UntypedActorWithUnboundedStash : UntypedActor, IWithUnboundedStash + { + public IStash Stash { get; set; } + } + + /// + /// Actor base class with `Stash` that does not enforce any mailbox type. The proper mailbox has to be configured + /// manually, and the mailbox should extend the marker interface. + /// See for details on how `Stash` works. + /// + public abstract class UntypedActorWithUnrestrictedStash : UntypedActor, IWithUnrestrictedStash + { + public IStash Stash { get; set; } + } +} diff --git a/src/core/Akka/Dispatch/Dispatchers.cs b/src/core/Akka/Dispatch/Dispatchers.cs index 208b7fdce81..85761b9b742 100644 --- a/src/core/Akka/Dispatch/Dispatchers.cs +++ b/src/core/Akka/Dispatch/Dispatchers.cs @@ -11,6 +11,7 @@ using System.Threading; using System.Threading.Tasks; using Akka.Actor; +using Akka.Annotations; using Akka.Configuration; using Akka.Event; using Helios.Concurrency; @@ -331,6 +332,35 @@ public sealed class Dispatchers /// private readonly ConcurrentDictionary _dispatcherConfigurators = new ConcurrentDictionary(); + /// + /// Get (possibly aliased) dispatcher config. Returns empty config if not found. + /// + [InternalApi] + public static Config GetConfig(Config config, string id, int depth = 0) + { + if (depth > MaxDispatcherAliasDepth) + { + // Didn't find dispatcher config after `MaxDispatcherAliasDepth` aliases + return ConfigurationFactory.Empty; + } + else if (config.HasPath(id)) + { + var hocon = config.GetValue(id); + if (hocon.IsObject()) + return config.GetConfig(id); + if (hocon.IsString()) + return GetConfig(config, config.GetString(id), depth + 1); + + // Expected either config or alias at `id` but found `unexpected` + return ConfigurationFactory.Empty; + } + else + { + // Dispatcher `id` not configured + return ConfigurationFactory.Empty; + } + } + /// Initializes a new instance of the class. /// The system. /// The prerequisites required for some instances. diff --git a/src/core/Akka/Dispatch/IRequiresMessageQueue.cs b/src/core/Akka/Dispatch/IRequiresMessageQueue.cs index aff0babd348..0bdaa9b9872 100644 --- a/src/core/Akka/Dispatch/IRequiresMessageQueue.cs +++ b/src/core/Akka/Dispatch/IRequiresMessageQueue.cs @@ -5,17 +5,22 @@ // //----------------------------------------------------------------------- -using Akka.Actor; -using Akka.Dispatch.MessageQueues; - namespace Akka.Dispatch { /// - /// Used to help give hints to the as to what types of this - /// actor requires. Used mostly for system actors. + /// Interface to signal that an Actor requires a certain type of message queue semantics. + /// + /// The mailbox type will be looked up by mapping the type T via akka.actor.mailbox.requirements in the config, + /// to a mailbox configuration. If no mailbox is assigned on Props or in deployment config then this one will be used. + /// + /// + /// The queue type of the created mailbox will be checked against the type T and actor creation will fail if it doesn't + /// fulfill the requirements. + /// /// /// The type of required - public interface IRequiresMessageQueue where T:ISemantics + public interface IRequiresMessageQueue + where T : ISemantics { } } diff --git a/src/core/Akka/Dispatch/Mailbox.cs b/src/core/Akka/Dispatch/Mailbox.cs index f94ef3bb403..ba49666282f 100644 --- a/src/core/Akka/Dispatch/Mailbox.cs +++ b/src/core/Akka/Dispatch/Mailbox.cs @@ -1,7 +1,7 @@ //----------------------------------------------------------------------- // -// Copyright (C) 2009-2021 Lightbend Inc. -// Copyright (C) 2013-2021 .NET Foundation +// Copyright (C) 2009-2022 Lightbend Inc. +// Copyright (C) 2013-2022 .NET Foundation // //----------------------------------------------------------------------- @@ -36,6 +36,7 @@ internal static class MailboxStatus /// TBD /// public const int Open = 0; // _status is not initialized in AbstractMailbox, so default must be zero! + /// /// TBD /// @@ -52,18 +53,22 @@ internal static class MailboxStatus /// TBD /// public const int ShouldScheduleMask = 3; + /// /// TBD /// public const int ShouldNotProcessMask = ~2; + /// /// TBD /// public const int SuspendMask = ~3; + /// /// TBD /// public const int SuspendUnit = 4; + /// /// TBD /// @@ -202,31 +207,46 @@ public virtual void SetActor(ActorCell actorCell) /// TBD /// [MethodImpl(MethodImplOptions.AggressiveInlining)] - internal int CurrentStatus() { return _statusDotNotCallMeDirectly; } + internal int CurrentStatus() + { + return Volatile.Read(ref _statusDotNotCallMeDirectly); + } /// /// TBD /// [MethodImpl(MethodImplOptions.AggressiveInlining)] - internal bool ShouldProcessMessage() { return (CurrentStatus() & MailboxStatus.ShouldNotProcessMask) == 0; } + internal bool ShouldProcessMessage() + { + return (CurrentStatus() & MailboxStatus.ShouldNotProcessMask) == 0; + } /// /// Returns the number of times this mailbox is currently suspended. /// [MethodImpl(MethodImplOptions.AggressiveInlining)] - internal int SuspendCount() { return CurrentStatus() / MailboxStatus.SuspendUnit; } + internal int SuspendCount() + { + return CurrentStatus() / MailboxStatus.SuspendUnit; + } /// /// Returns true if the mailbox is currently suspended from processing. false otherwise. /// [MethodImpl(MethodImplOptions.AggressiveInlining)] - internal bool IsSuspended() { return (CurrentStatus() & MailboxStatus.SuspendMask) != 0; } + internal bool IsSuspended() + { + return (CurrentStatus() & MailboxStatus.SuspendMask) != 0; + } /// /// Returns true if the mailbox is closed. false otherwise. /// [MethodImpl(MethodImplOptions.AggressiveInlining)] - internal bool IsClosed() { return (CurrentStatus() == MailboxStatus.Closed); } + internal bool IsClosed() + { + return (CurrentStatus() == MailboxStatus.Closed); + } /// /// Returns true if the mailbox is scheduled for execution on a . false otherwise. @@ -308,6 +328,7 @@ internal bool BecomeClosed() SetStatus(MailboxStatus.Closed); return false; } + return UpdateStatus(status, MailboxStatus.Closed) || BecomeClosed(); } @@ -369,7 +390,8 @@ public void Run() finally { SetAsIdle(); // Volatile write, needed here - Dispatcher.RegisterForExecution(this, false, false); // schedule to run again if there are more messages, possibly + Dispatcher.RegisterForExecution(this, false, + false); // schedule to run again if there are more messages, possibly } } @@ -389,11 +411,13 @@ private void ProcessMailbox(int left, long deadlineTicks) // not going to bother catching ThreadAbortExceptions here, since they'll get rethrown anyway Actor.Invoke(next); ProcessAllSystemMessages(); - if (left > 0 && (Dispatcher.ThroughputDeadlineTime.HasValue == false || (MonotonicClock.GetTicks() - deadlineTicks) < 0)) + if (left > 0 && (Dispatcher.ThroughputDeadlineTime.HasValue == false || + (MonotonicClock.GetTicks() - deadlineTicks) < 0)) { left = left - 1; continue; } + break; } } @@ -414,7 +438,8 @@ private void ProcessAllSystemMessages() var msg = messageList.Head; messageList = messageList.Tail; msg.Unlink(); - DebugPrint("{0} processing system message {1} with {2}", Actor.Self, msg, string.Join(",", Actor.GetChildren())); + DebugPrint("{0} processing system message {1} with {2}", Actor.Self, msg, + string.Join(",", Actor.GetChildren())); // we know here that SystemInvoke ensures that only "fatal" exceptions get rethrown Actor.SystemInvoke(msg); @@ -439,7 +464,8 @@ private void ProcessAllSystemMessages() } catch (Exception ex) { - Actor.System.EventStream.Publish(new Error(ex, GetType().FullName, GetType(), $"error while enqueuing {msg} to deadletters: {ex.Message}")); + Actor.System.EventStream.Publish(new Error(ex, GetType().FullName, GetType(), + $"error while enqueuing {msg} to deadletters: {ex.Message}")); } } @@ -539,9 +565,16 @@ internal virtual bool HasSystemMessages public static void DebugPrint(string message, params object[] args) { var formattedMessage = args.Length == 0 ? message : string.Format(message, args); - Console.WriteLine("[MAILBOX][{0}][Thread {1:0000}] {2}", DateTime.Now.ToString("o"), Thread.CurrentThread.ManagedThreadId, formattedMessage); + Console.WriteLine("[MAILBOX][{0}][Thread {1:0000}] {2}", DateTime.Now.ToString("o"), + Thread.CurrentThread.ManagedThreadId, formattedMessage); } +#if !NETSTANDARD + public void Execute() + { + Run(); + } +#endif } /// @@ -594,7 +627,21 @@ protected MailboxType(Settings settings, Config config) /// Compliment to /// /// The type of produced by this class. - public interface IProducesMessageQueue where TQueue : IMessageQueue { } + public interface IProducesMessageQueue where TQueue : IMessageQueue + { + } + + /// + /// INTERNAL API + /// + /// Used to determine mailbox factories which create + /// mailboxes, and thus should be validated that the `pushTimeOut` is greater than 0. + /// + /// + public interface IProducesPushTimeoutSemanticsMailbox + { + TimeSpan PushTimeout { get; } + } /// /// UnboundedMailbox is the default used by Akka.NET Actors @@ -624,7 +671,9 @@ public UnboundedMailbox(Settings settings, Config config) : base(settings, confi /// /// The default bounded mailbox implementation /// - public sealed class BoundedMailbox : MailboxType, IProducesMessageQueue + public sealed class BoundedMailbox : MailboxType, + IProducesMessageQueue, + IProducesPushTimeoutSemanticsMailbox { /// /// The capacity of this mailbox. @@ -641,7 +690,8 @@ public sealed class BoundedMailbox : MailboxType, IProducesMessageQueue /// or the 'mailbox-push-timeout-time' in is negative. /// - public BoundedMailbox(Settings settings, Config config) : base(settings, config) + public BoundedMailbox(Settings settings, Config config) + : base(settings, config) { if (config.IsNullOrEmpty()) throw ConfigurationException.NullOrEmptyConfig(); @@ -649,15 +699,15 @@ public BoundedMailbox(Settings settings, Config config) : base(settings, config) Capacity = config.GetInt("mailbox-capacity", 0); PushTimeout = config.GetTimeSpan("mailbox-push-timeout-time", TimeSpan.FromSeconds(-1)); - if (Capacity < 0) throw new ArgumentException("The capacity for BoundedMailbox cannot be negative", nameof(config)); - if (PushTimeout.TotalSeconds < 0) throw new ArgumentException("The push time-out for BoundedMailbox cannot be be negative", nameof(config)); + if (Capacity < 0) + throw new ArgumentException("The capacity for BoundedMailbox cannot be negative", nameof(config)); + if (PushTimeout.TotalSeconds < 0) + throw new ArgumentException("The push time-out for BoundedMailbox cannot be be negative", nameof(config)); } /// - public override IMessageQueue Create(IActorRef owner, ActorSystem system) - { - return new BoundedMessageQueue(Capacity, PushTimeout); - } + public override IMessageQueue Create(IActorRef owner, ActorSystem system) => + new BoundedMessageQueue(Capacity, PushTimeout); } /// @@ -706,7 +756,8 @@ protected UnboundedPriorityMailbox(Settings settings, Config config) : base(sett /// The value returned by the method will be used to order the message in the mailbox. /// Lower values will be delivered first. Messages ordered by the same number will remain in delivery order. /// - public abstract class UnboundedStablePriorityMailbox : MailboxType, IProducesMessageQueue + public abstract class UnboundedStablePriorityMailbox : MailboxType, + IProducesMessageQueue { /// /// Function responsible for generating the priority value of a message based on its type and content. @@ -758,7 +809,9 @@ public override IMessageQueue Create(IActorRef owner, ActorSystem system) /// /// BoundedDequeBasedMailbox is an bounded backed by a double-ended queue. Used for stashing. /// - public sealed class BoundedDequeBasedMailbox : MailboxType, IProducesMessageQueue + public class BoundedDequeBasedMailbox : MailboxType, + IProducesMessageQueue, + IProducesPushTimeoutSemanticsMailbox { /// /// The capacity of this mailbox. @@ -776,7 +829,8 @@ public sealed class BoundedDequeBasedMailbox : MailboxType, IProducesMessageQueu /// This exception is thrown if the 'mailbox-capacity' in /// or the 'mailbox-push-timeout-time' in is negative. /// - public BoundedDequeBasedMailbox(Settings settings, Config config) : base(settings, config) + public BoundedDequeBasedMailbox(Settings settings, Config config) + : base(settings, config) { if (config.IsNullOrEmpty()) throw ConfigurationException.NullOrEmptyConfig(); @@ -784,8 +838,10 @@ public BoundedDequeBasedMailbox(Settings settings, Config config) : base(setting Capacity = config.GetInt("mailbox-capacity", 0); PushTimeout = config.GetTimeSpan("mailbox-push-timeout-time", TimeSpan.FromSeconds(-1)); - if (Capacity < 0) throw new ArgumentException("The capacity for BoundedMailbox cannot be negative", nameof(config)); - if (PushTimeout.TotalSeconds < 0) throw new ArgumentException("The push time-out for BoundedMailbox cannot be null", nameof(config)); + if (Capacity < 0) + throw new ArgumentException("The capacity for BoundedMailbox cannot be negative", nameof(config)); + if (PushTimeout.TotalSeconds < 0) + throw new ArgumentException("The push time-out for BoundedMailbox cannot be null", nameof(config)); } /// @@ -794,6 +850,4 @@ public override IMessageQueue Create(IActorRef owner, ActorSystem system) return new BoundedDequeMessageQueue(Capacity, PushTimeout); } } - } - diff --git a/src/core/Akka/Dispatch/Mailboxes.cs b/src/core/Akka/Dispatch/Mailboxes.cs index 7e06fb5ae5a..572e0f223d6 100644 --- a/src/core/Akka/Dispatch/Mailboxes.cs +++ b/src/core/Akka/Dispatch/Mailboxes.cs @@ -8,13 +8,16 @@ using System; using System.Collections.Concurrent; using System.Collections.Generic; +using System.Collections.Immutable; using System.Linq; using System.Reflection; using Akka.Actor; +using Akka.Annotations; using Akka.Configuration; -using Akka.Util.Reflection; using Akka.Dispatch.MessageQueues; using Akka.Event; +using Akka.Util; +using Akka.Util.Internal; namespace Akka.Dispatch { @@ -70,6 +73,7 @@ public Mailboxes(ActorSystem system) } _defaultMailboxConfig = Settings.Config.GetConfig(DefaultMailboxId); + _defaultStashCapacity = StashCapacityFromConfig(Dispatchers.DefaultDispatcherId, DefaultMailboxId); } /// @@ -169,21 +173,32 @@ private MailboxType LookupConfigurator(string id) var mailboxTypeName = conf.GetString("mailbox-type", null); if (string.IsNullOrEmpty(mailboxTypeName)) throw new ConfigurationException($"The setting mailbox-type defined in [{id}] is empty"); - var type = Type.GetType(mailboxTypeName); - if (type == null) - throw new ConfigurationException($"Found mailbox-type [{mailboxTypeName}] in configuration for [{id}], but could not find that type in any loaded assemblies."); - var args = new object[] {Settings, conf}; + var mailboxType = Type.GetType(mailboxTypeName) + ?? throw new ConfigurationException($"Found mailbox-type [{mailboxTypeName}] in configuration for [{id}], but could not find that type in any loaded assemblies."); + var args = new object[] { Settings, conf }; try { - configurator = (MailboxType) Activator.CreateInstance(type, args); + configurator = (MailboxType)Activator.CreateInstance(mailboxType, args); + + if (!_mailboxNonZeroPushTimeoutWarningIssued) + { + if (configurator is IProducesPushTimeoutSemanticsMailbox m && m.PushTimeout.Ticks > 0L) + { + Warn($"Configured potentially-blocking mailbox [{id}] configured with non-zero PushTimeOut ({m.PushTimeout}), " + + "which can lead to blocking behavior when sending messages to this mailbox. " + + $"Avoid this by setting `{id}.mailbox-push-timeout-time` to `0`."); + + _mailboxNonZeroPushTimeoutWarningIssued = true; + } + + // good; nothing to see here, move along, sir. + } } catch (Exception ex) { - throw new ArgumentException($"Cannot instantiate MailboxType {type}, defined in [{id}]. Make sure it has a public " + + throw new ArgumentException($"Cannot instantiate MailboxType {mailboxType}, defined in [{id}]. Make sure it has a public " + "constructor with [Akka.Actor.Settings, Akka.Configuration.Config] parameters", ex); } - - // TODO: check for blocking mailbox with a non-zero pushtimeout and issue a warning } // add the new configurator to the mapping, or keep the existing if it was already added @@ -311,39 +326,52 @@ MailboxType VerifyRequirements(MailboxType mailboxType) return VerifyRequirements(Lookup(DefaultMailboxId)); } + private void Warn(string msg) => + _system.EventStream.Publish(new Warning("mailboxes", GetType(), msg)); + + private readonly AtomicReference> _stashCapacityCache = + new(ImmutableDictionary.Empty); + + private readonly int _defaultStashCapacity; + /// - /// Creates a mailbox from a configuration path. + /// INTERNAL API + /// + /// The capacity of the stash. Configured in the actor's mailbox or dispatcher config. + /// /// - /// The path. - /// Mailbox. - public Type FromConfig(string path) + [InternalApi] + public int StashCapacity(string dispatcher, string mailbox) { - //TODO: this should not exist, its a temp hack because we are not serializing mailbox info when doing remote deploy.. - if (string.IsNullOrEmpty(path)) + bool UpdateCache(ImmutableDictionary cache, string key, int value) { - return typeof (UnboundedMailbox); + return _stashCapacityCache.CompareAndSet(cache, cache.SetItem(key, value)) || + UpdateCache(_stashCapacityCache.Value, key, value); // recursive, try again } - var config = _system.Settings.Config.GetConfig(path); - if (config.IsNullOrEmpty()) - throw new ConfigurationException($"Cannot retrieve mailbox type from config: {path} configuration node not found"); + if (dispatcher == Dispatchers.DefaultDispatcherId && mailbox == DefaultMailboxId) + return _defaultStashCapacity; - var type = config.GetString("mailbox-type", null); + var cache = _stashCapacityCache.Value; + var key = $"{dispatcher}-{mailbox}"; - var mailboxType = TypeCache.GetType(type); - return mailboxType; - /* -mailbox-capacity = 1000 -mailbox-push-timeout-time = 10s -stash-capacity = -1 - */ - } + if (!cache.TryGetValue(key, out var value)) + { + value = StashCapacityFromConfig(dispatcher, mailbox); + UpdateCache(cache, key, value); + } - //TODO: stash capacity + return value; + } - private void Warn(string msg) + private int StashCapacityFromConfig(string dispatcher, string mailbox) { - _system.EventStream.Publish(new Warning("mailboxes", GetType(), msg)); + var disp = Dispatchers.GetConfig(Settings.Config, dispatcher); + var fallback = disp.WithFallback(Settings.Config.GetConfig(DefaultMailboxId)); + var config = mailbox == DefaultMailboxId + ? fallback + : Settings.Config.GetConfig(mailbox).WithFallback(fallback); + return config.GetInt("stash-capacity"); } } }