From a9203da56cc1734c487d56ca1ae68e61f5219266 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Thu, 24 Aug 2023 10:50:08 -0500 Subject: [PATCH 1/3] fixed NRE bug with DDataShardCoordinator --- .../DDataShardCoordinator.cs | 33 ++++++++++--------- .../PersistentShardCoordinator.cs | 18 +++++----- 2 files changed, 26 insertions(+), 25 deletions(-) diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/DDataShardCoordinator.cs b/src/contrib/cluster/Akka.Cluster.Sharding/DDataShardCoordinator.cs index 4b6933f2f62..f6e6558ba58 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/DDataShardCoordinator.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/DDataShardCoordinator.cs @@ -5,6 +5,7 @@ // //----------------------------------------------------------------------- +#nullable enable using System; using System.Collections.Immutable; using System.Linq; @@ -86,11 +87,11 @@ internal static Props Props( private readonly LWWRegisterKey _coordinatorStateKey; private ImmutableHashSet<(IActorRef, GetShardHome)> _getShardHomeRequests = ImmutableHashSet<(IActorRef, GetShardHome)>.Empty; private int _initialStateRetries = 0; - private readonly IActorRef _rememberEntitiesStore; + private readonly IActorRef? _rememberEntitiesStore; private readonly bool _rememberEntities; - public ITimerScheduler Timers { get; set; } - public IStash Stash { get; set; } + public ITimerScheduler Timers { get; set; } = null!; + public IStash Stash { get; set; } = null!; private string TypeName => _baseImpl.TypeName; private ClusterShardingSettings Settings => _baseImpl.Settings; @@ -103,7 +104,7 @@ public DDataShardCoordinator( IShardAllocationStrategy allocationStrategy, IActorRef replicator, int majorityMinCap, - IRememberEntitiesProvider rememberEntitiesStoreProvider) + IRememberEntitiesProvider? rememberEntitiesStoreProvider) { _replicator = replicator; var log = Context.GetLogger(); @@ -162,7 +163,9 @@ protected override bool Receive(object message) /// private Receive WaitingForInitialState(IImmutableSet rememberedShards) { - bool Receive(object message) + return ReceiveDelegate; + + bool ReceiveDelegate(object message) { switch (message) { @@ -181,7 +184,7 @@ bool Receive(object message) case GetFailure m when m.Key.Equals(_coordinatorStateKey): _initialStateRetries++; var template = - "{0}: The ShardCoordinator was unable to get an initial state within 'waiting-for-state-timeout': {1} millis (retrying). Has ClusterSharding been started on all nodes?"; + "{0}: The ShardCoordinator was unable to get an initial state within 'waiting-for-state-timeout': {1} millis (retrying). Has ClusterSharding been started on all nodes?"; if (_initialStateRetries == 1) Log.Info(template, TypeName, _stateReadConsistency.Timeout.TotalMilliseconds); else if (_initialStateRetries < 5) @@ -230,8 +233,6 @@ bool Receive(object message) } return ReceiveTerminated(message); } - - return Receive; } private void OnInitialState(CoordinatorState loadedState, IImmutableSet rememberedShards) @@ -321,13 +322,15 @@ private bool WaitingForStateInitialized(object message) /// private Receive WaitingForUpdate( TEvent evt, - ShardId shardId, + ShardId? shardId, bool waitingForStateWrite, bool waitingForRememberShard, Action afterUpdateCallback) where TEvent : IDomainEvent { - bool Receive(object message) + return ReceiveDelegate; + + bool ReceiveDelegate(object message) { switch (message) { @@ -382,7 +385,7 @@ bool Receive(object message) m.ErrorMessage, evt, _terminating ? "Coordinator will be terminated due to Terminate message received" - : "Coordinator will be restarted"); + : "Coordinator will be restarted"); if (_terminating) { Context.Stop(Self); @@ -405,7 +408,7 @@ bool Receive(object message) return true; case RememberEntitiesCoordinatorStore.UpdateDone m: - if (!shardId.Contains(m.ShardId)) + if (shardId != null && !shardId.Equals(m.ShardId)) { Log.Warning("{0}: Saw remember entities update complete for shard id [{1}], while waiting for [{2}]", TypeName, @@ -438,7 +441,7 @@ bool Receive(object message) return true; case RememberEntitiesCoordinatorStore.UpdateFailed m: - if (shardId.Contains(m.ShardId)) + if (shardId != null && shardId.Equals(m.ShardId)) { OnRememberEntitiesUpdateFailed(m.ShardId); } @@ -452,7 +455,7 @@ bool Receive(object message) return true; case RememberEntitiesTimeout m: - if (shardId.Contains(m.ShardId)) + if (shardId != null && shardId.Equals(m.ShardId)) { OnRememberEntitiesUpdateFailed(m.ShardId); } @@ -479,8 +482,6 @@ bool Receive(object message) return true; } } - - return Receive; } private void UnbecomeAfterUpdate(TEvent evt, Action afterUpdateCallback) where TEvent : IDomainEvent diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/PersistentShardCoordinator.cs b/src/contrib/cluster/Akka.Cluster.Sharding/PersistentShardCoordinator.cs index 8748d4588cf..b1b792352b8 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/PersistentShardCoordinator.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/PersistentShardCoordinator.cs @@ -5,6 +5,7 @@ // //----------------------------------------------------------------------- +#nullable enable using System; using System.Collections.Immutable; using Akka.Actor; @@ -77,7 +78,7 @@ protected override bool ReceiveRecover(object message) switch (message) { case EventSourcedRememberEntitiesCoordinatorStore.MigrationMarker _: - case SnapshotOffer offer when offer.Snapshot is EventSourcedRememberEntitiesCoordinatorStore.State _: + case SnapshotOffer { Snapshot: EventSourcedRememberEntitiesCoordinatorStore.State }: throw new IllegalStateException( "state-store is set to persistence but a migration has taken place to remember-entities-store=eventsourced. You can not downgrade."); @@ -87,10 +88,10 @@ protected override bool ReceiveRecover(object message) switch (evt) { - case ShardRegionRegistered _: + case ShardRegionRegistered: State = State.Updated(evt); return true; - case ShardRegionProxyRegistered _: + case ShardRegionProxyRegistered: State = State.Updated(evt); return true; case ShardRegionTerminated regionTerminated: @@ -125,7 +126,7 @@ protected override bool ReceiveRecover(object message) return true; } return false; - case SnapshotOffer offer when offer.Snapshot is CoordinatorState state: + case SnapshotOffer { Snapshot: CoordinatorState state }: if (VerboseDebug) Log.Debug("{0}: receiveRecover SnapshotOffer {1}", TypeName, state); State = state.WithRememberEntities(Settings.RememberEntities); @@ -135,7 +136,7 @@ protected override bool ReceiveRecover(object message) State = State.Copy(unallocatedShards: ImmutableHashSet.Empty); return true; - case RecoveryCompleted _: + case RecoveryCompleted: State = State.WithRememberEntities(Settings.RememberEntities); _baseImpl.WatchStateActors(); return true; @@ -158,12 +159,12 @@ private bool WaitingForStateInitialized(object message) { switch (message) { - case Terminate _: + case Terminate: Log.Debug("{0}: Received termination message before state was initialized", TypeName); Context.Stop(Self); return true; - case StateInitialized _: + case StateInitialized: _baseImpl.ReceiveStateInitialized(); Log.Debug("{0}: Coordinator initialization completed", TypeName); Context.Become(msg => _baseImpl.Active(msg) || ReceiveSnapshotResult(msg)); @@ -175,8 +176,7 @@ private bool WaitingForStateInitialized(object message) return true; } - if (_baseImpl.ReceiveTerminated(message)) return true; - else return ReceiveSnapshotResult(message); + return _baseImpl.ReceiveTerminated(message) || ReceiveSnapshotResult(message); } private bool ReceiveSnapshotResult(object message) From f63b24a387a8a39fd132838e52e30e3a9f1755fd Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Thu, 24 Aug 2023 11:01:50 -0500 Subject: [PATCH 2/3] Revert "Bump Verify.Xunit from 20.8.0 to 20.8.1 (#6890)" This reverts commit 0a66f9421761c729d2d0bd610638413557c6c8fd. --- src/core/Akka.API.Tests/Akka.API.Tests.csproj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/core/Akka.API.Tests/Akka.API.Tests.csproj b/src/core/Akka.API.Tests/Akka.API.Tests.csproj index 23308e4cd1d..79c6fbf546a 100644 --- a/src/core/Akka.API.Tests/Akka.API.Tests.csproj +++ b/src/core/Akka.API.Tests/Akka.API.Tests.csproj @@ -28,7 +28,7 @@ - + From 51e6f4355317ddb9dd041821b5d122c79a9c6882 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Thu, 24 Aug 2023 11:05:50 -0500 Subject: [PATCH 3/3] Revert "Revert "Bump Verify.Xunit from 20.8.0 to 20.8.1 (#6890)"" This reverts commit f63b24a387a8a39fd132838e52e30e3a9f1755fd. --- src/core/Akka.API.Tests/Akka.API.Tests.csproj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/core/Akka.API.Tests/Akka.API.Tests.csproj b/src/core/Akka.API.Tests/Akka.API.Tests.csproj index 79c6fbf546a..23308e4cd1d 100644 --- a/src/core/Akka.API.Tests/Akka.API.Tests.csproj +++ b/src/core/Akka.API.Tests/Akka.API.Tests.csproj @@ -28,7 +28,7 @@ - +