Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cluster dump utilities #5920

Merged
merged 29 commits into from
Mar 23, 2022
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
07539ec
Initial stab
sjperkins Mar 9, 2022
5c727a8
Better error message
sjperkins Mar 9, 2022
1d957c8
Task states should be in memory
sjperkins Mar 9, 2022
e2375f3
Clean up test case
sjperkins Mar 9, 2022
6063d68
WIP
sjperkins Mar 10, 2022
62c8810
Finalise DumpInspector class
sjperkins Mar 10, 2022
dea9d7e
In case of unresponsive workers, the worker dump might be a string de…
sjperkins Mar 11, 2022
75a4f6a
Provide story docstrings
sjperkins Mar 11, 2022
3c8821b
Merge branch 'main' into cluster_dump_utilities
sjperkins Mar 14, 2022
dee99de
Document load_cluster_dump
sjperkins Mar 14, 2022
4c69599
Annotate story return types
sjperkins Mar 14, 2022
cbd5609
Test with {submit,map} instead of dask.array
sjperkins Mar 14, 2022
182b3ea
Fix typo
sjperkins Mar 14, 2022
b233000
Test output of tasks_in_state
sjperkins Mar 14, 2022
0381454
Use defaultdict to group tasks with a duplicate key
sjperkins Mar 14, 2022
17e4136
Merge branch 'main' into cluster_dump_utilities
sjperkins Mar 15, 2022
15a0719
Split scheduler and worker task/story retrieval into separate functions
sjperkins Mar 15, 2022
51bb587
WIP Cluster dump tests
sjperkins Mar 15, 2022
7777724
Finalise task and story tests
sjperkins Mar 15, 2022
d428690
Rename DumpInspector to DumpArtefact
sjperkins Mar 15, 2022
426bef0
Remove unused earlier logic
sjperkins Mar 15, 2022
d8264ae
Add support for splitting the dump artefact into a tree of yaml files
sjperkins Mar 15, 2022
3a989bc
Support compaction of trivial state keys into a general.yaml file
sjperkins Mar 15, 2022
799325e
Address review comments
sjperkins Mar 16, 2022
1cb1d98
Remove Path conversion
sjperkins Mar 16, 2022
e9a7c3d
Convert story output from lists to dicts
sjperkins Mar 16, 2022
1ecb088
Some other test case improvements
sjperkins Mar 16, 2022
4b65b81
to_yaml -> to_yamls
sjperkins Mar 17, 2022
9f81cfc
Merge branch 'main' into cluster_dump_utilities
sjperkins Mar 23, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
221 changes: 220 additions & 1 deletion distributed/cluster_dump.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@

from __future__ import annotations

from typing import IO, Any, Awaitable, Callable, Literal
from pathlib import Path
from typing import IO, Any, Awaitable, Callable, Collection, Literal

import fsspec
import msgpack

from distributed.compatibility import to_thread
from distributed.stories import scheduler_story as _scheduler_story
from distributed.stories import worker_story as _worker_story


def _tuple_to_list(node):
Expand Down Expand Up @@ -57,3 +60,219 @@ def writer(state: dict, f: IO):
# Write from a thread so we don't block the event loop quite as badly
# (the writer will still hold the GIL a lot though).
await to_thread(writer, state, f)


def load_cluster_dump(url: str) -> dict:
"""Loads a cluster dump from a disk artefact

Parameters
----------
url : str
Name of the disk artefact. This should have either a
sjperkins marked this conversation as resolved.
Show resolved Hide resolved
`.msgpack.gz` or `yaml` suffix, depending on the dump format.

Returns
-------
state : dict
The cluster state at the time of the dump.
"""
if url.endswith(".msgpack.gz"):
mode = "rb"
reader = msgpack.unpack
elif url.endswith(".yaml"):
import yaml

mode = "r"
reader = yaml.safe_load
else:
raise ValueError(f"url ({url}) must have a .msgpack.gz or .yaml suffix")

with fsspec.open(url, mode, compression="infer") as f:
return reader(f)


class DumpArtefact:
sjperkins marked this conversation as resolved.
Show resolved Hide resolved
"""
Utility class for inspecting the state of a cluster dump

.. code-block:: python

dump = DumpArtefact("dump.msgpack.gz")
memory_tasks = dump.scheduler_tasks("memory")
executing_tasks = dump.worker_tasks("executing")
"""

def __init__(self, url_or_state: str | dict):
if isinstance(url_or_state, str):
self.dump = load_cluster_dump(url_or_state)
sjperkins marked this conversation as resolved.
Show resolved Hide resolved
elif isinstance(url_or_state, dict):
self.dump = url_or_state
else:
raise TypeError("'url_or_state' must be a str or dict")

def _extract_tasks(self, state: str, context: dict):
if state:
return [v for v in context.values() if v["state"] == state]
else:
return list(context.values())

def scheduler_tasks(self, state: str = "") -> list:
sjperkins marked this conversation as resolved.
Show resolved Hide resolved
"""
Returns
-------
tasks : list
The list of scheduler tasks in `state`.
"""
return self._extract_tasks(state, self.dump["scheduler"]["tasks"])

def worker_tasks(self, state: str = "") -> list:
"""
Returns
-------
tasks : list
The list of worker tasks in `state`
"""
tasks = []

for worker_dump in self.dump["workers"].values():
if self._valid_worker_dump(worker_dump):
tasks.extend(self._extract_tasks(state, worker_dump["tasks"]))

return tasks

def _valid_worker_dump(self, worker_dump):
sjperkins marked this conversation as resolved.
Show resolved Hide resolved
# Worker dumps should be a dictionaries but can also be
# strings describing comm Failures
return isinstance(worker_dump, dict)

def scheduler_story(self, *key_or_stimulus_id: str) -> list:
"""
Returns
-------
stories : list
A list of stories for the keys/stimulus ID's in `*key_or_stimulus_id`.
"""
keys = set(key_or_stimulus_id)
story = _scheduler_story(keys, self.dump["scheduler"]["transition_log"])
return list(map(tuple, story))

def worker_story(self, *key_or_stimulus_id: str) -> list:
"""
Returns
-------
stories : list
A list of stories for the keys/stimulus ID's in `*key_or_stimulus_id`.
"""
keys = set(key_or_stimulus_id)
stories: list = []
sjperkins marked this conversation as resolved.
Show resolved Hide resolved

for worker_dump in self.dump["workers"].values():
if self._valid_worker_dump(worker_dump):
# Stories are tuples, not lists
story = _worker_story(keys, worker_dump["log"])
stories.extend(map(tuple, story))

return stories

def missing_workers(self) -> list:
"""
Returns
-------
missing : list
A list of workers connected to the scheduler, but which
did not respond to requests for a state dump.
"""
scheduler_workers = self.dump["scheduler"]["workers"]
responsive_workers = self.dump["workers"]
return [
w
for w in scheduler_workers.keys()
sjperkins marked this conversation as resolved.
Show resolved Hide resolved
if w not in responsive_workers
or not self._valid_worker_dump(responsive_workers[w])
]

def split(
sjperkins marked this conversation as resolved.
Show resolved Hide resolved
self,
root_dir: str | Path | None = None,
worker_expand_keys: Collection[str] = ("config", "log", "logs", "tasks"),
scheduler_expand_keys: Collection[str] = (
"events",
"extensions",
"log",
"task_groups",
"tasks",
"transition_log",
"workers",
),
):
"""
Splits the Dump Artefact into a tree of yaml files with
`root_dir` as it's base.
sjperkins marked this conversation as resolved.
Show resolved Hide resolved

The root level of the tree contains a directory for the scheduler
and directories for each individual worker.
Each directory contains yaml files describing the state of the scheduler
or worker when the artefact was created.

In general, keys associated with the state are compacted into a `general.yaml`
file, unless they are in `scheduler_expand_keys` and `worker_expand_keys`.

Parameters
----------
root_dir : str or Path
The root directory into which the tree is written.
Defaults to the current working directory if `None`.
worker_expand_keys : iterable of str
An iterable of artefact worker keys that will be expanded
into separate yaml files.
Keys that are not in this iterable are compacted into a
`general.yaml` file.
scheduler_expand_keys : iterable of str
An iterable of artefact scheduler keys that will be expanded
into separate yaml files.
Keys that are not in this iterable are compacted into a
`general.yaml` file.
"""
import yaml

root_dir = Path(root_dir) if root_dir else Path.cwd()
dumper = yaml.CSafeDumper
scheduler_expand_keys = set(scheduler_expand_keys)
worker_expand_keys = set(worker_expand_keys)

workers = self.dump["workers"]
for info in workers.values():
worker_id = info["id"]
assert "general" not in info

# Compact smaller keys into a general dict
general = {
k: info.pop(k) for k in list(info.keys()) if k not in worker_expand_keys
sjperkins marked this conversation as resolved.
Show resolved Hide resolved
}
info["general"] = general

log_dir = root_dir / worker_id
log_dir.mkdir(parents=True, exist_ok=True)

for name, _logs in info.items():
filename = str(log_dir / f"{name}.yaml")
with open(filename, "w") as fd:
yaml.dump(_logs, fd, Dumper=dumper)

context = "scheduler"
info = self.dump[context]
assert "general" not in info
general = {
k: info.pop(k) for k in list(info.keys()) if k not in scheduler_expand_keys
}
info["general"] = general

log_dir = root_dir / context
log_dir.mkdir(parents=True, exist_ok=True)
# Compact smaller keys into a general dict

for name, _logs in info.items():
filename = str(log_dir / f"{name}.yaml")

with open(filename, "w") as fd:
yaml.dump(_logs, fd, Dumper=dumper)
5 changes: 2 additions & 3 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
from distributed.security import Security
from distributed.semaphore import SemaphoreExtension
from distributed.stealing import WorkStealing
from distributed.stories import scheduler_story
from distributed.utils import (
All,
TimeoutError,
Expand Down Expand Up @@ -7537,9 +7538,7 @@ def transitions(self, recommendations: dict):
def story(self, *keys):
"""Get all transitions that touch one of the input keys"""
keys = {key.key if isinstance(key, TaskState) else key for key in keys}
return [
t for t in self.transition_log if t[0] in keys or keys.intersection(t[3])
]
return scheduler_story(keys, self.transition_log)

transition_story = story

Expand Down
44 changes: 44 additions & 0 deletions distributed/stories.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from typing import Iterable


def scheduler_story(keys: set, transition_log: Iterable) -> list:
"""Creates a story from the scheduler transition log given a set of keys
describing tasks or stimuli.

Parameters
----------
keys : set
A set of task `keys` or `stimulus_id`'s
log : iterable
The scheduler transition log

Returns
-------
story : list
"""
return [t for t in transition_log if t[0] in keys or keys.intersection(t[3])]


def worker_story(keys: set, log: Iterable) -> list:
"""Creates a story from the worker log given a set of keys
describing tasks or stimuli.

Parameters
----------
keys : set
A set of task `keys` or `stimulus_id`'s
log : iterable
The worker log

Returns
-------
story : list
"""
return [
msg
for msg in log
if any(key in msg for key in keys)
or any(
key in c for key in keys for c in msg if isinstance(c, (tuple, list, set))
)
]
Loading