-
-
Notifications
You must be signed in to change notification settings - Fork 719
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Reset state of ShuffleSchedulerExtension
on restart
#7446
Reset state of ShuffleSchedulerExtension
on restart
#7446
Conversation
Unit Test ResultsSee test report for an extended history of previous test failures. This is useful for diagnosing flaky tests. 22 files ± 0 22 suites ±0 10h 20m 46s ⏱️ + 50m 45s For more details on these failures, see this check. Results for commit cd8ddff. ± Comparison against base commit b5a2078. ♻️ This comment has been updated with latest results. |
# Cannot rerun forgotten shuffle due to tombstone | ||
with pytest.raises(RuntimeError, match="shuffle_transfer"): | ||
await c.compute(dd.shuffle.shuffle(df, "y", shuffle="p2p")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hold on, this is concerning. We cannot rerun the same shuffle even after it finished successfully?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, due to the get-or-create style of shuffle_get
, cancelled tasks could otherwise re-create a forgotten shuffle which would leave stale state indefinitely on the cluster. We should be able to solve this with something like #7372.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See also
distributed/distributed/shuffle/tests/test_shuffle.py
Lines 892 to 918 in 401b51d
@pytest.mark.xfail(reason="Tombstone prohibits multiple calls to head") | |
@gen_cluster(client=True, nthreads=[("127.0.0.1", 4)] * 2) | |
async def test_repeat(c, s, a, b): | |
df = dask.datasets.timeseries( | |
start="2000-01-01", | |
end="2000-01-10", | |
dtypes={"x": float, "y": float}, | |
freq="100 s", | |
) | |
out = dd.shuffle.shuffle(df, "x", shuffle="p2p") | |
await c.compute(out.head(compute=False)) | |
await clean_worker(a, timeout=2) | |
await clean_worker(b, timeout=2) | |
await clean_scheduler(s, timeout=2) | |
await c.compute(out.tail(compute=False)) | |
await clean_worker(a, timeout=2) | |
await clean_worker(b, timeout=2) | |
await clean_scheduler(s, timeout=2) | |
await c.compute(out.head(compute=False)) | |
await clean_worker(a, timeout=2) | |
await clean_worker(b, timeout=2) | |
await clean_scheduler(s, timeout=2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
xref #7452
Fixes issue where forgotten shuffles could not be re-run after a cluster restart
pre-commit run --all-files