Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support dumping cluster state to URL #5863

Merged
merged 16 commits into from
Mar 10, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 61 additions & 37 deletions distributed/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3820,34 +3820,14 @@ def scheduler_info(self, **kwargs):
self.sync(self._update_scheduler_info)
return self._scheduler_identity

async def _dump_cluster_state(
async def _dump_cluster_state_local(
self,
filename: str,
exclude: Collection[str],
format: Literal["msgpack", "yaml"],
storage_options: dict | None = None,
) -> None:

scheduler_info = self.scheduler.dump_state(exclude=exclude)

workers_info = self.scheduler.broadcast(
msg={"op": "dump_state", "exclude": exclude},
on_error="return_pickle",
)
versions_info = self._get_versions()
scheduler_info, workers_info, versions_info = await asyncio.gather(
scheduler_info, workers_info, versions_info
)
# Unpickle RPC errors and convert them to string
workers_info = {
k: repr(loads(v)) if isinstance(v, bytes) else v
for k, v in workers_info.items()
}

state = {
"scheduler": scheduler_info,
"workers": workers_info,
"versions": versions_info,
}
state = await self.scheduler.get_cluster_state(exclude=exclude)

def tuple_to_list(node):
if isinstance(node, (list, tuple)):
Expand Down Expand Up @@ -3886,42 +3866,76 @@ def tuple_to_list(node):
f"Unsupported format {format}. Possible values are `msgpack` or `yaml`"
)

async def _dump_cluster_state_remote(
self,
filename: str,
exclude: Collection[str],
format: Literal["msgpack", "yaml"],
storage_options: dict | None = None,
) -> None:
"Tell the scheduler to dump cluster state to a URL"
await self.scheduler.dump_cluster_state_to_url(
url=filename,
exclude=exclude,
format=format,
storage_options=storage_options,
)

def dump_cluster_state(
self,
filename: str = "dask-cluster-dump",
exclude: Collection[str] = ("run_spec",),
format: Literal["msgpack", "yaml"] = "msgpack",
**kwargs,
):
"""Extract a dump of the entire cluster state and persist to disk.
"""Extract a dump of the entire cluster state and persist to disk or a URL.
This is intended for debugging purposes only.

Warning: Memory usage on client side can be large.
Warning: Memory usage on the scheduler (and client, if writing the dump locally)
can be large. On a large or long-running cluster, this can take several minutes.
The scheduler may be unresponsive while the dump is processed.

Results will be stored in a dict::

{
"scheduler_info": {...},
"worker_info": {
worker_addr: {...}, # worker attributes
"scheduler": {...}, # scheduler state
"workers": {
worker_addr: {...}, # worker state
...
}
"versions": {
"scheduler": {...},
"workers": {
worker_addr: {...},
...
}
}
}

Parameters
----------
filename:
The output filename. The appropriate file suffix (`.msgpack.gz` or
`.yaml`) will be appended automatically.
The output filename. The appropriate file suffix (``.msgpack.gz`` or
``.yaml``) will be appended automatically.

If the filename is a URL supported by :func:`fsspec.open` (like
``s3://my-bucket/cluster-dump``, or any URL containing ``://``), the
dump will be written directly to that URL from the scheduler.
Any additional keyword arguments will be passed to :func:`fsspec.open`.

Otherwise, the filename is interpreted as a path on the local
filesystem. The cluster state will be sent from the scheduler back
to the client, then written to disk.
exclude:
A collection of attribute names which are supposed to be excluded
from the dump, e.g. to exclude code, tracebacks, logs, etc.

Defaults to exclude `run_spec` which is the serialized user code. This
is typically not required for debugging. To allow serialization of
this, pass an empty tuple.
Defaults to exclude ``run_spec``, which is the serialized user code.
This is typically not required for debugging. To allow serialization
of this, pass an empty tuple.
format:
Either msgpack or yaml. If msgpack is used (default), the output
will be stored in a gzipped file as msgpack.
Either ``"msgpack"`` or ``"yaml"``. If msgpack is used (default),
the output will be stored in a gzipped file as msgpack.

To read::

Expand All @@ -3938,13 +3952,23 @@ def dump_cluster_state(
from yaml import Loader
with open("filename") as fd:
state = yaml.load(fd, Loader=Loader)
**kwargs:
Any additional arguments to :func:`fsspec.open` when writing to a URL.
Ignored when writing to a local file.

"""
filename = str(filename)
method = (
self._dump_cluster_state_remote
if "://" in filename
gjoseph92 marked this conversation as resolved.
Show resolved Hide resolved
else self._dump_cluster_state_local
)
return self.sync(
self._dump_cluster_state,
method,
filename=filename,
format=format,
exclude=exclude,
format=format,
storage_options=kwargs,
)

def write_scheduler_file(self, scheduler_file):
Expand Down
103 changes: 102 additions & 1 deletion distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from datetime import timedelta
from functools import partial
from numbers import Number
from typing import ClassVar, Literal
from typing import Any, ClassVar, Dict, Literal, Optional
from typing import cast as pep484_cast

import psutil
Expand Down Expand Up @@ -3970,6 +3970,8 @@ def __init__(
"subscribe_worker_status": self.subscribe_worker_status,
"start_task_metadata": self.start_task_metadata,
"stop_task_metadata": self.stop_task_metadata,
"get_cluster_state": self.get_cluster_state,
"dump_cluster_state_to_url": self.dump_cluster_state_to_url,
}

connection_limit = get_fileno_limit() / 2
Expand Down Expand Up @@ -4082,6 +4084,105 @@ def _to_dict(
info.update(recursive_to_dict(extra, exclude=exclude))
return info

async def get_cluster_state(
self,
exclude: Collection[str],
gjoseph92 marked this conversation as resolved.
Show resolved Hide resolved
) -> dict:
"Produce the state dict used in a cluster state dump"
# Kick off state-dumping on workers before we block the event loop in `self._to_dict`.
workers_future = asyncio.gather(
self.broadcast(
msg={"op": "dump_state", "exclude": exclude},
on_error="return",
),
self.broadcast(
msg={"op": "versions"},
on_error="ignore",
),
)
try:
scheduler_state = self._to_dict(exclude=exclude)

worker_states, worker_versions = await workers_future
finally:
# Ensure the tasks aren't left running if anything fails.
# Someday (py3.11), use a trio-style TaskGroup for this.
workers_future.cancel()

# Convert any RPC errors to strings
worker_states = {
k: repr(v) if isinstance(v, Exception) else v
for k, v in worker_states.items()
}

return {
"scheduler": scheduler_state,
"workers": worker_states,
"versions": {"scheduler": self.versions(), "workers": worker_versions},
}

async def dump_cluster_state_to_url(
self,
url: str,
exclude: Collection[str],
format: Literal["msgpack", "yaml"],
storage_options: Optional[Dict[str, Any]] = None,
) -> None:
"Write a cluster state dump to an fsspec-compatible URL. Requires fsspec."
try:
import fsspec
gjoseph92 marked this conversation as resolved.
Show resolved Hide resolved
except ImportError:
raise ImportError(
"fsspec must be installed to dump cluster state directly to a URL"
)

if storage_options is None:
storage_options = {}

if format == "msgpack":
import msgpack

# NOTE: `compression="infer"` will automatically use gzip via the `.gz` suffix
mode = "wb"
suffix = ".msgpack.gz"
if not url.endswith(suffix):
url += suffix
writer = msgpack.pack
elif format == "yaml":
import yaml

mode = "w"
suffix = ".yaml"
if not url.endswith(suffix):
url += suffix

def writer(state: dict, f):
# YAML adds unnecessary `!!python/tuple` tags; convert tuples to lists to avoid them.
def tuple_to_list(node):
if isinstance(node, (list, tuple)):
return [tuple_to_list(el) for el in node]
elif isinstance(node, dict):
return {k: tuple_to_list(v) for k, v in node.items()}
else:
return node

state = tuple_to_list(state)
yaml.dump(state, f)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really want this to be duplicated? We already have a drift in the two versions (just a comment). Can't this function be refactored to be reused by local and remote?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I don't like the duplication either. There are a couple subtle differences right now:

  • Client runs tuple_to_list for both msgpack and yaml because of serialization; dump_cluster_state_to_url only runs it for yaml. I'd guess doing it for msgpack isn't necessary anywhere, so this shouldn't matter.
  • dump_cluster_state_to_url uses fsspec to open the file; client uses plain open. This gives you a way to get a cluster dump if you don't want to install fsspec.

Neither of those seem like huge deals to me, so I'll make it a shared function. Just not sure where to put it besides the dreaded utils.py.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about a new module? The stuff in #5873 needs to live somewhere as well

I'd guess doing it for msgpack isn't necessary anywhere, so this shouldn't matter.

symmetry. Having everything identical regardless of the output format is nice since every code you write works on both files. One thing I actually do often is to convert a received msgpack file to yaml which allows me to grep stuff 🙈

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

symmetry. Having everything identical regardless of the output format is nice

Converting tuples to lists in the input to msgpack doesn't matter though, since the msgpack itself effectively turns tuples into lists in the dumping process (since tuples are represented as lists). And for large cluster dumps, I think saving a full traversal and copy of the state is worthwhile.

One thing I actually do often is to convert a received msgpack file to yaml which allows me to grep stuff

I do this too. When you read in the msgpack though, it'll be all lists anyway, so the format is symmetrical when you then dump to yaml.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Converting tuples to lists in the input to msgpack doesn't matter though, since the msgpack itself effectively turns tuples into lists in the dumping process (since tuples are represented as lists)

afaik, you'll need to toggle this specifically. IIRC, msgpack load will reconstruct lists (which is horribly slow)


else:
raise ValueError(
f"Unsupported format {format!r}. Possible values are 'msgpack' or 'yaml'."
)

# Eagerly open the file to catch any errors before doing the full dump
with fsspec.open(url, mode, compression="infer", **storage_options) as f:
state = await self.get_cluster_state(exclude)

# Write from a thread so we don't block the event loop quite as badly
# (the writer will still hold the GIL a lot though).
# TODO use `asyncio.to_thread` once <3.9 support is dropped.
await self.loop.run_in_executor(None, writer, state, f)
gjoseph92 marked this conversation as resolved.
Show resolved Hide resolved
gjoseph92 marked this conversation as resolved.
Show resolved Hide resolved

def get_worker_service_addr(self, worker, service_name, protocol=False):
"""
Get the (host, port) address of the named service on the *worker*.
Expand Down
59 changes: 31 additions & 28 deletions distributed/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7260,23 +7260,23 @@ def test_print_simple(capsys):
assert "Hello!:123" in out


def _verify_cluster_dump(path, _format: str, addresses: set[str]) -> dict:
path = str(path)
if _format == "msgpack":
import gzip
def _verify_cluster_dump(url, format: str, addresses: set[str]) -> dict:
import fsspec
gjoseph92 marked this conversation as resolved.
Show resolved Hide resolved

url = str(url)
if format == "msgpack":
import msgpack

path += ".msgpack.gz"

with gzip.open(path) as fd_zip:
state = msgpack.unpack(fd_zip)
url += ".msgpack.gz"
loader = msgpack.unpack
else:
import yaml

path += ".yaml"
with open(path) as fd_plain:
state = yaml.safe_load(fd_plain)
url += ".yaml"
loader = yaml.safe_load

with fsspec.open(url, mode="rb", compression="infer") as f:
state = loader(f)

assert isinstance(state, dict)
assert "scheduler" in state
Expand All @@ -7286,25 +7286,41 @@ def _verify_cluster_dump(path, _format: str, addresses: set[str]) -> dict:
return state


@pytest.mark.parametrize("local", [True, False])
@pytest.mark.parametrize("_format", ["msgpack", "yaml"])
def test_dump_cluster_state_sync(c, s, a, b, tmp_path, _format):
def test_dump_cluster_state_sync(c, s, a, b, tmp_path, _format, local):
filename = tmp_path / "foo"
if not local:
pytest.importorskip("fsspec")
# Make it look like an fsspec path
filename = f"file://{filename}"
c.dump_cluster_state(filename, format=_format)
_verify_cluster_dump(filename, _format, {a["address"], b["address"]})


@pytest.mark.parametrize("local", [True, False])
@pytest.mark.parametrize("_format", ["msgpack", "yaml"])
@gen_cluster(client=True)
async def test_dump_cluster_state_async(c, s, a, b, tmp_path, _format):
async def test_dump_cluster_state_async(c, s, a, b, tmp_path, _format, local):
filename = tmp_path / "foo"
if not local:
pytest.importorskip("fsspec")
# Make it look like an fsspec path
filename = f"file://{filename}"
await c.dump_cluster_state(filename, format=_format)
_verify_cluster_dump(filename, _format, {a.address, b.address})


@pytest.mark.parametrize("local", [True, False])
@gen_cluster(client=True)
async def test_dump_cluster_state_json(c, s, a, b, tmp_path):
async def test_dump_cluster_state_json(c, s, a, b, tmp_path, local):
filename = tmp_path / "foo"
if not local:
pytest.importorskip("fsspec")
# Make it look like an fsspec path
filename = f"file://{filename}"
with pytest.raises(ValueError, match="Unsupported format"):
await c.dump_cluster_state(tmp_path / "foo", format="json")
await c.dump_cluster_state(filename, format="json")


@gen_cluster(client=True)
Expand Down Expand Up @@ -7363,19 +7379,6 @@ async def test_dump_cluster_state_exclude_default(c, s, a, b, tmp_path):
assert k in s.tasks


@gen_cluster(client=True, config={"distributed.comm.timeouts.connect": "200ms"})
async def test_dump_cluster_state_error(c, s, a, b, tmp_path):
a.stop()
filename = tmp_path / "foo"
await c.dump_cluster_state(filename, format="yaml")
state = _verify_cluster_dump(filename, "yaml", {a.address, b.address})
assert state["workers"][a.address] == (
f"OSError('Timed out trying to connect to {a.address} after 0.2 s')"
)
assert isinstance(state["workers"][b.address], dict)
assert state["versions"]["workers"].keys() == {b.address}


class TestClientSecurityLoader:
@contextmanager
def config_loader(self, monkeypatch, loader):
Expand Down
Loading