Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support dumping cluster state to URL #5863

Merged
merged 16 commits into from
Mar 10, 2022

Conversation

gjoseph92
Copy link
Collaborator

Adds support for passing any fsspec.open-compatible URL to Client.dump_cluster_state, in which case the scheduler will generate the cluster dump itself and write it directly to that URL, without transferring anything back to the client.

Previously, the logic to consolidate state/version information from the workers and the scheduler happened via multiple RPCs on the client. Now, this is moved to a Scheduler.get_cluster_state method. You might think this would lead to worse performance, since there's less overlap of communication, but given the way broadcast works (the messages all got de-serialized and re-serialized on the scheduler anyway, without streaming, and the scheduler's event loop is blocked anyway by _to_dict), I don't think this change should make much performance difference. I haven't tested that though.

cc @crusaderky @fjetter @graingert @sevberg

Full state dumps (not just scheduler state) are now generated scheduler-side, then sent wholesale back to the client.

Scheduler `dump_cluster_state` can write the state directly to a URL via fsspec. Still need to hook this up to a client method.
It's not done, just going to move it into a comment on GitHub instead
distributed/tests/test_scheduler.py Show resolved Hide resolved
distributed/scheduler.py Outdated Show resolved Hide resolved
@github-actions
Copy link
Contributor

github-actions bot commented Feb 24, 2022

Unit Test Results

       12 files  ±       0         12 suites  ±0   5h 46m 49s ⏱️ - 1h 22m 40s
  2 639 tests +     15    2 555 ✔️ +     17    80 💤 ±    0  4  - 2 
12 977 runs   - 2 691  12 333 ✔️  - 2 470  637 💤  - 222  7 +1 

For more details on these failures, see this check.

Results for commit 879f847. ± Comparison against base commit de94b40.

♻️ This comment has been updated with latest results.

distributed/scheduler.py Outdated Show resolved Hide resolved
distributed/tests/test_client.py Outdated Show resolved Hide resolved
distributed/scheduler.py Outdated Show resolved Hide resolved
Comment on lines 4142 to 4170
if format == "msgpack":
import msgpack

# NOTE: `compression="infer"` will automatically use gzip via the `.gz` suffix
mode = "wb"
suffix = ".msgpack.gz"
if not url.endswith(suffix):
url += suffix
writer = msgpack.pack
elif format == "yaml":
import yaml

mode = "w"
suffix = ".yaml"
if not url.endswith(suffix):
url += suffix

def writer(state: dict, f):
# YAML adds unnecessary `!!python/tuple` tags; convert tuples to lists to avoid them.
def tuple_to_list(node):
if isinstance(node, (list, tuple)):
return [tuple_to_list(el) for el in node]
elif isinstance(node, dict):
return {k: tuple_to_list(v) for k, v in node.items()}
else:
return node

state = tuple_to_list(state)
yaml.dump(state, f)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really want this to be duplicated? We already have a drift in the two versions (just a comment). Can't this function be refactored to be reused by local and remote?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I don't like the duplication either. There are a couple subtle differences right now:

  • Client runs tuple_to_list for both msgpack and yaml because of serialization; dump_cluster_state_to_url only runs it for yaml. I'd guess doing it for msgpack isn't necessary anywhere, so this shouldn't matter.
  • dump_cluster_state_to_url uses fsspec to open the file; client uses plain open. This gives you a way to get a cluster dump if you don't want to install fsspec.

Neither of those seem like huge deals to me, so I'll make it a shared function. Just not sure where to put it besides the dreaded utils.py.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about a new module? The stuff in #5873 needs to live somewhere as well

I'd guess doing it for msgpack isn't necessary anywhere, so this shouldn't matter.

symmetry. Having everything identical regardless of the output format is nice since every code you write works on both files. One thing I actually do often is to convert a received msgpack file to yaml which allows me to grep stuff 🙈

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

symmetry. Having everything identical regardless of the output format is nice

Converting tuples to lists in the input to msgpack doesn't matter though, since the msgpack itself effectively turns tuples into lists in the dumping process (since tuples are represented as lists). And for large cluster dumps, I think saving a full traversal and copy of the state is worthwhile.

One thing I actually do often is to convert a received msgpack file to yaml which allows me to grep stuff

I do this too. When you read in the msgpack though, it'll be all lists anyway, so the format is symmetrical when you then dump to yaml.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Converting tuples to lists in the input to msgpack doesn't matter though, since the msgpack itself effectively turns tuples into lists in the dumping process (since tuples are represented as lists)

afaik, you'll need to toggle this specifically. IIRC, msgpack load will reconstruct lists (which is horribly slow)

Copy link
Member

@fjetter fjetter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There appears to be a related test failure

FAILED distributed/tests/test_cluster_dump.py::test_url_and_writer_yaml - Typ...

@gjoseph92 gjoseph92 self-assigned this Mar 8, 2022
@ian-r-rose ian-r-rose self-requested a review March 8, 2022 17:05
distributed/scheduler.py Outdated Show resolved Hide resolved
distributed/client.py Outdated Show resolved Hide resolved
distributed/client.py Outdated Show resolved Hide resolved
@sjperkins sjperkins mentioned this pull request Mar 9, 2022
3 tasks
@gjoseph92 gjoseph92 requested a review from fjetter March 9, 2022 18:13
@gjoseph92
Copy link
Collaborator Author

Possibly some #5869 in tests, but at least tests from this PR aren't failing?

FAILED distributed/tests/test_cancelled_state.py::test_worker_stream_died_during_comm
FAILED distributed/tests/test_scheduler.py::test_missing_data_errant_worker
Error: Process completed with exit code 1
Error: Process completed with exit code 1

@fjetter
Copy link
Member

fjetter commented Mar 10, 2022

From what I can tell all test failures are unrelated. There are also a bunch of #5910 failures.
Thank you @gjoseph92 !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Make writing cluster state dumps to buckets easier
3 participants