diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/PersistentShardCoordinator.cs b/src/contrib/cluster/Akka.Cluster.Sharding/PersistentShardCoordinator.cs index 3ac0ae4f569..1b7d56f5ab9 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/PersistentShardCoordinator.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/PersistentShardCoordinator.cs @@ -112,12 +112,9 @@ public IImmutableSet AllShards /// Feed an event into the ShardCoordinator state. /// /// The event to process. - /// A flag to indicate if we're trying to recover state previously stored in the database. - /// We need to be more tolerant when this happens in the name of trying to accelerate recovery, so the system doesn't compromise - /// itself and go offline. /// Thrown if an event is illegal in the current state. /// An update copy of this state. - public State Updated(IDomainEvent e, bool isRecovering = false) + public State Updated(IDomainEvent e) { switch (e) { @@ -156,27 +153,7 @@ public State Updated(IDomainEvent e, bool isRecovering = false) if (!Regions.TryGetValue(message.Region, out var shardRegions)) throw new ArgumentException($"Region {message.Region} not registered", nameof(e)); if (Shards.ContainsKey(message.Shard)) - { - if (!isRecovering) - throw new ArgumentException($"Shard {message.Shard} is already allocated", - nameof(e)); - - // per https://github.com/akkadotnet/akka.net/issues/5604 - // we're going to allow new value to overwrite previous - var newRegions = Regions; - var previousRegion = Shards[message.Shard]; - if (Regions.TryGetValue(previousRegion, out var previousShards)) - { - newRegions = newRegions.SetItem(previousRegion, - previousShards.Remove(message.Shard)); - } - var newUnallocatedShardsRecovery = RememberEntities ? UnallocatedShards.Remove(message.Shard) : UnallocatedShards; - return Copy( - shards: Shards.SetItem(message.Shard, message.Region), - regions: newRegions.SetItem(message.Region, shardRegions.Add(message.Shard)), - unallocatedShards: newUnallocatedShardsRecovery); - } - + throw new ArgumentException($"Shard {message.Shard} is already allocated", nameof(e)); var newUnallocatedShards = RememberEntities ? UnallocatedShards.Remove(message.Shard) : UnallocatedShards; return Copy( @@ -1369,26 +1346,31 @@ protected override bool ReceiveRecover(object message) switch (evt) { case ShardRegionRegistered _: - CurrentState = CurrentState.Updated(evt, true); + CurrentState = CurrentState.Updated(evt); return true; case ShardRegionProxyRegistered _: - CurrentState = CurrentState.Updated(evt, true); + CurrentState = CurrentState.Updated(evt); return true; case ShardRegionTerminated regionTerminated: if (CurrentState.Regions.ContainsKey(regionTerminated.Region)) - CurrentState = CurrentState.Updated(evt, true); + CurrentState = CurrentState.Updated(evt); else Log.Debug("ShardRegionTerminated but region {0} was not registered", regionTerminated.Region); return true; case ShardRegionProxyTerminated proxyTerminated: if (CurrentState.RegionProxies.Contains(proxyTerminated.RegionProxy)) - CurrentState = CurrentState.Updated(evt, true); + CurrentState = CurrentState.Updated(evt); return true; - case ShardHomeAllocated _: - CurrentState = CurrentState.Updated(evt, true); + case ShardHomeAllocated homeAllocated: + // if we already have identical ShardHomeAllocated data, skip processing it + // addresses https://github.com/akkadotnet/akka.net/issues/5604 + if (CurrentState.Shards.TryGetValue(homeAllocated.Shard, out var currentShardRegion) + && Equals(homeAllocated.Region, currentShardRegion)) + return true; + CurrentState = CurrentState.Updated(evt); return true; case ShardHomeDeallocated _: - CurrentState = CurrentState.Updated(evt, true); + CurrentState = CurrentState.Updated(evt); return true; } return false;