Skip to content

Commit

Permalink
Block lease release in Shard PostStop (#7383)
Browse files Browse the repository at this point in the history
  • Loading branch information
Arkatufus authored Nov 8, 2024
1 parent 902daf5 commit 322c494
Showing 1 changed file with 42 additions and 24 deletions.
66 changes: 42 additions & 24 deletions src/contrib/cluster/Akka.Cluster.Sharding/Shard.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Annotations;
using Akka.Cluster.Sharding.Internal;
Expand Down Expand Up @@ -1051,17 +1052,25 @@ private void AcquireLeaseIfNeeded()

private void ReleaseLeaseIfNeeded()
{
if (_lease != null)
if (_lease is null)
return;

try
{
ReleaseLease().GetAwaiter().GetResult();
}
catch
{
// no-op, we're shutting down anyway.
}

return;

async Task ReleaseLease()
{
_lease.Release().ContinueWith(r =>
try
{
if (r.IsFaulted || r.IsCanceled)
{
Log.Error(r.Exception,
"{0}: Failed to release lease of shardId [{1}]. Shard may not be able to run on another node until lease timeout occurs.",
_typeName, _shardId);
}
else if (r.Result)
if (await _lease.Release())
{
Log.Info("{0}: Lease of shardId [{1}] released.", _typeName, _shardId);
}
Expand All @@ -1071,7 +1080,13 @@ private void ReleaseLeaseIfNeeded()
"{0}: Failed to release lease of shardId [{1}]. Shard may not be able to run on another node until lease timeout occurs.",
_typeName, _shardId);
}
});
}
catch (Exception ex)
{
Log.Error(ex,
"{0}: Failed to release lease of shardId [{1}]. Shard may not be able to run on another node until lease timeout occurs.",
_typeName, _shardId);
}
}
}

Expand All @@ -1085,18 +1100,12 @@ private bool AwaitingLease(object message)
{
switch (message)
{
case LeaseAcquireResult lar when lar.Acquired:
case LeaseAcquireResult { Acquired: true }:
Log.Debug("{0}: Lease acquired", _typeName);
TryLoadRememberedEntities();
return true;

case LeaseAcquireResult lar when !lar.Acquired && lar.Reason == null:
Log.Error("{0}: Failed to get lease for shard id [{1}]. Retry in {2}",
_typeName, _shardId, _leaseRetryInterval);
Timers.StartSingleTimer(LeaseRetryTimer, Shard.LeaseRetry.Instance, _leaseRetryInterval);
return true;

case LeaseAcquireResult lar when !lar.Acquired && lar.Reason != null:
case LeaseAcquireResult { Acquired: false } lar:
Log.Error(lar.Reason, "{0}: Failed to get lease for shard id [{1}]. Retry in {2}",
_typeName, _shardId, _leaseRetryInterval);
Timers.StartSingleTimer(LeaseRetryTimer, Shard.LeaseRetry.Instance, _leaseRetryInterval);
Expand Down Expand Up @@ -1126,12 +1135,21 @@ private void TryGetLease(Lease lease)
Log.Info("{0}: Acquiring lease {1}", _typeName, lease.Settings);

var self = Self;
lease.Acquire(reason => { self.Tell(new LeaseLost(reason)); }).ContinueWith(r =>
Acquire().PipeTo(self);
return;

async Task<LeaseAcquireResult> Acquire()
{
if (r.IsFaulted || r.IsCanceled)
return new LeaseAcquireResult(false, r.Exception);
return new LeaseAcquireResult(r.Result, null);
}).PipeTo(Self);
try
{
var result = await lease.Acquire(reason => { self.Tell(new LeaseLost(reason)); });
return new LeaseAcquireResult(result, null);
}
catch (Exception ex)
{
return new LeaseAcquireResult(false, ex);
}
}
}

// ===== remember entities initialization =====
Expand Down Expand Up @@ -1610,7 +1628,7 @@ private void HandOff(IActorRef replyTo)
"HandOffStopper"));

//During hand off we only care about watching for termination of the hand off stopper
Context.Become((object message) =>
Context.Become(message =>
{
switch (message)
{
Expand Down

0 comments on commit 322c494

Please sign in to comment.