Skip to content

Commit

Permalink
Fix sharding tolerant reader (#5976)
Browse files Browse the repository at this point in the history
  • Loading branch information
Aaronontheweb authored Jun 1, 2022
1 parent 0e7cd06 commit 36bc254
Showing 1 changed file with 14 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,9 @@ public IImmutableSet<ShardId> AllShards
/// Feed an event into the ShardCoordinator state.
/// </summary>
/// <param name="e">The event to process.</param>
/// <param name="isRecovering">A flag to indicate if we're trying to recover state previously stored in the database.
/// We need to be more tolerant when this happens in the name of trying to accelerate recovery, so the system doesn't compromise
/// itself and go offline.</param>
/// <exception cref="ArgumentException">Thrown if an event is illegal in the current state.</exception>
/// <returns>An update copy of this state.</returns>
public State Updated(IDomainEvent e, bool isRecovering = false)
public State Updated(IDomainEvent e)
{
switch (e)
{
Expand Down Expand Up @@ -156,27 +153,7 @@ public State Updated(IDomainEvent e, bool isRecovering = false)
if (!Regions.TryGetValue(message.Region, out var shardRegions))
throw new ArgumentException($"Region {message.Region} not registered", nameof(e));
if (Shards.ContainsKey(message.Shard))
{
if (!isRecovering)
throw new ArgumentException($"Shard {message.Shard} is already allocated",
nameof(e));

// per https://github.com/akkadotnet/akka.net/issues/5604
// we're going to allow new value to overwrite previous
var newRegions = Regions;
var previousRegion = Shards[message.Shard];
if (Regions.TryGetValue(previousRegion, out var previousShards))
{
newRegions = newRegions.SetItem(previousRegion,
previousShards.Remove(message.Shard));
}
var newUnallocatedShardsRecovery = RememberEntities ? UnallocatedShards.Remove(message.Shard) : UnallocatedShards;
return Copy(
shards: Shards.SetItem(message.Shard, message.Region),
regions: newRegions.SetItem(message.Region, shardRegions.Add(message.Shard)),
unallocatedShards: newUnallocatedShardsRecovery);
}

throw new ArgumentException($"Shard {message.Shard} is already allocated", nameof(e));

var newUnallocatedShards = RememberEntities ? UnallocatedShards.Remove(message.Shard) : UnallocatedShards;
return Copy(
Expand Down Expand Up @@ -1369,26 +1346,31 @@ protected override bool ReceiveRecover(object message)
switch (evt)
{
case ShardRegionRegistered _:
CurrentState = CurrentState.Updated(evt, true);
CurrentState = CurrentState.Updated(evt);
return true;
case ShardRegionProxyRegistered _:
CurrentState = CurrentState.Updated(evt, true);
CurrentState = CurrentState.Updated(evt);
return true;
case ShardRegionTerminated regionTerminated:
if (CurrentState.Regions.ContainsKey(regionTerminated.Region))
CurrentState = CurrentState.Updated(evt, true);
CurrentState = CurrentState.Updated(evt);
else
Log.Debug("ShardRegionTerminated but region {0} was not registered", regionTerminated.Region);
return true;
case ShardRegionProxyTerminated proxyTerminated:
if (CurrentState.RegionProxies.Contains(proxyTerminated.RegionProxy))
CurrentState = CurrentState.Updated(evt, true);
CurrentState = CurrentState.Updated(evt);
return true;
case ShardHomeAllocated _:
CurrentState = CurrentState.Updated(evt, true);
case ShardHomeAllocated homeAllocated:
// if we already have identical ShardHomeAllocated data, skip processing it
// addresses https://github.com/akkadotnet/akka.net/issues/5604
if (CurrentState.Shards.TryGetValue(homeAllocated.Shard, out var currentShardRegion)
&& Equals(homeAllocated.Region, currentShardRegion))
return true;
CurrentState = CurrentState.Updated(evt);
return true;
case ShardHomeDeallocated _:
CurrentState = CurrentState.Updated(evt, true);
CurrentState = CurrentState.Updated(evt);
return true;
}
return false;
Expand Down

0 comments on commit 36bc254

Please sign in to comment.