From 231c8c9179c37c658bd7963bd0085b4f1919e87a Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Sat, 17 Sep 2022 13:05:31 -0500 Subject: [PATCH] Akka.Cluster.Sharding `GetEntityLocation` Query (#6101) * adding new `GetEntityLocation` query to `ShardRegion` designed to make it easier for testing and telemetry, the new `GetEntityLocation` is an `IShardRegionQuery` that checks for the presence of an entity in a given `ShardRegion` and reports back if this entity is able to be located. This query is not designed to be supported over the network and is meant to be local-only. * added spec and bugfixes * added API approval * added documentation and more detailed error messages --- docs/articles/clustering/cluster-sharding.md | 20 +- .../ShardRegionQueriesSpecs.cs | 183 +++++++++ .../Akka.Cluster.Sharding/ShardRegion.cs | 378 +++++++++++++----- .../Akka.Cluster.Sharding/ShardingMessages.cs | 80 +++- ...PISpec.ApproveClusterSharding.verified.txt | 14 + 5 files changed, 557 insertions(+), 118 deletions(-) create mode 100644 src/contrib/cluster/Akka.Cluster.Sharding.Tests/ShardRegionQueriesSpecs.cs diff --git a/docs/articles/clustering/cluster-sharding.md b/docs/articles/clustering/cluster-sharding.md index 36a58dc8f92..759e6cbe43f 100644 --- a/docs/articles/clustering/cluster-sharding.md +++ b/docs/articles/clustering/cluster-sharding.md @@ -165,13 +165,6 @@ Possible reasons for disabling remember entity storage are: For supporting remembered entities in an environment without disk storage but with access to a database, use persistence mode instead. -> [!NOTE] -> Currently, Lightning.NET library, the storage solution used to store DData in disk, is having problem -> deploying native library files in [Linux operating system operating in x64 and ARM platforms] -> (). -> -> You will need to install LightningDB in your Linux distribution manually if you wanted to use the durable DData feature. - ### Terminating Remembered Entities One complication that `akka.cluster.sharding.remember-entities = true` introduces is that your sharded entity actors can no longer be terminated through the normal Akka.NET channels, i.e. `Context.Stop(Self)`, `PoisonPill.Instance`, and the like. This is because as part of the `remember-entities` contract - the sharding system is going to insist on keeping all remembered entities alive until explicitly told to stop. @@ -217,6 +210,19 @@ You can inspect current sharding stats by using following messages: * On `GetShardRegionState` shard region will reply with `ShardRegionState` containing data about shards living in the current actor system and what entities are alive on each one of them. * On `GetClusterShardingStats` shard region will reply with `ClusterShardingStats` having information about shards living in the whole cluster and how many entities alive in each one of them. +### Querying for the Location of Specific Entities + +It's possible to query a `ShardRegion` or a `ShardRegionProxy` using a `GetEntityLocation` query: + +[!code-csharp[ShardedDaemonProcessSpec.cs](../../../src/contrib/cluster/Akka.Cluster.Sharding.Tests/ShardRegionQueriesSpecs.cs?name=GetEntityLocationQuery)] + +A `GetEntityLocation` query will always return an `EntityLocation` response - even if the query could not be executed. + +> [!IMPORTANT] +> One major caveat is that in order for the `GetEntityLocation` to execute your `IMessageExtractor` or `ShardExtractor` delegate will need to support the `ShardRegion.StartEntity` message - just like you'd have to use in order to support `remember-entities=on`: + +[!code-csharp[ShardedDaemonProcessSpec.cs](../../../src/contrib/cluster/Akka.Cluster.Sharding.Tests/ShardRegionQueriesSpecs.cs?name=GetEntityLocationExtractor)] + ## Integrating Cluster Sharding with Persistent Actors One of the most common scenarios, where cluster sharding is used, is to combine them with event-sourced persistent actors from [Akka.Persistence](xref:persistence-architecture) module. diff --git a/src/contrib/cluster/Akka.Cluster.Sharding.Tests/ShardRegionQueriesSpecs.cs b/src/contrib/cluster/Akka.Cluster.Sharding.Tests/ShardRegionQueriesSpecs.cs new file mode 100644 index 00000000000..d4263d88d10 --- /dev/null +++ b/src/contrib/cluster/Akka.Cluster.Sharding.Tests/ShardRegionQueriesSpecs.cs @@ -0,0 +1,183 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2022 Lightbend Inc. +// Copyright (C) 2013-2022 .NET Foundation +// +//----------------------------------------------------------------------- + +using System; +using System.Threading.Tasks; +using Akka.Actor; +using Akka.Cluster.Tools.Singleton; +using Akka.Configuration; +using Akka.TestKit; +using Akka.TestKit.TestActors; +using Akka.Util; +using Xunit; +using Xunit.Abstractions; +using FluentAssertions; + +namespace Akka.Cluster.Sharding.Tests +{ + public class ShardRegionQueriesSpecs : AkkaSpec + { + private Cluster _cluster; + private ClusterSharding _clusterSharding; + private IActorRef _shardRegion; + + private ActorSystem _proxySys; + + public ShardRegionQueriesSpecs(ITestOutputHelper outputHelper) : base(GetConfig(), outputHelper) + { + _clusterSharding = ClusterSharding.Get(Sys); + _cluster = Cluster.Get(Sys); + _shardRegion = _clusterSharding.Start("entity", s => EchoActor.Props(this, true), + ClusterShardingSettings.Create(Sys).WithRole("shard"), ExtractEntityId, ExtractShardId); + + var proxySysConfig = ConfigurationFactory.ParseString("akka.cluster.roles = [proxy]") + .WithFallback(Sys.Settings.Config); + _proxySys = ActorSystem.Create(Sys.Name, proxySysConfig); + + _cluster.Join(_cluster.SelfAddress); + AwaitAssert(() => { _cluster.SelfMember.Status.ShouldBe(MemberStatus.Up); }); + + // form a 2-node cluster + var proxyCluster = Cluster.Get(_proxySys); + proxyCluster.Join(_cluster.SelfAddress); + AwaitAssert(() => { proxyCluster.SelfMember.Status.ShouldBe(MemberStatus.Up); }); + } + + protected override void AfterAll() + { + Shutdown(_proxySys); + base.AfterAll(); + } + + private Option<(string, object)> ExtractEntityId(object message) + { + switch (message) + { + case int i: + return (i.ToString(), message); + } + + throw new NotSupportedException(); + } + + // + private string ExtractShardId(object message) + { + switch (message) + { + case int i: + return (i % 10).ToString(); + // must support ShardRegion.StartEntity in order for + // GetEntityLocation to work properly + case ShardRegion.StartEntity se: + return se.EntityId; + } + + throw new NotSupportedException(); + } + // + + private static Config GetConfig() + { + return ConfigurationFactory.ParseString(@" + akka.loglevel = WARNING + akka.actor.provider = cluster + akka.remote.dot-netty.tcp.port = 0 + akka.cluster.roles = [shard]") + .WithFallback(Sharding.ClusterSharding.DefaultConfig()) + .WithFallback(DistributedData.DistributedData.DefaultConfig()) + .WithFallback(ClusterSingletonManager.DefaultConfig()); + } + + /// + /// DocFx material for demonstrating how this query type works + /// + [Fact] + public async Task ShardRegion_GetEntityLocation_DocumentationSpec() + { + // + // creates an entity with entityId="1" + await _shardRegion.Ask(1, TimeSpan.FromSeconds(3)); + + // determine where entity with "entityId=1" is located in cluster + var q1 = await _shardRegion.Ask(new GetEntityLocation("1", TimeSpan.FromSeconds(1))); + + q1.EntityId.Should().Be("1"); + + // have a valid ShardId + q1.ShardId.Should().NotBeEmpty(); + + // have valid address for node that will / would host entity + q1.ShardRegion.Should().NotBe(Address.AllSystems); // has real address + + // if entity actor is alive, will retrieve a reference to it + q1.EntityRef.HasValue.Should().BeTrue(); + // + } + + [Fact(DisplayName = "ShardRegion should support GetEntityLocation queries locally")] + public async Task ShardRegion_should_support_GetEntityLocation_query_locally() + { + // arrange + await _shardRegion.Ask(1, TimeSpan.FromSeconds(3)); + await _shardRegion.Ask(2, TimeSpan.FromSeconds(3)); + + // act + var q1 = await _shardRegion.Ask(new GetEntityLocation("1", TimeSpan.FromSeconds(1))); + var q2 = await _shardRegion.Ask(new GetEntityLocation("2", TimeSpan.FromSeconds(1))); + var q3 = await _shardRegion.Ask(new GetEntityLocation("3", TimeSpan.FromSeconds(1))); + + // assert + void AssertValidEntityLocation(EntityLocation e, string entityId) + { + e.EntityId.Should().Be(entityId); + e.EntityRef.Should().NotBe(Option.None); + e.ShardId.Should().NotBeNullOrEmpty(); + e.ShardRegion.Should().Be(_cluster.SelfAddress); + } + + AssertValidEntityLocation(q1, "1"); + AssertValidEntityLocation(q2, "2"); + + q3.EntityRef.Should().Be(Option.None); + q3.ShardId.Should().NotBeNullOrEmpty(); // should still have computed a valid shard? + q3.ShardRegion.Should().Be(Address.AllSystems); + } + + [Fact(DisplayName = "ShardRegion should support GetEntityLocation queries remotely")] + public async Task ShardRegion_should_support_GetEntityLocation_query_remotely() + { + // arrange + var sharding2 = ClusterSharding.Get(_proxySys); + var shardRegionProxy = await sharding2.StartProxyAsync("entity", "shard", ExtractEntityId, ExtractShardId); + + await shardRegionProxy.Ask(1, TimeSpan.FromSeconds(3)); + await shardRegionProxy.Ask(2, TimeSpan.FromSeconds(3)); + + // act + var q1 = await shardRegionProxy.Ask(new GetEntityLocation("1", TimeSpan.FromSeconds(1))); + var q2 = await shardRegionProxy.Ask(new GetEntityLocation("2", TimeSpan.FromSeconds(1))); + var q3 = await shardRegionProxy.Ask(new GetEntityLocation("3", TimeSpan.FromSeconds(1))); + + // assert + void AssertValidEntityLocation(EntityLocation e, string entityId) + { + e.EntityId.Should().Be(entityId); + e.EntityRef.Should().NotBe(Option.None); + e.ShardId.Should().NotBeNullOrEmpty(); + e.ShardRegion.Should().Be(_cluster.SelfAddress); + } + + AssertValidEntityLocation(q1, "1"); + AssertValidEntityLocation(q2, "2"); + + q3.EntityRef.Should().Be(Option.None); + q3.ShardId.Should().NotBeNullOrEmpty(); // should still have computed a valid shard? + q3.ShardRegion.Should().Be(Address.AllSystems); + } + } +} \ No newline at end of file diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/ShardRegion.cs b/src/contrib/cluster/Akka.Cluster.Sharding/ShardRegion.cs index b274fe145ce..f034e71238a 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/ShardRegion.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/ShardRegion.cs @@ -13,7 +13,9 @@ using Akka.Actor; using Akka.Event; using Akka.Pattern; +using Akka.Util; using Akka.Util.Internal; +using Get = Akka.DistributedData.Get; namespace Akka.Cluster.Sharding { @@ -42,7 +44,10 @@ internal sealed class Retry : IShardRegionCommand /// TBD /// public static readonly Retry Instance = new Retry(); - private Retry() { } + + private Retry() + { + } } /// @@ -62,7 +67,10 @@ internal sealed class RegisterRetry : IShardRegionCommand /// TBD /// public static readonly RegisterRetry Instance = new RegisterRetry(); - private RegisterRetry() { } + + private RegisterRetry() + { + } } /// @@ -76,6 +84,7 @@ internal sealed class RestartShard /// TBD /// public readonly ShardId ShardId; + /// /// TBD /// @@ -111,7 +120,6 @@ public StartEntity(EntityId entityId) #region Equals - public override bool Equals(object obj) { var other = obj as StartEntity; @@ -122,7 +130,7 @@ public override bool Equals(object obj) return EntityId.Equals(other.EntityId); } - + public override int GetHashCode() { unchecked @@ -165,7 +173,6 @@ public StartEntityAck(EntityId entityId, ShardId shardId) #region Equals - public override bool Equals(object obj) { var other = obj as StartEntityAck; @@ -174,10 +181,10 @@ public override bool Equals(object obj) if (ReferenceEquals(other, this)) return true; return EntityId.Equals(other.EntityId) - && ShardId.Equals(other.ShardId); + && ShardId.Equals(other.ShardId); } - + public override int GetHashCode() { unchecked @@ -219,10 +226,14 @@ private StopTimeoutWarning() private static readonly TimeSpan StopTimeoutWarningAfter = TimeSpan.FromSeconds(5); private ILoggingAdapter _log; + /// /// TBD /// - public ILoggingAdapter Log { get { return _log ?? (_log = Context.GetLogger()); } } + public ILoggingAdapter Log + { + get { return _log ?? (_log = Context.GetLogger()); } + } public ITimerScheduler Timers { get; set; } @@ -236,9 +247,12 @@ private StopTimeoutWarning() /// TBD /// TBD /// TBD - public static Props Props(string typeName, ShardId shard, IActorRef replyTo, IEnumerable entities, object stopMessage, TimeSpan handoffTimeout) + public static Props Props(string typeName, ShardId shard, IActorRef replyTo, + IEnumerable entities, object stopMessage, TimeSpan handoffTimeout) { - return Actor.Props.Create(() => new HandOffStopper(typeName, shard, replyTo, entities, stopMessage, handoffTimeout)).WithDeploy(Deploy.Local); + return Actor.Props + .Create(() => new HandOffStopper(typeName, shard, replyTo, entities, stopMessage, handoffTimeout)) + .WithDeploy(Deploy.Local); } /// @@ -252,7 +266,8 @@ public static Props Props(string typeName, ShardId shard, IActorRef replyTo, IEn /// TBD /// TBD /// TBD - public HandOffStopper(string typeName, ShardId shard, IActorRef replyTo, IEnumerable entities, object stopMessage, TimeSpan handoffTimeout) + public HandOffStopper(string typeName, ShardId shard, IActorRef replyTo, IEnumerable entities, + object stopMessage, TimeSpan handoffTimeout) { var remaining = new HashSet(entities); @@ -273,20 +288,23 @@ public HandOffStopper(string typeName, ShardId shard, IActorRef replyTo, IEnumer shard, StopTimeoutWarningAfter, stopMessage.GetType(), - (CoordinatedShutdown.Get(Context.System).ShutdownReason != null) ? - "" // the region will be shutdown earlier so would be confusing to say more + (CoordinatedShutdown.Get(Context.System).ShutdownReason != null) + ? "" // the region will be shutdown earlier so would be confusing to say more : $"Waiting additional [{handoffTimeout}] before stopping the remaining entities."); }); Receive(s => { - Log.Warning($"{typeName}: HandOffStopMessage[{{0}}] is not handled by some of the entities in shard [{{1}}] after [{{2}}], " + - "stopping the remaining [{3}] entities.", stopMessage.GetType(), shard, handoffTimeout, remaining.Count); + Log.Warning( + $"{typeName}: HandOffStopMessage[{{0}}] is not handled by some of the entities in shard [{{1}}] after [{{2}}], " + + "stopping the remaining [{3}] entities.", stopMessage.GetType(), shard, handoffTimeout, + remaining.Count); foreach (var r in remaining) Context.Stop(r); }); - Timers.StartSingleTimer(StopTimeoutWarning.Instance, StopTimeoutWarning.Instance, StopTimeoutWarningAfter); + Timers.StartSingleTimer(StopTimeoutWarning.Instance, StopTimeoutWarning.Instance, + StopTimeoutWarningAfter); Timers.StartSingleTimer(StopTimeout.Instance, StopTimeout.Instance, handoffTimeout); foreach (var aref in remaining) @@ -310,9 +328,13 @@ public HandOffStopper(string typeName, ShardId shard, IActorRef replyTo, IEnumer /// /// /// TBD - internal static Props Props(string typeName, Func entityProps, ClusterShardingSettings settings, string coordinatorPath, ExtractEntityId extractEntityId, ExtractShardId extractShardId, object handOffStopMessage, IActorRef replicator, int majorityMinCap) + internal static Props Props(string typeName, Func entityProps, ClusterShardingSettings settings, + string coordinatorPath, ExtractEntityId extractEntityId, ExtractShardId extractShardId, + object handOffStopMessage, IActorRef replicator, int majorityMinCap) { - return Actor.Props.Create(() => new ShardRegion(typeName, entityProps, settings, coordinatorPath, extractEntityId, extractShardId, handOffStopMessage, replicator, majorityMinCap)).WithDeploy(Deploy.Local); + return Actor.Props.Create(() => new ShardRegion(typeName, entityProps, settings, coordinatorPath, + extractEntityId, extractShardId, handOffStopMessage, replicator, majorityMinCap)) + .WithDeploy(Deploy.Local); } /// @@ -326,35 +348,43 @@ internal static Props Props(string typeName, Func entityProps, Cl /// /// /// TBD - internal static Props ProxyProps(string typeName, ClusterShardingSettings settings, string coordinatorPath, ExtractEntityId extractEntityId, ExtractShardId extractShardId, IActorRef replicator, int majorityMinCap) + internal static Props ProxyProps(string typeName, ClusterShardingSettings settings, string coordinatorPath, + ExtractEntityId extractEntityId, ExtractShardId extractShardId, IActorRef replicator, int majorityMinCap) { - return Actor.Props.Create(() => new ShardRegion(typeName, null, settings, coordinatorPath, extractEntityId, extractShardId, PoisonPill.Instance, replicator, majorityMinCap)).WithDeploy(Deploy.Local); + return Actor.Props.Create(() => new ShardRegion(typeName, null, settings, coordinatorPath, extractEntityId, + extractShardId, PoisonPill.Instance, replicator, majorityMinCap)).WithDeploy(Deploy.Local); } /// /// TBD /// public readonly string TypeName; + /// /// TBD /// public readonly Func EntityProps; + /// /// TBD /// public readonly ClusterShardingSettings Settings; + /// /// TBD /// public readonly string CoordinatorPath; + /// /// TBD /// public readonly ExtractEntityId ExtractEntityId; + /// /// TBD /// public readonly ExtractShardId ExtractShardId; + /// /// TBD /// @@ -371,35 +401,46 @@ internal static Props ProxyProps(string typeName, ClusterShardingSettings settin /// /// TBD /// - protected IImmutableSet MembersByAge = ImmutableSortedSet.Empty.WithComparer(Member.AgeOrdering); + protected IImmutableSet + MembersByAge = ImmutableSortedSet.Empty.WithComparer(Member.AgeOrdering); // membersByAge contains members with these status - private static readonly ImmutableHashSet MemberStatusOfInterest = ImmutableHashSet.Create(MemberStatus.Up, MemberStatus.Leaving, MemberStatus.Exiting); + private static readonly ImmutableHashSet MemberStatusOfInterest = + ImmutableHashSet.Create(MemberStatus.Up, MemberStatus.Leaving, MemberStatus.Exiting); /// /// TBD /// - protected IImmutableDictionary> Regions = ImmutableDictionary>.Empty; + protected IImmutableDictionary> Regions = + ImmutableDictionary>.Empty; + /// /// TBD /// - protected IImmutableDictionary RegionByShard = ImmutableDictionary.Empty; + protected IImmutableDictionary + RegionByShard = ImmutableDictionary.Empty; + /// /// TBD /// - protected IImmutableDictionary>> ShardBuffers = ImmutableDictionary>>.Empty; + protected IImmutableDictionary>> ShardBuffers = + ImmutableDictionary>>.Empty; + /// /// TBD /// protected IImmutableDictionary Shards = ImmutableDictionary.Empty; + /// /// TBD /// protected IImmutableDictionary ShardsByRef = ImmutableDictionary.Empty; + /// /// TBD /// protected IImmutableSet StartingShards = ImmutableHashSet.Empty; + /// /// TBD /// @@ -428,7 +469,9 @@ internal static Props ProxyProps(string typeName, ClusterShardingSettings settin /// TBD /// /// - public ShardRegion(string typeName, Func entityProps, ClusterShardingSettings settings, string coordinatorPath, ExtractEntityId extractEntityId, ExtractShardId extractShardId, object handOffStopMessage, IActorRef replicator, int majorityMinCap) + public ShardRegion(string typeName, Func entityProps, ClusterShardingSettings settings, + string coordinatorPath, ExtractEntityId extractEntityId, ExtractShardId extractShardId, + object handOffStopMessage, IActorRef replicator, int majorityMinCap) { TypeName = typeName; EntityProps = entityProps; @@ -465,10 +508,14 @@ private void SetupCoordinatedShutdown() } private ILoggingAdapter _log; + /// /// TBD /// - public ILoggingAdapter Log { get { return _log ?? (_log = Context.GetLogger()); } } + public ILoggingAdapter Log + { + get { return _log ?? (_log = Context.GetLogger()); } + } public ITimerScheduler Timers { get; set; } @@ -476,10 +523,14 @@ private void SetupCoordinatedShutdown() /// TBD /// public bool GracefulShutdownInProgress { get; private set; } + /// /// TBD /// - public int TotalBufferSize { get { return ShardBuffers.Aggregate(0, (acc, entity) => acc + entity.Value.Count); } } + public int TotalBufferSize + { + get { return ShardBuffers.Aggregate(0, (acc, entity) => acc + entity.Value.Count); } + } /// /// When leaving the coordinator singleton is started rather quickly on next @@ -583,7 +634,6 @@ protected override bool Receive(object message) { switch (message) { - case Terminated t: HandleTerminated(t); return true; @@ -615,7 +665,8 @@ protected override bool Receive(object message) DeliverMessage(message, Sender); return true; default: - Log.Warning("{0}: Message does not have an extractor defined in shard so it was ignored: {1}", TypeName, message); + Log.Warning("{0}: Message does not have an extractor defined in shard so it was ignored: {1}", + TypeName, message); return false; } } @@ -669,7 +720,8 @@ private void Register() { if (Log.IsWarningEnabled) { - Log.Warning("{0}: Trying to register to coordinator at [{1}], but no acknowledgement. Total [{2}] buffered messages. [{3}]", + Log.Warning( + "{0}: Trying to register to coordinator at [{1}], but no acknowledgement. Total [{2}] buffered messages. [{3}]", TypeName, string.Join(", ", actorSelections.Select(i => i.PathString)), TotalBufferSize, @@ -678,7 +730,8 @@ private void Register() } else if (Log.IsDebugEnabled) { - Log.Debug("{0}: Trying to register to coordinator at [{1}], but no acknowledgement. No buffered messages yet. [{2}]", + Log.Debug( + "{0}: Trying to register to coordinator at [{1}], but no acknowledgement. No buffered messages yet. [{2}]", TypeName, string.Join(", ", actorSelections.Select(i => i.PathString)), coordinatorMessage); @@ -716,7 +769,9 @@ private void DeliverStartEntity(object message, IActorRef sender) catch (Exception ex) { //case ex: MatchError ⇒ - Log.Error(ex, "{0}: When using remember-entities the shard id extractor must handle ShardRegion.StartEntity(id).", TypeName); + Log.Error(ex, + "{0}: When using remember-entities the shard id extractor must handle ShardRegion.StartEntity(id).", + TypeName); } } @@ -739,8 +794,10 @@ private void DeliverMessage(object message, IActorRef sender) _coordinator?.Tell(new PersistentShardCoordinator.GetShardHome(shardId)); } - Log.Debug("{0}: Buffer message for shard [{1}]. Total [{2}] buffered messages.", TypeName, shardId, buffer.Count + 1); - ShardBuffers = ShardBuffers.SetItem(shardId, buffer.Add(new KeyValuePair(message, sender))); + Log.Debug("{0}: Buffer message for shard [{1}]. Total [{2}] buffered messages.", TypeName, shardId, + buffer.Count + 1); + ShardBuffers = ShardBuffers.SetItem(shardId, + buffer.Add(new KeyValuePair(message, sender))); } } else @@ -775,14 +832,16 @@ private void DeliverMessage(object message, IActorRef sender) { if (string.IsNullOrEmpty(shardId)) { - Log.Warning("{0}: Shard must not be empty, dropping message [{1}]", TypeName, message.GetType()); + Log.Warning("{0}: Shard must not be empty, dropping message [{1}]", TypeName, + message.GetType()); Context.System.DeadLetters.Tell(message); } else { if (!ShardBuffers.ContainsKey(shardId)) { - Log.Debug("{0}: Request shard [{1}] home. Coordinator [{2}]", TypeName, shardId, _coordinator); + Log.Debug("{0}: Request shard [{1}] home. Coordinator [{2}]", TypeName, shardId, + _coordinator); _coordinator?.Tell(new PersistentShardCoordinator.GetShardHome(shardId)); } @@ -811,7 +870,8 @@ private void BufferMessage(ShardId shardId, Msg message, IActorRef sender) { if (!ShardBuffers.TryGetValue(shardId, out var buffer)) buffer = ImmutableList>.Empty; - ShardBuffers = ShardBuffers.SetItem(shardId, buffer.Add(new KeyValuePair(message, sender))); + ShardBuffers = ShardBuffers.SetItem(shardId, + buffer.Add(new KeyValuePair(message, sender))); // log some insight to how buffers are filled up every 10% of the buffer capacity var total = totalBufferSize + 1; @@ -820,7 +880,10 @@ private void BufferMessage(ShardId shardId, Msg message, IActorRef sender) { const string logMsg = "{0}: ShardRegion is using [{1} %] of its buffer capacity."; if (total > bufferSize / 2) - Log.Warning(logMsg + " The coordinator might not be available. You might want to check cluster membership status.", TypeName, 100 * total / bufferSize); + Log.Warning( + logMsg + + " The coordinator might not be available. You might want to check cluster membership status.", + TypeName, 100 * total / bufferSize); else Log.Info(logMsg, TypeName, 100 * total / bufferSize); } @@ -860,6 +923,7 @@ private void HandleShardRegionCommand(IShardRegionCommand command) Register(); ScheduleNextRegistration(); } + break; case GracefulShutdown _: @@ -869,10 +933,12 @@ private void HandleShardRegionCommand(IShardRegionCommand command) if (coordShutdown.ShutdownReason != null) { // use a shorter timeout than the coordinated shutdown phase to be able to log better reason for the timeout - var timeout = coordShutdown.Timeout(CoordinatedShutdown.PhaseClusterShardingShutdownRegion) - TimeSpan.FromSeconds(1); + var timeout = coordShutdown.Timeout(CoordinatedShutdown.PhaseClusterShardingShutdownRegion) - + TimeSpan.FromSeconds(1); if (timeout > TimeSpan.Zero) { - Timers.StartSingleTimer(GracefulShutdownTimeout.Instance, GracefulShutdownTimeout.Instance, timeout); + Timers.StartSingleTimer(GracefulShutdownTimeout.Instance, GracefulShutdownTimeout.Instance, + timeout); } } @@ -917,12 +983,92 @@ private void HandleShardRegionQuery(IShardRegionQuery query) else Sender.Tell(new ClusterShardingStats(ImmutableDictionary.Empty)); break; + case GetEntityLocation g: + ReplyToGetEntityLocationQuery(g, Sender); + break; default: Unhandled(query); break; } } + private void ReplyToGetEntityLocationQuery(GetEntityLocation getEntityLocation, IActorRef sender) + { + // Get the Address of the remote IActorRef, or return our Cluster.SelfAddress is the shard / entity + // is hosted locally. + Address GetNodeAddress(IActorRef shardOrRegionRef) + { + return shardOrRegionRef.Path.Address.HasGlobalScope + ? shardOrRegionRef.Path.Address + : Cluster.SelfAddress; + } + + try + { + var shardId = ExtractShardId(new StartEntity(getEntityLocation.EntityId)); + if (string.IsNullOrEmpty(shardId)) + { + // unsupported entityId - could only happen in highly customized extractors + sender.Tell(new EntityLocation(getEntityLocation.EntityId, shardId, Address.AllSystems, + Option.None)); + return; + } + + async Task ResolveEntityRef(Address destinationAddress, ActorPath entityPath) + { + // now we just need to check to see if an entity ref exists + try + { + var entityRef = await Context.ActorSelection(entityPath).ResolveOne(getEntityLocation.Timeout); + sender.Tell(new EntityLocation(getEntityLocation.EntityId, shardId, destinationAddress, + new Option(entityRef))); + } + catch (ActorNotFoundException ex) + { + // entity does not exist + sender.Tell(new EntityLocation(getEntityLocation.EntityId, shardId, destinationAddress, + Option.None)); + } + } + + if (!Shards.TryGetValue(shardId, out var shardActorRef)) + { + // shard is not homed yet, so try looking up the ShardRegion + if (!RegionByShard.TryGetValue(shardId, out var shardRegionRef)) + { + // shardRegion isn't allocated either + sender.Tell(new EntityLocation(getEntityLocation.EntityId, shardId, Address.AllSystems, + Option.None)); + } + else + { + // ShardRegion exists, but shard is not homed + // NOTE: in the event that we're querying a shard's location from a ShardRegionProxy + // the shard may not be technically "homed" inside the proxy, but it does exist. +#pragma warning disable CS4014 + ResolveEntityRef(GetNodeAddress(shardRegionRef), shardRegionRef.Path / shardId / shardId); // needs to run as a detached task +#pragma warning restore CS4014 + } + + return; + } + +#pragma warning disable CS4014 + ResolveEntityRef(GetNodeAddress(shardActorRef), shardActorRef.Path / shardId); // needs to run as a detached task +#pragma warning restore CS4014 + } + catch (Exception ex) + { + _log.Error(ex, "Error while trying to resolve GetEntityLocation query for entityId [{0}]. " + + "Does MessageExtractor support `ShardRegion.StartEntity`? " + + "If not, that's why you might be receiving this error.", + getEntityLocation.EntityId); + // unsupported entityId - could only happen in highly customized extractors + sender.Tell(new EntityLocation(getEntityLocation.EntityId, string.Empty, Address.AllSystems, + Option.None)); + } + } + private void ReplyToRegionStateQuery(IActorRef sender) { AskAllShardsAsync(Shard.GetCurrentShardState.Instance) @@ -934,7 +1080,9 @@ private void ReplyToRegionStateQuery(IActorRef sender) if (shardStates.IsFaulted) throw shardStates.Exception; //TODO check if this is the right way - return new CurrentShardRegionState(shardStates.Result.Select(x => new ShardState(x.Item1, x.Item2.EntityIds.ToImmutableHashSet())).ToImmutableHashSet()); + return new CurrentShardRegionState(shardStates.Result + .Select(x => new ShardState(x.Item1, x.Item2.EntityIds.ToImmutableHashSet())) + .ToImmutableHashSet()); }, TaskContinuationOptions.ExecuteSynchronously).PipeTo(sender); } @@ -949,14 +1097,17 @@ private void ReplyToRegionStatsQuery(IActorRef sender) if (shardStats.IsFaulted) throw shardStats.Exception; //TODO check if this is the right way - return new ShardRegionStats(shardStats.Result.ToImmutableDictionary(x => x.Item1, x => x.Item2.EntityCount)); + return new ShardRegionStats( + shardStats.Result.ToImmutableDictionary(x => x.Item1, x => x.Item2.EntityCount)); }, TaskContinuationOptions.ExecuteSynchronously).PipeTo(sender); } private Task<(ShardId, T)[]> AskAllShardsAsync(object message) { var timeout = TimeSpan.FromSeconds(3); - var tasks = Shards.Select(entity => entity.Value.Ask(message, timeout).ContinueWith(t => (entity.Key, t.Result), TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnRanToCompletion)); + var tasks = Shards.Select(entity => + entity.Value.Ask(message, timeout).ContinueWith(t => (entity.Key, t.Result), + TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnRanToCompletion)); return Task.WhenAll(tasks); } @@ -965,7 +1116,7 @@ private void TryCompleteGracefulShutdownIfInProgress() if (GracefulShutdownInProgress && Shards.Count == 0 && ShardBuffers.Count == 0) { Log.Debug("{0}: Completed graceful shutdown of region.", TypeName); - Context.Stop(Self); // all shards have been rebalanced, complete graceful shutdown + Context.Stop(Self); // all shards have been rebalanced, complete graceful shutdown } } @@ -974,7 +1125,8 @@ private void SendGracefulShutdownToCoordinatorIfInProgress() if (GracefulShutdownInProgress) { var actorSelections = CoordinatorSelection; - Log.Debug("Sending graceful shutdown to {0}", string.Join(",", actorSelections.Select(c => c.ToString()))); + Log.Debug("Sending graceful shutdown to {0}", + string.Join(",", actorSelections.Select(c => c.ToString()))); actorSelections.ForEach(c => c.Tell(new PersistentShardCoordinator.GracefulShutdownRequest(Self))); } } @@ -984,29 +1136,30 @@ private void HandleCoordinatorMessage(PersistentShardCoordinator.ICoordinatorMes switch (message) { case PersistentShardCoordinator.HostShard hs: + { + if (GracefulShutdownInProgress) { - if (GracefulShutdownInProgress) - { - Log.Debug("{0}: Ignoring Host Shard request for [{1}] as region is shutting down", TypeName, hs.Shard); + Log.Debug("{0}: Ignoring Host Shard request for [{1}] as region is shutting down", TypeName, + hs.Shard); - // if the coordinator is sending HostShard to a region that is shutting down - // it means that it missed the shutting down message (coordinator moved?) - // we want to inform it as soon as possible so it doesn't keep trying to allocate the shard here - SendGracefulShutdownToCoordinatorIfInProgress(); - } - else - { - var shard = hs.Shard; - Log.Debug("{0}: Host shard [{1}]", TypeName, shard); - RegionByShard = RegionByShard.SetItem(shard, Self); - UpdateRegionShards(Self, shard); + // if the coordinator is sending HostShard to a region that is shutting down + // it means that it missed the shutting down message (coordinator moved?) + // we want to inform it as soon as possible so it doesn't keep trying to allocate the shard here + SendGracefulShutdownToCoordinatorIfInProgress(); + } + else + { + var shard = hs.Shard; + Log.Debug("{0}: Host shard [{1}]", TypeName, shard); + RegionByShard = RegionByShard.SetItem(shard, Self); + UpdateRegionShards(Self, shard); - // Start the shard, if already started this does nothing - GetShard(shard); + // Start the shard, if already started this does nothing + GetShard(shard); - Sender.Tell(new PersistentShardCoordinator.ShardStarted(shard)); - } + Sender.Tell(new PersistentShardCoordinator.ShardStarted(shard)); } + } break; case PersistentShardCoordinator.ShardHome home: Log.Debug("{0}: Shard [{1}] located at [{2}]", TypeName, home.Shard, home.Ref); @@ -1016,7 +1169,8 @@ private void HandleCoordinatorMessage(PersistentShardCoordinator.ICoordinatorMes if (region.Equals(Self) && !home.Ref.Equals(Self)) { // should not happen, inconsistency between ShardRegion and PersistentShardCoordinator - throw new IllegalStateException($"{TypeName}: Unexpected change of shard [{home.Shard}] from self to [{home.Ref}]"); + throw new IllegalStateException( + $"{TypeName}: Unexpected change of shard [{home.Shard}] from self to [{home.Ref}]"); } } @@ -1034,6 +1188,7 @@ private void HandleCoordinatorMessage(PersistentShardCoordinator.ICoordinatorMes } else DeliverBufferedMessage(home.Shard, home.Ref); + break; case PersistentShardCoordinator.RegisterAck ra: _coordinator = ra.Coordinator; @@ -1042,48 +1197,48 @@ private void HandleCoordinatorMessage(PersistentShardCoordinator.ICoordinatorMes TryRequestShardBufferHomes(); break; case PersistentShardCoordinator.BeginHandOff bho: + { + var shard = bho.Shard; + Log.Debug("{0}: BeginHandOff shard [{1}]", TypeName, shard); + if (RegionByShard.TryGetValue(shard, out var regionRef)) { - var shard = bho.Shard; - Log.Debug("{0}: BeginHandOff shard [{1}]", TypeName, shard); - if (RegionByShard.TryGetValue(shard, out var regionRef)) - { - if (!Regions.TryGetValue(regionRef, out var updatedShards)) - updatedShards = ImmutableHashSet.Empty; - - updatedShards = updatedShards.Remove(shard); + if (!Regions.TryGetValue(regionRef, out var updatedShards)) + updatedShards = ImmutableHashSet.Empty; - Regions = updatedShards.Count == 0 - ? Regions.Remove(regionRef) - : Regions.SetItem(regionRef, updatedShards); + updatedShards = updatedShards.Remove(shard); - RegionByShard = RegionByShard.Remove(shard); - } + Regions = updatedShards.Count == 0 + ? Regions.Remove(regionRef) + : Regions.SetItem(regionRef, updatedShards); - Sender.Tell(new PersistentShardCoordinator.BeginHandOffAck(shard)); + RegionByShard = RegionByShard.Remove(shard); } + + Sender.Tell(new PersistentShardCoordinator.BeginHandOffAck(shard)); + } break; case PersistentShardCoordinator.HandOff ho: - { - var shard = ho.Shard; - Log.Debug("{0}: HandOff shard [{1}]", TypeName, shard); + { + var shard = ho.Shard; + Log.Debug("{0}: HandOff shard [{1}]", TypeName, shard); - // must drop requests that came in between the BeginHandOff and now, - // because they might be forwarded from other regions and there - // is a risk or message re-ordering otherwise - if (ShardBuffers.ContainsKey(shard)) - { - ShardBuffers = ShardBuffers.Remove(shard); - _loggedFullBufferWarning = false; - } + // must drop requests that came in between the BeginHandOff and now, + // because they might be forwarded from other regions and there + // is a risk or message re-ordering otherwise + if (ShardBuffers.ContainsKey(shard)) + { + ShardBuffers = ShardBuffers.Remove(shard); + _loggedFullBufferWarning = false; + } - if (Shards.TryGetValue(shard, out var actorRef)) - { - HandingOff = HandingOff.Add(actorRef); - actorRef.Forward(message); - } - else - Sender.Tell(new PersistentShardCoordinator.ShardStopped(shard)); + if (Shards.TryGetValue(shard, out var actorRef)) + { + HandingOff = HandingOff.Add(actorRef); + actorRef.Forward(message); } + else + Sender.Tell(new PersistentShardCoordinator.ShardStopped(shard)); + } break; default: Unhandled(message); @@ -1108,7 +1263,8 @@ private void TryRequestShardBufferHomes() { foreach (var buffer in ShardBuffers) { - Log.Debug("{0}: Requesting shard home for [{1}] from coordinator at [{2}]. [{3}] buffered messages.", + Log.Debug( + "{0}: Requesting shard home for [{1}] from coordinator at [{2}]. [{3}] buffered messages.", TypeName, buffer.Key, _coordinator, @@ -1184,7 +1340,8 @@ private IActorRef GetShard(ShardId id) private void HandleClusterState(ClusterEvent.CurrentClusterState state) { - var members = ImmutableSortedSet.Empty.WithComparer(Member.AgeOrdering).Union(state.Members.Where(m => MemberStatusOfInterest.Contains(m.Status) && MatchingRole(m))); + var members = ImmutableSortedSet.Empty.WithComparer(Member.AgeOrdering) + .Union(state.Members.Where(m => MemberStatusOfInterest.Contains(m.Status) && MatchingRole(m))); ChangeMembers(members); } @@ -1203,13 +1360,13 @@ private void HandleClusterEvent(ClusterEvent.IClusterDomainEvent e) break; case ClusterEvent.MemberRemoved mr: - { - var m = mr.Member; - if (m.UniqueAddress == Cluster.SelfUniqueAddress) - Context.Stop(Self); - else if (MatchingRole(m)) - ChangeMembers(MembersByAge.Remove(m)); - } + { + var m = mr.Member; + if (m.UniqueAddress == Cluster.SelfUniqueAddress) + Context.Stop(Self); + else if (MatchingRole(m)) + ChangeMembers(MembersByAge.Remove(m)); + } break; case ClusterEvent.MemberDowned md: @@ -1218,6 +1375,7 @@ private void HandleClusterEvent(ClusterEvent.IClusterDomainEvent e) Log.Info("{0}: Self downed, stopping ShardRegion [{1}]", TypeName, Self.Path); Context.Stop(Self); } + break; case ClusterEvent.IMemberEvent _: // these are expected, no need to warn about them @@ -1250,7 +1408,8 @@ private void HandleTerminated(Terminated terminated) Regions = Regions.Remove(terminated.ActorRef); if (Log.IsDebugEnabled) - Log.Debug("{0}: Region [{1}] with shards [{2}] terminated", TypeName, terminated.ActorRef, string.Join(", ", shards)); + Log.Debug("{0}: Region [{1}] with shards [{2}] terminated", TypeName, terminated.ActorRef, + string.Join(", ", shards)); } else if (ShardsByRef.TryGetValue(terminated.ActorRef, out var shard)) { @@ -1267,7 +1426,8 @@ private void HandleTerminated(Terminated terminated) // if persist fails it will stop Log.Debug("{0}: Shard [{1}] terminated while not being handed off", TypeName, shard); if (Settings.RememberEntities) - Context.System.Scheduler.ScheduleTellOnce(Settings.TuningParameters.ShardFailureBackoff, Self, new RestartShard(shard), Self); + Context.System.Scheduler.ScheduleTellOnce(Settings.TuningParameters.ShardFailureBackoff, Self, + new RestartShard(shard), Self); } // did this shard get removed because the ShardRegion is shutting down? @@ -1276,4 +1436,4 @@ private void HandleTerminated(Terminated terminated) } } } -} +} \ No newline at end of file diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/ShardingMessages.cs b/src/contrib/cluster/Akka.Cluster.Sharding/ShardingMessages.cs index 3098c9c68cb..86b4d4c35ee 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/ShardingMessages.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/ShardingMessages.cs @@ -11,16 +11,21 @@ using System.Collections.Immutable; using System.Linq; using Akka.Event; +using Akka.Util; namespace Akka.Cluster.Sharding { /// - /// TBD + /// Marker interface for commands that can be sent to a . /// public interface IShardRegionCommand { } + /// - /// TBD + /// Marker interface for read-only queries that can be sent to a . /// + /// + /// These have no side-effects on the state of the sharding system. + /// public interface IShardRegionQuery { } /// @@ -153,6 +158,77 @@ private GetCurrentRegions() } } + /// + /// Send this message to a actor to determine the location and liveness + /// of a specific entity actor in the region. + /// + /// Creates a message in response. + /// + /// + /// This is used primarily for testing and telemetry purposes. + /// + /// In order for this query to work, the must support , + /// which is also used when remember-entities=on. + /// + public sealed class GetEntityLocation : IShardRegionQuery + { + public GetEntityLocation(string entityId, TimeSpan timeout) + { + EntityId = entityId; + Timeout = timeout; + } + + /// + /// The id of the entity we're searching for. + /// + public string EntityId { get; } + + /// + /// Used to timeout the Ask{T} operation used to identify whether or not + /// this entity actor currently exists. + /// + public TimeSpan Timeout { get; } + } + + /// + /// Response to a query. + /// + /// + /// In the event that no ShardId can be extracted for the given , we will return + /// and for the shard and shard region respectively. + /// + public sealed class EntityLocation + { + public EntityLocation(string entityId, string shardId, Address shardRegion, Option entityRef) + { + EntityId = entityId; + ShardId = shardId; + ShardRegion = shardRegion ?? Address.AllSystems; + EntityRef = entityRef; + } + + /// + /// The Id of the entity. + /// + public string EntityId { get; } + + /// + /// The shard Id that would host this entity. + /// + public string ShardId { get; } + + /// + /// The in the cluster that would host + /// this particular entity. + /// + public Address ShardRegion { get; } + + /// + /// Optional - a reference to this entity actor, if it's alive. + /// + public Option EntityRef { get; } + } + /// /// Reply to . /// diff --git a/src/core/Akka.API.Tests/CoreAPISpec.ApproveClusterSharding.verified.txt b/src/core/Akka.API.Tests/CoreAPISpec.ApproveClusterSharding.verified.txt index 1cc270e855a..6366aa9828f 100644 --- a/src/core/Akka.API.Tests/CoreAPISpec.ApproveClusterSharding.verified.txt +++ b/src/core/Akka.API.Tests/CoreAPISpec.ApproveClusterSharding.verified.txt @@ -87,6 +87,14 @@ namespace Akka.Cluster.Sharding public readonly System.Collections.Immutable.IImmutableSet Shards; public CurrentShardRegionState(System.Collections.Immutable.IImmutableSet shards) { } } + public sealed class EntityLocation + { + public EntityLocation(string entityId, string shardId, Akka.Actor.Address shardRegion, Akka.Util.Option entityRef) { } + public string EntityId { get; } + public Akka.Util.Option EntityRef { get; } + public string ShardId { get; } + public Akka.Actor.Address ShardRegion { get; } + } public class static EnumerableExtensions { public static System.Collections.Generic.IEnumerable> Grouped(this System.Collections.Generic.IEnumerable items, int size) { } @@ -105,6 +113,12 @@ namespace Akka.Cluster.Sharding { public static readonly Akka.Cluster.Sharding.GetCurrentRegions Instance; } + public sealed class GetEntityLocation : Akka.Cluster.Sharding.IShardRegionQuery + { + public GetEntityLocation(string entityId, System.TimeSpan timeout) { } + public string EntityId { get; } + public System.TimeSpan Timeout { get; } + } public sealed class GetShardRegionState : Akka.Cluster.Sharding.IClusterShardingSerializable, Akka.Cluster.Sharding.IShardRegionQuery { public static readonly Akka.Cluster.Sharding.GetShardRegionState Instance;