Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure internal stash is unstashed on writes after recovery #1756

Merged
merged 1 commit into from Mar 8, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
273 changes: 249 additions & 24 deletions src/core/Akka.Persistence.Tests/PersistentActorStashingSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,30 +15,61 @@ namespace Akka.Persistence.Tests
{
public class PersistentActorStashingSpec : PersistenceSpec
{
internal class UserStashActor : PersistentActorSpec.ExamplePersistentActor
internal abstract class StashExamplePersistentActor : PersistentActorSpec.ExamplePersistentActor
{
public StashExamplePersistentActor(string name) : base(name)
{
}

protected virtual bool UnstashBehavior(object message)
{
return false;
}
}

internal class UserStashActor : StashExamplePersistentActor
{
private bool _stashed = false;
public UserStashActor(string name) : base(name) { }

protected override bool ReceiveCommand(object message)
{
if (message is Cmd)
if (!UnstashBehavior(message))
{
var cmd = message as Cmd;
if (cmd.Data.ToString() == "a")
if (message is Cmd)
{
if (!_stashed)
var cmd = message as Cmd;
if (cmd.Data.ToString() == "a")
{
Stash.Stash();
_stashed = true;
}
else Sender.Tell("a");
if (!_stashed)
{
Stash.Stash();
_stashed = true;
}
else Sender.Tell("a");

}
else if (cmd.Data.ToString() == "b") Persist(new Evt("b"), evt => Sender.Tell(evt.Data));
else if (cmd.Data.ToString() == "c")
{
UnstashAll();
Sender.Tell("c");
}
else return false;
}
else if (cmd.Data.ToString() == "b") Persist(new Evt("b"), evt => Sender.Tell(evt.Data));
else if (cmd.Data.ToString() == "c")
else return false;
}
return true;
}

protected override bool UnstashBehavior(object message)
{
if (message is Cmd)
{
var cmd = message as Cmd;
if (cmd.Data.ToString() == "c")
{
Stash.UnstashAll();
UnstashAll();
Sender.Tell("c");
}
else return false;
Expand All @@ -48,7 +79,33 @@ protected override bool ReceiveCommand(object message)
}
}

internal class UserStashManyActor : PersistentActorSpec.ExamplePersistentActor
internal class UserStashWithinHandlerActor : UserStashActor
{
public UserStashWithinHandlerActor(string name) : base(name)
{
}

protected override bool UnstashBehavior(object message)
{
if (message is Cmd)
{
var cmd = message as Cmd;
if (cmd.Data.ToString() == "c")
{
Persist(new Evt("c"), e =>
{
Sender.Tell(e.Data);
UnstashAll();
});
}
else return false;
}
else return false;
return true;
}
}

internal class UserStashManyActor : StashExamplePersistentActor
{
public UserStashManyActor(string name)
: base(name)
Expand Down Expand Up @@ -84,6 +141,15 @@ protected override bool ReceiveCommand(object message)
}

protected bool ProcessC(object message)
{
if (!UnstashBehavior(message))
{
Stash.Stash();
}
return true;
}

protected override bool UnstashBehavior(object message)
{
var cmd = message as Cmd;
if (cmd != null && cmd.Data.ToString() == "c")
Expand All @@ -95,12 +161,35 @@ protected bool ProcessC(object message)
});
UnstashAll();
}
else Stash.Stash();
else return false;
return true;
}
}

internal class UserStashWithinHandlerManyActor : UserStashManyActor
{
public UserStashWithinHandlerManyActor(string name) : base(name)
{
}

protected override bool UnstashBehavior(object message)
{
var cmd = message as Cmd;
if (cmd != null && cmd.Data.ToString() == "c")
{
Persist(new Evt("c"), evt =>
{
UpdateState(evt);
Context.UnbecomeStacked();
UnstashAll();
});
}
else return false;
return true;
}
}

internal class UserStashFailureActor : PersistentActorSpec.ExamplePersistentActor
internal class UserStashFailureActor : StashExamplePersistentActor
{
public UserStashFailureActor(string name)
: base(name)
Expand Down Expand Up @@ -130,6 +219,15 @@ protected override bool ReceiveCommand(object message)
}

protected bool OtherCommandHandler(object message)
{
if (!UnstashBehavior(message))
{
Stash.Stash();
}
return true;
}

protected override bool UnstashBehavior(object message)
{
var cmd = message as Cmd;
if (cmd != null && cmd.Data.ToString() == "c")
Expand All @@ -141,7 +239,30 @@ protected bool OtherCommandHandler(object message)
});
UnstashAll();
}
else Stash.Stash();
else return false;
return true;
}
}

internal class UserStashWithinHandlerFailureCallbackActor : UserStashFailureActor
{
public UserStashWithinHandlerFailureCallbackActor(string name) : base(name)
{
}

protected override bool UnstashBehavior(object message)
{
var cmd = message as Cmd;
if (cmd != null && cmd.Data.ToString() == "c")
{
Persist(new Evt("c"), evt =>
{
UpdateState(evt);
Context.UnbecomeStacked();
UnstashAll();
});
}
else return false;
return true;
}
}
Expand All @@ -163,6 +284,18 @@ public void PersistentActor_should_support_user_stash_operations()
ExpectMsg("a");
}

[Fact]
public void PersistentActor_should_support_user_stash_operations_within_handler()
{
var pref = ActorOf(Props.Create(() => new UserStashWithinHandlerActor(Name)));
pref.Tell(new Cmd("a"));
pref.Tell(new Cmd("b"));
pref.Tell(new Cmd("c"));
ExpectMsg("b");
ExpectMsg("c");
ExpectMsg("a");
}

[Fact]
public void PersistentActor_should_support_user_stash_operations_with_several_stashed_messages()
{
Expand All @@ -181,6 +314,24 @@ public void PersistentActor_should_support_user_stash_operations_with_several_st
ExpectMsgInOrder(evts);
}

[Fact]
public void PersistentActor_should_support_user_stash_operations_within_handler_with_several_stashed_messages()
{
var pref = ActorOf(Props.Create(() => new UserStashWithinHandlerManyActor(Name)));
var n = 10;
var commands = Enumerable.Range(1, n).SelectMany(_ => new[] { new Cmd("a"), new Cmd("b-1"), new Cmd("b-2"), new Cmd("c"), });
var evts = Enumerable.Range(1, n).SelectMany(_ => new[] { "a", "c", "b-1", "b-2" }).ToArray();

foreach (var command in commands)
{
pref.Tell(command);
}

pref.Tell(GetState.Instance);

ExpectMsgInOrder(evts);
}

[Fact]
public void PersistentActor_should_support_user_stash_operations_under_failures()
{
Expand All @@ -195,11 +346,26 @@ public void PersistentActor_should_support_user_stash_operations_under_failures(
pref.Tell(GetState.Instance);
ExpectMsgInOrder("a", "c", "b-1", "b-3", "b-4", "b-5", "b-6", "b-7", "b-8", "b-9", "b-10");
}

[Fact]
public void PersistentActor_should_support_user_stash_operations_within_handler_under_failures()
{
var pref = ActorOf(Props.Create(() => new UserStashWithinHandlerFailureCallbackActor(Name)));
pref.Tell(new Cmd("a"));
for (int i = 1; i <= 10; i++)
{
var cmd = new Cmd("b-" + i);
pref.Tell(cmd);
}
pref.Tell(new Cmd("c"));
pref.Tell(GetState.Instance);
ExpectMsgInOrder("a", "c", "b-1", "b-3", "b-4", "b-5", "b-6", "b-7", "b-8", "b-9", "b-10");
}
}

public class SteppingMemoryPersistentActorStashingSpec : PersistenceSpec
{
internal class AsyncStashingActor : PersistentActorSpec.ExamplePersistentActor
internal class AsyncStashingActor : PersistentActorStashingSpec.StashExamplePersistentActor
{
private bool _stashed = false;

Expand All @@ -209,7 +375,7 @@ public AsyncStashingActor(string name) : base(name)

protected override bool ReceiveCommand(object message)
{
if (!CommonBehavior(message))
if (!CommonBehavior(message) && !UnstashBehavior(message))
{
if (message is Cmd)
{
Expand All @@ -230,17 +396,45 @@ protected override bool ReceiveCommand(object message)
PersistAsync(new Evt("b"), UpdateStateHandler);
return true;
}
if (data.Equals("c"))
{
PersistAsync(new Evt("c"), UpdateStateHandler);
Stash.UnstashAll();
return true;
}
}
}
else return true;
return false;
}

protected override bool UnstashBehavior(object message)
{
var cmd = message as Cmd;
if (cmd != null && cmd.Data.ToString() == "c")
{
PersistAsync(new Evt("c"), UpdateStateHandler);
Stash.UnstashAll();
}
else return false;
return true;
}
}

internal class AsyncStashingWithinHandlerActor : AsyncStashingActor
{
public AsyncStashingWithinHandlerActor(string name) : base(name)
{
}

protected override bool UnstashBehavior(object message)
{
var cmd = message as Cmd;
if (cmd != null && cmd.Data.ToString() == "c")
{
PersistAsync(new Evt("c"), evt =>
{
UpdateState(evt);
Stash.UnstashAll();
});
}
else return false;
return true;
}
}

public SteppingMemoryPersistentActorStashingSpec()
Expand Down Expand Up @@ -278,5 +472,36 @@ public void Stashing_in_a_PersistentActor_mixed_with_PersistAsync_should_handle_
});
});
}

[Fact]
public void Stashing_in_a_PersistentActor_mixed_with_PersistAsync_should_handle_async_callback_not_happening_until_next_message_has_been_stashed_within_handler()
{
var pref = Sys.ActorOf(Props.Create(() => new AsyncStashingWithinHandlerActor(Name)));
AwaitAssert(() => SteppingMemoryJournal.GetRef("persistence-stash"), TimeSpan.FromSeconds(3));
var journal = SteppingMemoryJournal.GetRef("persistence-stash");

// initial read highest
SteppingMemoryJournal.Step(journal);

pref.Tell(new Cmd("a"));
pref.Tell(new Cmd("b"));

// allow the write to complete, after the stash
SteppingMemoryJournal.Step(journal);

pref.Tell(new Cmd("c"));
// writing of c and b
SteppingMemoryJournal.Step(journal);
SteppingMemoryJournal.Step(journal);

Within(TimeSpan.FromSeconds(3), () =>
{
AwaitAssert(() =>
{
pref.Tell(GetState.Instance);
ExpectMsgInOrder("a", "c", "b");
});
});
}
}
}
Loading