Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make shard lease Release block in PostStop #7383

Merged
merged 2 commits into from
Nov 8, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 42 additions & 24 deletions src/contrib/cluster/Akka.Cluster.Sharding/Shard.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Annotations;
using Akka.Cluster.Sharding.Internal;
Expand Down Expand Up @@ -1051,17 +1052,25 @@ private void AcquireLeaseIfNeeded()

private void ReleaseLeaseIfNeeded()
{
if (_lease != null)
if (_lease is null)
return;

try
{
ReleaseLease().GetAwaiter().GetResult();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actual fix, we block instead of running Release() in a detached task.

}
catch
{
// no-op, we're shutting down anyway.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might want to log an Exception here just in case

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah NM - you do it down below

}

return;

async Task ReleaseLease()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code modernization, refactor ContinueWith to internal method

{
_lease.Release().ContinueWith(r =>
try
{
if (r.IsFaulted || r.IsCanceled)
{
Log.Error(r.Exception,
"{0}: Failed to release lease of shardId [{1}]. Shard may not be able to run on another node until lease timeout occurs.",
_typeName, _shardId);
}
else if (r.Result)
if (await _lease.Release())
{
Log.Info("{0}: Lease of shardId [{1}] released.", _typeName, _shardId);
}
Expand All @@ -1071,7 +1080,13 @@ private void ReleaseLeaseIfNeeded()
"{0}: Failed to release lease of shardId [{1}]. Shard may not be able to run on another node until lease timeout occurs.",
_typeName, _shardId);
}
});
}
catch (Exception ex)
{
Log.Error(ex,
"{0}: Failed to release lease of shardId [{1}]. Shard may not be able to run on another node until lease timeout occurs.",
_typeName, _shardId);
}
}
}

Expand All @@ -1085,18 +1100,12 @@ private bool AwaitingLease(object message)
{
switch (message)
{
case LeaseAcquireResult lar when lar.Acquired:
case LeaseAcquireResult { Acquired: true }:
Log.Debug("{0}: Lease acquired", _typeName);
TryLoadRememberedEntities();
return true;

case LeaseAcquireResult lar when !lar.Acquired && lar.Reason == null:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code removed and simplified, ILoggingAdaptor.Error can accept null exception, this code is not needed.

Log.Error("{0}: Failed to get lease for shard id [{1}]. Retry in {2}",
_typeName, _shardId, _leaseRetryInterval);
Timers.StartSingleTimer(LeaseRetryTimer, Shard.LeaseRetry.Instance, _leaseRetryInterval);
return true;

case LeaseAcquireResult lar when !lar.Acquired && lar.Reason != null:
case LeaseAcquireResult { Acquired: false } lar:
Log.Error(lar.Reason, "{0}: Failed to get lease for shard id [{1}]. Retry in {2}",
_typeName, _shardId, _leaseRetryInterval);
Timers.StartSingleTimer(LeaseRetryTimer, Shard.LeaseRetry.Instance, _leaseRetryInterval);
Expand Down Expand Up @@ -1126,12 +1135,21 @@ private void TryGetLease(Lease lease)
Log.Info("{0}: Acquiring lease {1}", _typeName, lease.Settings);

var self = Self;
lease.Acquire(reason => { self.Tell(new LeaseLost(reason)); }).ContinueWith(r =>
Acquire().PipeTo(self);
return;

async Task<LeaseAcquireResult> Acquire()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code modernization, refactor ContinueWith to internal method

{
if (r.IsFaulted || r.IsCanceled)
return new LeaseAcquireResult(false, r.Exception);
return new LeaseAcquireResult(r.Result, null);
}).PipeTo(Self);
try
{
var result = await lease.Acquire(reason => { self.Tell(new LeaseLost(reason)); });
return new LeaseAcquireResult(result, null);
}
catch (Exception ex)
{
return new LeaseAcquireResult(false, ex);
}
}
}

// ===== remember entities initialization =====
Expand Down Expand Up @@ -1610,7 +1628,7 @@ private void HandOff(IActorRef replyTo)
"HandOffStopper"));

//During hand off we only care about watching for termination of the hand off stopper
Context.Become((object message) =>
Context.Become(message =>
{
switch (message)
{
Expand Down