From ff915ee308c8aad7f21783e44b11bc7435e44c49 Mon Sep 17 00:00:00 2001 From: Bartosz Sypytkowski Date: Sat, 19 Dec 2015 21:13:16 +0100 Subject: [PATCH 1/2] enclosed some API in akka cluster plugins --- .../ClusterShardingFailureSpec.cs | 3 +- .../ClusterShardingGracefulShutdownSpec.cs | 4 +- .../ClusterShardingSpec.cs | 14 +- .../Akka.Cluster.Sharding.csproj | 3 + .../Akka.Cluster.Sharding/ClusterSharding.cs | 219 +- .../ClusterShardingGuardian.cs | 2 +- .../PersistentShardCoordinator.cs | 10 +- .../Properties/AssemblyInfo.cs | 1 + .../ClusterShardingMessageSerializer.cs | 339 + .../Proto/ClusterShardingMessages.cs | 6806 ++++++++--------- .../cluster/Akka.Cluster.Sharding/Shard.cs | 38 +- .../Akka.Cluster.Sharding/ShardRegion.cs | 204 +- .../Akka.Cluster.Sharding/ShardingMessages.cs | 199 + .../DistributedPubSubMediatorSpec.cs | 96 +- .../Client/ClusterClient.cs | 12 +- .../Client/ClusterClientReceptionist.cs | 12 +- .../Client/ClusterReceptionist.cs | 22 +- .../Properties/AssemblyInfo.cs | 1 + .../PublishSubscribe/DistributedMessages.cs | 642 +- .../DistributedPubSubMediator.cs | 40 +- .../Internal/TopicMessages.cs | 32 +- .../PublishSubscribe/Internal/Topics.cs | 28 +- .../DistributedPubSubMessageSerializer.cs | 48 +- .../protobuf/ClusterShardingMessages.proto | 54 + .../ClusterSharding.Node/App.config | 3 + .../ClusterSharding.Node.csproj | 1 + .../ClusterSharding.Node/MessageExtractor.cs | 25 + .../ClusterSharding.Node/Printer.cs | 25 - .../ClusterSharding.Node/Program.cs | 4 +- .../ClusterToolsExample.Node/App.config | 17 +- .../ClusterToolsExample.Seed/App.config | 17 +- .../ClusterToolsExample.Seed/Program.cs | 2 +- .../ClusterToolsExample.Shared/Echo.cs | 6 +- 33 files changed, 4577 insertions(+), 4352 deletions(-) create mode 100644 src/contrib/cluster/Akka.Cluster.Sharding/Serialization/ClusterShardingMessageSerializer.cs create mode 100644 src/contrib/cluster/Akka.Cluster.Sharding/ShardingMessages.cs create mode 100644 src/core/protobuf/ClusterShardingMessages.proto create mode 100644 src/examples/Cluster/ClusterSharding/ClusterSharding.Node/MessageExtractor.cs diff --git a/src/contrib/cluster/Akka.Cluster.Sharding.Tests/ClusterShardingFailureSpec.cs b/src/contrib/cluster/Akka.Cluster.Sharding.Tests/ClusterShardingFailureSpec.cs index 7971a2373d1..e03d4cbd7d7 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding.Tests/ClusterShardingFailureSpec.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding.Tests/ClusterShardingFailureSpec.cs @@ -14,7 +14,6 @@ using Akka.Persistence.Journal; using Akka.Remote.TestKit; using Akka.Remote.Transport; -using Xunit; namespace Akka.Cluster.Sharding.Tests { @@ -276,7 +275,7 @@ public void ClusterSharding_with_flaky_journal_should_recover_after_journal_fail region.Tell(new Add("11", 1)); //Test the Shard passivate works during a journal failure - shard2.Tell(new ShardRegion.Passivate(PoisonPill.Instance), entity21); + shard2.Tell(new Passivate(PoisonPill.Instance), entity21); region.Tell(new Add("21", 1)); region.Tell(new Get("21")); diff --git a/src/contrib/cluster/Akka.Cluster.Sharding.Tests/ClusterShardingGracefulShutdownSpec.cs b/src/contrib/cluster/Akka.Cluster.Sharding.Tests/ClusterShardingGracefulShutdownSpec.cs index 6434035881e..ab61212e16a 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding.Tests/ClusterShardingGracefulShutdownSpec.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding.Tests/ClusterShardingGracefulShutdownSpec.cs @@ -84,7 +84,7 @@ protected override void OnReceive(object message) if (message.Equals("leave")) { Context.Watch(_region); - _region.Tell(ShardRegion.GracefulShutdown.Instance); + _region.Tell(GracefulShutdown.Instance); } else if ((terminated = message as Terminated) != null && terminated.ActorRef.Equals(_region)) { @@ -214,7 +214,7 @@ public void ClusterSharding_should_gracefully_shutdown_a_region() RunOn(() => { - _region.Value.Tell(ShardRegion.GracefulShutdown.Instance); + _region.Value.Tell(GracefulShutdown.Instance); }, _second); RunOn(() => diff --git a/src/contrib/cluster/Akka.Cluster.Sharding.Tests/ClusterShardingSpec.cs b/src/contrib/cluster/Akka.Cluster.Sharding.Tests/ClusterShardingSpec.cs index f727619722d..5f6a7c519e4 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding.Tests/ClusterShardingSpec.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding.Tests/ClusterShardingSpec.cs @@ -6,11 +6,9 @@ //----------------------------------------------------------------------- using System; -using System.Collections.Generic; using System.IO; using System.Linq; using System.Threading; -using Akka.Cluster.Sharding; using Akka.Configuration; using Akka.Persistence; using Akka.Remote.TestKit; @@ -189,7 +187,7 @@ protected override bool ReceiveCommand(object message) .With(_ => Persist(new CounterChanged(1), UpdateState)) .With(_ => Persist(new CounterChanged(-1), UpdateState)) .With(_ => Sender.Tell(_count)) - .With(_ => Context.Parent.Tell(new ShardRegion.Passivate(Stop.Instance))) + .With(_ => Context.Parent.Tell(new Passivate(Stop.Instance))) .With(_ => Context.Stop(Self)) .WasHandled; } @@ -383,8 +381,8 @@ public void ClusterSharding_should_work_in_single_node_cluster() r.Tell(new Counter.Get(1)); ExpectMsg(2); - r.Tell(ShardRegion.GetCurrentRegions.Instance); - ExpectMsg(m => m.Regions.Length == 1 && m.Regions[0].Equals(Cluster.SelfAddress)); + r.Tell(GetCurrentRegions.Instance); + ExpectMsg(m => m.Regions.Length == 1 && m.Regions[0].Equals(Cluster.SelfAddress)); }, _first); EnterBarrier("after-2"); @@ -445,8 +443,8 @@ public void ClusterSharding_should_use_second_node() ExpectMsg(3); Assert.Equal(r.Path / "2" / "2", LastSender.Path); - r.Tell(ShardRegion.GetCurrentRegions.Instance); - ExpectMsg(x => x.Regions.Length == 2 + r.Tell(GetCurrentRegions.Instance); + ExpectMsg(x => x.Regions.Length == 2 && x.Regions[0].Equals(Cluster.SelfAddress) && x.Regions[1].Equals(Node(_first).Address)); }, _second); @@ -906,7 +904,7 @@ public void Persistent_cluster_shards_should_permanently_stop_entities_which_pas //Send the shard the passivate message from the counter Watch(counter1); - shard.Tell(new ShardRegion.Passivate(Counter.Stop.Instance), counter1); + shard.Tell(new Passivate(Counter.Stop.Instance), counter1); // watch for the Terminated message ExpectTerminated(counter1, TimeSpan.FromSeconds(5)); diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/Akka.Cluster.Sharding.csproj b/src/contrib/cluster/Akka.Cluster.Sharding/Akka.Cluster.Sharding.csproj index f2229d5617b..9726833f1fc 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/Akka.Cluster.Sharding.csproj +++ b/src/contrib/cluster/Akka.Cluster.Sharding/Akka.Cluster.Sharding.csproj @@ -59,9 +59,11 @@ + + @@ -92,6 +94,7 @@ +