Skip to content

Commit

Permalink
cleaned up RememberEntitiesFailureSpecs (#7400)
Browse files Browse the repository at this point in the history
- converted to `async` APIs
- fixed `Cl;usterSingletonManager.DefaultConfig` build warning
- style cleanup
  • Loading branch information
Aaronontheweb authored Dec 2, 2024
1 parent 0968e9d commit 4becbdd
Showing 1 changed file with 30 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using System.Collections.Immutable;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Cluster.Sharding.Internal;
using Akka.Cluster.Tools.Singleton;
Expand All @@ -26,19 +27,19 @@ public class RememberEntitiesFailureSpec : AkkaSpec
{
internal class EntityActor : ActorBase
{
private readonly ILoggingAdapter log = Context.GetLogger();
private readonly ILoggingAdapter _log = Context.GetLogger();

public EntityActor()
{
log.Info("Entity actor [{0}] starting up", Context.Self.Path.Name);
_log.Info("Entity actor [{0}] starting up", Context.Self.Path.Name);
}

protected override bool Receive(object message)
{
switch (message)
{
case "stop":
log.Info("Stopping myself!");
_log.Info("Stopping myself!");
Context.Stop(Self);
return true;
case "graceful-stop":
Expand Down Expand Up @@ -122,8 +123,8 @@ public Delay(TimeSpan howLong)
}

// outside store since we need to be able to set them before sharding initializes
private static ImmutableDictionary<string, IFail> failShardGetEntities = ImmutableDictionary<string, IFail>.Empty;
private static readonly IFail failCoordinatorGetShards = null;
private static ImmutableDictionary<string, IFail> _failShardGetEntities = ImmutableDictionary<string, IFail>.Empty;
private static readonly IFail FailCoordinatorGetShards = null;

private class ShardStoreCreated
{
Expand Down Expand Up @@ -221,7 +222,7 @@ protected override bool Receive(object message)
switch (message)
{
case RememberEntitiesShardStore.GetEntities _:
switch (failShardGetEntities.GetValueOrDefault(shardId))
switch (_failShardGetEntities.GetValueOrDefault(shardId))
{
case null:
Sender.Tell(new RememberEntitiesShardStore.RememberedEntities(ImmutableHashSet<string>.Empty));
Expand Down Expand Up @@ -317,7 +318,7 @@ protected override bool Receive(object message)
switch (message)
{
case RememberEntitiesCoordinatorStore.GetShards _:
switch (failCoordinatorGetShards)
switch (FailCoordinatorGetShards)
{
case null:
Sender.Tell(new RememberEntitiesCoordinatorStore.RememberedShards(ImmutableHashSet<string>.Empty));
Expand Down Expand Up @@ -390,7 +391,7 @@ protected override bool Receive(object message)
akka.cluster.sharding.updating-state-timeout = 1s
akka.cluster.sharding.verbose-debug-logging = on
akka.cluster.sharding.fail-on-invalid-entity-state-transition = on")
.WithFallback(ClusterSingletonManager.DefaultConfig()
.WithFallback(ClusterSingleton.DefaultConfig()
.WithFallback(ClusterSharding.DefaultConfig()));

public RememberEntitiesFailureSpec(ITestOutputHelper helper) : base(SpecConfig, helper)
Expand Down Expand Up @@ -441,7 +442,7 @@ public void Remember_entities_handling_in_sharding_must_recover_when_initial_rem
private void Remember_entities_handling_in_sharding_must_recover_when_initial_remember_entities_load_fails(IFail wayToFail)
{
Log.Debug("Getting entities for shard 1 will fail");
failShardGetEntities = ImmutableDictionary<string, IFail>.Empty.Add("1", wayToFail);
_failShardGetEntities = ImmutableDictionary<string, IFail>.Empty.Add("1", wayToFail);

try
{
Expand All @@ -456,7 +457,7 @@ private void Remember_entities_handling_in_sharding_must_recover_when_initial_re
probe.ExpectNoMsg(); // message is lost because shard crashes

Log.Debug("Resetting initial fail");
failShardGetEntities = ImmutableDictionary<string, IFail>.Empty;
_failShardGetEntities = ImmutableDictionary<string, IFail>.Empty;

// shard should be restarted and eventually succeed
AwaitAssert(() =>
Expand All @@ -469,7 +470,7 @@ private void Remember_entities_handling_in_sharding_must_recover_when_initial_re
}
finally
{
failShardGetEntities = ImmutableDictionary<string, IFail>.Empty;
_failShardGetEntities = ImmutableDictionary<string, IFail>.Empty;
}
}

Expand Down Expand Up @@ -694,41 +695,41 @@ private void Remember_entities_handling_in_sharding_must_recover_on_graceful_ent
}

[Fact]
public void Remember_entities_handling_in_sharding_must_recover_when_coordinator_storing_shard_start_fails_NoResponse()
public Task Remember_entities_handling_in_sharding_must_recover_when_coordinator_storing_shard_start_fails_NoResponse()
{
Remember_entities_handling_in_sharding_must_recover_when_coordinator_storing_shard_start_fails(new NoResponse());
return Remember_entities_handling_in_sharding_must_recover_when_coordinator_storing_shard_start_fails(new NoResponse());
}

[Fact]
public void Remember_entities_handling_in_sharding_must_recover_when_coordinator_storing_shard_start_fails_CrashStore()
public Task Remember_entities_handling_in_sharding_must_recover_when_coordinator_storing_shard_start_fails_CrashStore()
{
Remember_entities_handling_in_sharding_must_recover_when_coordinator_storing_shard_start_fails(new CrashStore());
return Remember_entities_handling_in_sharding_must_recover_when_coordinator_storing_shard_start_fails(new CrashStore());
}

[Fact]
public void Remember_entities_handling_in_sharding_must_recover_when_coordinator_storing_shard_start_fails_StopStore()
public Task Remember_entities_handling_in_sharding_must_recover_when_coordinator_storing_shard_start_fails_StopStore()
{
Remember_entities_handling_in_sharding_must_recover_when_coordinator_storing_shard_start_fails(new StopStore());
return Remember_entities_handling_in_sharding_must_recover_when_coordinator_storing_shard_start_fails(new StopStore());
}

[Fact]
public void Remember_entities_handling_in_sharding_must_recover_when_coordinator_storing_shard_start_fails_Delay_500()
public Task Remember_entities_handling_in_sharding_must_recover_when_coordinator_storing_shard_start_fails_Delay_500()
{
Remember_entities_handling_in_sharding_must_recover_when_coordinator_storing_shard_start_fails(new Delay(TimeSpan.FromMilliseconds(500)));
return Remember_entities_handling_in_sharding_must_recover_when_coordinator_storing_shard_start_fails(new Delay(TimeSpan.FromMilliseconds(500)));
}

[Fact]
public void Remember_entities_handling_in_sharding_must_recover_when_coordinator_storing_shard_start_fails_Delay_1000()
public Task Remember_entities_handling_in_sharding_must_recover_when_coordinator_storing_shard_start_fails_Delay_1000()
{
Remember_entities_handling_in_sharding_must_recover_when_coordinator_storing_shard_start_fails(new Delay(TimeSpan.FromSeconds(1)));
return Remember_entities_handling_in_sharding_must_recover_when_coordinator_storing_shard_start_fails(new Delay(TimeSpan.FromSeconds(1)));
}

private void Remember_entities_handling_in_sharding_must_recover_when_coordinator_storing_shard_start_fails(IFail wayToFail)
private async Task Remember_entities_handling_in_sharding_must_recover_when_coordinator_storing_shard_start_fails(IFail wayToFail)
{
var storeProbe = CreateTestProbe();
Sys.EventStream.Subscribe(storeProbe.Ref, typeof(CoordinatorStoreCreated));

var sharding = ClusterSharding.Get(Sys).Start(
var sharding = await ClusterSharding.Get(Sys).StartAsync(
$"coordinatorStoreStopGraceful-{wayToFail}",
Props.Create(() => new EntityActor()),
ClusterShardingSettings.Create(Sys).WithRememberEntities(true),
Expand All @@ -739,12 +740,12 @@ private void Remember_entities_handling_in_sharding_must_recover_when_coordinato
var probe = CreateTestProbe();

// coordinator store is triggered by coordinator starting up
var coordinatorStore = storeProbe.ExpectMsg<CoordinatorStoreCreated>().Store;
var coordinatorStore = (await storeProbe.ExpectMsgAsync<CoordinatorStoreCreated>()).Store;
coordinatorStore.Tell(new FakeCoordinatorStoreActor.FailAddShard("1", wayToFail), probe.Ref);
probe.ExpectMsg<Done>();
await probe.ExpectMsgAsync<Done>();

sharding.Tell(new EntityEnvelope(1, "hello-1"), probe.Ref);
probe.ExpectNoMsg(TimeSpan.FromSeconds(1)); // because shard cannot start while store failing
await probe.ExpectNoMsgAsync(TimeSpan.FromSeconds(1)); // because shard cannot start while store failing

if (wayToFail is StopStore or CrashStore)
{
Expand All @@ -754,12 +755,12 @@ private void Remember_entities_handling_in_sharding_must_recover_when_coordinato

// fail it when stopping
coordinatorStore.Tell(new FakeCoordinatorStoreActor.ClearFailShard("1"), storeProbe.Ref);
storeProbe.ExpectMsg<Done>();
await storeProbe.ExpectMsgAsync<Done>();

probe.AwaitAssert(() =>
await probe.AwaitAssertAsync(async () =>
{
sharding.Tell(new EntityEnvelope(1, "hello-2"), probe.Ref);
probe.ExpectMsg("hello-2"); // should now work again
await probe.ExpectMsgAsync("hello-2"); // should now work again
}, TimeSpan.FromSeconds(5));

Sys.Stop(sharding);
Expand Down

0 comments on commit 4becbdd

Please sign in to comment.