-
-
Notifications
You must be signed in to change notification settings - Fork 719
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Automatically restart P2P shuffles when output worker leaves #7970
Conversation
Unit Test ResultsSee test report for an extended history of previous test failures. This is useful for diagnosing flaky tests. 20 files ± 0 20 suites ±0 11h 35m 10s ⏱️ + 10h 14m 44s For more details on these failures, see this check. Results for commit 5764f45. ± Comparison against base commit efc7eeb. This pull request skips 1 and un-skips 138 tests.
♻️ This comment has been updated with latest results. |
This reverts commit 853d953.
@wence-: Your feedback should be addressed and blocking PRs are merged, so this should be good for another round. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally looks good to me, but not familiar enough with the code-base to be the final reviewer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most of my comments are nits and you can ignore or use them.
The one thing I'm concerned about is that the coverage report indicates some rather critical code sections are not covered. We should look into that before merging
try: | ||
shuffle = self.states[shuffle_id] | ||
shuffle = self.active_shuffles[shuffle_id] | ||
except KeyError: | ||
return | ||
self._fail_on_workers(shuffle, message=f"{shuffle} forgotten") | ||
self._clean_on_scheduler(shuffle_id) | ||
pass | ||
else: | ||
self._fail_on_workers(shuffle, message=f"{shuffle} forgotten") | ||
self._clean_on_scheduler(shuffle_id, stimulus_id=stimulus_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is more a style note and I typically try to avoid style questions in a PR review. Still, this feels a bit convoluted. I believe something like
if shuffle := self.active_shuffles.get(shuffle_id):
self._fail_on_workers(shuffle, message=f"{shuffle} forgotten")
self._clean_on_scheduler(shuffle_id, stimulus_id=stimulus_id)
elif finish == "forgotten":
...
is easier to read than try/except;pass/else. with or without walrus.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Feel free to ignore. Logic is the same in the end
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fair point, this bit has gotten out of hand.
def __eq__(self, other: Any) -> bool: | ||
return type(other) == type(self) and other.run_id == self.run_id |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This appears to be not covered by tests. Why is this necessary then?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It had some use in an earlier iteration. Removed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My guess is because __hash__
is now not the default and this addition of __eq__
ensures that __eq__
and the newly defined __hash__
are consistent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose this is because the run_id
is a unique token that defines the shuffle state object.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose this is because the run_id is a unique token that defines the shuffle state object.
That's why I initially had a new __eq__
. Then __hash__
had to match it. Now I'm only using __hash__
, so I think there's no need for a custom __eq__
that could potentially get outdated.
distributed/shuffle/_shuffle.py
Outdated
except ShuffleClosedError: | ||
raise Reschedule() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This lack of coverage make me nervous. I think around the barrier there are various races we should test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't been able to come up with a scenario where this would be triggered (and relevant), so I've removed it for now. If this ever pops up for somebody, I hope they'll send a bug report our way.
self._clean_on_scheduler(shuffle_id, stimulus_id=stimulus_id) | ||
|
||
if finish == "forgotten": | ||
shuffles = self._shuffles.pop(shuffle_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC this entire logic is just there to clean up. Behavior would not be impacted if we didn't do any of this, correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this is state cleanup on the scheduler plugin.
recs.update({dt.key: "released"}) | ||
|
||
if barrier_task.state == "erred": | ||
return {} # pragma: no cover |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why don't you want coverage to detect this? Seems like an important case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a comment to explain
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems like an ideal case for an assert False, "Invariant broken"
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That would also work. I'm wondering if assert False
is the right thing to add here given that PYTHONOPTIMIZE
will strip them. It would work as an addition though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
raising a RuntimeError
now
|
||
for dt in barrier_task.dependencies: | ||
if dt.state == "erred": | ||
return {} # pragma: no cover |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a comment to explain
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similarly here.
while self._runs: | ||
await asyncio.sleep(0.1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't insist on this but I don't like these sleep patterns.
def __init__(...):
self._runs_condition = asyncio.Condition()
async def _close_shuffle_run(self, shuffle: ShuffleRun) -> None:
await shuffle.close()
async with self._runs_condition:
self._runs.remove(shuffle)
self._runs_condition.notify_all()
async def teardown(self, worker: Worker) -> None:
...
async with self._runs_condition:
await self._runs_condition.wait_for(lambda: not self._runs)
would be a clean alternative. Many people consider Conditions too complex but what I like about them is that they make this relationship very clear (and they unblock immediately which is nice for testing and such things).
As I said, I don't insist on this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To a more serious question: Is it possible for _runs to be repopulated at this point or are we locking everything up properly for this to not happen once we reach this point?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense! I've added the condition. At this point the plugin is closed which will raise a ShuffleClosedError
before a new run can be added.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_runs
is added to in _refresh_shuffle
which doesn't have a lock associated with it. But I am not sure if that can be running simultaneously with teardown
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
teardown
sets
distributed/distributed/shuffle/_worker_plugin.py
Lines 890 to 893 in 90eb9ea
async def teardown(self, worker: Worker) -> None: | |
assert not self.closed | |
self.closed = True |
Once that is done,
distributed/distributed/shuffle/_worker_plugin.py
Lines 810 to 811 in 90eb9ea
if self.closed: | |
raise ShuffleClosedError(f"{self} has already been closed") |
self._runs.add(shuffle) |
@gen_cluster( | ||
client=True, | ||
nthreads=[("", 1)] * 2, | ||
config={"distributed.scheduler.allowed-failures": 0}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this mean that P2P is now retried allowed-failures
times if a worker goes OOM?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't be a dealbreaker but I also don't think this is useful. It's very unlikely that another P2P run attempt would be more successful.
However, there are of course also cases like spot interruption where this matters... Never mind!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this mean that P2P is now retried allowed-failures times if a worker goes OOM?
Yes, as there might be other causes apart from an output partition being too large.
@@ -578,12 +699,39 @@ async def test_closed_worker_during_unpack(c, s, a, b): | |||
freq="10 s", | |||
) | |||
out = dd.shuffle.shuffle(df, "x", shuffle="p2p") | |||
out = out.persist() | |||
x, y = c.compute([df.x.size, out.x.size]) | |||
await wait_for_tasks_in_state("shuffle-p2p", "memory", 1, b) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess this is out of scope for this PR but I think it would make sense to have an API to easily get access to the actual shuffle instnaces held by the plugins and have a stage
attribute that indicates whether we're in transfer, barrier or unpack stage.
I would find this kind of verification nicer than waiting for task states.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most of my comments are nits and you can ignore or use them.
The one thing I'm concerned about is that the coverage report indicates some rather critical code sections are not covered. We should look into that before merging
def _create_shuffle_run( | ||
self, shuffle_id: ShuffleId, result: dict[str, Any] | ||
) -> ShuffleRun: | ||
shuffle: ShuffleRun | ||
if result["type"] == ShuffleType.DATAFRAME: | ||
shuffle = DataFrameShuffleRun( | ||
column=result["column"], | ||
worker_for=result["worker_for"], | ||
output_workers=result["output_workers"], | ||
id=shuffle_id, | ||
run_id=result["run_id"], | ||
directory=os.path.join( | ||
self.worker.local_directory, | ||
f"shuffle-{shuffle_id}-{result['run_id']}", | ||
), | ||
executor=self._executor, | ||
local_address=self.worker.address, | ||
rpc=self.worker.rpc, | ||
scheduler=self.worker.scheduler, | ||
memory_limiter_disk=self.memory_limiter_disk, | ||
memory_limiter_comms=self.memory_limiter_comms, | ||
) | ||
shuffle = self._create_dataframe_shuffle_run(shuffle_id, result) | ||
elif result["type"] == ShuffleType.ARRAY_RECHUNK: | ||
shuffle = ArrayRechunkRun( | ||
worker_for=result["worker_for"], | ||
output_workers=result["output_workers"], | ||
old=result["old"], | ||
new=result["new"], | ||
id=shuffle_id, | ||
run_id=result["run_id"], | ||
directory=os.path.join( | ||
self.worker.local_directory, | ||
f"shuffle-{shuffle_id}-{result['run_id']}", | ||
), | ||
executor=self._executor, | ||
local_address=self.worker.address, | ||
rpc=self.worker.rpc, | ||
scheduler=self.worker.scheduler, | ||
memory_limiter_disk=self.memory_limiter_disk, | ||
memory_limiter_comms=self.memory_limiter_comms, | ||
) | ||
shuffle = self._create_array_rechunk_run(shuffle_id, result) | ||
else: # pragma: no cover | ||
raise TypeError(result["type"]) | ||
self.shuffles[shuffle_id] = shuffle | ||
self._runs.add(shuffle) | ||
return shuffle | ||
|
||
def _create_dataframe_shuffle_run( | ||
self, shuffle_id: ShuffleId, result: dict[str, Any] | ||
) -> DataFrameShuffleRun: | ||
return DataFrameShuffleRun( | ||
column=result["column"], | ||
worker_for=result["worker_for"], | ||
output_workers=result["output_workers"], | ||
id=shuffle_id, | ||
run_id=result["run_id"], | ||
directory=os.path.join( | ||
self.worker.local_directory, | ||
f"shuffle-{shuffle_id}-{result['run_id']}", | ||
), | ||
executor=self._executor, | ||
local_address=self.worker.address, | ||
rpc=self.worker.rpc, | ||
scheduler=self.worker.scheduler, | ||
memory_limiter_disk=self.memory_limiter_disk, | ||
memory_limiter_comms=self.memory_limiter_comms, | ||
) | ||
|
||
def _create_array_rechunk_run( | ||
self, shuffle_id: ShuffleId, result: dict[str, Any] | ||
) -> ArrayRechunkRun: | ||
return ArrayRechunkRun( | ||
worker_for=result["worker_for"], | ||
output_workers=result["output_workers"], | ||
old=result["old"], | ||
new=result["new"], | ||
id=shuffle_id, | ||
run_id=result["run_id"], | ||
directory=os.path.join( | ||
self.worker.local_directory, | ||
f"shuffle-{shuffle_id}-{result['run_id']}", | ||
), | ||
executor=self._executor, | ||
local_address=self.worker.address, | ||
rpc=self.worker.rpc, | ||
scheduler=self.worker.scheduler, | ||
memory_limiter_disk=self.memory_limiter_disk, | ||
memory_limiter_comms=self.memory_limiter_comms, | ||
) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cosmetical refactoring to make it easier to understand whether we could potentially encounter races.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure about the maintenance of invariants in remove_worker
, plus a similar request to @fjetter for some coverage on edge cases?
def __eq__(self, other: Any) -> bool: | ||
return type(other) == type(self) and other.run_id == self.run_id |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My guess is because __hash__
is now not the default and this addition of __eq__
ensures that __eq__
and the newly defined __hash__
are consistent.
def __eq__(self, other: Any) -> bool: | ||
return type(other) == type(self) and other.run_id == self.run_id |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose this is because the run_id
is a unique token that defines the shuffle state object.
if worker not in self.scheduler.workers: | ||
raise RuntimeError(f"Scheduler is unaware of this worker {worker!r}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this be tested by retiring a worker during a shuffle in a test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't been able to come up with a scenario where this would happen, but given how messy worker shutdown can be, I'm not 100% certain this would never happen. Left it in with a note for now.
if worker not in shuffle.participating_workers: | ||
continue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Test by adding a worker to the cluster and then restarting a shuffle?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
refactored.
|
||
stimulus_id = f"shuffle-failed-worker-left-{time()}" | ||
self._restart_shuffle(shuffle.id, scheduler, stimulus_id=stimulus_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, so first we restart all shuffles that were interrupted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that restarting this shuffle should remove it from _archived_by
but I do not see that happening. Do I have that right? Or does this somehow create a new shuffle object that has archived_by = None
. Otherwise it seems like it might get lost in _clean_on_scheduler
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Restarting a shuffle removes the ShuffleState
from active states. The first shuffle_transfer
task to call shuffle_get_or_create
will cause the SchedulerPlugin to create a new ShuffleState
with an incremented run_id
and _archived_by = None
.
# If processing the transactions causes a task to get released, this | ||
# removes the shuffle from self.active_shuffles. Therefore, we must iterate | ||
# over a copy. | ||
for shuffle_id, shuffle in self.active_shuffles.copy().items(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then we iterate over all active shuffles, remove and restart?
Why do we not unconditionally restart the archived shuffles after this loop over active shuffles?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not 100% sure I'm following, but what I think you're saying is a very good point.
while self._runs: | ||
await asyncio.sleep(0.1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_runs
is added to in _refresh_shuffle
which doesn't have a lock associated with it. But I am not sure if that can be running simultaneously with teardown
.
recs.update({dt.key: "released"}) | ||
|
||
if barrier_task.state == "erred": | ||
return {} # pragma: no cover |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems like an ideal case for an assert False, "Invariant broken"
?
|
||
for dt in barrier_task.dependencies: | ||
if dt.state == "erred": | ||
return {} # pragma: no cover |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similarly here.
async with self._runs_condition: | ||
await self._runs_condition.wait_for(lambda: not self._runs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This lock protects _runs
wrt _close_shuffle_run
but not wrt _refresh_shuffle
I think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've renamed it to _runs_cleanup_condition
to highlight that it's only concerned with cleanup. There's a different mechanism in place for adding to self._runs
. (Feel free to refactor in a follow-up if you see a good way of doing so.)
I've added another test, now all feedback should be addressed. For |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To the best of my understanding, this looks right!
Closes #7353
Blocked by and includes #7967Blocked by and includes #7979Blocked by and includes #7981Blocked by and includes #7974pre-commit run --all-files