-
-
Notifications
You must be signed in to change notification settings - Fork 719
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fail P2PShuffle gracefully upon worker failure #7326
Conversation
Unit Test ResultsSee test report for an extended history of previous test failures. This is useful for diagnosing flaky tests. 18 files ± 0 18 suites ±0 8h 19m 55s ⏱️ + 42m 39s For more details on these failures, see this check. Results for commit bc317e2. ± Comparison against base commit 8c81d03. ♻️ This comment has been updated with latest results. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed the code this morning. Apparently VSCode didn't push the review so comments might be stale :(
@@ -219,11 +227,20 @@ async def _receive(self, data: list[bytes]) -> None: | |||
for k, v in groups.items() | |||
} | |||
) | |||
self.raise_if_closed() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an interesting place. Why would we need to raise here but not between any of the other awaits?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW I think we should combine the above calls into a single offload anyhow which would render this comment moot
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
offload
itself is protected with raise_if_closed()
. I've been thinking whether I should wrap any async functionality that needs to be protected with raise_if_closed()
into individual functions. That would probably make reasoning about these easier.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW I think we should combine the above calls into a single offload anyhow which would render this comment moot
Good point, done.
except Exception: | ||
self.executor.shutdown() | ||
self.closed = True | ||
async with self._close_lock: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shuffle.close
is called in register_complete
and set_exception
How come this is overlapping? why do we need a lock? Are we sure that everything we do below is idempotent?
If we do not want to guarantee idempotency, the pattern we take in the server classes might be better suited than a lock, i.e.
async def close(self) -> None:
if self.closed:
await self._event_close.wait()
self.closed = True
await close_all_stuff()
self._event_close.set()
this locks + makes it idempotent even without relying on the buffers/executors / whatever else to come to be.
The only important thing is that nobody must reset the closed attribute. This is a one way street, otherwise this pattern breaks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shuffle.close
is called in ShuffleWorkerExtension._register_complete
,ShuffleWorkerExtension.close
and Shuffle.fail
, so we could see scenarios where an extension is closing and a shuffle is either completing or failing.
Fair point about idempotency. While everything should be idempotent at the moment, I'll adjust this to use the more cautious pattern of waiting on an event. Since shuffles should never reopen once closed, this should be fine.
with mock.patch( | ||
"distributed.shuffle._shuffle_extension.get_worker_for", mock_get_worker_for | ||
): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't love this mock. Right now, there are actually only two ways to trigger a situation where we have input workers that are not participating as output workers
- What you are currently emulating. That would be an unbelievable hash collision. We could possibly emulate this by using constant input data or smth like this
- Worker enters after the shuffle started. Stealing assigns a transfer task (which is very unlikely), this new worker executes at least one key and then it dies s.t. this key is rescheduled
I actually tried to manipulate stealing to do this and even manually this is non-trivial.
I also tried with shuffle(npartitions=1)
s.t. we have more input than output partitoins. However, dask automatically triggers a repartition before the shuffle in this case s.t. we fall back to the stealing case.
Interestingly, this is an interesting performance hint. With p2p we should no longer require the repartition!
No actions here. Just wanted to share some info
@@ -536,12 +609,17 @@ class ShuffleSchedulerExtension: | |||
columns: dict[ShuffleId, str] | |||
output_workers: dict[ShuffleId, set[str]] | |||
completed_workers: dict[ShuffleId, set[str]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should be able to remove self.completed_workers
and register_complete
, but that's work for another PR.
There is a related test failure |
Should be solved. |
No related failures on CI. |
No related failures on CI |
# await clean_worker(a) | ||
# await clean_worker(b) | ||
# await clean_scheduler(s) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this intentional? Shouldn't this test work already?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I lost track of that, I adjusted the test and unskipped it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Turns out CI isn't happy with this one. Unfortunately, I have not been able to reproduce the issue locally so far.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR is not exactly where we would like it to be but I still believe this is worth merging s.t. we can focus on follow up tasks.
Great job @hendrikmakait . This was much tougher than I initially suspected
Ha!
👍 🎉 |
pre-commit run --all-files