Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Akka.Actor: IStash API and configuration enhancements #6660

Merged
merged 4 commits into from
Apr 24, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1134,6 +1134,11 @@ namespace Akka.Actor
}
public interface IStash
{
int Capacity { get; }
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New stashing APIs. Pretty self-explanatory.

int Count { get; }
bool IsEmpty { get; }
bool IsFull { get; }
bool NonEmpty { get; }
System.Collections.Generic.IEnumerable<Akka.Actor.Envelope> ClearStash();
void Prepend(System.Collections.Generic.IEnumerable<Akka.Actor.Envelope> envelopes);
void Stash();
Expand Down Expand Up @@ -1933,6 +1938,11 @@ namespace Akka.Actor.Internal
public abstract class AbstractStash : Akka.Actor.IStash
{
protected AbstractStash(Akka.Actor.IActorContext context) { }
public int Capacity { get; }
public int Count { get; }
public bool IsEmpty { get; }
public bool IsFull { get; }
public bool NonEmpty { get; }
public System.Collections.Generic.IEnumerable<Akka.Actor.Envelope> ClearStash() { }
public void Prepend(System.Collections.Generic.IEnumerable<Akka.Actor.Envelope> envelopes) { }
public void Stash() { }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1132,6 +1132,11 @@ namespace Akka.Actor
}
public interface IStash
{
int Capacity { get; }
int Count { get; }
bool IsEmpty { get; }
bool IsFull { get; }
bool NonEmpty { get; }
System.Collections.Generic.IEnumerable<Akka.Actor.Envelope> ClearStash();
void Prepend(System.Collections.Generic.IEnumerable<Akka.Actor.Envelope> envelopes);
void Stash();
Expand Down Expand Up @@ -1931,6 +1936,11 @@ namespace Akka.Actor.Internal
public abstract class AbstractStash : Akka.Actor.IStash
{
protected AbstractStash(Akka.Actor.IActorContext context) { }
public int Capacity { get; }
public int Count { get; }
public bool IsEmpty { get; }
public bool IsFull { get; }
public bool NonEmpty { get; }
public System.Collections.Generic.IEnumerable<Akka.Actor.Envelope> ClearStash() { }
public void Prepend(System.Collections.Generic.IEnumerable<Akka.Actor.Envelope> envelopes) { }
public void Stash() { }
Expand Down
6 changes: 6 additions & 0 deletions src/core/Akka.Persistence/Eventsourced.cs
Original file line number Diff line number Diff line change
Expand Up @@ -695,6 +695,12 @@ public void Prepend(IEnumerable<Envelope> envelopes)
{
_userStash.Prepend(envelopes);
}

public int Count => _userStash.Count;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the Eventsourced stash, we use the values provided for the user stash rather than the internal one (since the user can't really affect the internal stash anyway.)

public bool IsEmpty => _userStash.IsEmpty;
public bool NonEmpty => _userStash.NonEmpty;
public bool IsFull => _userStash.IsFull;
public int Capacity => _userStash.Capacity;
}
}
}
145 changes: 145 additions & 0 deletions src/core/Akka.Tests/Actor/Stash/StashCapacitySpecs.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
//-----------------------------------------------------------------------
// <copyright file="StashCapacitySpecs.cs" company="Akka.NET Project">
// Copyright (C) 2009-2023 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2023 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using System;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Configuration;
using Akka.TestKit;
using Xunit;
using Xunit.Abstractions;
using FluentAssertions;

namespace Akka.Tests.Actor.Stash;

public class StashCapacitySpecs : AkkaSpec
{
public StashCapacitySpecs(ITestOutputHelper output) : base(Config.Empty, output: output)
{

}

[Fact]
public async Task ShouldGetAccurateStashReadingForUnboundedStash()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basic happy path stash spec. Going to add one for dealing with bounded stashes once I get the other half of #6658

{
var stashActor = Sys.ActorOf(Props.Create(() => new StashActor()));
stashActor.Tell(new StashActor.StashMessage("1"));
stashActor.Tell(new StashActor.StashMessage("2"));
stashActor.Tell(StashActor.GetStashReadout.Instance);
var readout1 = await ExpectMsgAsync<StashActor.StashReadout>();
readout1.Capacity.Should().Be(-1); // unbounded stash returns -1 for capacity
readout1.Size.Should().Be(2);
readout1.IsEmpty.Should().BeFalse();
readout1.IsFull.Should().BeFalse();

stashActor.Tell(StashActor.UnstashMessage.Instance);
stashActor.Tell(StashActor.GetStashReadout.Instance);
var readout2 = await ExpectMsgAsync<StashActor.StashReadout>();
readout2.Capacity.Should().Be(-1);
readout2.Size.Should().Be(1);
readout2.IsEmpty.Should().BeFalse();
readout2.IsFull.Should().BeFalse();

stashActor.Tell(StashActor.UnstashMessage.Instance);
stashActor.Tell(StashActor.GetStashReadout.Instance);
var readout3 = await ExpectMsgAsync<StashActor.StashReadout>();
readout3.Capacity.Should().Be(-1);
readout3.Size.Should().Be(0);
readout3.IsEmpty.Should().BeTrue();
readout3.IsFull.Should().BeFalse();
}

private class StashActor : UntypedActorWithStash
{
public class StashMessage
{
public StashMessage(string message)
{
Message = message;
}

public string Message { get; }
}

public class UnstashMessage
{
private UnstashMessage()
{

}
public static readonly UnstashMessage Instance = new();
}

public class GetStashReadout
{
private GetStashReadout()
{

}
public static readonly GetStashReadout Instance = new();
}

public class StashReadout
{
public StashReadout(int capacity, int size, bool isEmpty, bool isFull)
{
Capacity = capacity;
Size = size;
IsEmpty = isEmpty;
IsFull = isFull;
}

public int Capacity { get; }
public int Size { get; }

public bool IsEmpty { get; }

public bool IsFull { get; }
}

protected override void OnReceive(object message)
{
switch (message)
{
case StashMessage msg:
Stash.Stash();
break;
case UnstashMessage _:
Stash.Unstash();
Context.Become(Unstashing); // switch behaviors so we're not stuck in stash loop
break;
case GetStashReadout _:
Sender.Tell(new StashReadout(Stash.Capacity, Stash.Count, Stash.IsEmpty, Stash.IsFull));
break;
default:
Unhandled(message);
break;
}
}

private void Unstashing(object message)
{
switch (message)
{
case StashMessage msg: // do nothing
break;
case UnstashMessage when Stash.NonEmpty:
Stash.Unstash();
break;
case UnstashMessage _: // when the stash is empty, go back to stashing
Context.Become(OnReceive);
break;
case GetStashReadout _:
Sender.Tell(new StashReadout(Stash.Capacity, Stash.Count, Stash.IsEmpty, Stash.IsFull));
break;
default:
Unhandled(message);
break;
}
}
}
}
32 changes: 30 additions & 2 deletions src/core/Akka/Actor/Stash/IStash.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,42 @@ public interface IStash
/// Returns all messages and clears the stash.
/// The stash is guaranteed to be empty afterwards.
/// </summary>
/// <returns>TBD</returns>
/// <returns>The previous stashed messages.</returns>
IEnumerable<Envelope> ClearStash();

/// <summary>
/// TBD
/// Prepend a set of envelopes to the front of the stash.
/// </summary>
/// <param name="envelopes">TBD</param>
void Prepend(IEnumerable<Envelope> envelopes);

/// <summary>
/// The number of messages currently inside the stash.
/// </summary>
public int Count { get; }

/// <summary>
/// Returns <c>true</c> when <see cref="Count"/> is zero.
/// </summary>
public bool IsEmpty { get; }

/// <summary>
/// Returns <c>true</c> when <see cref="Count"/> is greater than zero.
/// </summary>
public bool NonEmpty { get; }

/// <summary>
/// When using a bounded stash, this returns <c>true</c> when the stash is full.
/// </summary>
/// <remarks>
/// Always returns <c>false</c> when using an unbounded stash.
/// </remarks>
public bool IsFull { get; }

/// <summary>
/// The total capacity of the stash.
/// </summary>
public int Capacity { get; }
}
}

22 changes: 15 additions & 7 deletions src/core/Akka/Actor/Stash/Internal/AbstractStash.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,6 @@ public abstract class AbstractStash : IStash

private readonly ActorCell _actorCell;

/// <summary>
/// The capacity of the stash. Configured in the actor's mailbox or dispatcher config.
/// </summary>
private readonly int _capacity;

/// <summary>
/// The actor's deque-based message queue.
/// `mailbox.queue` is the underlying `Deque`.
Expand All @@ -61,7 +56,7 @@ protected AbstractStash(IActorContext context)
_actorCell = actorCell;

// The capacity of the stash. Configured in the actor's mailbox or dispatcher config.
_capacity = context.System.Mailboxes.StashCapacity(context.Props.Dispatcher, context.Props.Mailbox);
Capacity = context.System.Mailboxes.StashCapacity(context.Props.Dispatcher, context.Props.Mailbox);
}

private int _currentEnvelopeId;
Expand All @@ -84,7 +79,7 @@ public void Stash()
}
_currentEnvelopeId = _actorCell.CurrentEnvelopeId;

if (_capacity <= 0 || _theStash.Count < _capacity)
if (Capacity <= 0 || _theStash.Count < Capacity)
_theStash.AddLast(new Envelope(currMsg, sender));
else
throw new StashOverflowException($"Couldn't enqueue message {currMsg} from ${sender} to stash of {_actorCell.Self}");
Expand Down Expand Up @@ -189,6 +184,19 @@ public void Prepend(IEnumerable<Envelope> envelopes)
}
}

public int Count => _theStash.Count;
public bool IsEmpty => Count == 0;
public bool NonEmpty => !IsEmpty;
public bool IsFull => Capacity >= 0 && _theStash.Count >= Capacity;

/// <summary>
/// The capacity of the stash. Configured in the actor's mailbox or dispatcher config.
/// </summary>
/// <remarks>
/// If capacity is negative, then we're using an Unbounded stash.
/// </remarks>
public int Capacity { get; }

/// <summary>
/// Enqueues <paramref name="msg"/> at the first position in the mailbox. If the message contained in
/// the envelope is a <see cref="Terminated"/> message, it will be ensured that it can be re-received
Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka/Dispatch/ISemantics.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
namespace Akka.Dispatch
{
/// <summary>
/// TBD
/// Describes the message queue semantics of a mailbox.
/// </summary>
public interface ISemantics
{
Expand Down