Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cluster dump utilities #5920

Merged
merged 29 commits into from
Mar 23, 2022
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
07539ec
Initial stab
sjperkins Mar 9, 2022
5c727a8
Better error message
sjperkins Mar 9, 2022
1d957c8
Task states should be in memory
sjperkins Mar 9, 2022
e2375f3
Clean up test case
sjperkins Mar 9, 2022
6063d68
WIP
sjperkins Mar 10, 2022
62c8810
Finalise DumpInspector class
sjperkins Mar 10, 2022
dea9d7e
In case of unresponsive workers, the worker dump might be a string de…
sjperkins Mar 11, 2022
75a4f6a
Provide story docstrings
sjperkins Mar 11, 2022
3c8821b
Merge branch 'main' into cluster_dump_utilities
sjperkins Mar 14, 2022
dee99de
Document load_cluster_dump
sjperkins Mar 14, 2022
4c69599
Annotate story return types
sjperkins Mar 14, 2022
cbd5609
Test with {submit,map} instead of dask.array
sjperkins Mar 14, 2022
182b3ea
Fix typo
sjperkins Mar 14, 2022
b233000
Test output of tasks_in_state
sjperkins Mar 14, 2022
0381454
Use defaultdict to group tasks with a duplicate key
sjperkins Mar 14, 2022
17e4136
Merge branch 'main' into cluster_dump_utilities
sjperkins Mar 15, 2022
15a0719
Split scheduler and worker task/story retrieval into separate functions
sjperkins Mar 15, 2022
51bb587
WIP Cluster dump tests
sjperkins Mar 15, 2022
7777724
Finalise task and story tests
sjperkins Mar 15, 2022
d428690
Rename DumpInspector to DumpArtefact
sjperkins Mar 15, 2022
426bef0
Remove unused earlier logic
sjperkins Mar 15, 2022
d8264ae
Add support for splitting the dump artefact into a tree of yaml files
sjperkins Mar 15, 2022
3a989bc
Support compaction of trivial state keys into a general.yaml file
sjperkins Mar 15, 2022
799325e
Address review comments
sjperkins Mar 16, 2022
1cb1d98
Remove Path conversion
sjperkins Mar 16, 2022
e9a7c3d
Convert story output from lists to dicts
sjperkins Mar 16, 2022
1ecb088
Some other test case improvements
sjperkins Mar 16, 2022
4b65b81
to_yaml -> to_yamls
sjperkins Mar 17, 2022
9f81cfc
Merge branch 'main' into cluster_dump_utilities
sjperkins Mar 23, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 110 additions & 0 deletions distributed/cluster_dump.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import msgpack

from distributed.compatibility import to_thread
from distributed.stories import scheduler_story, worker_story


def _tuple_to_list(node):
Expand Down Expand Up @@ -57,3 +58,112 @@ def writer(state: dict, f: IO):
# Write from a thread so we don't block the event loop quite as badly
# (the writer will still hold the GIL a lot though).
await to_thread(writer, state, f)


def load_cluster_dump(url: str):
sjperkins marked this conversation as resolved.
Show resolved Hide resolved
sjperkins marked this conversation as resolved.
Show resolved Hide resolved
if url.endswith(".msgpack.gz"):
mode = "rb"
reader = msgpack.unpack
elif url.endswith(".yaml"):
import yaml

mode = "r"
reader = yaml.safe_load
else:
raise ValueError(f"url ({url}) must have a .msgpack.gz or .yaml suffix")

with fsspec.open(url, mode, compression="infer") as f:
return reader(f)


class DumpInspector:
"""
Utility class for inspecting the state of a cluster dump

.. code-block:: python

inspector = DumpInspect("dump.msgpack.gz")
memory_tasks = inspector.tasks_in_state("memory")
released_tasks = inspector.tasks_in_state("released)
sjperkins marked this conversation as resolved.
Show resolved Hide resolved
"""

def __init__(self, url_or_state: str | dict):
if isinstance(url_or_state, str):
self.dump = load_cluster_dump(url_or_state)
sjperkins marked this conversation as resolved.
Show resolved Hide resolved
elif isinstance(url_or_state, dict):
self.dump = url_or_state
else:
raise TypeError("'url_or_state' must be a str or dict")

def tasks_in_state(self, state: str = "", workers: bool = False) -> dict:
"""
Returns
-------
tasks : dict
A dictionary of scheduler tasks with state `state`.
worker tasks are included if `workers=True`
"""
stasks = self.dump["scheduler"]["tasks"]

if state:
tasks = {k: v for k, v in stasks.items() if v["state"] == state}
else:
tasks = stasks.copy()

if not workers:
return tasks

for worker_dump in self.dump["workers"].values():
if self._valid_worker_dump(worker_dump):
if state:
tasks.update(
(k, v)
for k, v in worker_dump["tasks"].items()
if v["state"] == state
)
else:
tasks.update(worker_dump["tasks"])
sjperkins marked this conversation as resolved.
Show resolved Hide resolved

return tasks

def _valid_worker_dump(self, worker_dump):
sjperkins marked this conversation as resolved.
Show resolved Hide resolved
# Worker dumps should be a dictionaries but can also be
# strings describing comm Failures
return isinstance(worker_dump, dict)

def story(self, *key_or_stimulus_id: str, workers: bool = False) -> list:
"""
Returns
-------
stories : list
A list of stories for the keys/stimulus ID's in `*key_or_stimulus_id`.
worker stories are included if `workers=True`
"""
keys = set(key_or_stimulus_id)
story = scheduler_story(keys, self.dump["scheduler"]["transition_log"])

if not workers:
return story

for wdump in self.dump["workers"].values():
if self._valid_worker_dump(wdump):
story.extend(worker_story(keys, wdump["log"]))

return story

def missing_workers(self) -> list:
"""
Returns
-------
missing : list
A list of workers connected to the scheduler, but which
did not respond to requests for a state dump.
"""
scheduler_workers = self.dump["scheduler"]["workers"]
responsive_workers = self.dump["workers"]
return [
w
for w in scheduler_workers.keys()
sjperkins marked this conversation as resolved.
Show resolved Hide resolved
if w not in responsive_workers
or not self._valid_worker_dump(responsive_workers[w])
]
5 changes: 2 additions & 3 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
from .security import Security
from .semaphore import SemaphoreExtension
from .stealing import WorkStealing
from .stories import scheduler_story
from .utils import (
All,
TimeoutError,
Expand Down Expand Up @@ -7533,9 +7534,7 @@ def transitions(self, recommendations: dict):
def story(self, *keys):
"""Get all transitions that touch one of the input keys"""
keys = {key.key if isinstance(key, TaskState) else key for key in keys}
return [
t for t in self.transition_log if t[0] in keys or keys.intersection(t[3])
]
return scheduler_story(keys, self.transition_log)

transition_story = story

Expand Down
44 changes: 44 additions & 0 deletions distributed/stories.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from typing import Iterable


def scheduler_story(keys: set, transition_log: Iterable):
"""Creates a story from the scheduler transition log given a set of keys
describing tasks or stimuli.

Parameters
----------
keys : set
A set of task `keys` or `stimulus_id`'s
log : iterable
The scheduler transition log

Returns
-------
story : list
"""
return [t for t in transition_log if t[0] in keys or keys.intersection(t[3])]


def worker_story(keys: set, log: Iterable):
"""Creates a story from the worker log given a set of keys
describing tasks or stimuli.

Parameters
----------
keys : set
A set of task `keys` or `stimulus_id`'s
log : iterable
The worker log

Returns
-------
story : list
"""
return [
msg
for msg in log
if any(key in msg for key in keys)
or any(
key in c for key in keys for c in msg if isinstance(c, (tuple, list, set))
)
]
46 changes: 30 additions & 16 deletions distributed/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from tlz import concat, first, identity, isdistinct, merge, pluck, valmap

import dask
import dask.array as da
import dask.bag as db
from dask import delayed
from dask.optimization import SubgraphCallable
Expand Down Expand Up @@ -63,6 +64,7 @@
tokenize,
wait,
)
from distributed.cluster_dump import DumpInspector, load_cluster_dump
from distributed.comm import CommClosedError
from distributed.compatibility import LINUX, WINDOWS
from distributed.core import Status
Expand Down Expand Up @@ -7261,22 +7263,9 @@ def test_print_simple(capsys):


def _verify_cluster_dump(url, format: str, addresses: set[str]) -> dict:
fsspec = pytest.importorskip("fsspec")

url = str(url)
if format == "msgpack":
import msgpack

url += ".msgpack.gz"
loader = msgpack.unpack
else:
import yaml

url += ".yaml"
loader = yaml.safe_load

with fsspec.open(url, mode="rb", compression="infer") as f:
state = loader(f)
fsspec = pytest.importorskip("fsspec") # for load_cluster_dump
url = str(url) + (".msgpack.gz" if format == "msgpack" else ".yaml")
state = load_cluster_dump(url)

assert isinstance(state, dict)
assert "scheduler" in state
Expand Down Expand Up @@ -7345,6 +7334,31 @@ async def test_dump_cluster_state_json(c, s, a, b, tmp_path, local):
await c.dump_cluster_state(filename, format="json")


@pytest.mark.parametrize("local", [True, False])
@pytest.mark.parametrize("_format", ["msgpack", "yaml"])
sjperkins marked this conversation as resolved.
Show resolved Hide resolved
@pytest.mark.parametrize("workers", [True, False])
@gen_cluster(client=True)
async def test_inspect_cluster_dump(c, s, a, b, tmp_path, _format, local, workers):
filename = tmp_path / "foo"
if not local:
pytest.importorskip("fsspec")
# Make it look like an fsspec path
filename = f"file://{filename}"

A = da.ones(100, chunks=25)
sjperkins marked this conversation as resolved.
Show resolved Hide resolved
await c.persist(A)
await c.dump_cluster_state(filename, format=_format)

suffix = ".gz" if _format == "msgpack" else ""
inspector = DumpInspector(f"{filename}.{_format}{suffix}")
tasks = inspector.tasks_in_state("memory", workers=workers)
assert set(tasks.keys()) == set(map(str, A.__dask_keys__()))
it = iter(tasks.keys())
stories = inspector.story(next(it), workers=workers)
sjperkins marked this conversation as resolved.
Show resolved Hide resolved
stories = inspector.story(next(it), next(it), workers=workers)
missing = inspector.missing_workers()


@gen_cluster(client=True)
async def test_dump_cluster_state_exclude_default(c, s, a, b, tmp_path):
futs = c.map(inc, range(10))
Expand Down
15 changes: 3 additions & 12 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
from .security import Security
from .shuffle import ShuffleWorkerExtension
from .sizeof import safe_sizeof as sizeof
from .stories import worker_story
from .threadpoolexecutor import ThreadPoolExecutor
from .threadpoolexecutor import secede as tpe_secede
from .utils import (
Expand Down Expand Up @@ -2894,18 +2895,8 @@ def stateof(self, key: str) -> dict[str, Any]:
}

def story(self, *keys_or_tasks: str | TaskState) -> list[tuple]:
keys = [e.key if isinstance(e, TaskState) else e for e in keys_or_tasks]
return [
msg
for msg in self.log
if any(key in msg for key in keys)
or any(
key in c
for key in keys
for c in msg
if isinstance(c, (tuple, list, set))
)
]
keys = {e.key if isinstance(e, TaskState) else e for e in keys_or_tasks}
return worker_story(keys, self.log)

def ensure_communicating(self) -> None:
stimulus_id = f"ensure-communicating-{time()}"
Expand Down