Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-36451][runtime] Replaces LeaderElection#hasLeadership with LeaderElection#runAsLeader #25679

Merged
merged 5 commits into from
Dec 9, 2024

Conversation

XComp
Copy link
Contributor

@XComp XComp commented Nov 22, 2024

What is the purpose of the change

See reasoning in FLINK-36451 comment

Brief change log

  • Adds runAsLeader method that executes callbacks on the leadership main thread
  • Replaces hasLeadership calls with runAsLeader
  • Adds confirmLeadershipAsLeader which runs the confirmLeadership method on the leaderOperationExecutor

Verifying this change

  • Adds unit tests
  • Rewrites hasLeadership tests in unit test classes

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: yes
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented? not applicable

@flinkbot
Copy link
Collaborator

flinkbot commented Nov 22, 2024

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@XComp
Copy link
Contributor Author

XComp commented Nov 27, 2024

The most-recent CI run revealed a timeout in MemoryManagerConcurrentModReleaseTest#testConcurrentModificationWhileReleasing which is independent of our changes (see FLINK-35315).

final TestingLeaderElectionDriver.Builder driverBuilder =
TestingLeaderElectionDriver.newNoOpBuilder();
TestingLeaderElectionDriver.newBuilder(new AtomicBoolean(true));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was an interesting one because I wondered why we didn't need to set the leadership pre-FLINK-36451.

The reason was a bug in the TestingLeaderContender#grantLeadership method. The old implementation confirmed the leadership and pushed a LeaderInformation event after that call even if the confirmation failed (due to leadership loss). The new implementation is cleaner in this regards

@XComp XComp force-pushed the FLINK-36451 branch 2 times, most recently from d9d09b6 to 1d23f3c Compare November 27, 2024 22:00
* acquired from the {@link LeaderContender} and from the {@link DefaultLeaderElectionService}.
*/
@Test
void testNestedDeadlockInLeadershipConfirmation() throws Exception {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's the test that I added to verify the findings. I kept it in a separate commit to prove that the test runs into a deadlock before applying the changes.

Copy link
Contributor

@zentol zentol left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this is the right way to go about it.

currentLeaderConfirmed = embeddedLeaderElection;
currentLeaderAddress = leaderAddress;
currentLeaderProposed = null;
Preconditions.checkState(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm quite worried about this because it'll just blow up an entire JobManager if leadership is lost at the wrong time, when HA would be capable of sorting this out on it's own.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This Precondition is there to indicate some bug. If the EmbeddedLeaderService isn't properly implemented, the whole setup shouldn't be trusted, no?

assertThat(callbackTriggered).isTrue();
}

public static void assertRunAsLeaderOmitsCallbackDueToLostLeadership(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public static void assertRunAsLeaderOmitsCallbackDueToLostLeadership(
public static void assertRunAsLeaderSkipsRunnableDueToLostLeadership(

I think this is a bit clearer.

@@ -108,6 +113,16 @@ void testHasLeadership() throws Exception {
}
}

private static void assertRunAsLeaderSuccessfully(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

duplicate with LeaderElectionTestUtils

} catch (Throwable t) {
fatalError(t);
} else {
throw new LeadershipLostException(leaderSessionId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not quite sold on this. It feels like we're using a special exception to do control flow (such that users can ignore certain exceptions).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For me, it was meant as an invalid state: The callback that's passed into the runAsLeader method should succeed in the happy scenario. Losing the leadership in the mean time is a an "unusual" state of the system. In this sense, I felt like exceptions being the most appropriate tool to express this.

You're suggesting that we should make runAsLeader return Completable<Boolean> indicating whether the callback was executed or not? That would work as well. But from an interface perspective, runAsLeader is just one way to execute the callback. One could also (theoretically) think of a supplyAsLeader (as it's implemented in JobMasterServiceLeadershipRunner right now). In such a case, utilizing the return value of CompletableFuture and relying on an exception for leadership loss feels like the more natural thing to do.

*/
boolean hasLeadership(UUID leaderSessionId);
CompletableFuture<Void> runAsLeader(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeeeeah I'm not sold on this.

I struggle to understand the contract as to what should be run via the method.

Is it that everything called in the LeaderContender callbacks must used this method?
It feels very wrong that this method is used to access the JRS for example (even if that internally does things asynchronously), when that has little to do with leader election, which runs counter to the idea of running as few things as possible in the leader election executor.

I also don't quite follow yet why such a big API change is necessary.
Couldn't we have saved ourselves a lot of work by just changing hasLeadership to return a CompletableFuture<Boolean>?
Now we have this strange API where a sync confirmLeadership method exists but must basically never be called from one side because it risks dead-locks.

Can't we solve the lock acquisition problem by removing the need to call hasLeadership?
We already have an API that signals whether we are the leader or not; can't we write that outside of the contender lock into an AtomicBoolean and use that instead (while ensuring that grant/revoke don't do any heavy lifting)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it that everything called in the LeaderContender callbacks must used this method?
It feels very wrong that this method is used to access the JRS for example (even if that internally does things asynchronously), when that has little to do with leader election, which runs counter to the idea of running as few things as possible in the leader election executor.

I guess you're right on the JRS access: That one can run even if the leadership is not acquired because we only read here (i.e. no side effects).

Even the JobMasterServiceLeadershipRunner#createNewJobMasterServiceProcess does not need to be protected by the leadership because we can start a new JobMasterServiceProcess even without being the leader.

The crucial part is that leadership is still acquired when the JobMasterServiceProcess instantiation is finalized and the future results are forwarded. This still needs to happen under leadership because it triggers side effects.

JobMasterServiceLeadershipRunner#handleJobAlreadyDoneAsLeader is another operation that needs to run under leadership because it triggers side effects.

I also don't quite follow yet why such a big API change is necessary.
Couldn't we have saved ourselves a lot of work by just changing hasLeadership to return a CompletableFuture?

That's a good point. I thought about it and I think it wouldn't work: An asynchronous hasLeadership would then run on the leaderOperations thread (in the DefaultLeaderElectionService). But the chained callback that is called after the async call completes can run concurrently to a leadership revocation again. So that's not a viable solution.

The operations that need to run under leadership need to be performed on the leaderOperations thread. These operations need to be light-weight (i.e. ideally only future completions). And some of the operations that are currently performed under leadership (like the JRS read trigger mentioned above) can be moved out of the leadership context.

Now we have this strange API where a sync confirmLeadership method exists but must basically never be called from one side because it risks dead-locks.

Yeah, the synchronous confirmLeadership method in the LeaderElection interface doesn't feel right/inconsistent I have to admit. 🤔 The problem is that we need to check the running state when executing the leadership confirmation because the JobMasterServiceLeadershipRunner#close closes the LeaderElection outside of the lock. I.e. the confirm leadership call could be executed after the runner was already shut down.

But looking at the DefaultDispatcherRunner implementation I notice that we're not relying on the lock for the leadership confirmation there despite having the same problem. Maybe, I have a misconception here. 🤔

Can't we solve the lock acquisition problem by removing the need to call hasLeadership?
We already have an API that signals whether we are the leader or not; can't we write that outside of the contender lock into an AtomicBoolean and use that instead (while ensuring that grant/revoke don't do any heavy lifting)?

No, we need to run certain operations as the leader. Otherwise, it could happen that the job is marked as done despite some other leader picking up the work.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But the chained callback that is called after the async call completes can run concurrently to a leadership revocation again

Why not? It is a matter of fact that we will occasionally do stuff while we no longer have leadership. Some S3 delete call can be on the wire while we have lost leadership. All the components like DIspatchers/JobManagers/etc can do stuff stuff while the leadership revocation message sits in the akka message queue waiting to be scheduled into the main thread.

The only place where leadership must be truly maintained during the operation is access to HA itself, which uses different primitives. Everything else is best-effort.

Copy link
Contributor Author

@XComp XComp Dec 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm, I guess you're right. That was my misconception here that we want to ensure leadership beyond the HA which, indeed, is not possible. Let me give it another try with the async hasLeadership approach 🤔

@davidradl
Copy link
Contributor

Reviewed by Chi on 28/11/24 business as usual progressing with committer involved.

Copy link
Contributor Author

@XComp XComp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @zentol for looking into the issue. I tried to respond to your remarks. PTAL

*/
boolean hasLeadership(UUID leaderSessionId);
CompletableFuture<Void> runAsLeader(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it that everything called in the LeaderContender callbacks must used this method?
It feels very wrong that this method is used to access the JRS for example (even if that internally does things asynchronously), when that has little to do with leader election, which runs counter to the idea of running as few things as possible in the leader election executor.

I guess you're right on the JRS access: That one can run even if the leadership is not acquired because we only read here (i.e. no side effects).

Even the JobMasterServiceLeadershipRunner#createNewJobMasterServiceProcess does not need to be protected by the leadership because we can start a new JobMasterServiceProcess even without being the leader.

The crucial part is that leadership is still acquired when the JobMasterServiceProcess instantiation is finalized and the future results are forwarded. This still needs to happen under leadership because it triggers side effects.

JobMasterServiceLeadershipRunner#handleJobAlreadyDoneAsLeader is another operation that needs to run under leadership because it triggers side effects.

I also don't quite follow yet why such a big API change is necessary.
Couldn't we have saved ourselves a lot of work by just changing hasLeadership to return a CompletableFuture?

That's a good point. I thought about it and I think it wouldn't work: An asynchronous hasLeadership would then run on the leaderOperations thread (in the DefaultLeaderElectionService). But the chained callback that is called after the async call completes can run concurrently to a leadership revocation again. So that's not a viable solution.

The operations that need to run under leadership need to be performed on the leaderOperations thread. These operations need to be light-weight (i.e. ideally only future completions). And some of the operations that are currently performed under leadership (like the JRS read trigger mentioned above) can be moved out of the leadership context.

Now we have this strange API where a sync confirmLeadership method exists but must basically never be called from one side because it risks dead-locks.

Yeah, the synchronous confirmLeadership method in the LeaderElection interface doesn't feel right/inconsistent I have to admit. 🤔 The problem is that we need to check the running state when executing the leadership confirmation because the JobMasterServiceLeadershipRunner#close closes the LeaderElection outside of the lock. I.e. the confirm leadership call could be executed after the runner was already shut down.

But looking at the DefaultDispatcherRunner implementation I notice that we're not relying on the lock for the leadership confirmation there despite having the same problem. Maybe, I have a misconception here. 🤔

Can't we solve the lock acquisition problem by removing the need to call hasLeadership?
We already have an API that signals whether we are the leader or not; can't we write that outside of the contender lock into an AtomicBoolean and use that instead (while ensuring that grant/revoke don't do any heavy lifting)?

No, we need to run certain operations as the leader. Otherwise, it could happen that the job is marked as done despite some other leader picking up the work.

} catch (Throwable t) {
fatalError(t);
} else {
throw new LeadershipLostException(leaderSessionId);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For me, it was meant as an invalid state: The callback that's passed into the runAsLeader method should succeed in the happy scenario. Losing the leadership in the mean time is a an "unusual" state of the system. In this sense, I felt like exceptions being the most appropriate tool to express this.

You're suggesting that we should make runAsLeader return Completable<Boolean> indicating whether the callback was executed or not? That would work as well. But from an interface perspective, runAsLeader is just one way to execute the callback. One could also (theoretically) think of a supplyAsLeader (as it's implemented in JobMasterServiceLeadershipRunner right now). In such a case, utilizing the return value of CompletableFuture and relying on an exception for leadership loss feels like the more natural thing to do.

currentLeaderConfirmed = embeddedLeaderElection;
currentLeaderAddress = leaderAddress;
currentLeaderProposed = null;
Preconditions.checkState(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This Precondition is there to indicate some bug. If the EmbeddedLeaderService isn't properly implemented, the whole setup shouldn't be trusted, no?

@XComp
Copy link
Contributor Author

XComp commented Nov 29, 2024

Reviewed by Chi on 28/11/24 business as usual progressing with committer involved.

Hi @davidradl can you explain what this comment means? 🤔

@davidradl
Copy link
Contributor

Reviewed by Chi on 28/11/24 business as usual progressing with committer involved.

Hi @davidradl can you explain what this comment means? 🤔

@XComp I was trying to indicate that the PR was proceeding without any need for the CHI workgroup (which is looking to reduce the Flink technical debt) to do anything. I can use different words if it would be clearer.

If we could start using labels rather than comments this would be better - but I do not have authority to create / assign labels in Jira.

Does this make sense?

@XComp XComp force-pushed the FLINK-36451 branch 2 times, most recently from 5f0c169 to 2ca1d6d Compare December 3, 2024 12:42
@XComp
Copy link
Contributor Author

XComp commented Dec 3, 2024

@zentol I pushed the code from the 2nd iteration. This time, hasLeadership and confirmLeadership are refactored to be asynchronous. They will be executed on the DefaultLeaderElectionService's leaderOperationExecutor. PTAL

@XComp XComp requested a review from zentol December 3, 2024 12:45
@XComp XComp force-pushed the FLINK-36451 branch 3 times, most recently from 4c12066 to a045cf0 Compare December 3, 2024 15:43
runIfValidLeader(
private CompletableFuture<Void> createNewJobMasterServiceProcessIfValidLeader(
UUID leaderSessionId) {
return runIfValidLeader(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it a problem that this runs in the leadership operation thread now? I suppose not, but it's the one thing that stood out to me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// the heavy lifting of the JobMasterServiceProcess instantiation is still
// done asynchronously (see DefaultJobMasterServiceFactory#createJobMasterService
// executing the logic on the leaderOperation thread in the DefaultLeaderElectionService
// should be, therefore, fine

Yeah, that should be fine. I added a comment to the code to clarify this.

One could think of making the interfaces for creating the JobMasterService more explicit. Moving the instantiation out of the leadership check should be fine as well. But I was hesitant to touch that now since having the instantiation trigger being protected by the leadership is something that was part of the code for a while. 🤔

… the confirmLeadership call because the check happens internally
XComp added 2 commits December 5, 2024 16:43
… to be triggered as a leader since it's read access only
… asynchronously

A test case is added that illustrates the concurrent lock acquisition problem in the DefaultLeaderElectionService
@XComp
Copy link
Contributor Author

XComp commented Dec 5, 2024

Last force-push (diff) squashed the commits and ran spotless:apply on the new comment

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants