Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed DDataShardCoordinator NullReferenceException #6892

Merged
merged 3 commits into from
Aug 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 17 additions & 16 deletions src/contrib/cluster/Akka.Cluster.Sharding/DDataShardCoordinator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
// </copyright>
//-----------------------------------------------------------------------

#nullable enable
using System;
using System.Collections.Immutable;
using System.Linq;
Expand Down Expand Up @@ -86,11 +87,11 @@ internal static Props Props(
private readonly LWWRegisterKey<ShardCoordinator.CoordinatorState> _coordinatorStateKey;
private ImmutableHashSet<(IActorRef, GetShardHome)> _getShardHomeRequests = ImmutableHashSet<(IActorRef, GetShardHome)>.Empty;
private int _initialStateRetries = 0;
private readonly IActorRef _rememberEntitiesStore;
private readonly IActorRef? _rememberEntitiesStore;
private readonly bool _rememberEntities;

public ITimerScheduler Timers { get; set; }
public IStash Stash { get; set; }
public ITimerScheduler Timers { get; set; } = null!;
public IStash Stash { get; set; } = null!;

private string TypeName => _baseImpl.TypeName;
private ClusterShardingSettings Settings => _baseImpl.Settings;
Expand All @@ -103,7 +104,7 @@ public DDataShardCoordinator(
IShardAllocationStrategy allocationStrategy,
IActorRef replicator,
int majorityMinCap,
IRememberEntitiesProvider rememberEntitiesStoreProvider)
IRememberEntitiesProvider? rememberEntitiesStoreProvider)
{
_replicator = replicator;
var log = Context.GetLogger();
Expand Down Expand Up @@ -162,7 +163,9 @@ protected override bool Receive(object message)
/// <returns></returns>
private Receive WaitingForInitialState(IImmutableSet<ShardId> rememberedShards)
{
bool Receive(object message)
return ReceiveDelegate;

bool ReceiveDelegate(object message)
{
switch (message)
{
Expand All @@ -181,7 +184,7 @@ bool Receive(object message)
case GetFailure m when m.Key.Equals(_coordinatorStateKey):
_initialStateRetries++;
var template =
"{0}: The ShardCoordinator was unable to get an initial state within 'waiting-for-state-timeout': {1} millis (retrying). Has ClusterSharding been started on all nodes?";
"{0}: The ShardCoordinator was unable to get an initial state within 'waiting-for-state-timeout': {1} millis (retrying). Has ClusterSharding been started on all nodes?";
if (_initialStateRetries == 1)
Log.Info(template, TypeName, _stateReadConsistency.Timeout.TotalMilliseconds);
else if (_initialStateRetries < 5)
Expand Down Expand Up @@ -230,8 +233,6 @@ bool Receive(object message)
}
return ReceiveTerminated(message);
}

return Receive;
}

private void OnInitialState(CoordinatorState loadedState, IImmutableSet<ShardId> rememberedShards)
Expand Down Expand Up @@ -321,13 +322,15 @@ private bool WaitingForStateInitialized(object message)
/// <returns></returns>
private Receive WaitingForUpdate<TEvent>(
TEvent evt,
ShardId shardId,
ShardId? shardId,
bool waitingForStateWrite,
bool waitingForRememberShard,
Action<TEvent> afterUpdateCallback)
where TEvent : IDomainEvent
{
bool Receive(object message)
return ReceiveDelegate;

bool ReceiveDelegate(object message)
{
switch (message)
{
Expand Down Expand Up @@ -382,7 +385,7 @@ bool Receive(object message)
m.ErrorMessage,
evt,
_terminating ? "Coordinator will be terminated due to Terminate message received"
: "Coordinator will be restarted");
: "Coordinator will be restarted");
if (_terminating)
{
Context.Stop(Self);
Expand All @@ -405,7 +408,7 @@ bool Receive(object message)
return true;

case RememberEntitiesCoordinatorStore.UpdateDone m:
if (!shardId.Contains(m.ShardId))
if (shardId != null && !shardId.Equals(m.ShardId))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These checks are the key bug fixes - enabling nullability made them easy to spot.

{
Log.Warning("{0}: Saw remember entities update complete for shard id [{1}], while waiting for [{2}]",
TypeName,
Expand Down Expand Up @@ -438,7 +441,7 @@ bool Receive(object message)
return true;

case RememberEntitiesCoordinatorStore.UpdateFailed m:
if (shardId.Contains(m.ShardId))
if (shardId != null && shardId.Equals(m.ShardId))
{
OnRememberEntitiesUpdateFailed(m.ShardId);
}
Expand All @@ -452,7 +455,7 @@ bool Receive(object message)
return true;

case RememberEntitiesTimeout m:
if (shardId.Contains(m.ShardId))
if (shardId != null && shardId.Equals(m.ShardId))
{
OnRememberEntitiesUpdateFailed(m.ShardId);
}
Expand All @@ -479,8 +482,6 @@ bool Receive(object message)
return true;
}
}

return Receive;
}

private void UnbecomeAfterUpdate<TEvent>(TEvent evt, Action<TEvent> afterUpdateCallback) where TEvent : IDomainEvent
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
// </copyright>
//-----------------------------------------------------------------------

#nullable enable
using System;
using System.Collections.Immutable;
using Akka.Actor;
Expand Down Expand Up @@ -77,7 +78,7 @@ protected override bool ReceiveRecover(object message)
switch (message)
{
case EventSourcedRememberEntitiesCoordinatorStore.MigrationMarker _:
case SnapshotOffer offer when offer.Snapshot is EventSourcedRememberEntitiesCoordinatorStore.State _:
case SnapshotOffer { Snapshot: EventSourcedRememberEntitiesCoordinatorStore.State }:
throw new IllegalStateException(
"state-store is set to persistence but a migration has taken place to remember-entities-store=eventsourced. You can not downgrade.");

Expand All @@ -87,10 +88,10 @@ protected override bool ReceiveRecover(object message)

switch (evt)
{
case ShardRegionRegistered _:
case ShardRegionRegistered:
State = State.Updated(evt);
return true;
case ShardRegionProxyRegistered _:
case ShardRegionProxyRegistered:
State = State.Updated(evt);
return true;
case ShardRegionTerminated regionTerminated:
Expand Down Expand Up @@ -125,7 +126,7 @@ protected override bool ReceiveRecover(object message)
return true;
}
return false;
case SnapshotOffer offer when offer.Snapshot is CoordinatorState state:
case SnapshotOffer { Snapshot: CoordinatorState state }:
if (VerboseDebug)
Log.Debug("{0}: receiveRecover SnapshotOffer {1}", TypeName, state);
State = state.WithRememberEntities(Settings.RememberEntities);
Expand All @@ -135,7 +136,7 @@ protected override bool ReceiveRecover(object message)
State = State.Copy(unallocatedShards: ImmutableHashSet<ShardId>.Empty);
return true;

case RecoveryCompleted _:
case RecoveryCompleted:
State = State.WithRememberEntities(Settings.RememberEntities);
_baseImpl.WatchStateActors();
return true;
Expand All @@ -158,12 +159,12 @@ private bool WaitingForStateInitialized(object message)
{
switch (message)
{
case Terminate _:
case Terminate:
Log.Debug("{0}: Received termination message before state was initialized", TypeName);
Context.Stop(Self);
return true;

case StateInitialized _:
case StateInitialized:
_baseImpl.ReceiveStateInitialized();
Log.Debug("{0}: Coordinator initialization completed", TypeName);
Context.Become(msg => _baseImpl.Active(msg) || ReceiveSnapshotResult(msg));
Expand All @@ -175,8 +176,7 @@ private bool WaitingForStateInitialized(object message)
return true;
}

if (_baseImpl.ReceiveTerminated(message)) return true;
else return ReceiveSnapshotResult(message);
return _baseImpl.ReceiveTerminated(message) || ReceiveSnapshotResult(message);
}

private bool ReceiveSnapshotResult(object message)
Expand Down