Skip to content

Commit

Permalink
Read stash capacity from actor's mailbox or dispatcher configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
ismaelhamed committed Dec 30, 2022
1 parent 124a378 commit 137c942
Show file tree
Hide file tree
Showing 9 changed files with 452 additions and 124 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1935,13 +1935,13 @@ namespace Akka.Actor.Internal
{
public abstract class AbstractStash : Akka.Actor.IStash
{
protected AbstractStash(Akka.Actor.IActorContext context, int capacity = 100) { }
protected AbstractStash(Akka.Actor.IActorContext context) { }
public System.Collections.Generic.IEnumerable<Akka.Actor.Envelope> ClearStash() { }
public void Prepend(System.Collections.Generic.IEnumerable<Akka.Actor.Envelope> envelopes) { }
public void Stash() { }
public void Unstash() { }
public void UnstashAll() { }
public void UnstashAll(System.Func<Akka.Actor.Envelope, bool> predicate) { }
public void UnstashAll(System.Func<Akka.Actor.Envelope, bool> filterPredicate) { }
}
public class ActorSystemImpl : Akka.Actor.ExtendedActorSystem
{
Expand Down Expand Up @@ -1990,7 +1990,7 @@ namespace Akka.Actor.Internal
}
public class BoundedStashImpl : Akka.Actor.Internal.AbstractStash
{
public BoundedStashImpl(Akka.Actor.IActorContext context, int capacity = 100) { }
public BoundedStashImpl(Akka.Actor.IActorContext context) { }
}
public class ChildNameReserved : Akka.Actor.Internal.IChildStats
{
Expand Down Expand Up @@ -2490,14 +2490,14 @@ namespace Akka.Dispatch
public static void RunTask(System.Func<System.Threading.Tasks.Task> asyncAction) { }
protected override bool TryExecuteTaskInline(System.Threading.Tasks.Task task, bool taskWasPreviouslyQueued) { }
}
public sealed class BoundedDequeBasedMailbox : Akka.Dispatch.MailboxType, Akka.Dispatch.IProducesMessageQueue<Akka.Dispatch.MessageQueues.BoundedDequeMessageQueue>
public class BoundedDequeBasedMailbox : Akka.Dispatch.MailboxType, Akka.Dispatch.IProducesMessageQueue<Akka.Dispatch.MessageQueues.BoundedDequeMessageQueue>, Akka.Dispatch.IProducesPushTimeoutSemanticsMailbox
{
public BoundedDequeBasedMailbox(Akka.Actor.Settings settings, Akka.Configuration.Config config) { }
public int Capacity { get; }
public System.TimeSpan PushTimeout { get; }
public override Akka.Dispatch.MessageQueues.IMessageQueue Create(Akka.Actor.IActorRef owner, Akka.Actor.ActorSystem system) { }
}
public sealed class BoundedMailbox : Akka.Dispatch.MailboxType, Akka.Dispatch.IProducesMessageQueue<Akka.Dispatch.MessageQueues.BoundedMessageQueue>
public sealed class BoundedMailbox : Akka.Dispatch.MailboxType, Akka.Dispatch.IProducesMessageQueue<Akka.Dispatch.MessageQueues.BoundedMessageQueue>, Akka.Dispatch.IProducesPushTimeoutSemanticsMailbox
{
public BoundedMailbox(Akka.Actor.Settings settings, Akka.Configuration.Config config) { }
public int Capacity { get; }
Expand Down Expand Up @@ -2553,6 +2553,8 @@ namespace Akka.Dispatch
public Akka.Configuration.Config DefaultDispatcherConfig { get; }
public Akka.Dispatch.MessageDispatcher DefaultGlobalDispatcher { get; }
public Akka.Dispatch.IDispatcherPrerequisites Prerequisites { get; }
[Akka.Annotations.InternalApiAttribute()]
public static Akka.Configuration.Config GetConfig(Akka.Configuration.Config config, string id, int depth = 0) { }
public bool HasDispatcher(string id) { }
public Akka.Dispatch.MessageDispatcher Lookup(string dispatcherName) { }
public bool RegisterConfigurator(string id, Akka.Dispatch.MessageDispatcherConfigurator configurator) { }
Expand Down Expand Up @@ -2606,6 +2608,10 @@ namespace Akka.Dispatch
public interface IMultipleConsumerSemantics : Akka.Dispatch.ISemantics { }
public interface IProducesMessageQueue<TQueue>
where TQueue : Akka.Dispatch.MessageQueues.IMessageQueue { }
public interface IProducesPushTimeoutSemanticsMailbox
{
System.TimeSpan PushTimeout { get; }
}
public interface IRequiresMessageQueue<T>
where T : Akka.Dispatch.ISemantics { }
public interface IRunnable : System.Threading.IThreadPoolWorkItem
Expand Down Expand Up @@ -2640,13 +2646,14 @@ namespace Akka.Dispatch
public static readonly string NoMailboxRequirement;
public Mailboxes(Akka.Actor.ActorSystem system) { }
public Akka.Actor.DeadLetterMailbox DeadLetterMailbox { get; }
public System.Type FromConfig(string path) { }
public Akka.Dispatch.MailboxType GetMailboxType(Akka.Actor.Props props, Akka.Configuration.Config dispatcherConfig) { }
public System.Type GetRequiredType(System.Type actorType) { }
public bool HasRequiredType(System.Type actorType) { }
public Akka.Dispatch.MailboxType Lookup(string id) { }
public Akka.Dispatch.MailboxType LookupByQueueType(System.Type queueType) { }
public bool ProducesMessageQueue(System.Type mailboxType) { }
[Akka.Annotations.InternalApiAttribute()]
public int StashCapacity(string dispatcher, string mailbox) { }
}
public abstract class MessageDispatcher
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1933,13 +1933,13 @@ namespace Akka.Actor.Internal
{
public abstract class AbstractStash : Akka.Actor.IStash
{
protected AbstractStash(Akka.Actor.IActorContext context, int capacity = 100) { }
protected AbstractStash(Akka.Actor.IActorContext context) { }
public System.Collections.Generic.IEnumerable<Akka.Actor.Envelope> ClearStash() { }
public void Prepend(System.Collections.Generic.IEnumerable<Akka.Actor.Envelope> envelopes) { }
public void Stash() { }
public void Unstash() { }
public void UnstashAll() { }
public void UnstashAll(System.Func<Akka.Actor.Envelope, bool> predicate) { }
public void UnstashAll(System.Func<Akka.Actor.Envelope, bool> filterPredicate) { }
}
public class ActorSystemImpl : Akka.Actor.ExtendedActorSystem
{
Expand Down Expand Up @@ -1988,7 +1988,7 @@ namespace Akka.Actor.Internal
}
public class BoundedStashImpl : Akka.Actor.Internal.AbstractStash
{
public BoundedStashImpl(Akka.Actor.IActorContext context, int capacity = 100) { }
public BoundedStashImpl(Akka.Actor.IActorContext context) { }
}
public class ChildNameReserved : Akka.Actor.Internal.IChildStats
{
Expand Down Expand Up @@ -2486,14 +2486,14 @@ namespace Akka.Dispatch
public static void RunTask(System.Func<System.Threading.Tasks.Task> asyncAction) { }
protected override bool TryExecuteTaskInline(System.Threading.Tasks.Task task, bool taskWasPreviouslyQueued) { }
}
public sealed class BoundedDequeBasedMailbox : Akka.Dispatch.MailboxType, Akka.Dispatch.IProducesMessageQueue<Akka.Dispatch.MessageQueues.BoundedDequeMessageQueue>
public class BoundedDequeBasedMailbox : Akka.Dispatch.MailboxType, Akka.Dispatch.IProducesMessageQueue<Akka.Dispatch.MessageQueues.BoundedDequeMessageQueue>, Akka.Dispatch.IProducesPushTimeoutSemanticsMailbox
{
public BoundedDequeBasedMailbox(Akka.Actor.Settings settings, Akka.Configuration.Config config) { }
public int Capacity { get; }
public System.TimeSpan PushTimeout { get; }
public override Akka.Dispatch.MessageQueues.IMessageQueue Create(Akka.Actor.IActorRef owner, Akka.Actor.ActorSystem system) { }
}
public sealed class BoundedMailbox : Akka.Dispatch.MailboxType, Akka.Dispatch.IProducesMessageQueue<Akka.Dispatch.MessageQueues.BoundedMessageQueue>
public sealed class BoundedMailbox : Akka.Dispatch.MailboxType, Akka.Dispatch.IProducesMessageQueue<Akka.Dispatch.MessageQueues.BoundedMessageQueue>, Akka.Dispatch.IProducesPushTimeoutSemanticsMailbox
{
public BoundedMailbox(Akka.Actor.Settings settings, Akka.Configuration.Config config) { }
public int Capacity { get; }
Expand Down Expand Up @@ -2549,6 +2549,8 @@ namespace Akka.Dispatch
public Akka.Configuration.Config DefaultDispatcherConfig { get; }
public Akka.Dispatch.MessageDispatcher DefaultGlobalDispatcher { get; }
public Akka.Dispatch.IDispatcherPrerequisites Prerequisites { get; }
[Akka.Annotations.InternalApiAttribute()]
public static Akka.Configuration.Config GetConfig(Akka.Configuration.Config config, string id, int depth = 0) { }
public bool HasDispatcher(string id) { }
public Akka.Dispatch.MessageDispatcher Lookup(string dispatcherName) { }
public bool RegisterConfigurator(string id, Akka.Dispatch.MessageDispatcherConfigurator configurator) { }
Expand Down Expand Up @@ -2602,6 +2604,10 @@ namespace Akka.Dispatch
public interface IMultipleConsumerSemantics : Akka.Dispatch.ISemantics { }
public interface IProducesMessageQueue<TQueue>
where TQueue : Akka.Dispatch.MessageQueues.IMessageQueue { }
public interface IProducesPushTimeoutSemanticsMailbox
{
System.TimeSpan PushTimeout { get; }
}
public interface IRequiresMessageQueue<T>
where T : Akka.Dispatch.ISemantics { }
public interface IRunnable
Expand Down Expand Up @@ -2635,13 +2641,14 @@ namespace Akka.Dispatch
public static readonly string NoMailboxRequirement;
public Mailboxes(Akka.Actor.ActorSystem system) { }
public Akka.Actor.DeadLetterMailbox DeadLetterMailbox { get; }
public System.Type FromConfig(string path) { }
public Akka.Dispatch.MailboxType GetMailboxType(Akka.Actor.Props props, Akka.Configuration.Config dispatcherConfig) { }
public System.Type GetRequiredType(System.Type actorType) { }
public bool HasRequiredType(System.Type actorType) { }
public Akka.Dispatch.MailboxType Lookup(string id) { }
public Akka.Dispatch.MailboxType LookupByQueueType(System.Type queueType) { }
public bool ProducesMessageQueue(System.Type mailboxType) { }
[Akka.Annotations.InternalApiAttribute()]
public int StashCapacity(string dispatcher, string mailbox) { }
}
public abstract class MessageDispatcher
{
Expand Down
210 changes: 210 additions & 0 deletions src/core/Akka.Tests/Actor/Stash/ActorWithBoundedStashSpec.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
//-----------------------------------------------------------------------
// <copyright file="ActorWithBoundedStashSpec.cs" company="Akka.NET Project">
// Copyright (C) 2009-2022 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2022 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using System;
using System.Text.RegularExpressions;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Configuration;
using Akka.Dispatch;
using Akka.Event;
using Akka.TestKit;
using Xunit;
using Xunit.Abstractions;

namespace Akka.Tests.Actor.Stash
{
public class StashingActor : UntypedActor, IWithBoundedStash
{
public IStash Stash { get; set; }

protected override void OnReceive(object message)
{
if (message is string s && s.StartsWith("hello"))
{
Stash.Stash();
Sender.Tell("ok");
}
else if (message.Equals("world"))
{
Context.Become(AfterWorldBehavior);
Stash.UnstashAll();
}
}

private void AfterWorldBehavior(object message) => Stash.Stash();
}

public class StashingActorWithOverflow : UntypedActor, IWithBoundedStash
{
private int numStashed = 0;

public IStash Stash { get; set; }

protected override void OnReceive(object message)
{
if (!(message is string s) || !s.StartsWith("hello"))
return;

numStashed++;
try
{
Stash.Stash();
Sender.Tell("ok");
}
catch (Exception ex) when (ex is StashOverflowException)
{
if (numStashed == 21)
{
Sender.Tell("STASHOVERFLOW");
Context.Stop(Self);
}
else
{
Sender.Tell("Unexpected StashOverflowException: " + numStashed);
}
}
}
}

// bounded deque-based mailbox with capacity 10
public class Bounded10 : BoundedDequeBasedMailbox
{
public Bounded10(Settings settings, Config config)
: base(settings, config)
{ }
}

public class Bounded100 : BoundedDequeBasedMailbox
{
public Bounded100(Settings settings, Config config)
: base(settings, config)
{ }
}

public class ActorWithBoundedStashSpec : AkkaSpec
{
private static Config SpecConfig => ConfigurationFactory.ParseString(@$"
akka.loggers = [""Akka.TestKit.TestEventListener, Akka.TestKit""]
my-dispatcher-1 {{
mailbox-type = ""{typeof(Bounded10).AssemblyQualifiedName}""
mailbox-capacity = 10
mailbox-push-timeout-time = 500ms
stash-capacity = 20
}}
my-dispatcher-2 {{
mailbox-type = ""{typeof(Bounded100).AssemblyQualifiedName}""
mailbox-capacity = 100
mailbox-push-timeout-time = 500ms
stash-capacity = 20
}}
my-aliased-dispatcher-1 = my-dispatcher-1
my-aliased-dispatcher-2 = my-aliased-dispatcher-1
my-mailbox-1 {{
mailbox-type = ""{typeof(Bounded10).AssemblyQualifiedName}""
mailbox-capacity = 10
mailbox-push-timeout-time = 500ms
stash-capacity = 20
}}
my-mailbox-2 {{
mailbox-type = ""{typeof(Bounded100).AssemblyQualifiedName}""
mailbox-capacity = 100
mailbox-push-timeout-time = 500ms
stash-capacity = 20
}}");

public ActorWithBoundedStashSpec(ITestOutputHelper outputHelper)
: base(outputHelper, SpecConfig)
{
Sys.EventStream.Subscribe(TestActor, typeof(DeadLetter));
}

protected override void AtStartup()
{
base.AtStartup();
Sys.EventStream.Publish(EventFilter.Warning(pattern: new Regex(".*Received dead letter from.*hello.*")).Mute());
}

protected override async Task AfterAllAsync()
{
await base.AfterAllAsync();
Sys.EventStream.Unsubscribe(TestActor, typeof(DeadLetter));
}

private void TestDeadLetters(IActorRef stasher)
{
for (var n = 1; n <= 11; n++)
{
stasher.Tell("hello" + n);
ExpectMsg("ok");
}

// cause unstashAll with capacity violation
stasher.Tell("world");
ExpectMsg<DeadLetter>().Equals(new DeadLetter("hello1", TestActor, stasher));

AwaitCondition(() => ExpectMsg<DeadLetter>() != null);

stasher.Tell(PoisonPill.Instance);
// stashed messages are sent to deadletters when stasher is stopped
for (var n = 2; n <= 11; n++)
ExpectMsg<DeadLetter>().Equals(new DeadLetter("hello" + n, TestActor, stasher));
}

private void TestStashOverflowException(IActorRef stasher)
{
// fill up stash
for (var n = 1; n <= 20; n++)
{
stasher.Tell("hello" + n);
ExpectMsg("ok");
}

stasher.Tell("hello21");
ExpectMsg("STASHOVERFLOW");

// stashed messages are sent to deadletters when stasher is stopped
for (var n = 1; n <= 20; n++)
ExpectMsg<DeadLetter>().Equals(new DeadLetter("hello" + n, TestActor, stasher));
}

[Fact(Skip = "DequeWrapperMessageQueue implementations are not actually double-ended queues, so this cannot currently work.")]
public void An_actor_with_stash_must_end_up_in_DeadLetters_in_case_of_a_capacity_violation_when_configure_via_dispatcher()
{
var stasher = Sys.ActorOf(Props.Create<StashingActor>().WithDispatcher("my-dispatcher-1"));
TestDeadLetters(stasher);
}

[Fact(Skip = "DequeWrapperMessageQueue implementations are not actually double-ended queues, so this cannot currently work.")]
public void An_actor_with_stash_must_end_up_in_DeadLetters_in_case_of_a_capacity_violation_when_configure_via_mailbox()
{
var stasher = Sys.ActorOf(Props.Create<StashingActor>().WithDispatcher("my-mailbox-1"));
TestDeadLetters(stasher);
}

[Fact]
public void An_actor_with_stash_must_throw_a_StashOverflowException_in_case_of_a_stash_capacity_violation_when_configured_via_dispatcher()
{
var stasher = Sys.ActorOf(Props.Create<StashingActorWithOverflow>().WithDispatcher("my-dispatcher-2"));
TestStashOverflowException(stasher);
}

[Fact]
public void An_actor_with_stash_must_throw_a_StashOverflowException_in_case_of_a_stash_capacity_violation_when_configured_via_mailbox()
{
var stasher = Sys.ActorOf(Props.Create<StashingActorWithOverflow>().WithDispatcher("my-mailbox-2"));
TestStashOverflowException(stasher);
}

[Fact]
public void An_actor_with_stash_must_get_stash_capacity_from_aliased_dispatchers()
{
var stasher = Sys.ActorOf(Props.Create<StashingActorWithOverflow>().WithDispatcher("my-aliased-dispatcher-2"));
TestStashOverflowException(stasher);
}
}
}
Loading

0 comments on commit 137c942

Please sign in to comment.