Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cluster dump utilities #5920

Merged
merged 29 commits into from
Mar 23, 2022
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
07539ec
Initial stab
sjperkins Mar 9, 2022
5c727a8
Better error message
sjperkins Mar 9, 2022
1d957c8
Task states should be in memory
sjperkins Mar 9, 2022
e2375f3
Clean up test case
sjperkins Mar 9, 2022
6063d68
WIP
sjperkins Mar 10, 2022
62c8810
Finalise DumpInspector class
sjperkins Mar 10, 2022
dea9d7e
In case of unresponsive workers, the worker dump might be a string de…
sjperkins Mar 11, 2022
75a4f6a
Provide story docstrings
sjperkins Mar 11, 2022
3c8821b
Merge branch 'main' into cluster_dump_utilities
sjperkins Mar 14, 2022
dee99de
Document load_cluster_dump
sjperkins Mar 14, 2022
4c69599
Annotate story return types
sjperkins Mar 14, 2022
cbd5609
Test with {submit,map} instead of dask.array
sjperkins Mar 14, 2022
182b3ea
Fix typo
sjperkins Mar 14, 2022
b233000
Test output of tasks_in_state
sjperkins Mar 14, 2022
0381454
Use defaultdict to group tasks with a duplicate key
sjperkins Mar 14, 2022
17e4136
Merge branch 'main' into cluster_dump_utilities
sjperkins Mar 15, 2022
15a0719
Split scheduler and worker task/story retrieval into separate functions
sjperkins Mar 15, 2022
51bb587
WIP Cluster dump tests
sjperkins Mar 15, 2022
7777724
Finalise task and story tests
sjperkins Mar 15, 2022
d428690
Rename DumpInspector to DumpArtefact
sjperkins Mar 15, 2022
426bef0
Remove unused earlier logic
sjperkins Mar 15, 2022
d8264ae
Add support for splitting the dump artefact into a tree of yaml files
sjperkins Mar 15, 2022
3a989bc
Support compaction of trivial state keys into a general.yaml file
sjperkins Mar 15, 2022
799325e
Address review comments
sjperkins Mar 16, 2022
1cb1d98
Remove Path conversion
sjperkins Mar 16, 2022
e9a7c3d
Convert story output from lists to dicts
sjperkins Mar 16, 2022
1ecb088
Some other test case improvements
sjperkins Mar 16, 2022
4b65b81
to_yaml -> to_yamls
sjperkins Mar 17, 2022
9f81cfc
Merge branch 'main' into cluster_dump_utilities
sjperkins Mar 23, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
271 changes: 270 additions & 1 deletion distributed/cluster_dump.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,17 @@

from __future__ import annotations

from typing import IO, Any, Awaitable, Callable, Literal
from collections import defaultdict
from collections.abc import Mapping
from pathlib import Path
from typing import IO, Any, Awaitable, Callable, Collection, Literal

import fsspec
import msgpack

from distributed.compatibility import to_thread
from distributed.stories import scheduler_story as _scheduler_story
from distributed.stories import worker_story as _worker_story


def _tuple_to_list(node):
Expand Down Expand Up @@ -57,3 +62,267 @@ def writer(state: dict, f: IO):
# Write from a thread so we don't block the event loop quite as badly
# (the writer will still hold the GIL a lot though).
await to_thread(writer, state, f)


def load_cluster_dump(url: str, **kwargs) -> dict:
"""Loads a cluster dump from a disk artefact

Parameters
----------
url : str
Name of the disk artefact. This should have either a
sjperkins marked this conversation as resolved.
Show resolved Hide resolved
``.msgpack.gz`` or ``yaml`` suffix, depending on the dump format.
**kwargs :
Extra arguments passed to :func:`fsspec.open`.

Returns
-------
state : dict
The cluster state at the time of the dump.
"""
if url.endswith(".msgpack.gz"):
mode = "rb"
reader = msgpack.unpack
elif url.endswith(".yaml"):
import yaml

mode = "r"
reader = yaml.safe_load
else:
raise ValueError(f"url ({url}) must have a .msgpack.gz or .yaml suffix")

kwargs.setdefault("compression", "infer")

with fsspec.open(url, mode, **kwargs) as f:
return reader(f)


class DumpArtefact(Mapping):
"""
Utility class for inspecting the state of a cluster dump

.. code-block:: python

dump = DumpArtefact.from_url("dump.msgpack.gz")
memory_tasks = dump.scheduler_tasks("memory")
executing_tasks = dump.worker_tasks("executing")
"""

def __init__(self, state: dict):
self.dump = state

@classmethod
def from_url(cls, url: str, **kwargs) -> DumpArtefact:
"""Loads a cluster dump from a disk artefact

Parameters
----------
url : str
Name of the disk artefact. This should have either a
``.msgpack.gz`` or ``yaml`` suffix, depending on the dump format.
**kwargs :
Extra arguments passed to :func:`fsspec.open`.

Returns
-------
state : dict
The cluster state at the time of the dump.
"""
return DumpArtefact(load_cluster_dump(url, **kwargs))

def __getitem__(self, key):
return self.dump[key]

def __iter__(self):
return iter(self.dump)

def __len__(self):
return len(self.dump)

def _extract_tasks(self, state: str | None, context: dict):
if state:
return [v for v in context.values() if v["state"] == state]
else:
return list(context.values())

def scheduler_tasks_in_state(self, state: str | None = None) -> list:
"""
Parameters
----------
state : optional, str
If provided, only tasks in the given state are returned.
Otherwise, all tasks are returned.

Returns
-------
tasks : list
The list of scheduler tasks in ``state``.
"""
return self._extract_tasks(state, self.dump["scheduler"]["tasks"])

def worker_tasks_in_state(self, state: str | None = None) -> list:
"""
Parameters
----------
state : optional, str
If provided, only tasks in the given state are returned.
Otherwise, all tasks are returned.

Returns
-------
tasks : list
The list of worker tasks in ``state``
"""
tasks = []

for worker_dump in self.dump["workers"].values():
if isinstance(worker_dump, dict) and "tasks" in worker_dump:
tasks.extend(self._extract_tasks(state, worker_dump["tasks"]))

return tasks

def scheduler_story(self, *key_or_stimulus_id: str) -> dict:
"""
Returns
-------
stories : dict
A list of stories for the keys/stimulus ID's in ``*key_or_stimulus_id``.
"""
stories = defaultdict(list)

log = self.dump["scheduler"]["transition_log"]
keys = set(key_or_stimulus_id)

for story in _scheduler_story(keys, log):
stories[story[0]].append(tuple(story))

return dict(stories)

def worker_story(self, *key_or_stimulus_id: str) -> dict:
"""
Returns
-------
stories : dict
A dict of stories for the keys/stimulus ID's in ``*key_or_stimulus_id`.`
"""
keys = set(key_or_stimulus_id)
stories = defaultdict(list)

for worker_dump in self.dump["workers"].values():
if isinstance(worker_dump, dict) and "log" in worker_dump:
for story in _worker_story(keys, worker_dump["log"]):
stories[story[0]].append(tuple(story))

return dict(stories)

def missing_workers(self) -> list:
"""
Returns
-------
missing : list
A list of workers connected to the scheduler, but which
did not respond to requests for a state dump.
"""
scheduler_workers = self.dump["scheduler"]["workers"]
responsive_workers = self.dump["workers"]
return [
w
for w in scheduler_workers
if w not in responsive_workers
or not isinstance(responsive_workers[w], dict)
]

def _compact_state(self, state: dict, expand_keys: set[str]):
"""Compacts ``state`` keys into a general key,
unless the key is in ``expand_keys``"""
assert "general" not in state
result = {}
general = {}

for k, v in state.items():
if k in expand_keys:
result[k] = v
else:
general[k] = v

result["general"] = general
return result

def to_yaml(
sjperkins marked this conversation as resolved.
Show resolved Hide resolved
self,
root_dir: str | Path | None = None,
worker_expand_keys: Collection[str] = ("config", "log", "logs", "tasks"),
scheduler_expand_keys: Collection[str] = (
"events",
"extensions",
"log",
"task_groups",
"tasks",
"transition_log",
"workers",
),
):
"""
Splits the Dump Artefact into a tree of yaml files with
``root_dir`` as it's base.

The root level of the tree contains a directory for the scheduler
and directories for each individual worker.
Each directory contains yaml files describing the state of the scheduler
or worker when the artefact was created.

In general, keys associated with the state are compacted into a ``general.yaml``
file, unless they are in ``scheduler_expand_keys`` and ``worker_expand_keys``.

Parameters
----------
root_dir : str or Path
The root directory into which the tree is written.
Defaults to the current working directory if ``None``.
worker_expand_keys : iterable of str
An iterable of artefact worker keys that will be expanded
into separate yaml files.
Keys that are not in this iterable are compacted into a
`general.yaml` file.
scheduler_expand_keys : iterable of str
An iterable of artefact scheduler keys that will be expanded
into separate yaml files.
Keys that are not in this iterable are compacted into a
``general.yaml`` file.
"""
import yaml

root_dir = Path(root_dir) if root_dir else Path.cwd()
dumper = yaml.CSafeDumper
scheduler_expand_keys = set(scheduler_expand_keys)
worker_expand_keys = set(worker_expand_keys)

workers = self.dump["workers"]
for info in workers.values():
try:
worker_id = info["id"]
except KeyError:
continue

worker_state = self._compact_state(info, worker_expand_keys)

log_dir = root_dir / worker_id
log_dir.mkdir(parents=True, exist_ok=True)

for name, _logs in worker_state.items():
filename = str(log_dir / f"{name}.yaml")
with open(filename, "w") as fd:
yaml.dump(_logs, fd, Dumper=dumper)

context = "scheduler"
scheduler_state = self._compact_state(self.dump[context], scheduler_expand_keys)

log_dir = root_dir / context
log_dir.mkdir(parents=True, exist_ok=True)
# Compact smaller keys into a general dict

for name, _logs in scheduler_state.items():
filename = str(log_dir / f"{name}.yaml")

with open(filename, "w") as fd:
yaml.dump(_logs, fd, Dumper=dumper)
5 changes: 2 additions & 3 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
from distributed.security import Security
from distributed.semaphore import SemaphoreExtension
from distributed.stealing import WorkStealing
from distributed.stories import scheduler_story
from distributed.utils import (
All,
TimeoutError,
Expand Down Expand Up @@ -7537,9 +7538,7 @@ def transitions(self, recommendations: dict):
def story(self, *keys):
"""Get all transitions that touch one of the input keys"""
keys = {key.key if isinstance(key, TaskState) else key for key in keys}
return [
t for t in self.transition_log if t[0] in keys or keys.intersection(t[3])
]
return scheduler_story(keys, self.transition_log)

transition_story = story

Expand Down
44 changes: 44 additions & 0 deletions distributed/stories.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from typing import Iterable


def scheduler_story(keys: set, transition_log: Iterable) -> list:
"""Creates a story from the scheduler transition log given a set of keys
describing tasks or stimuli.

Parameters
----------
keys : set
A set of task `keys` or `stimulus_id`'s
log : iterable
The scheduler transition log

Returns
-------
story : list
"""
return [t for t in transition_log if t[0] in keys or keys.intersection(t[3])]


def worker_story(keys: set, log: Iterable) -> list:
"""Creates a story from the worker log given a set of keys
describing tasks or stimuli.

Parameters
----------
keys : set
A set of task `keys` or `stimulus_id`'s
log : iterable
The worker log

Returns
-------
story : list
"""
return [
msg
for msg in log
if any(key in msg for key in keys)
or any(
key in c for key in keys for c in msg if isinstance(c, (tuple, list, set))
)
]
20 changes: 4 additions & 16 deletions distributed/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
tokenize,
wait,
)
from distributed.cluster_dump import load_cluster_dump
from distributed.comm import CommClosedError
from distributed.compatibility import LINUX, WINDOWS
from distributed.core import Status
Expand Down Expand Up @@ -7261,22 +7262,9 @@ def test_print_simple(capsys):


def _verify_cluster_dump(url, format: str, addresses: set[str]) -> dict:
fsspec = pytest.importorskip("fsspec")

url = str(url)
if format == "msgpack":
import msgpack

url += ".msgpack.gz"
loader = msgpack.unpack
else:
import yaml

url += ".yaml"
loader = yaml.safe_load

with fsspec.open(url, mode="rb", compression="infer") as f:
state = loader(f)
fsspec = pytest.importorskip("fsspec") # for load_cluster_dump
url = str(url) + (".msgpack.gz" if format == "msgpack" else ".yaml")
state = load_cluster_dump(url)

assert isinstance(state, dict)
assert "scheduler" in state
Expand Down
Loading