From 52bdd260576b89067255f3c6732291698eef9bd1 Mon Sep 17 00:00:00 2001 From: Chris Constantin Date: Fri, 4 Mar 2016 15:59:47 -0800 Subject: [PATCH] Ensure internal stash is unstashed on writes after recovery See https://github.com/akka/akka/issues/19899 --- .../PersistentActorStashingSpec.cs | 273 ++++++++++++++++-- .../Akka.Persistence/Eventsourced.Recovery.cs | 13 +- src/core/Akka.Persistence/Eventsourced.cs | 8 + 3 files changed, 264 insertions(+), 30 deletions(-) diff --git a/src/core/Akka.Persistence.Tests/PersistentActorStashingSpec.cs b/src/core/Akka.Persistence.Tests/PersistentActorStashingSpec.cs index 296a607b5ad..e029f4b0476 100644 --- a/src/core/Akka.Persistence.Tests/PersistentActorStashingSpec.cs +++ b/src/core/Akka.Persistence.Tests/PersistentActorStashingSpec.cs @@ -15,30 +15,61 @@ namespace Akka.Persistence.Tests { public class PersistentActorStashingSpec : PersistenceSpec { - internal class UserStashActor : PersistentActorSpec.ExamplePersistentActor + internal abstract class StashExamplePersistentActor : PersistentActorSpec.ExamplePersistentActor + { + public StashExamplePersistentActor(string name) : base(name) + { + } + + protected virtual bool UnstashBehavior(object message) + { + return false; + } + } + + internal class UserStashActor : StashExamplePersistentActor { private bool _stashed = false; public UserStashActor(string name) : base(name) { } protected override bool ReceiveCommand(object message) { - if (message is Cmd) + if (!UnstashBehavior(message)) { - var cmd = message as Cmd; - if (cmd.Data.ToString() == "a") + if (message is Cmd) { - if (!_stashed) + var cmd = message as Cmd; + if (cmd.Data.ToString() == "a") { - Stash.Stash(); - _stashed = true; - } - else Sender.Tell("a"); + if (!_stashed) + { + Stash.Stash(); + _stashed = true; + } + else Sender.Tell("a"); + } + else if (cmd.Data.ToString() == "b") Persist(new Evt("b"), evt => Sender.Tell(evt.Data)); + else if (cmd.Data.ToString() == "c") + { + UnstashAll(); + Sender.Tell("c"); + } + else return false; } - else if (cmd.Data.ToString() == "b") Persist(new Evt("b"), evt => Sender.Tell(evt.Data)); - else if (cmd.Data.ToString() == "c") + else return false; + } + return true; + } + + protected override bool UnstashBehavior(object message) + { + if (message is Cmd) + { + var cmd = message as Cmd; + if (cmd.Data.ToString() == "c") { - Stash.UnstashAll(); + UnstashAll(); Sender.Tell("c"); } else return false; @@ -48,7 +79,33 @@ protected override bool ReceiveCommand(object message) } } - internal class UserStashManyActor : PersistentActorSpec.ExamplePersistentActor + internal class UserStashWithinHandlerActor : UserStashActor + { + public UserStashWithinHandlerActor(string name) : base(name) + { + } + + protected override bool UnstashBehavior(object message) + { + if (message is Cmd) + { + var cmd = message as Cmd; + if (cmd.Data.ToString() == "c") + { + Persist(new Evt("c"), e => + { + Sender.Tell(e.Data); + UnstashAll(); + }); + } + else return false; + } + else return false; + return true; + } + } + + internal class UserStashManyActor : StashExamplePersistentActor { public UserStashManyActor(string name) : base(name) @@ -84,6 +141,15 @@ protected override bool ReceiveCommand(object message) } protected bool ProcessC(object message) + { + if (!UnstashBehavior(message)) + { + Stash.Stash(); + } + return true; + } + + protected override bool UnstashBehavior(object message) { var cmd = message as Cmd; if (cmd != null && cmd.Data.ToString() == "c") @@ -95,12 +161,35 @@ protected bool ProcessC(object message) }); UnstashAll(); } - else Stash.Stash(); + else return false; + return true; + } + } + + internal class UserStashWithinHandlerManyActor : UserStashManyActor + { + public UserStashWithinHandlerManyActor(string name) : base(name) + { + } + + protected override bool UnstashBehavior(object message) + { + var cmd = message as Cmd; + if (cmd != null && cmd.Data.ToString() == "c") + { + Persist(new Evt("c"), evt => + { + UpdateState(evt); + Context.UnbecomeStacked(); + UnstashAll(); + }); + } + else return false; return true; } } - internal class UserStashFailureActor : PersistentActorSpec.ExamplePersistentActor + internal class UserStashFailureActor : StashExamplePersistentActor { public UserStashFailureActor(string name) : base(name) @@ -130,6 +219,15 @@ protected override bool ReceiveCommand(object message) } protected bool OtherCommandHandler(object message) + { + if (!UnstashBehavior(message)) + { + Stash.Stash(); + } + return true; + } + + protected override bool UnstashBehavior(object message) { var cmd = message as Cmd; if (cmd != null && cmd.Data.ToString() == "c") @@ -141,7 +239,30 @@ protected bool OtherCommandHandler(object message) }); UnstashAll(); } - else Stash.Stash(); + else return false; + return true; + } + } + + internal class UserStashWithinHandlerFailureCallbackActor : UserStashFailureActor + { + public UserStashWithinHandlerFailureCallbackActor(string name) : base(name) + { + } + + protected override bool UnstashBehavior(object message) + { + var cmd = message as Cmd; + if (cmd != null && cmd.Data.ToString() == "c") + { + Persist(new Evt("c"), evt => + { + UpdateState(evt); + Context.UnbecomeStacked(); + UnstashAll(); + }); + } + else return false; return true; } } @@ -163,6 +284,18 @@ public void PersistentActor_should_support_user_stash_operations() ExpectMsg("a"); } + [Fact] + public void PersistentActor_should_support_user_stash_operations_within_handler() + { + var pref = ActorOf(Props.Create(() => new UserStashWithinHandlerActor(Name))); + pref.Tell(new Cmd("a")); + pref.Tell(new Cmd("b")); + pref.Tell(new Cmd("c")); + ExpectMsg("b"); + ExpectMsg("c"); + ExpectMsg("a"); + } + [Fact] public void PersistentActor_should_support_user_stash_operations_with_several_stashed_messages() { @@ -181,6 +314,24 @@ public void PersistentActor_should_support_user_stash_operations_with_several_st ExpectMsgInOrder(evts); } + [Fact] + public void PersistentActor_should_support_user_stash_operations_within_handler_with_several_stashed_messages() + { + var pref = ActorOf(Props.Create(() => new UserStashWithinHandlerManyActor(Name))); + var n = 10; + var commands = Enumerable.Range(1, n).SelectMany(_ => new[] { new Cmd("a"), new Cmd("b-1"), new Cmd("b-2"), new Cmd("c"), }); + var evts = Enumerable.Range(1, n).SelectMany(_ => new[] { "a", "c", "b-1", "b-2" }).ToArray(); + + foreach (var command in commands) + { + pref.Tell(command); + } + + pref.Tell(GetState.Instance); + + ExpectMsgInOrder(evts); + } + [Fact] public void PersistentActor_should_support_user_stash_operations_under_failures() { @@ -195,11 +346,26 @@ public void PersistentActor_should_support_user_stash_operations_under_failures( pref.Tell(GetState.Instance); ExpectMsgInOrder("a", "c", "b-1", "b-3", "b-4", "b-5", "b-6", "b-7", "b-8", "b-9", "b-10"); } + + [Fact] + public void PersistentActor_should_support_user_stash_operations_within_handler_under_failures() + { + var pref = ActorOf(Props.Create(() => new UserStashWithinHandlerFailureCallbackActor(Name))); + pref.Tell(new Cmd("a")); + for (int i = 1; i <= 10; i++) + { + var cmd = new Cmd("b-" + i); + pref.Tell(cmd); + } + pref.Tell(new Cmd("c")); + pref.Tell(GetState.Instance); + ExpectMsgInOrder("a", "c", "b-1", "b-3", "b-4", "b-5", "b-6", "b-7", "b-8", "b-9", "b-10"); + } } public class SteppingMemoryPersistentActorStashingSpec : PersistenceSpec { - internal class AsyncStashingActor : PersistentActorSpec.ExamplePersistentActor + internal class AsyncStashingActor : PersistentActorStashingSpec.StashExamplePersistentActor { private bool _stashed = false; @@ -209,7 +375,7 @@ public AsyncStashingActor(string name) : base(name) protected override bool ReceiveCommand(object message) { - if (!CommonBehavior(message)) + if (!CommonBehavior(message) && !UnstashBehavior(message)) { if (message is Cmd) { @@ -230,17 +396,45 @@ protected override bool ReceiveCommand(object message) PersistAsync(new Evt("b"), UpdateStateHandler); return true; } - if (data.Equals("c")) - { - PersistAsync(new Evt("c"), UpdateStateHandler); - Stash.UnstashAll(); - return true; - } } } else return true; return false; } + + protected override bool UnstashBehavior(object message) + { + var cmd = message as Cmd; + if (cmd != null && cmd.Data.ToString() == "c") + { + PersistAsync(new Evt("c"), UpdateStateHandler); + Stash.UnstashAll(); + } + else return false; + return true; + } + } + + internal class AsyncStashingWithinHandlerActor : AsyncStashingActor + { + public AsyncStashingWithinHandlerActor(string name) : base(name) + { + } + + protected override bool UnstashBehavior(object message) + { + var cmd = message as Cmd; + if (cmd != null && cmd.Data.ToString() == "c") + { + PersistAsync(new Evt("c"), evt => + { + UpdateState(evt); + Stash.UnstashAll(); + }); + } + else return false; + return true; + } } public SteppingMemoryPersistentActorStashingSpec() @@ -278,5 +472,36 @@ public void Stashing_in_a_PersistentActor_mixed_with_PersistAsync_should_handle_ }); }); } + + [Fact] + public void Stashing_in_a_PersistentActor_mixed_with_PersistAsync_should_handle_async_callback_not_happening_until_next_message_has_been_stashed_within_handler() + { + var pref = Sys.ActorOf(Props.Create(() => new AsyncStashingWithinHandlerActor(Name))); + AwaitAssert(() => SteppingMemoryJournal.GetRef("persistence-stash"), TimeSpan.FromSeconds(3)); + var journal = SteppingMemoryJournal.GetRef("persistence-stash"); + + // initial read highest + SteppingMemoryJournal.Step(journal); + + pref.Tell(new Cmd("a")); + pref.Tell(new Cmd("b")); + + // allow the write to complete, after the stash + SteppingMemoryJournal.Step(journal); + + pref.Tell(new Cmd("c")); + // writing of c and b + SteppingMemoryJournal.Step(journal); + SteppingMemoryJournal.Step(journal); + + Within(TimeSpan.FromSeconds(3), () => + { + AwaitAssert(() => + { + pref.Tell(GetState.Instance); + ExpectMsgInOrder("a", "c", "b"); + }); + }); + } } } \ No newline at end of file diff --git a/src/core/Akka.Persistence/Eventsourced.Recovery.cs b/src/core/Akka.Persistence/Eventsourced.Recovery.cs index e42fbb9eb7e..5b5c5730af8 100644 --- a/src/core/Akka.Persistence/Eventsourced.Recovery.cs +++ b/src/core/Akka.Persistence/Eventsourced.Recovery.cs @@ -146,7 +146,11 @@ private EventsourcedState ProcessingCommands() { return new EventsourcedState("processing commands", false, (receive, message) => { - var handled = CommonProcessingStateBehavior(message, err => _pendingInvocations.Pop()); + var handled = CommonProcessingStateBehavior(message, err => + { + _pendingInvocations.Pop(); + UnstashInternally(err); + }); if (!handled) { try @@ -169,10 +173,8 @@ private void OnProcessingCommandsAroundReceiveComplete(bool err) if (_pendingStashingPersistInvocations > 0) ChangeState(PersistingEvents()); - else if (err) - _internalStash.UnstashAll(); else - _internalStash.Unstash(); + UnstashInternally(err); } private void FlushBatch() @@ -211,8 +213,7 @@ private EventsourcedState PersistingEvents() if (_pendingStashingPersistInvocations == 0) { ChangeState(ProcessingCommands()); - if (err) _internalStash.UnstashAll(); - else _internalStash.Unstash(); + UnstashInternally(err); } }); diff --git a/src/core/Akka.Persistence/Eventsourced.cs b/src/core/Akka.Persistence/Eventsourced.cs index e27cccefe01..dfa4c0a8dd7 100644 --- a/src/core/Akka.Persistence/Eventsourced.cs +++ b/src/core/Akka.Persistence/Eventsourced.cs @@ -501,6 +501,14 @@ private void StashInternally(object currentMessage) } } } + + private void UnstashInternally(bool all) + { + if (all) + _internalStash.UnstashAll(); + else + _internalStash.Unstash(); + } } }