-
-
Notifications
You must be signed in to change notification settings - Fork 719
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix P2P worker cleanup #7981
Fix P2P worker cleanup #7981
Conversation
cc @fjetter |
Unit Test ResultsSee test report for an extended history of previous test failures. This is useful for diagnosing flaky tests. 20 files ±0 20 suites ±0 10h 49m 46s ⏱️ - 24m 29s For more details on these failures, see this check. Results for commit 74599ee. ± Comparison against base commit 7b21399. This pull request removes 7 and adds 7 tests. Note that renamed tests count towards both.
This pull request removes 1 skipped test and adds 1 skipped test. Note that renamed tests count towards both.
♻️ This comment has been updated with latest results. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this looks right, although minor linting changes required.
await self.scheduler.register_worker_plugin( | ||
None, dumps(worker_plugin), name="shuffle" | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this is the magic that ensures things are cleaned up, because the client will teardown the scheduler which tears down all the worker plugins?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And pretty much everything else is downstream renaming changes....
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pretty much, yes. IIUC, extensions don't get cleaned up at all. Worker plugins will get torn down on worker close, but for that the method has to be called teardown
(as seen below). Similarly, we need to move some of the initialization to setup
.
@@ -855,7 +856,7 @@ async def _( | |||
self._runs.add(shuffle) | |||
return shuffle | |||
|
|||
async def close(self) -> None: | |||
async def teardown(self, worker: Worker) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Along with this so that the worker plugin now conforms to the WorkerPlugin
interface.
I'm not sure if I missed any additional linting problems, but I think the ones on here also happen on |
|
Worker cleanup currently works "by accident" because shuffles either complete or worker state gets cleaned up on failure. However,
ShuffleWorkerExtension.close()
never gets triggered. This PR fixes that. It also refactors the setup such that the scheduler plugin is responsible of installing the worker plugin.pre-commit run --all-files