Skip to content

Commit

Permalink
Allow PersistentShardCoordinator to tolerate duplicate `ShardHomeAl…
Browse files Browse the repository at this point in the history
…located` messages (akkadotnet#5967)

close akkadotnet#5604
  • Loading branch information
Aaronontheweb authored May 26, 2022
1 parent 9260924 commit 4fa7abb
Showing 1 changed file with 35 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,15 @@ public IImmutableSet<ShardId> AllShards
}

/// <summary>
/// TBD
/// Feed an event into the ShardCoordinator state.
/// </summary>
/// <param name="e">TBD</param>
/// <exception cref="ArgumentException">TBD</exception>
/// <returns>TBD</returns>
public State Updated(IDomainEvent e)
/// <param name="e">The event to process.</param>
/// <param name="isRecovering">A flag to indicate if we're trying to recover state previously stored in the database.
/// We need to be more tolerant when this happens in the name of trying to accelerate recovery, so the system doesn't compromise
/// itself and go offline.</param>
/// <exception cref="ArgumentException">Thrown if an event is illegal in the current state.</exception>
/// <returns>An update copy of this state.</returns>
public State Updated(IDomainEvent e, bool isRecovering = false)
{
switch (e)
{
Expand Down Expand Up @@ -153,7 +156,27 @@ public State Updated(IDomainEvent e)
if (!Regions.TryGetValue(message.Region, out var shardRegions))
throw new ArgumentException($"Region {message.Region} not registered", nameof(e));
if (Shards.ContainsKey(message.Shard))
throw new ArgumentException($"Shard {message.Shard} is already allocated", nameof(e));
{
if (!isRecovering)
throw new ArgumentException($"Shard {message.Shard} is already allocated",
nameof(e));

// per https://github.com/akkadotnet/akka.net/issues/5604
// we're going to allow new value to overwrite previous
var newRegions = Regions;
var previousRegion = Shards[message.Shard];
if (Regions.TryGetValue(previousRegion, out var previousShards))
{
newRegions = newRegions.SetItem(previousRegion,
previousShards.Remove(message.Shard));
}
var newUnallocatedShardsRecovery = RememberEntities ? UnallocatedShards.Remove(message.Shard) : UnallocatedShards;
return Copy(
shards: Shards.SetItem(message.Shard, message.Region),
regions: newRegions.SetItem(message.Region, shardRegions.Add(message.Shard)),
unallocatedShards: newUnallocatedShardsRecovery);
}


var newUnallocatedShards = RememberEntities ? UnallocatedShards.Remove(message.Shard) : UnallocatedShards;
return Copy(
Expand Down Expand Up @@ -1346,26 +1369,26 @@ protected override bool ReceiveRecover(object message)
switch (evt)
{
case ShardRegionRegistered _:
CurrentState = CurrentState.Updated(evt);
CurrentState = CurrentState.Updated(evt, true);
return true;
case ShardRegionProxyRegistered _:
CurrentState = CurrentState.Updated(evt);
CurrentState = CurrentState.Updated(evt, true);
return true;
case ShardRegionTerminated regionTerminated:
if (CurrentState.Regions.ContainsKey(regionTerminated.Region))
CurrentState = CurrentState.Updated(evt);
CurrentState = CurrentState.Updated(evt, true);
else
Log.Debug("ShardRegionTerminated but region {0} was not registered", regionTerminated.Region);
return true;
case ShardRegionProxyTerminated proxyTerminated:
if (CurrentState.RegionProxies.Contains(proxyTerminated.RegionProxy))
CurrentState = CurrentState.Updated(evt);
CurrentState = CurrentState.Updated(evt, true);
return true;
case ShardHomeAllocated _:
CurrentState = CurrentState.Updated(evt);
CurrentState = CurrentState.Updated(evt, true);
return true;
case ShardHomeDeallocated _:
CurrentState = CurrentState.Updated(evt);
CurrentState = CurrentState.Updated(evt, true);
return true;
}
return false;
Expand Down

0 comments on commit 4fa7abb

Please sign in to comment.