Skip to content

Commit

Permalink
[BACKPORT] Several stashing improvements (#6323, #6325 and #6327) (#6358
Browse files Browse the repository at this point in the history
)

* Read stash capacity from actor's mailbox or dispatcher configuration (#6323)

* Added support for `UnrestrictedStash` (#6325)

* Add API for UntypedActorWithStash types (#6327)

Co-authored-by: Aaron Stannard <aaron@petabridge.com>

Co-authored-by: Aaron Stannard <aaron@petabridge.com>
  • Loading branch information
ismaelhamed and Aaronontheweb authored Jan 24, 2023
1 parent 7a33b50 commit ba9ff3e
Show file tree
Hide file tree
Showing 21 changed files with 705 additions and 185 deletions.
20 changes: 15 additions & 5 deletions docs/articles/actors/receive-actor-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -790,12 +790,15 @@ static void Main(string[] args)

## Stash

The `IWithUnboundedStash` interface enables an actor to temporarily stash away messages that can not or should not be handled using the actor's current behavior. Upon changing the actor's message handler, i.e., right before invoking `Context.BecomeStacked()` or `Context.UnbecomeStacked()`;, all stashed messages can be "un-stashed", thereby prepending them to the actor's mailbox. This way, the stashed messages can be processed in the same order as they have been received originally. An actor that implements `IWithUnboundedStash` will automatically get a dequeue-based mailbox.
The `IWithStash` interface enables an actor to temporarily stash away messages that can not or should not be handled using the actor's current behavior. Upon changing the actor's message handler, i.e., right before invoking `Context.Become()` or `Context.Unbecome()`, all stashed messages can be "un-stashed", thereby prepending them to the actor's mailbox. This way, the stashed messages can be processed in the same order as they have been received originally.

Here is an example of the `IWithUnboundedStash` interface in action:
> [!NOTE]
> The interface `IWithStash` implements the marker interface `IRequiresMessageQueue<IDequeBasedMessageQueueSemantics>` which requests the system to automatically choose a deque-based mailbox implementation for the actor (defaults to an unbounded deque mailbox). If you want more control over the mailbox, see the documentation on mailboxes: [Mailboxes](xref:mailboxes).

Here is an example of the `IWithStash` interface in action:

```csharp
public class ActorWithProtocol : ReceiveActor, IWithUnboundedStash
public class ActorWithProtocol : ReceiveActor, IWithStash
{
public IStash Stash { get; set; }

Expand Down Expand Up @@ -827,11 +830,18 @@ public class ActorWithProtocol : ReceiveActor, IWithUnboundedStash
}
```

Invoking `Stash()` adds the current message (the message that the actor received last) to the actor's stash. It is typically invoked when handling the default case in the actor's message handler to stash messages that aren't handled by the other cases. It is illegal to stash the same message twice; to do so results in an `IllegalStateException` being thrown. The stash may also be bounded in which case invoking `Stash()` may lead to a capacity violation, which results in a `StashOverflowException`. The capacity of the stash can be configured using the stash-capacity setting (an `Int`) of the mailbox's configuration.
Invoking `Stash()` adds the current message (the message that the actor received last) to the actor's stash. It is typically invoked when handling the default case in the actor's message handler to stash messages that aren't handled by the other cases. It is illegal to stash the same message twice; to do so results in an `IllegalStateException` being thrown. The stash may also be bounded in which case invoking `Stash()` may lead to a capacity violation, which results in a `StashOverflowException`. The capacity of the stash can be configured using the stash-capacity setting (an `int`) of the mailbox's configuration.

Invoking `UnstashAll()` enqueues messages from the stash to the actor's mailbox until the capacity of the mailbox (if any) has been reached (note that messages from the stash are prepended to the mailbox). In case a bounded mailbox overflows, a `MessageQueueAppendFailedException` is thrown. The stash is guaranteed to be empty after calling `UnstashAll()`.

Note that the `stash` is part of the ephemeral actor state, unlike the mailbox. Therefore, it should be managed like other parts of the actor's state which have the same property. The `IWithUnboundedStash` interface implementation of `PreRestart` will call `UnstashAll()`, which is usually the desired behavior.
Note that the stash is part of the ephemeral actor state, unlike the mailbox. Therefore, it should be managed like other parts of the actor's state which have the same property.

However, the `IWithStash` interface implementation of `PreRestart` will call `UnstashAll()`. This means that before the actor restarts, it will transfer all stashed messages back to the actors mailbox.

The result of this is that when an actor is restarted, any stashed messages will be delivered to the new incarnation of the actor. This is usually the desired behavior.

> [!NOTE]
> If you want to enforce that your actor can only work with an unbounded stash, then you should use the `IWithUnboundedStash` interface instead.

## Killing an Actor

Expand Down
18 changes: 11 additions & 7 deletions docs/articles/actors/untyped-actor-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -708,15 +708,16 @@ static void Main(string[] args)

## Stash

The `IWithUnboundedStash` interface enables an actor to temporarily stash away messages that can not or should not be handled using the actor's current behavior. Upon changing the actor's message handler, i.e., right before invoking `Context.BecomeStacked()` or `Context.UnbecomeStacked()`;, all stashed messages can be "un-stashed", thereby prepending them to the actor's mailbox. This way, the stashed messages can be processed in the same order as they have been received originally. An actor that implements `IWithUnboundedStash` will automatically get a dequeue-based mailbox.
The `UntypedActorWithStash` class enables an actor to temporarily stash away messages that can not or should not be handled using the actor's current behavior. Upon changing the actor's message handler, i.e., right before invoking `Context.Become()` or `Context.Unbecome()`, all stashed messages can be "un-stashed", thereby prepending them to the actor's mailbox. This way, the stashed messages can be processed in the same order as they have been received originally. An actor that extends `UntypedActorWithStash` will automatically get a deque-based mailbox.

Here is an example of the `IWithUnboundedStash` interface in action:
> [!NOTE]
> The abstract class `UntypedActorWithStash` implements the marker interface `IRequiresMessageQueue<IDequeBasedMessageQueueSemantics>` which requests the system to automatically choose a deque-based mailbox implementation for the actor. If you want more control over the mailbox, see the documentation on mailboxes: [Mailboxes](xref:mailboxes).
Here is an example of the `UntypedActorWithStash` interface in action:

```csharp
public class ActorWithProtocol : UntypedActor, IWithUnboundedStash
public class ActorWithProtocol : UntypedActorWithStash
{
public IStash Stash { get; set; }

protected override void OnReceive(object message)
{
switch (message)
Expand Down Expand Up @@ -748,11 +749,14 @@ public class ActorWithProtocol : UntypedActor, IWithUnboundedStash
}
```

Invoking `Stash()` adds the current message (the message that the actor received last) to the actor's stash. It is typically invoked when handling the default case in the actor's message handler to stash messages that aren't handled by the other cases. It is illegal to stash the same message twice; to do so results in an `IllegalStateException` being thrown. The stash may also be bounded in which case invoking `Stash()` may lead to a capacity violation, which results in a `StashOverflowException`. The capacity of the stash can be configured using the stash-capacity setting (an `Int`) of the mailbox's configuration.
Invoking `Stash()` adds the current message (the message that the actor received last) to the actor's stash. It is typically invoked when handling the default case in the actor's message handler to stash messages that aren't handled by the other cases. It is illegal to stash the same message twice; to do so results in an `IllegalStateException` being thrown. The stash may also be bounded in which case invoking `Stash()` may lead to a capacity violation, which results in a `StashOverflowException`. The capacity of the stash can be configured using the stash-capacity setting (an `int`) of the mailbox's configuration.

Invoking `UnstashAll()` enqueues messages from the stash to the actor's mailbox until the capacity of the mailbox (if any) has been reached (note that messages from the stash are prepended to the mailbox). In case a bounded mailbox overflows, a `MessageQueueAppendFailedException` is thrown. The stash is guaranteed to be empty after calling `UnstashAll()`.

Note that the `stash` is part of the ephemeral actor state, unlike the mailbox. Therefore, it should be managed like other parts of the actor's state which have the same property. The `IWithUnboundedStash` interface implementation of `PreRestart` will call `UnstashAll()`, which is usually the desired behavior.
Note that the stash is part of the ephemeral actor state, unlike the mailbox. Therefore, it should be managed like other parts of the actor's state which have the same property. The `UntypedActorWithStash` implementation of `PreRestart` will call `UnstashAll()`, which is usually the desired behavior.

> [!NOTE]
> If you want to enforce that your actor can only work with an unbounded stash, then you should use the `UntypedActorWithUnboundedStash` class instead.
## Killing an Actor

Expand Down
1 change: 1 addition & 0 deletions docs/cSpell.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
"CRDT",
"datacenter",
"denormalization",
"deque",
"deserialization",
"downstream",
"downstreams",
Expand Down
43 changes: 33 additions & 10 deletions src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.verified.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1175,14 +1175,15 @@ namespace Akka.Actor
void Become(Akka.Actor.UntypedReceive receive);
void BecomeStacked(Akka.Actor.UntypedReceive receive);
}
[System.ObsoleteAttribute("Bounded stashing is not yet implemented. Unbounded stashing will be used instead " +
"[0.7.0]")]
public interface IWithBoundedStash : Akka.Actor.IActorStash, Akka.Dispatch.IRequiresMessageQueue<Akka.Dispatch.IBoundedDequeBasedMessageQueueSemantics> { }
[System.ObsoleteAttribute("Use `IWithStash` with a configured BoundedDeque-based mailbox instead.")]
public interface IWithBoundedStash : Akka.Actor.IActorStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue<Akka.Dispatch.IBoundedDequeBasedMessageQueueSemantics> { }
public interface IWithStash : Akka.Actor.IActorStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue<Akka.Dispatch.IDequeBasedMessageQueueSemantics> { }
public interface IWithTimers
{
Akka.Actor.ITimerScheduler Timers { get; set; }
}
public interface IWithUnboundedStash : Akka.Actor.IActorStash, Akka.Dispatch.IRequiresMessageQueue<Akka.Dispatch.IUnboundedDequeBasedMessageQueueSemantics> { }
public interface IWithUnboundedStash : Akka.Actor.IActorStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue<Akka.Dispatch.IUnboundedDequeBasedMessageQueueSemantics> { }
public interface IWithUnrestrictedStash : Akka.Actor.IActorStash { }
public interface IWrappedMessage
{
object Message { get; }
Expand Down Expand Up @@ -1857,6 +1858,21 @@ namespace Akka.Actor
protected void RunTask(System.Action action) { }
protected void RunTask(System.Func<System.Threading.Tasks.Task> action) { }
}
public abstract class UntypedActorWithStash : Akka.Actor.UntypedActor, Akka.Actor.IActorStash, Akka.Actor.IWithStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue<Akka.Dispatch.IDequeBasedMessageQueueSemantics>
{
protected UntypedActorWithStash() { }
public Akka.Actor.IStash Stash { get; set; }
}
public abstract class UntypedActorWithUnboundedStash : Akka.Actor.UntypedActor, Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue<Akka.Dispatch.IUnboundedDequeBasedMessageQueueSemantics>
{
protected UntypedActorWithUnboundedStash() { }
public Akka.Actor.IStash Stash { get; set; }
}
public abstract class UntypedActorWithUnrestrictedStash : Akka.Actor.UntypedActor, Akka.Actor.IActorStash, Akka.Actor.IWithUnrestrictedStash
{
protected UntypedActorWithUnrestrictedStash() { }
public Akka.Actor.IStash Stash { get; set; }
}
public delegate void UntypedReceive(object message);
public class static WrappedMessage
{
Expand Down Expand Up @@ -1927,13 +1943,13 @@ namespace Akka.Actor.Internal
{
public abstract class AbstractStash : Akka.Actor.IStash
{
protected AbstractStash(Akka.Actor.IActorContext context, int capacity = 100) { }
protected AbstractStash(Akka.Actor.IActorContext context) { }
public System.Collections.Generic.IEnumerable<Akka.Actor.Envelope> ClearStash() { }
public void Prepend(System.Collections.Generic.IEnumerable<Akka.Actor.Envelope> envelopes) { }
public void Stash() { }
public void Unstash() { }
public void UnstashAll() { }
public void UnstashAll(System.Func<Akka.Actor.Envelope, bool> predicate) { }
public void UnstashAll(System.Func<Akka.Actor.Envelope, bool> filterPredicate) { }
}
public class ActorSystemImpl : Akka.Actor.ExtendedActorSystem
{
Expand Down Expand Up @@ -1981,7 +1997,7 @@ namespace Akka.Actor.Internal
}
public class BoundedStashImpl : Akka.Actor.Internal.AbstractStash
{
public BoundedStashImpl(Akka.Actor.IActorContext context, int capacity = 100) { }
public BoundedStashImpl(Akka.Actor.IActorContext context) { }
}
public class ChildNameReserved : Akka.Actor.Internal.IChildStats
{
Expand Down Expand Up @@ -2520,14 +2536,14 @@ namespace Akka.Dispatch
public static void RunTask(System.Func<System.Threading.Tasks.Task> asyncAction) { }
protected override bool TryExecuteTaskInline(System.Threading.Tasks.Task task, bool taskWasPreviouslyQueued) { }
}
public sealed class BoundedDequeBasedMailbox : Akka.Dispatch.MailboxType, Akka.Dispatch.IProducesMessageQueue<Akka.Dispatch.MessageQueues.BoundedDequeMessageQueue>
public class BoundedDequeBasedMailbox : Akka.Dispatch.MailboxType, Akka.Dispatch.IProducesMessageQueue<Akka.Dispatch.MessageQueues.BoundedDequeMessageQueue>, Akka.Dispatch.IProducesPushTimeoutSemanticsMailbox
{
public BoundedDequeBasedMailbox(Akka.Actor.Settings settings, Akka.Configuration.Config config) { }
public int Capacity { get; }
public System.TimeSpan PushTimeout { get; }
public override Akka.Dispatch.MessageQueues.IMessageQueue Create(Akka.Actor.IActorRef owner, Akka.Actor.ActorSystem system) { }
}
public sealed class BoundedMailbox : Akka.Dispatch.MailboxType, Akka.Dispatch.IProducesMessageQueue<Akka.Dispatch.MessageQueues.BoundedMessageQueue>
public sealed class BoundedMailbox : Akka.Dispatch.MailboxType, Akka.Dispatch.IProducesMessageQueue<Akka.Dispatch.MessageQueues.BoundedMessageQueue>, Akka.Dispatch.IProducesPushTimeoutSemanticsMailbox
{
public BoundedMailbox(Akka.Actor.Settings settings, Akka.Configuration.Config config) { }
public int Capacity { get; }
Expand Down Expand Up @@ -2583,6 +2599,8 @@ namespace Akka.Dispatch
public Akka.Configuration.Config DefaultDispatcherConfig { get; }
public Akka.Dispatch.MessageDispatcher DefaultGlobalDispatcher { get; }
public Akka.Dispatch.IDispatcherPrerequisites Prerequisites { get; }
[Akka.Annotations.InternalApiAttribute()]
public static Akka.Configuration.Config GetConfig(Akka.Configuration.Config config, string id, int depth = 0) { }
public bool HasDispatcher(string id) { }
public Akka.Dispatch.MessageDispatcher Lookup(string dispatcherName) { }
public bool RegisterConfigurator(string id, Akka.Dispatch.MessageDispatcherConfigurator configurator) { }
Expand Down Expand Up @@ -2636,6 +2654,10 @@ namespace Akka.Dispatch
public interface IMultipleConsumerSemantics : Akka.Dispatch.ISemantics { }
public interface IProducesMessageQueue<TQueue>
where TQueue : Akka.Dispatch.MessageQueues.IMessageQueue { }
public interface IProducesPushTimeoutSemanticsMailbox
{
System.TimeSpan PushTimeout { get; }
}
public interface IRequiresMessageQueue<T>
where T : Akka.Dispatch.ISemantics { }
public interface IRunnable
Expand Down Expand Up @@ -2669,13 +2691,14 @@ namespace Akka.Dispatch
public static readonly string NoMailboxRequirement;
public Mailboxes(Akka.Actor.ActorSystem system) { }
public Akka.Actor.DeadLetterMailbox DeadLetterMailbox { get; }
public System.Type FromConfig(string path) { }
public Akka.Dispatch.MailboxType GetMailboxType(Akka.Actor.Props props, Akka.Configuration.Config dispatcherConfig) { }
public System.Type GetRequiredType(System.Type actorType) { }
public bool HasRequiredType(System.Type actorType) { }
public Akka.Dispatch.MailboxType Lookup(string id) { }
public Akka.Dispatch.MailboxType LookupByQueueType(System.Type queueType) { }
public bool ProducesMessageQueue(System.Type mailboxType) { }
[Akka.Annotations.InternalApiAttribute()]
public int StashCapacity(string dispatcher, string mailbox) { }
}
public abstract class MessageDispatcher
{
Expand Down
Loading

0 comments on commit ba9ff3e

Please sign in to comment.