From 07539ec8ef22a213dcb2533f6f970cdfd7c4afbe Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Wed, 9 Mar 2022 17:04:06 +0200 Subject: [PATCH 01/26] Initial stab --- distributed/cluster_dump.py | 40 ++++++++++++++++++++++++++++++++ distributed/tests/test_client.py | 24 +++++++++++++++++++ 2 files changed, 64 insertions(+) diff --git a/distributed/cluster_dump.py b/distributed/cluster_dump.py index bf00708f3b..0eff7d5ee4 100644 --- a/distributed/cluster_dump.py +++ b/distributed/cluster_dump.py @@ -57,3 +57,43 @@ def writer(state: dict, f: IO): # Write from a thread so we don't block the event loop quite as badly # (the writer will still hold the GIL a lot though). await to_thread(writer, state, f) + + +def get_tasks_in_state( + url: str, + state: str, + worker: bool = False, +) -> dict: + if url.endswith(".msgpack.gz"): + mode = "rb" + reader = msgpack.unpack + elif url.endswith(".yaml"): + import yaml + + mode = "r" + reader = yaml.safe_load + else: + raise ValueError(f"url ({url}) must have a .msgpack.gz or .yaml suffix") + + with fsspec.open(url, mode, compression="infer") as f: + dump = reader(f) + + context_str = "workers" if worker else "scheduler" + + try: + context = dump[context_str] + except KeyError: + raise ValueError(f"The '{context_str}' was not present in the dumped state") + + try: + tasks = context["tasks"] + except KeyError: + raise ValueError( + f"'tasks' was not present within the '{context_str}' " + f"context of the dumped state" + ) + + if state: + return {k: v for k, v in tasks.items() if v["state"] == state} + + return tasks diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index abd496e3eb..6bed50e972 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -31,6 +31,7 @@ from tlz import concat, first, identity, isdistinct, merge, pluck, valmap import dask +import dask.array as da import dask.bag as db from dask import delayed from dask.optimization import SubgraphCallable @@ -63,6 +64,7 @@ tokenize, wait, ) +from distributed.cluster_dump import get_tasks_in_state from distributed.comm import CommClosedError from distributed.compatibility import LINUX, WINDOWS from distributed.core import Status @@ -7345,6 +7347,28 @@ async def test_dump_cluster_state_json(c, s, a, b, tmp_path, local): await c.dump_cluster_state(filename, format="json") +@pytest.mark.parametrize("local", [True, False]) +@pytest.mark.parametrize("_format", ["msgpack", "yaml"]) +@gen_cluster(client=True) +async def test_get_cluster_state(c, s, a, b, tmp_path, _format, local): + filename = tmp_path / "foo" + if not local: + pytest.importorskip("fsspec") + # Make it look like an fsspec path + filename = f"file://{filename}" + + A = da.ones(100, chunks=25) + B = A.persist() + + await c.persist(A) + await c.dump_cluster_state(filename, format=_format) + + suffix = ".gz" if _format == "msgpack" else "" + outfile = f"{filename}.{_format}{suffix}" + tasks = get_tasks_in_state(outfile, "") + assert set(tasks.keys()) == set(map(str, A.__dask_keys__())) + + @gen_cluster(client=True) async def test_dump_cluster_state_exclude_default(c, s, a, b, tmp_path): futs = c.map(inc, range(10)) From 5c727a859b032bf85be83395afb679adfd6434e4 Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Wed, 9 Mar 2022 17:24:25 +0200 Subject: [PATCH 02/26] Better error message --- distributed/cluster_dump.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/distributed/cluster_dump.py b/distributed/cluster_dump.py index 0eff7d5ee4..f4d6240507 100644 --- a/distributed/cluster_dump.py +++ b/distributed/cluster_dump.py @@ -83,7 +83,9 @@ def get_tasks_in_state( try: context = dump[context_str] except KeyError: - raise ValueError(f"The '{context_str}' was not present in the dumped state") + raise ValueError( + f"The '{context_str}' context was not present in the dumped state" + ) try: tasks = context["tasks"] From 1d957c829b93c6e5f5c20755b6458b32be34e0bd Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Wed, 9 Mar 2022 17:24:45 +0200 Subject: [PATCH 03/26] Task states should be in memory --- distributed/tests/test_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index 6bed50e972..f7a5ca5ade 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -7365,7 +7365,7 @@ async def test_get_cluster_state(c, s, a, b, tmp_path, _format, local): suffix = ".gz" if _format == "msgpack" else "" outfile = f"{filename}.{_format}{suffix}" - tasks = get_tasks_in_state(outfile, "") + tasks = get_tasks_in_state(outfile, "memory") assert set(tasks.keys()) == set(map(str, A.__dask_keys__())) From e2375f3b5fe72ccc37b28e4a56498b359b149f3a Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Wed, 9 Mar 2022 17:38:01 +0200 Subject: [PATCH 04/26] Clean up test case --- distributed/tests/test_client.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index f7a5ca5ade..7a7967d2d4 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -7358,8 +7358,6 @@ async def test_get_cluster_state(c, s, a, b, tmp_path, _format, local): filename = f"file://{filename}" A = da.ones(100, chunks=25) - B = A.persist() - await c.persist(A) await c.dump_cluster_state(filename, format=_format) From 6063d6841cb7e99a1c99b291838235fafdcb57d6 Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Thu, 10 Mar 2022 12:37:18 +0200 Subject: [PATCH 05/26] WIP --- distributed/cluster_dump.py | 29 +++++++++++++++++++++++------ distributed/scheduler.py | 7 ++++--- distributed/stories.py | 13 +++++++++++++ distributed/worker.py | 16 ++++------------ 4 files changed, 44 insertions(+), 21 deletions(-) create mode 100644 distributed/stories.py diff --git a/distributed/cluster_dump.py b/distributed/cluster_dump.py index f4d6240507..fbc4346fb3 100644 --- a/distributed/cluster_dump.py +++ b/distributed/cluster_dump.py @@ -59,11 +59,7 @@ def writer(state: dict, f: IO): await to_thread(writer, state, f) -def get_tasks_in_state( - url: str, - state: str, - worker: bool = False, -) -> dict: +def load_cluster_dump(url: str): if url.endswith(".msgpack.gz"): mode = "rb" reader = msgpack.unpack @@ -76,8 +72,29 @@ def get_tasks_in_state( raise ValueError(f"url ({url}) must have a .msgpack.gz or .yaml suffix") with fsspec.open(url, mode, compression="infer") as f: - dump = reader(f) + return reader(f) + + +class ClusterInspector: + def __init__( + self, url_or_state: str | dict, context: Literal["scheduler" | "workers"] + ): + if isinstance(url_or_state, str): + self.dump = load_cluster_dump(url_or_state) + elif isinstance(url_or_state, dict): + self.dump = url_or_state + else: + raise TypeError(f"'url_or_state' must be a str or dict") + self.context = context + + +def get_tasks_in_state( + url: str, + state: str, + worker: bool = False, +) -> dict: + dump = load_cluster_dump(url) context_str = "workers" if worker else "scheduler" try: diff --git a/distributed/scheduler.py b/distributed/scheduler.py index ce0622c0c2..5564001e41 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -7533,9 +7533,10 @@ def transitions(self, recommendations: dict): def story(self, *keys): """Get all transitions that touch one of the input keys""" keys = {key.key if isinstance(key, TaskState) else key for key in keys} - return [ - t for t in self.transition_log if t[0] in keys or keys.intersection(t[3]) - ] + + from .stories import scheduler_story + + return scheduler_story(keys, self.transition_log) transition_story = story diff --git a/distributed/stories.py b/distributed/stories.py new file mode 100644 index 0000000000..18bb9028c6 --- /dev/null +++ b/distributed/stories.py @@ -0,0 +1,13 @@ +def scheduler_story(keys: set, transition_log: list): + return [t for t in transition_log if t[0] in keys or keys.intersection(t[3])] + + +def worker_story(keys: set, log: list): + return [ + msg + for msg in log + if any(key in msg for key in keys) + or any( + key in c for key in keys for c in msg if isinstance(c, (tuple, list, set)) + ) + ] diff --git a/distributed/worker.py b/distributed/worker.py index 589ab3aee5..dfa486516f 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -2894,18 +2894,10 @@ def stateof(self, key: str) -> dict[str, Any]: } def story(self, *keys_or_tasks: str | TaskState) -> list[tuple]: - keys = [e.key if isinstance(e, TaskState) else e for e in keys_or_tasks] - return [ - msg - for msg in self.log - if any(key in msg for key in keys) - or any( - key in c - for key in keys - for c in msg - if isinstance(c, (tuple, list, set)) - ) - ] + keys = {e.key if isinstance(e, TaskState) else e for e in keys_or_tasks} + from .stories import worker_story + + return worker_story(keys, self.log) def ensure_communicating(self) -> None: stimulus_id = f"ensure-communicating-{time()}" From 62c88102e28f311888c2cb6088b62c3156f5301c Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Thu, 10 Mar 2022 16:05:18 +0200 Subject: [PATCH 06/26] Finalise DumpInspector class --- distributed/cluster_dump.py | 111 +++++++++++++++++++++---------- distributed/scheduler.py | 4 +- distributed/stories.py | 7 +- distributed/tests/test_client.py | 32 ++++----- distributed/worker.py | 3 +- 5 files changed, 94 insertions(+), 63 deletions(-) diff --git a/distributed/cluster_dump.py b/distributed/cluster_dump.py index fbc4346fb3..095eb4a1c5 100644 --- a/distributed/cluster_dump.py +++ b/distributed/cluster_dump.py @@ -8,6 +8,7 @@ import msgpack from distributed.compatibility import to_thread +from distributed.stories import scheduler_story, worker_story def _tuple_to_list(node): @@ -75,44 +76,82 @@ def load_cluster_dump(url: str): return reader(f) -class ClusterInspector: - def __init__( - self, url_or_state: str | dict, context: Literal["scheduler" | "workers"] - ): +class DumpInspector: + """ + Utility class for inspecting the state of a cluster dump + + .. code-block:: python + + inspector = DumpInspect("dump.msgpack.gz") + memory_tasks = inspector.tasks_in_state("memory") + released_tasks = inspector.tasks_in_state("released) + """ + + def __init__(self, url_or_state: str | dict): if isinstance(url_or_state, str): self.dump = load_cluster_dump(url_or_state) elif isinstance(url_or_state, dict): self.dump = url_or_state else: - raise TypeError(f"'url_or_state' must be a str or dict") - - self.context = context - - -def get_tasks_in_state( - url: str, - state: str, - worker: bool = False, -) -> dict: - dump = load_cluster_dump(url) - context_str = "workers" if worker else "scheduler" - - try: - context = dump[context_str] - except KeyError: - raise ValueError( - f"The '{context_str}' context was not present in the dumped state" - ) - - try: - tasks = context["tasks"] - except KeyError: - raise ValueError( - f"'tasks' was not present within the '{context_str}' " - f"context of the dumped state" - ) - - if state: - return {k: v for k, v in tasks.items() if v["state"] == state} - - return tasks + raise TypeError("'url_or_state' must be a str or dict") + + def tasks_in_state(self, state: str = "", workers: bool = False) -> dict: + """ + Returns + ------- + tasks : dict + A dictionary of scheduler tasks with state `state`. + worker tasks are included if `workers=True` + """ + stasks = self.dump["scheduler"]["tasks"] + + if state: + tasks = {k: v for k, v in stasks.items() if v["state"] == state} + else: + tasks = stasks.copy() + + if not workers: + return tasks + + for worker_dump in self.dump["workers"].values(): + if state: + tasks.update( + (k, v) + for k, v in worker_dump["tasks"].items() + if v["state"] == state + ) + else: + tasks.update(worker_dump["tasks"]) + + return tasks + + def story(self, *key_or_stimulus_id: str, workers: bool = False) -> list: + """ + Returns + ------- + stories : list + A list of stories for the keys/stimulus ID's in `*key_or_stimulus_id`. + worker stories are included if `workers=True` + """ + keys = set(key_or_stimulus_id) + story = scheduler_story(keys, self.dump["scheduler"]["transition_log"]) + + if not workers: + return story + + for wdump in self.dump["workers"].values(): + story.extend(worker_story(keys, wdump["log"])) + + return story + + def missing_workers(self) -> list: + """ + Returns + ------- + missing : list + A list of workers connected to the scheduler, but which + did not respond to requests for a state dump. + """ + scheduler_workers = self.dump["scheduler"]["workers"] + responsive_workers = set(self.dump["workers"].keys()) + return [w for w in scheduler_workers.keys() if w not in responsive_workers] diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 5564001e41..7a2bba9936 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -84,6 +84,7 @@ from .security import Security from .semaphore import SemaphoreExtension from .stealing import WorkStealing +from .stories import scheduler_story from .utils import ( All, TimeoutError, @@ -7533,9 +7534,6 @@ def transitions(self, recommendations: dict): def story(self, *keys): """Get all transitions that touch one of the input keys""" keys = {key.key if isinstance(key, TaskState) else key for key in keys} - - from .stories import scheduler_story - return scheduler_story(keys, self.transition_log) transition_story = story diff --git a/distributed/stories.py b/distributed/stories.py index 18bb9028c6..1a824e3016 100644 --- a/distributed/stories.py +++ b/distributed/stories.py @@ -1,8 +1,11 @@ -def scheduler_story(keys: set, transition_log: list): +from typing import Iterable + + +def scheduler_story(keys: set, transition_log: Iterable): return [t for t in transition_log if t[0] in keys or keys.intersection(t[3])] -def worker_story(keys: set, log: list): +def worker_story(keys: set, log: Iterable): return [ msg for msg in log diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index 7a7967d2d4..4387b2c0b1 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -64,7 +64,7 @@ tokenize, wait, ) -from distributed.cluster_dump import get_tasks_in_state +from distributed.cluster_dump import DumpInspector, load_cluster_dump from distributed.comm import CommClosedError from distributed.compatibility import LINUX, WINDOWS from distributed.core import Status @@ -7263,22 +7263,9 @@ def test_print_simple(capsys): def _verify_cluster_dump(url, format: str, addresses: set[str]) -> dict: - fsspec = pytest.importorskip("fsspec") - - url = str(url) - if format == "msgpack": - import msgpack - - url += ".msgpack.gz" - loader = msgpack.unpack - else: - import yaml - - url += ".yaml" - loader = yaml.safe_load - - with fsspec.open(url, mode="rb", compression="infer") as f: - state = loader(f) + fsspec = pytest.importorskip("fsspec") # for load_cluster_dump + url = str(url) + (".msgpack.gz" if format == "msgpack" else ".yaml") + state = load_cluster_dump(url) assert isinstance(state, dict) assert "scheduler" in state @@ -7349,8 +7336,9 @@ async def test_dump_cluster_state_json(c, s, a, b, tmp_path, local): @pytest.mark.parametrize("local", [True, False]) @pytest.mark.parametrize("_format", ["msgpack", "yaml"]) +@pytest.mark.parametrize("workers", [True, False]) @gen_cluster(client=True) -async def test_get_cluster_state(c, s, a, b, tmp_path, _format, local): +async def test_inspect_cluster_dump(c, s, a, b, tmp_path, _format, local, workers): filename = tmp_path / "foo" if not local: pytest.importorskip("fsspec") @@ -7362,9 +7350,13 @@ async def test_get_cluster_state(c, s, a, b, tmp_path, _format, local): await c.dump_cluster_state(filename, format=_format) suffix = ".gz" if _format == "msgpack" else "" - outfile = f"{filename}.{_format}{suffix}" - tasks = get_tasks_in_state(outfile, "memory") + inspector = DumpInspector(f"{filename}.{_format}{suffix}") + tasks = inspector.tasks_in_state("memory", workers=workers) assert set(tasks.keys()) == set(map(str, A.__dask_keys__())) + it = iter(tasks.keys()) + stories = inspector.story(next(it), workers=workers) + stories = inspector.story(next(it), next(it), workers=workers) + missing = inspector.missing_workers() @gen_cluster(client=True) diff --git a/distributed/worker.py b/distributed/worker.py index dfa486516f..3e482e6378 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -71,6 +71,7 @@ from .security import Security from .shuffle import ShuffleWorkerExtension from .sizeof import safe_sizeof as sizeof +from .stories import worker_story from .threadpoolexecutor import ThreadPoolExecutor from .threadpoolexecutor import secede as tpe_secede from .utils import ( @@ -2895,8 +2896,6 @@ def stateof(self, key: str) -> dict[str, Any]: def story(self, *keys_or_tasks: str | TaskState) -> list[tuple]: keys = {e.key if isinstance(e, TaskState) else e for e in keys_or_tasks} - from .stories import worker_story - return worker_story(keys, self.log) def ensure_communicating(self) -> None: From dea9d7e1837429c2dd7f320fb4089d88606acd54 Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Fri, 11 Mar 2022 11:49:19 +0200 Subject: [PATCH 07/26] In case of unresponsive workers, the worker dump might be a string describing the comm error --- distributed/cluster_dump.py | 34 +++++++++++++++++++++++----------- 1 file changed, 23 insertions(+), 11 deletions(-) diff --git a/distributed/cluster_dump.py b/distributed/cluster_dump.py index 095eb4a1c5..13f8bfb444 100644 --- a/distributed/cluster_dump.py +++ b/distributed/cluster_dump.py @@ -114,17 +114,23 @@ def tasks_in_state(self, state: str = "", workers: bool = False) -> dict: return tasks for worker_dump in self.dump["workers"].values(): - if state: - tasks.update( - (k, v) - for k, v in worker_dump["tasks"].items() - if v["state"] == state - ) - else: - tasks.update(worker_dump["tasks"]) + if self._valid_worker_dump(worker_dump): + if state: + tasks.update( + (k, v) + for k, v in worker_dump["tasks"].items() + if v["state"] == state + ) + else: + tasks.update(worker_dump["tasks"]) return tasks + def _valid_worker_dump(self, worker_dump): + # Worker dumps should be a dictionaries but can also be + # strings describing comm Failures + return isinstance(worker_dump, dict) + def story(self, *key_or_stimulus_id: str, workers: bool = False) -> list: """ Returns @@ -140,7 +146,8 @@ def story(self, *key_or_stimulus_id: str, workers: bool = False) -> list: return story for wdump in self.dump["workers"].values(): - story.extend(worker_story(keys, wdump["log"])) + if self._valid_worker_dump(wdump): + story.extend(worker_story(keys, wdump["log"])) return story @@ -153,5 +160,10 @@ def missing_workers(self) -> list: did not respond to requests for a state dump. """ scheduler_workers = self.dump["scheduler"]["workers"] - responsive_workers = set(self.dump["workers"].keys()) - return [w for w in scheduler_workers.keys() if w not in responsive_workers] + responsive_workers = self.dump["workers"] + return [ + w + for w in scheduler_workers.keys() + if w not in responsive_workers + or not self._valid_worker_dump(responsive_workers[w]) + ] From 75a4f6a6fc07d40d6b88c15154a0743f94e2de9f Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Fri, 11 Mar 2022 14:25:55 +0200 Subject: [PATCH 08/26] Provide story docstrings --- distributed/stories.py | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/distributed/stories.py b/distributed/stories.py index 1a824e3016..50a7b5cdd5 100644 --- a/distributed/stories.py +++ b/distributed/stories.py @@ -2,10 +2,38 @@ def scheduler_story(keys: set, transition_log: Iterable): + """Creates a story from the scheduler transition log given a set of keys + describing tasks or stimuli. + + Parameters + ---------- + keys : set + A set of task `keys` or `stimulus_id`'s + log : iterable + The scheduler transition log + + Returns + ------- + story : list + """ return [t for t in transition_log if t[0] in keys or keys.intersection(t[3])] def worker_story(keys: set, log: Iterable): + """Creates a story from the worker log given a set of keys + describing tasks or stimuli. + + Parameters + ---------- + keys : set + A set of task `keys` or `stimulus_id`'s + log : iterable + The worker log + + Returns + ------- + story : list + """ return [ msg for msg in log From dee99de0232f9e8e0f438fd44105225f42d5b8c8 Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Mon, 14 Mar 2022 12:23:49 +0200 Subject: [PATCH 09/26] Document load_cluster_dump --- distributed/cluster_dump.py | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/distributed/cluster_dump.py b/distributed/cluster_dump.py index 13f8bfb444..21e62c20c8 100644 --- a/distributed/cluster_dump.py +++ b/distributed/cluster_dump.py @@ -60,7 +60,20 @@ def writer(state: dict, f: IO): await to_thread(writer, state, f) -def load_cluster_dump(url: str): +def load_cluster_dump(url: str) -> dict: + """Loads a cluster dump from a disk artefact + + Parameters + ---------- + url : str + Name of the dump artefact. This should have either a + `.msgpack.gz` or `yaml` suffix, depending on the dump format. + + Returns + ------- + state : dict + The cluster state at the time of the dump. + """ if url.endswith(".msgpack.gz"): mode = "rb" reader = msgpack.unpack @@ -84,7 +97,7 @@ class DumpInspector: inspector = DumpInspect("dump.msgpack.gz") memory_tasks = inspector.tasks_in_state("memory") - released_tasks = inspector.tasks_in_state("released) + released_tasks = inspector.tasks_in_state("released") """ def __init__(self, url_or_state: str | dict): From 4c69599d089187e3b733960a2db6a8f232b89f87 Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Mon, 14 Mar 2022 12:57:01 +0200 Subject: [PATCH 10/26] Annotate story return types --- distributed/stories.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/distributed/stories.py b/distributed/stories.py index 50a7b5cdd5..d17e54df53 100644 --- a/distributed/stories.py +++ b/distributed/stories.py @@ -1,7 +1,7 @@ from typing import Iterable -def scheduler_story(keys: set, transition_log: Iterable): +def scheduler_story(keys: set, transition_log: Iterable) -> list: """Creates a story from the scheduler transition log given a set of keys describing tasks or stimuli. @@ -19,7 +19,7 @@ def scheduler_story(keys: set, transition_log: Iterable): return [t for t in transition_log if t[0] in keys or keys.intersection(t[3])] -def worker_story(keys: set, log: Iterable): +def worker_story(keys: set, log: Iterable) -> list: """Creates a story from the worker log given a set of keys describing tasks or stimuli. From cbd56094dcb1fb06bd9180a4e3489fee35ea5e83 Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Mon, 14 Mar 2022 14:13:19 +0200 Subject: [PATCH 11/26] Test with {submit,map} instead of dask.array --- distributed/tests/test_client.py | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index 4387b2c0b1..bd122fcef6 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -31,12 +31,12 @@ from tlz import concat, first, identity, isdistinct, merge, pluck, valmap import dask -import dask.array as da import dask.bag as db from dask import delayed from dask.optimization import SubgraphCallable from dask.utils import parse_timedelta, stringify, tmpfile +import distributed from distributed import ( CancelledError, Executor, @@ -7334,6 +7334,11 @@ async def test_dump_cluster_state_json(c, s, a, b, tmp_path, local): await c.dump_cluster_state(filename, format="json") +def blocked_inc(x, event): + event.wait() + return x + 1 + + @pytest.mark.parametrize("local", [True, False]) @pytest.mark.parametrize("_format", ["msgpack", "yaml"]) @pytest.mark.parametrize("workers", [True, False]) @@ -7345,14 +7350,23 @@ async def test_inspect_cluster_dump(c, s, a, b, tmp_path, _format, local, worker # Make it look like an fsspec path filename = f"file://{filename}" - A = da.ones(100, chunks=25) - await c.persist(A) + futs = c.map(inc, range(10)) + fut_keys = {f.key for f in futs} + await c.gather(futs) + + event = distributed.Event() + inc_blocked = c.submit(blocked_inc, 1, event=event) + await c.dump_cluster_state(filename, format=_format) + event.set() + suffix = ".gz" if _format == "msgpack" else "" inspector = DumpInspector(f"{filename}.{_format}{suffix}") tasks = inspector.tasks_in_state("memory", workers=workers) - assert set(tasks.keys()) == set(map(str, A.__dask_keys__())) + assert fut_keys.issubset(tasks.keys()) + assert inc_blocked.key in inspector.tasks_in_state("executing", workers=True) + it = iter(tasks.keys()) stories = inspector.story(next(it), workers=workers) stories = inspector.story(next(it), next(it), workers=workers) From 182b3eae99a5caa8d4f6e64fb5a3a44a02772338 Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Mon, 14 Mar 2022 14:29:38 +0200 Subject: [PATCH 12/26] Fix typo --- distributed/cluster_dump.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/cluster_dump.py b/distributed/cluster_dump.py index 21e62c20c8..9d3e276d92 100644 --- a/distributed/cluster_dump.py +++ b/distributed/cluster_dump.py @@ -95,7 +95,7 @@ class DumpInspector: .. code-block:: python - inspector = DumpInspect("dump.msgpack.gz") + inspector = DumpInspector("dump.msgpack.gz") memory_tasks = inspector.tasks_in_state("memory") released_tasks = inspector.tasks_in_state("released") """ From b233000f9bf5ab2426709683e07320358313b6c3 Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Mon, 14 Mar 2022 15:00:20 +0200 Subject: [PATCH 13/26] Test output of tasks_in_state --- distributed/tests/test_client.py | 37 ++++++++++++++++++++++++++++---- 1 file changed, 33 insertions(+), 4 deletions(-) diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index bd122fcef6..9e41b757c6 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -7363,11 +7363,40 @@ async def test_inspect_cluster_dump(c, s, a, b, tmp_path, _format, local, worker suffix = ".gz" if _format == "msgpack" else "" inspector = DumpInspector(f"{filename}.{_format}{suffix}") - tasks = inspector.tasks_in_state("memory", workers=workers) - assert fut_keys.issubset(tasks.keys()) - assert inc_blocked.key in inspector.tasks_in_state("executing", workers=True) + memory_tasks = inspector.tasks_in_state("memory", workers=workers) + ex_tasks = inspector.tasks_in_state("executing", workers=True) + assert fut_keys.issubset(memory_tasks.keys()) + assert ex_tasks == { + inc_blocked.key: { + "annotations": None, + "coming_from": None, + "dependencies": [], + "dependents": [], + "done": False, + "duration": 0.5, + "exception": None, + "exception_text": "", + "key": inc_blocked.key, + "metadata": {}, + "nbytes": None, + "resource_restrictions": {}, + "startstops": [], + "state": "executing", + "stop_time": None, + "suspicious_count": 0, + "traceback": None, + "traceback_text": "", + "type": None, + "waiters": [], + "waiting_for_data": [], + "who_has": [], + # Set non-deterministic values + "priority": ex_tasks[inc_blocked.key]["priority"], + "start_time": ex_tasks[inc_blocked.key]["start_time"], + } + } - it = iter(tasks.keys()) + it = iter(memory_tasks.keys()) stories = inspector.story(next(it), workers=workers) stories = inspector.story(next(it), next(it), workers=workers) missing = inspector.missing_workers() From 0381454b8de53127261982a91a4c4b00d99d0796 Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Mon, 14 Mar 2022 15:49:51 +0200 Subject: [PATCH 14/26] Use defaultdict to group tasks with a duplicate key --- distributed/cluster_dump.py | 48 +++++++++++++++++++++++-------------- 1 file changed, 30 insertions(+), 18 deletions(-) diff --git a/distributed/cluster_dump.py b/distributed/cluster_dump.py index 9d3e276d92..9dcc5ec02a 100644 --- a/distributed/cluster_dump.py +++ b/distributed/cluster_dump.py @@ -2,6 +2,7 @@ from __future__ import annotations +from collections import defaultdict from typing import IO, Any, Awaitable, Callable, Literal import fsspec @@ -110,6 +111,16 @@ def __init__(self, url_or_state: str | dict): def tasks_in_state(self, state: str = "", workers: bool = False) -> dict: """ + .. code-block:: python + + inc = lambda x: x + 1 + client = LocalCluster() + f = client.submit(inc, x, 1) + + inspector = DumpInspector("dump.msgpack.gz") + + + Returns ------- tasks : dict @@ -117,27 +128,28 @@ def tasks_in_state(self, state: str = "", workers: bool = False) -> dict: worker tasks are included if `workers=True` """ stasks = self.dump["scheduler"]["tasks"] + tasks = defaultdict(list) if state: - tasks = {k: v for k, v in stasks.items() if v["state"] == state} + for k, v in stasks.items(): + if state == v["state"]: + tasks[k].append(v) else: - tasks = stasks.copy() - - if not workers: - return tasks - - for worker_dump in self.dump["workers"].values(): - if self._valid_worker_dump(worker_dump): - if state: - tasks.update( - (k, v) - for k, v in worker_dump["tasks"].items() - if v["state"] == state - ) - else: - tasks.update(worker_dump["tasks"]) - - return tasks + for k, v in stasks.items(): + tasks[k].append(v) + + if workers: + for worker_dump in self.dump["workers"].values(): + if self._valid_worker_dump(worker_dump): + if state: + for k, v in worker_dump["tasks"].items(): + if v["state"] == state: + tasks[k].append(v) + else: + for k, v in worker_dump["tasks"].items(): + tasks[k].append(v) + + return dict(tasks) def _valid_worker_dump(self, worker_dump): # Worker dumps should be a dictionaries but can also be From 15a0719a213f465a79191d2e5565718a24704c01 Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Tue, 15 Mar 2022 10:56:37 +0200 Subject: [PATCH 15/26] Split scheduler and worker task/story retrieval into separate functions --- distributed/cluster_dump.py | 92 +++++++++++++++++-------------------- 1 file changed, 43 insertions(+), 49 deletions(-) diff --git a/distributed/cluster_dump.py b/distributed/cluster_dump.py index 9dcc5ec02a..8da9380ce2 100644 --- a/distributed/cluster_dump.py +++ b/distributed/cluster_dump.py @@ -2,14 +2,14 @@ from __future__ import annotations -from collections import defaultdict from typing import IO, Any, Awaitable, Callable, Literal import fsspec import msgpack from distributed.compatibility import to_thread -from distributed.stories import scheduler_story, worker_story +from distributed.stories import scheduler_story as _scheduler_story +from distributed.stories import worker_story as _worker_story def _tuple_to_list(node): @@ -67,7 +67,7 @@ def load_cluster_dump(url: str) -> dict: Parameters ---------- url : str - Name of the dump artefact. This should have either a + Name of the disk artefact. This should have either a `.msgpack.gz` or `yaml` suffix, depending on the dump format. Returns @@ -97,8 +97,8 @@ class DumpInspector: .. code-block:: python inspector = DumpInspector("dump.msgpack.gz") - memory_tasks = inspector.tasks_in_state("memory") - released_tasks = inspector.tasks_in_state("released") + memory_tasks = inspector.scheduler_tasks("memory") + executing_tasks = inspector.worker_tasks("executing") """ def __init__(self, url_or_state: str | dict): @@ -109,72 +109,66 @@ def __init__(self, url_or_state: str | dict): else: raise TypeError("'url_or_state' must be a str or dict") - def tasks_in_state(self, state: str = "", workers: bool = False) -> dict: - """ - .. code-block:: python - - inc = lambda x: x + 1 - client = LocalCluster() - f = client.submit(inc, x, 1) - - inspector = DumpInspector("dump.msgpack.gz") - + def _extract_tasks(self, state: str, context: dict): + if state: + return [v for v in context.values() if v["state"] == state] + else: + return list(context.values()) + def scheduler_tasks(self, state: str = "") -> list: + """ + Returns + ------- + tasks : list + The list of scheduler tasks in `state`. + """ + return self._extract_tasks(state, self.dump["scheduler"]["tasks"]) + def worker_tasks(self, state: str = "") -> list: + """ Returns ------- - tasks : dict - A dictionary of scheduler tasks with state `state`. - worker tasks are included if `workers=True` + tasks : list + The list of worker tasks in `state` """ - stasks = self.dump["scheduler"]["tasks"] - tasks = defaultdict(list) + tasks = [] - if state: - for k, v in stasks.items(): - if state == v["state"]: - tasks[k].append(v) - else: - for k, v in stasks.items(): - tasks[k].append(v) - - if workers: - for worker_dump in self.dump["workers"].values(): - if self._valid_worker_dump(worker_dump): - if state: - for k, v in worker_dump["tasks"].items(): - if v["state"] == state: - tasks[k].append(v) - else: - for k, v in worker_dump["tasks"].items(): - tasks[k].append(v) - - return dict(tasks) + for worker_dump in self.dump["workers"].values(): + if self._valid_worker_dump(worker_dump): + tasks.extend(self._extract_tasks(state, worker_dump["tasks"])) + + return tasks def _valid_worker_dump(self, worker_dump): # Worker dumps should be a dictionaries but can also be # strings describing comm Failures return isinstance(worker_dump, dict) - def story(self, *key_or_stimulus_id: str, workers: bool = False) -> list: + def scheduler_story(self, *key_or_stimulus_id: str) -> list: """ Returns ------- stories : list A list of stories for the keys/stimulus ID's in `*key_or_stimulus_id`. - worker stories are included if `workers=True` """ keys = set(key_or_stimulus_id) - story = scheduler_story(keys, self.dump["scheduler"]["transition_log"]) + return _scheduler_story(keys, self.dump["scheduler"]["transition_log"]) - if not workers: - return story + def worker_story(self, *key_or_stimulus_id: str) -> list: + """ + Returns + ------- + stories : list + A list of stories for the keys/stimulus ID's in `*key_or_stimulus_id`. + """ + keys = set(key_or_stimulus_id) + stories = [] - for wdump in self.dump["workers"].values(): - if self._valid_worker_dump(wdump): - story.extend(worker_story(keys, wdump["log"])) + for worker_dump in self.dump["workers"].values(): + if self._valid_worker_dump(worker_dump): + stories.extend(_worker_story(keys, worker_dump["log"])) - return story + return stories def missing_workers(self) -> list: """ From 51bb58732946ffe18455c119f355ac515325dba6 Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Tue, 15 Mar 2022 11:40:53 +0200 Subject: [PATCH 16/26] WIP Cluster dump tests --- distributed/tests/test_client.py | 74 ++++++++++++++------------------ 1 file changed, 33 insertions(+), 41 deletions(-) diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index 9e41b757c6..e49246e013 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -50,6 +50,7 @@ performance_report, profile, secede, + worker, ) from distributed.client import ( Client, @@ -7339,66 +7340,57 @@ def blocked_inc(x, event): return x + 1 -@pytest.mark.parametrize("local", [True, False]) -@pytest.mark.parametrize("_format", ["msgpack", "yaml"]) -@pytest.mark.parametrize("workers", [True, False]) +# @pytest.mark.parametrize("local", [True, False]) +# @pytest.mark.parametrize("_format", ["msgpack", "yaml"]) +@pytest.mark.parametrize("local", [True]) +@pytest.mark.parametrize("_format", ["msgpack"]) @gen_cluster(client=True) -async def test_inspect_cluster_dump(c, s, a, b, tmp_path, _format, local, workers): +async def test_inspect_cluster_dump(c, s, a, b, tmp_path, _format, local): filename = tmp_path / "foo" if not local: pytest.importorskip("fsspec") # Make it look like an fsspec path filename = f"file://{filename}" - futs = c.map(inc, range(10)) + futs = c.map(inc, range(2)) fut_keys = {f.key for f in futs} await c.gather(futs) event = distributed.Event() - inc_blocked = c.submit(blocked_inc, 1, event=event) + with dask.annotate(values=1): + inc_blocked = c.submit(blocked_inc, 1, event=event) await c.dump_cluster_state(filename, format=_format) + exclude = {"run_spec", "group"} + + scheduler_tasks = [t._to_dict_no_nest(exclude=exclude) for t in s.tasks.values()] + worker_tasks = [ + t._to_dict_no_nest(exclude=exclude) for w in (a, b) for t in w.tasks.values() + ] + event.set() suffix = ".gz" if _format == "msgpack" else "" inspector = DumpInspector(f"{filename}.{_format}{suffix}") - memory_tasks = inspector.tasks_in_state("memory", workers=workers) - ex_tasks = inspector.tasks_in_state("executing", workers=True) - assert fut_keys.issubset(memory_tasks.keys()) - assert ex_tasks == { - inc_blocked.key: { - "annotations": None, - "coming_from": None, - "dependencies": [], - "dependents": [], - "done": False, - "duration": 0.5, - "exception": None, - "exception_text": "", - "key": inc_blocked.key, - "metadata": {}, - "nbytes": None, - "resource_restrictions": {}, - "startstops": [], - "state": "executing", - "stop_time": None, - "suspicious_count": 0, - "traceback": None, - "traceback_text": "", - "type": None, - "waiters": [], - "waiting_for_data": [], - "who_has": [], - # Set non-deterministic values - "priority": ex_tasks[inc_blocked.key]["priority"], - "start_time": ex_tasks[inc_blocked.key]["start_time"], - } - } - it = iter(memory_tasks.keys()) - stories = inspector.story(next(it), workers=workers) - stories = inspector.story(next(it), next(it), workers=workers) + smem_tasks = inspector.scheduler_tasks("memory") + wmem_tasks = inspector.worker_tasks("memory") + + expected = list(sorted(smem_tasks, key=lambda t: t["key"])) + expected2 = list( + sorted( + [t for t in scheduler_tasks if t["state"] == "memory"], + key=lambda t: t["key"], + ) + ) + assert expected == expected2 + + task_key = next(iter(fut_keys)) + + sstory = inspector.scheduler_story(task_key) + wstory = inspector.worker_story(task_key) + missing = inspector.missing_workers() From 77777240e1d90aad7c2fe163c468aa8648c7fe3f Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Tue, 15 Mar 2022 12:54:36 +0200 Subject: [PATCH 17/26] Finalise task and story tests --- distributed/cluster_dump.py | 9 +++-- distributed/tests/test_client.py | 63 ++++++++++++++++++++------------ 2 files changed, 45 insertions(+), 27 deletions(-) diff --git a/distributed/cluster_dump.py b/distributed/cluster_dump.py index 8da9380ce2..8781cb7b4f 100644 --- a/distributed/cluster_dump.py +++ b/distributed/cluster_dump.py @@ -152,7 +152,8 @@ def scheduler_story(self, *key_or_stimulus_id: str) -> list: A list of stories for the keys/stimulus ID's in `*key_or_stimulus_id`. """ keys = set(key_or_stimulus_id) - return _scheduler_story(keys, self.dump["scheduler"]["transition_log"]) + story = _scheduler_story(keys, self.dump["scheduler"]["transition_log"]) + return list(map(tuple, story)) def worker_story(self, *key_or_stimulus_id: str) -> list: """ @@ -162,11 +163,13 @@ def worker_story(self, *key_or_stimulus_id: str) -> list: A list of stories for the keys/stimulus ID's in `*key_or_stimulus_id`. """ keys = set(key_or_stimulus_id) - stories = [] + stories: list = [] for worker_dump in self.dump["workers"].values(): if self._valid_worker_dump(worker_dump): - stories.extend(_worker_story(keys, worker_dump["log"])) + # Stories are tuples, not lists + story = _worker_story(keys, worker_dump["log"]) + stories.extend(map(tuple, story)) return stories diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index e49246e013..6704bbe974 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -50,7 +50,6 @@ performance_report, profile, secede, - worker, ) from distributed.client import ( Client, @@ -82,6 +81,7 @@ from distributed.utils_test import ( TaskStateMetadataPlugin, _UnhashableCallable, + assert_worker_story, async_wait_for, asyncinc, captured_logger, @@ -7340,10 +7340,8 @@ def blocked_inc(x, event): return x + 1 -# @pytest.mark.parametrize("local", [True, False]) -# @pytest.mark.parametrize("_format", ["msgpack", "yaml"]) -@pytest.mark.parametrize("local", [True]) -@pytest.mark.parametrize("_format", ["msgpack"]) +@pytest.mark.parametrize("local", [True, False]) +@pytest.mark.parametrize("_format", ["msgpack", "yaml"]) @gen_cluster(client=True) async def test_inspect_cluster_dump(c, s, a, b, tmp_path, _format, local): filename = tmp_path / "foo" @@ -7362,36 +7360,53 @@ async def test_inspect_cluster_dump(c, s, a, b, tmp_path, _format, local): await c.dump_cluster_state(filename, format=_format) - exclude = {"run_spec", "group"} - - scheduler_tasks = [t._to_dict_no_nest(exclude=exclude) for t in s.tasks.values()] - worker_tasks = [ - t._to_dict_no_nest(exclude=exclude) for w in (a, b) for t in w.tasks.values() - ] + scheduler_tasks = list(s.tasks.values()) + worker_tasks = [t for w in (a, b) for t in w.tasks.values()] event.set() suffix = ".gz" if _format == "msgpack" else "" inspector = DumpInspector(f"{filename}.{_format}{suffix}") - smem_tasks = inspector.scheduler_tasks("memory") - wmem_tasks = inspector.worker_tasks("memory") + smem_keys = {t["key"] for t in inspector.scheduler_tasks("memory")} + wmem_keys = {t["key"] for t in inspector.worker_tasks("memory")} - expected = list(sorted(smem_tasks, key=lambda t: t["key"])) - expected2 = list( - sorted( - [t for t in scheduler_tasks if t["state"] == "memory"], - key=lambda t: t["key"], - ) - ) - assert expected == expected2 + assert smem_keys == fut_keys + assert smem_keys == {t.key for t in scheduler_tasks if t.state == "memory"} + assert wmem_keys == fut_keys + assert wmem_keys == {t.key for t in worker_tasks if t.state == "memory"} task_key = next(iter(fut_keys)) - sstory = inspector.scheduler_story(task_key) - wstory = inspector.worker_story(task_key) + expected = [ + (task_key, "released", "waiting", {task_key: "processing"}), + (task_key, "waiting", "processing", {}), + (task_key, "processing", "memory", {}), + ] + + story = inspector.scheduler_story(task_key) + assert len(story) == len(expected) + + for event, expected_event in zip(story, expected): + for e1, e2 in zip(event, expected_event): + assert e1 == e2 + + assert len(inspector.scheduler_story(task_key)) == 3 + assert_worker_story( + inspector.worker_story(task_key), + [ + (task_key, "compute-task"), + (task_key, "released", "waiting", "waiting", {task_key: "ready"}), + (task_key, "waiting", "ready", "ready", {}), + (task_key, "ready", "executing", "executing", {}), + (task_key, "put-in-memory"), + (task_key, "executing", "memory", "memory", {}), + ], + ) - missing = inspector.missing_workers() + # TODO(sjperkins): Make a worker fail badly to make + # this test more interesting + assert len(inspector.missing_workers()) == 0 @gen_cluster(client=True) From d428690c4f861473e032c65706cdb0b549998ecb Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Tue, 15 Mar 2022 14:20:40 +0200 Subject: [PATCH 18/26] Rename DumpInspector to DumpArtefact --- distributed/cluster_dump.py | 8 ++++---- distributed/tests/test_client.py | 16 ++++++++-------- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/distributed/cluster_dump.py b/distributed/cluster_dump.py index 8781cb7b4f..9ca7b6a712 100644 --- a/distributed/cluster_dump.py +++ b/distributed/cluster_dump.py @@ -90,15 +90,15 @@ def load_cluster_dump(url: str) -> dict: return reader(f) -class DumpInspector: +class DumpArtefact: """ Utility class for inspecting the state of a cluster dump .. code-block:: python - inspector = DumpInspector("dump.msgpack.gz") - memory_tasks = inspector.scheduler_tasks("memory") - executing_tasks = inspector.worker_tasks("executing") + dump = DumpArtefact("dump.msgpack.gz") + memory_tasks = dump.scheduler_tasks("memory") + executing_tasks = dump.worker_tasks("executing") """ def __init__(self, url_or_state: str | dict): diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index 6704bbe974..e39e2b38ba 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -64,7 +64,7 @@ tokenize, wait, ) -from distributed.cluster_dump import DumpInspector, load_cluster_dump +from distributed.cluster_dump import DumpArtefact, load_cluster_dump from distributed.comm import CommClosedError from distributed.compatibility import LINUX, WINDOWS from distributed.core import Status @@ -7366,10 +7366,10 @@ async def test_inspect_cluster_dump(c, s, a, b, tmp_path, _format, local): event.set() suffix = ".gz" if _format == "msgpack" else "" - inspector = DumpInspector(f"{filename}.{_format}{suffix}") + dump = DumpArtefact(f"{filename}.{_format}{suffix}") - smem_keys = {t["key"] for t in inspector.scheduler_tasks("memory")} - wmem_keys = {t["key"] for t in inspector.worker_tasks("memory")} + smem_keys = {t["key"] for t in dump.scheduler_tasks("memory")} + wmem_keys = {t["key"] for t in dump.worker_tasks("memory")} assert smem_keys == fut_keys assert smem_keys == {t.key for t in scheduler_tasks if t.state == "memory"} @@ -7384,16 +7384,16 @@ async def test_inspect_cluster_dump(c, s, a, b, tmp_path, _format, local): (task_key, "processing", "memory", {}), ] - story = inspector.scheduler_story(task_key) + story = dump.scheduler_story(task_key) assert len(story) == len(expected) for event, expected_event in zip(story, expected): for e1, e2 in zip(event, expected_event): assert e1 == e2 - assert len(inspector.scheduler_story(task_key)) == 3 + assert len(dump.scheduler_story(task_key)) == 3 assert_worker_story( - inspector.worker_story(task_key), + dump.worker_story(task_key), [ (task_key, "compute-task"), (task_key, "released", "waiting", "waiting", {task_key: "ready"}), @@ -7406,7 +7406,7 @@ async def test_inspect_cluster_dump(c, s, a, b, tmp_path, _format, local): # TODO(sjperkins): Make a worker fail badly to make # this test more interesting - assert len(inspector.missing_workers()) == 0 + assert len(dump.missing_workers()) == 0 @gen_cluster(client=True) From 426bef05e1111f812ba63a6bba5e9c57fcbd636d Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Tue, 15 Mar 2022 14:26:32 +0200 Subject: [PATCH 19/26] Remove unused earlier logic --- distributed/tests/test_client.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index e39e2b38ba..9a391f77f1 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -7355,8 +7355,7 @@ async def test_inspect_cluster_dump(c, s, a, b, tmp_path, _format, local): await c.gather(futs) event = distributed.Event() - with dask.annotate(values=1): - inc_blocked = c.submit(blocked_inc, 1, event=event) + blocked_fut = c.submit(blocked_inc, 1, event) await c.dump_cluster_state(filename, format=_format) From d8264ae41efb2da80d3406578cf1808c5b068ba1 Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Tue, 15 Mar 2022 16:01:35 +0200 Subject: [PATCH 20/26] Add support for splitting the dump artefact into a tree of yaml files --- distributed/cluster_dump.py | 36 +++++++++++++++++++ distributed/tests/test_client.py | 59 ++++++++++++++++++++++++++++++++ 2 files changed, 95 insertions(+) diff --git a/distributed/cluster_dump.py b/distributed/cluster_dump.py index 9ca7b6a712..1e6032b63f 100644 --- a/distributed/cluster_dump.py +++ b/distributed/cluster_dump.py @@ -2,6 +2,7 @@ from __future__ import annotations +from pathlib import Path from typing import IO, Any, Awaitable, Callable, Literal import fsspec @@ -189,3 +190,38 @@ def missing_workers(self) -> list: if w not in responsive_workers or not self._valid_worker_dump(responsive_workers[w]) ] + + def split(self, root_dir: str | Path | None = None): + """ + Splits the Dump Artefact into a tree of yaml files with + `root_dir` as it's base. + + Parameters + ---------- + root_dir : str or Path + """ + import yaml + + root_dir = Path(root_dir) if root_dir else Path.cwd() + dumper = yaml.CSafeDumper + + workers = self.dump["workers"] + for info in workers.values(): + log_dir = root_dir / info["id"] + log_dir.mkdir(parents=True, exist_ok=True) + + for name, _logs in info.items(): + filename = str(log_dir / f"{name}.yaml") + with open(filename, "w") as fd: + yaml.dump(_logs, fd, Dumper=dumper) + + context = "scheduler" + info = self.dump[context] + log_dir = root_dir / context + log_dir.mkdir(parents=True, exist_ok=True) + + for name, _logs in info.items(): + filename = str(log_dir / f"{name}.yaml") + + with open(filename, "w") as fd: + yaml.dump(_logs, fd, Dumper=dumper) diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index 9a391f77f1..1ade1ef089 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -21,6 +21,7 @@ from contextlib import contextmanager, suppress from functools import partial from operator import add +from pathlib import Path from threading import Semaphore from time import sleep from typing import Any @@ -7403,6 +7404,64 @@ async def test_inspect_cluster_dump(c, s, a, b, tmp_path, _format, local): ], ) + dump_path = Path(tmp_path / "dump") + dump.split(dump_path) + scheduler_files = { + "address.yaml", + "clients.yaml", + "events.yaml", + "extensions.yaml", + "id.yaml", + "log.yaml", + "memory.yaml", + "services.yaml", + "started.yaml", + "status.yaml", + "task_groups.yaml", + "tasks.yaml", + "thread_id.yaml", + "transition_log.yaml", + "type.yaml", + "workers.yaml", + } + + scheduler_dump_path = dump_path / "scheduler" + for file in scheduler_files: + assert (scheduler_dump_path / file).exists() + + worker_files = { + "address.yaml", + "config.yaml", + "constrained.yaml", + "data_needed.yaml", + "executing_count.yaml", + "id.yaml", + "in_flight_tasks.yaml", + "in_flight_workers.yaml", + "incoming_transfer_log.yaml", + "log.yaml", + "logs.yaml", + "long_running.yaml", + "memory_limit.yaml", + "memory_pause_fraction.yaml", + "memory_spill_fraction.yaml", + "memory_target_fraction.yaml", + "nthreads.yaml", + "outgoing_transfer_log.yaml", + "pending_data_per_worker.yaml", + "ready.yaml", + "scheduler.yaml", + "status.yaml", + "tasks.yaml", + "thread_id.yaml", + "type.yaml", + } + + for worker in (a, b): + worker_dump_path = dump_path / worker.id + for file in worker_files: + assert (worker_dump_path / file).exists() + # TODO(sjperkins): Make a worker fail badly to make # this test more interesting assert len(dump.missing_workers()) == 0 From 3a989bc257ca9abe2b30aa318a6f1ff3e3e22ccd Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Tue, 15 Mar 2022 17:21:42 +0200 Subject: [PATCH 21/26] Support compaction of trivial state keys into a general.yaml file --- distributed/cluster_dump.py | 57 ++++++++++++++++++++++++++++++-- distributed/tests/test_client.py | 40 ++++------------------ 2 files changed, 60 insertions(+), 37 deletions(-) diff --git a/distributed/cluster_dump.py b/distributed/cluster_dump.py index 1e6032b63f..0977117c71 100644 --- a/distributed/cluster_dump.py +++ b/distributed/cluster_dump.py @@ -3,7 +3,7 @@ from __future__ import annotations from pathlib import Path -from typing import IO, Any, Awaitable, Callable, Literal +from typing import IO, Any, Awaitable, Callable, Collection, Literal import fsspec import msgpack @@ -191,23 +191,67 @@ def missing_workers(self) -> list: or not self._valid_worker_dump(responsive_workers[w]) ] - def split(self, root_dir: str | Path | None = None): + def split( + self, + root_dir: str | Path | None = None, + worker_expand_keys: Collection[str] = ("config", "log", "logs", "tasks"), + scheduler_expand_keys: Collection[str] = ( + "events", + "extensions", + "log", + "task_groups", + "tasks", + "transition_log", + "workers", + ), + ): """ Splits the Dump Artefact into a tree of yaml files with `root_dir` as it's base. + The root level of the tree contains a directory for the scheduler + and directories for each individual worker. + Each directory contains yaml files describing the state of the scheduler + or worker when the artefact was created. + + In general, keys associated with the state are compacted into a `general.yaml` + file, unless they are in `scheduler_expand_keys` and `worker_expand_keys`. + Parameters ---------- root_dir : str or Path + The root directory into which the tree is written. + Defaults to the current working directory if `None`. + worker_expand_keys : iterable of str + An iterable of artefact worker keys that will be expanded + into separate yaml files. + Keys that are not in this iterable are compacted into a + `general.yaml` file. + scheduler_expand_keys : iterable of str + An iterable of artefact scheduler keys that will be expanded + into separate yaml files. + Keys that are not in this iterable are compacted into a + `general.yaml` file. """ import yaml root_dir = Path(root_dir) if root_dir else Path.cwd() dumper = yaml.CSafeDumper + scheduler_expand_keys = set(scheduler_expand_keys) + worker_expand_keys = set(worker_expand_keys) workers = self.dump["workers"] for info in workers.values(): - log_dir = root_dir / info["id"] + worker_id = info["id"] + assert "general" not in info + + # Compact smaller keys into a general dict + general = { + k: info.pop(k) for k in list(info.keys()) if k not in worker_expand_keys + } + info["general"] = general + + log_dir = root_dir / worker_id log_dir.mkdir(parents=True, exist_ok=True) for name, _logs in info.items(): @@ -217,8 +261,15 @@ def split(self, root_dir: str | Path | None = None): context = "scheduler" info = self.dump[context] + assert "general" not in info + general = { + k: info.pop(k) for k in list(info.keys()) if k not in scheduler_expand_keys + } + info["general"] = general + log_dir = root_dir / context log_dir.mkdir(parents=True, exist_ok=True) + # Compact smaller keys into a general dict for name, _logs in info.items(): filename = str(log_dir / f"{name}.yaml") diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index 1ade1ef089..d7a243a42c 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -7407,60 +7407,32 @@ async def test_inspect_cluster_dump(c, s, a, b, tmp_path, _format, local): dump_path = Path(tmp_path / "dump") dump.split(dump_path) scheduler_files = { - "address.yaml", - "clients.yaml", "events.yaml", "extensions.yaml", - "id.yaml", + "general.yaml", "log.yaml", - "memory.yaml", - "services.yaml", - "started.yaml", - "status.yaml", "task_groups.yaml", "tasks.yaml", - "thread_id.yaml", "transition_log.yaml", - "type.yaml", "workers.yaml", } scheduler_dump_path = dump_path / "scheduler" - for file in scheduler_files: - assert (scheduler_dump_path / file).exists() + expected = {scheduler_dump_path / f for f in scheduler_files} + assert expected == set(scheduler_dump_path.iterdir()) worker_files = { - "address.yaml", "config.yaml", - "constrained.yaml", - "data_needed.yaml", - "executing_count.yaml", - "id.yaml", - "in_flight_tasks.yaml", - "in_flight_workers.yaml", - "incoming_transfer_log.yaml", + "general.yaml", "log.yaml", "logs.yaml", - "long_running.yaml", - "memory_limit.yaml", - "memory_pause_fraction.yaml", - "memory_spill_fraction.yaml", - "memory_target_fraction.yaml", - "nthreads.yaml", - "outgoing_transfer_log.yaml", - "pending_data_per_worker.yaml", - "ready.yaml", - "scheduler.yaml", - "status.yaml", "tasks.yaml", - "thread_id.yaml", - "type.yaml", } for worker in (a, b): worker_dump_path = dump_path / worker.id - for file in worker_files: - assert (worker_dump_path / file).exists() + expected = {worker_dump_path / f for f in worker_files} + assert expected == set(worker_dump_path.iterdir()) # TODO(sjperkins): Make a worker fail badly to make # this test more interesting From 799325ebfad236e2da7d0a9f2b1453a6f6056c3f Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Wed, 16 Mar 2022 12:42:05 +0200 Subject: [PATCH 22/26] Address review comments --- distributed/cluster_dump.py | 149 ++++++++++++++++--------- distributed/tests/test_client.py | 108 +----------------- distributed/tests/test_cluster_dump.py | 148 +++++++++++++++++++++++- 3 files changed, 245 insertions(+), 160 deletions(-) diff --git a/distributed/cluster_dump.py b/distributed/cluster_dump.py index 0977117c71..8640c1119c 100644 --- a/distributed/cluster_dump.py +++ b/distributed/cluster_dump.py @@ -2,6 +2,7 @@ from __future__ import annotations +from collections.abc import Mapping from pathlib import Path from typing import IO, Any, Awaitable, Callable, Collection, Literal @@ -62,24 +63,29 @@ def writer(state: dict, f: IO): await to_thread(writer, state, f) -def load_cluster_dump(url: str) -> dict: +def load_cluster_dump(url: str | Path, **kwargs) -> dict: """Loads a cluster dump from a disk artefact Parameters ---------- url : str Name of the disk artefact. This should have either a - `.msgpack.gz` or `yaml` suffix, depending on the dump format. + ``.msgpack.gz`` or ``yaml`` suffix, depending on the dump format. + **kwargs : + Extra arguments passed to :func:`fsspec.open`. Returns ------- state : dict The cluster state at the time of the dump. """ - if url.endswith(".msgpack.gz"): + if isinstance(url, str): + url = Path(url) + + if url.name.endswith(".msgpack.gz"): mode = "rb" reader = msgpack.unpack - elif url.endswith(".yaml"): + elif url.name.endswith(".yaml"): import yaml mode = "r" @@ -87,70 +93,102 @@ def load_cluster_dump(url: str) -> dict: else: raise ValueError(f"url ({url}) must have a .msgpack.gz or .yaml suffix") - with fsspec.open(url, mode, compression="infer") as f: + kwargs.setdefault("compression", "infer") + + with fsspec.open(url, mode, **kwargs) as f: return reader(f) -class DumpArtefact: +class DumpArtefact(Mapping): """ Utility class for inspecting the state of a cluster dump .. code-block:: python - dump = DumpArtefact("dump.msgpack.gz") + dump = DumpArtefact.from_url("dump.msgpack.gz") memory_tasks = dump.scheduler_tasks("memory") executing_tasks = dump.worker_tasks("executing") """ - def __init__(self, url_or_state: str | dict): - if isinstance(url_or_state, str): - self.dump = load_cluster_dump(url_or_state) - elif isinstance(url_or_state, dict): - self.dump = url_or_state - else: - raise TypeError("'url_or_state' must be a str or dict") + def __init__(self, state: dict): + self.dump = state + + @classmethod + def from_url(cls, url: str, **kwargs) -> DumpArtefact: + """Loads a cluster dump from a disk artefact + + Parameters + ---------- + url : str + Name of the disk artefact. This should have either a + ``.msgpack.gz`` or ``yaml`` suffix, depending on the dump format. + **kwargs : + Extra arguments passed to :func:`fsspec.open`. + + Returns + ------- + state : dict + The cluster state at the time of the dump. + """ + return DumpArtefact(load_cluster_dump(url, **kwargs)) + + def __getitem__(self, key): + return self.dump[key] + + def __iter__(self): + return iter(self.dump) - def _extract_tasks(self, state: str, context: dict): + def __len__(self): + return len(self.dump) + + def _extract_tasks(self, state: str | None, context: dict): if state: return [v for v in context.values() if v["state"] == state] else: return list(context.values()) - def scheduler_tasks(self, state: str = "") -> list: + def scheduler_tasks_in_state(self, state: str | None = None) -> list: """ + Parameters + ---------- + state : optional, str + If provided, only tasks in the given state are returned. + Otherwise, all tasks are returned. + Returns ------- tasks : list - The list of scheduler tasks in `state`. + The list of scheduler tasks in ``state``. """ return self._extract_tasks(state, self.dump["scheduler"]["tasks"]) - def worker_tasks(self, state: str = "") -> list: + def worker_tasks_in_state(self, state: str | None = None) -> list: """ + Parameters + ---------- + state : optional, str + If provided, only tasks in the given state are returned. + Otherwise, all tasks are returned. + Returns ------- tasks : list - The list of worker tasks in `state` + The list of worker tasks in ``state`` """ tasks = [] for worker_dump in self.dump["workers"].values(): - if self._valid_worker_dump(worker_dump): + if isinstance(worker_dump, dict) and "tasks" in worker_dump: tasks.extend(self._extract_tasks(state, worker_dump["tasks"])) return tasks - def _valid_worker_dump(self, worker_dump): - # Worker dumps should be a dictionaries but can also be - # strings describing comm Failures - return isinstance(worker_dump, dict) - def scheduler_story(self, *key_or_stimulus_id: str) -> list: """ Returns ------- stories : list - A list of stories for the keys/stimulus ID's in `*key_or_stimulus_id`. + A list of stories for the keys/stimulus ID's in ``*key_or_stimulus_id``. """ keys = set(key_or_stimulus_id) story = _scheduler_story(keys, self.dump["scheduler"]["transition_log"]) @@ -161,13 +199,13 @@ def worker_story(self, *key_or_stimulus_id: str) -> list: Returns ------- stories : list - A list of stories for the keys/stimulus ID's in `*key_or_stimulus_id`. + A list of stories for the keys/stimulus ID's in ``*key_or_stimulus_id`.` """ keys = set(key_or_stimulus_id) stories: list = [] for worker_dump in self.dump["workers"].values(): - if self._valid_worker_dump(worker_dump): + if isinstance(worker_dump, dict) and "log" in worker_dump: # Stories are tuples, not lists story = _worker_story(keys, worker_dump["log"]) stories.extend(map(tuple, story)) @@ -186,12 +224,28 @@ def missing_workers(self) -> list: responsive_workers = self.dump["workers"] return [ w - for w in scheduler_workers.keys() + for w in scheduler_workers if w not in responsive_workers - or not self._valid_worker_dump(responsive_workers[w]) + or not isinstance(responsive_workers[w], dict) ] - def split( + def _compact_state(self, state: dict, expand_keys: set[str]): + """Compacts ``state`` keys into a general key, + unless the key is in ``expand_keys``""" + assert "general" not in state + result = {} + general = {} + + for k, v in state.items(): + if k in expand_keys: + result[k] = v + else: + general[k] = v + + result["general"] = general + return result + + def to_yaml( self, root_dir: str | Path | None = None, worker_expand_keys: Collection[str] = ("config", "log", "logs", "tasks"), @@ -207,21 +261,21 @@ def split( ): """ Splits the Dump Artefact into a tree of yaml files with - `root_dir` as it's base. + ``root_dir`` as it's base. The root level of the tree contains a directory for the scheduler and directories for each individual worker. Each directory contains yaml files describing the state of the scheduler or worker when the artefact was created. - In general, keys associated with the state are compacted into a `general.yaml` - file, unless they are in `scheduler_expand_keys` and `worker_expand_keys`. + In general, keys associated with the state are compacted into a ``general.yaml`` + file, unless they are in ``scheduler_expand_keys`` and ``worker_expand_keys``. Parameters ---------- root_dir : str or Path The root directory into which the tree is written. - Defaults to the current working directory if `None`. + Defaults to the current working directory if ``None``. worker_expand_keys : iterable of str An iterable of artefact worker keys that will be expanded into separate yaml files. @@ -231,7 +285,7 @@ def split( An iterable of artefact scheduler keys that will be expanded into separate yaml files. Keys that are not in this iterable are compacted into a - `general.yaml` file. + ``general.yaml`` file. """ import yaml @@ -242,36 +296,29 @@ def split( workers = self.dump["workers"] for info in workers.values(): - worker_id = info["id"] - assert "general" not in info + try: + worker_id = info["id"] + except KeyError: + continue - # Compact smaller keys into a general dict - general = { - k: info.pop(k) for k in list(info.keys()) if k not in worker_expand_keys - } - info["general"] = general + worker_state = self._compact_state(info, worker_expand_keys) log_dir = root_dir / worker_id log_dir.mkdir(parents=True, exist_ok=True) - for name, _logs in info.items(): + for name, _logs in worker_state.items(): filename = str(log_dir / f"{name}.yaml") with open(filename, "w") as fd: yaml.dump(_logs, fd, Dumper=dumper) context = "scheduler" - info = self.dump[context] - assert "general" not in info - general = { - k: info.pop(k) for k in list(info.keys()) if k not in scheduler_expand_keys - } - info["general"] = general + scheduler_state = self._compact_state(self.dump[context], scheduler_expand_keys) log_dir = root_dir / context log_dir.mkdir(parents=True, exist_ok=True) # Compact smaller keys into a general dict - for name, _logs in info.items(): + for name, _logs in scheduler_state.items(): filename = str(log_dir / f"{name}.yaml") with open(filename, "w") as fd: diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index d7a243a42c..383558ca54 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -21,7 +21,6 @@ from contextlib import contextmanager, suppress from functools import partial from operator import add -from pathlib import Path from threading import Semaphore from time import sleep from typing import Any @@ -37,7 +36,6 @@ from dask.optimization import SubgraphCallable from dask.utils import parse_timedelta, stringify, tmpfile -import distributed from distributed import ( CancelledError, Executor, @@ -65,7 +63,7 @@ tokenize, wait, ) -from distributed.cluster_dump import DumpArtefact, load_cluster_dump +from distributed.cluster_dump import load_cluster_dump from distributed.comm import CommClosedError from distributed.compatibility import LINUX, WINDOWS from distributed.core import Status @@ -82,7 +80,6 @@ from distributed.utils_test import ( TaskStateMetadataPlugin, _UnhashableCallable, - assert_worker_story, async_wait_for, asyncinc, captured_logger, @@ -7336,109 +7333,6 @@ async def test_dump_cluster_state_json(c, s, a, b, tmp_path, local): await c.dump_cluster_state(filename, format="json") -def blocked_inc(x, event): - event.wait() - return x + 1 - - -@pytest.mark.parametrize("local", [True, False]) -@pytest.mark.parametrize("_format", ["msgpack", "yaml"]) -@gen_cluster(client=True) -async def test_inspect_cluster_dump(c, s, a, b, tmp_path, _format, local): - filename = tmp_path / "foo" - if not local: - pytest.importorskip("fsspec") - # Make it look like an fsspec path - filename = f"file://{filename}" - - futs = c.map(inc, range(2)) - fut_keys = {f.key for f in futs} - await c.gather(futs) - - event = distributed.Event() - blocked_fut = c.submit(blocked_inc, 1, event) - - await c.dump_cluster_state(filename, format=_format) - - scheduler_tasks = list(s.tasks.values()) - worker_tasks = [t for w in (a, b) for t in w.tasks.values()] - - event.set() - - suffix = ".gz" if _format == "msgpack" else "" - dump = DumpArtefact(f"{filename}.{_format}{suffix}") - - smem_keys = {t["key"] for t in dump.scheduler_tasks("memory")} - wmem_keys = {t["key"] for t in dump.worker_tasks("memory")} - - assert smem_keys == fut_keys - assert smem_keys == {t.key for t in scheduler_tasks if t.state == "memory"} - assert wmem_keys == fut_keys - assert wmem_keys == {t.key for t in worker_tasks if t.state == "memory"} - - task_key = next(iter(fut_keys)) - - expected = [ - (task_key, "released", "waiting", {task_key: "processing"}), - (task_key, "waiting", "processing", {}), - (task_key, "processing", "memory", {}), - ] - - story = dump.scheduler_story(task_key) - assert len(story) == len(expected) - - for event, expected_event in zip(story, expected): - for e1, e2 in zip(event, expected_event): - assert e1 == e2 - - assert len(dump.scheduler_story(task_key)) == 3 - assert_worker_story( - dump.worker_story(task_key), - [ - (task_key, "compute-task"), - (task_key, "released", "waiting", "waiting", {task_key: "ready"}), - (task_key, "waiting", "ready", "ready", {}), - (task_key, "ready", "executing", "executing", {}), - (task_key, "put-in-memory"), - (task_key, "executing", "memory", "memory", {}), - ], - ) - - dump_path = Path(tmp_path / "dump") - dump.split(dump_path) - scheduler_files = { - "events.yaml", - "extensions.yaml", - "general.yaml", - "log.yaml", - "task_groups.yaml", - "tasks.yaml", - "transition_log.yaml", - "workers.yaml", - } - - scheduler_dump_path = dump_path / "scheduler" - expected = {scheduler_dump_path / f for f in scheduler_files} - assert expected == set(scheduler_dump_path.iterdir()) - - worker_files = { - "config.yaml", - "general.yaml", - "log.yaml", - "logs.yaml", - "tasks.yaml", - } - - for worker in (a, b): - worker_dump_path = dump_path / worker.id - expected = {worker_dump_path / f for f in worker_files} - assert expected == set(worker_dump_path.iterdir()) - - # TODO(sjperkins): Make a worker fail badly to make - # this test more interesting - assert len(dump.missing_workers()) == 0 - - @gen_cluster(client=True) async def test_dump_cluster_state_exclude_default(c, s, a, b, tmp_path): futs = c.map(inc, range(10)) diff --git a/distributed/tests/test_cluster_dump.py b/distributed/tests/test_cluster_dump.py index 5653b06732..fa873c6137 100644 --- a/distributed/tests/test_cluster_dump.py +++ b/distributed/tests/test_cluster_dump.py @@ -1,10 +1,14 @@ +import asyncio +from pathlib import Path + import fsspec import msgpack import pytest import yaml -from distributed.cluster_dump import _tuple_to_list, write_state -from distributed.utils_test import gen_test +import distributed +from distributed.cluster_dump import DumpArtefact, _tuple_to_list, write_state +from distributed.utils_test import assert_worker_story, gen_cluster, gen_test, inc @pytest.mark.parametrize( @@ -44,3 +48,143 @@ async def test_write_state_yaml(tmp_path): assert readback == _tuple_to_list(await get_state()) f.seek(0) assert "!!python/tuple" not in f.read() + + +def blocked_inc(x, event): + event.wait() + return x + 1 + + +@gen_cluster(client=True) +async def test_cluster_dump_state(c, s, a, b, tmp_path): + filename = tmp_path / "dump" + futs = c.map(inc, range(2)) + fut_keys = {f.key for f in futs} + await c.gather(futs) + + event = distributed.Event() + blocked_fut = c.submit(blocked_inc, 1, event) + await asyncio.sleep(0.05) + await c.dump_cluster_state(filename, format="msgpack") + + scheduler_tasks = list(s.tasks.values()) + worker_tasks = [t for w in (a, b) for t in w.tasks.values()] + + smem_tasks = [t for t in scheduler_tasks if t.state == "memory"] + wmem_tasks = [t for t in worker_tasks if t.state == "memory"] + + assert len(smem_tasks) == 2 + assert len(wmem_tasks) == 2 + + sproc_tasks = [t for t in scheduler_tasks if t.state == "processing"] + wproc_tasks = [t for t in worker_tasks if t.state == "executing"] + + assert len(sproc_tasks) == 1 + assert len(wproc_tasks) == 1 + + await c.gather(event.set(), blocked_fut) + + dump = DumpArtefact.from_url(f"{filename}.msgpack.gz") + + smem_keys = {t["key"] for t in dump.scheduler_tasks_in_state("memory")} + wmem_keys = {t["key"] for t in dump.worker_tasks_in_state("memory")} + + assert smem_keys == fut_keys + assert smem_keys == {t.key for t in smem_tasks} + assert wmem_keys == fut_keys + assert wmem_keys == {t.key for t in wmem_tasks} + + sproc_keys = {t["key"] for t in dump.scheduler_tasks_in_state("processing")} + wproc_keys = {t["key"] for t in dump.worker_tasks_in_state("executing")} + + assert sproc_keys == {t.key for t in sproc_tasks} + assert wproc_keys == {t.key for t in wproc_tasks} + + # Mapping API works + assert "transition_log" in dump["scheduler"] + assert "log" in dump["workers"][a.address] + assert len(dump) == 3 + + +@gen_cluster(client=True) +async def test_cluster_dump_story(c, s, a, b, tmp_path): + filename = tmp_path / "dump" + futs = c.map(inc, range(2)) + fut_keys = {f.key for f in futs} + await c.gather(futs) + await c.dump_cluster_state(filename, format="msgpack") + + dump = DumpArtefact.from_url(f"{filename}.msgpack.gz") + task_key = next(iter(fut_keys)) + + expected = [ + (task_key, "released", "waiting", {task_key: "processing"}), + (task_key, "waiting", "processing", {}), + (task_key, "processing", "memory", {}), + ] + + story = dump.scheduler_story(task_key) + assert len(story) == len(expected) + + for event, expected_event in zip(story, expected): + for e1, e2 in zip(event, expected_event): + assert e1 == e2 + + assert len(dump.scheduler_story(task_key)) == 3 + assert_worker_story( + dump.worker_story(task_key), + [ + (task_key, "compute-task"), + (task_key, "released", "waiting", "waiting", {task_key: "ready"}), + (task_key, "waiting", "ready", "ready", {}), + (task_key, "ready", "executing", "executing", {}), + (task_key, "put-in-memory"), + (task_key, "executing", "memory", "memory", {}), + ], + ) + + +@gen_cluster(client=True) +async def test_cluster_dump_to_yaml(c, s, a, b, tmp_path): + futs = c.map(inc, range(2)) + await c.gather(futs) + + event = distributed.Event() + blocked_fut = c.submit(blocked_inc, 1, event) + filename = tmp_path / "dump" + await asyncio.sleep(0.05) + await c.dump_cluster_state(filename, format="msgpack") + await event.set() + await blocked_fut + + dump = DumpArtefact.from_url(f"{filename}.msgpack.gz") + yaml_path = Path(tmp_path / "dump") + dump.to_yaml(yaml_path) + + scheduler_files = { + "events.yaml", + "extensions.yaml", + "general.yaml", + "log.yaml", + "task_groups.yaml", + "tasks.yaml", + "transition_log.yaml", + "workers.yaml", + } + + scheduler_yaml_path = yaml_path / "scheduler" + expected = {scheduler_yaml_path / f for f in scheduler_files} + assert expected == set(scheduler_yaml_path.iterdir()) + + worker_files = { + "config.yaml", + "general.yaml", + "log.yaml", + "logs.yaml", + "tasks.yaml", + } + + for worker in (a, b): + worker_yaml_path = yaml_path / worker.id + expected = {worker_yaml_path / f for f in worker_files} + assert expected == set(worker_yaml_path.iterdir()) From 1cb1d98a20aeb4aa2a6b71cfd3f4638cb66e1898 Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Wed, 16 Mar 2022 14:36:38 +0200 Subject: [PATCH 23/26] Remove Path conversion --- distributed/cluster_dump.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/distributed/cluster_dump.py b/distributed/cluster_dump.py index 8640c1119c..2ce7e38c88 100644 --- a/distributed/cluster_dump.py +++ b/distributed/cluster_dump.py @@ -63,7 +63,7 @@ def writer(state: dict, f: IO): await to_thread(writer, state, f) -def load_cluster_dump(url: str | Path, **kwargs) -> dict: +def load_cluster_dump(url: str, **kwargs) -> dict: """Loads a cluster dump from a disk artefact Parameters @@ -79,13 +79,10 @@ def load_cluster_dump(url: str | Path, **kwargs) -> dict: state : dict The cluster state at the time of the dump. """ - if isinstance(url, str): - url = Path(url) - - if url.name.endswith(".msgpack.gz"): + if url.endswith(".msgpack.gz"): mode = "rb" reader = msgpack.unpack - elif url.name.endswith(".yaml"): + elif url.endswith(".yaml"): import yaml mode = "r" From e9a7c3d835a5076c5fe6db2ddb0f11bde7ff035f Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Wed, 16 Mar 2022 17:52:30 +0200 Subject: [PATCH 24/26] Convert story output from lists to dicts --- distributed/cluster_dump.py | 30 ++++++++------ distributed/tests/test_cluster_dump.py | 57 +++++++++++++++----------- 2 files changed, 50 insertions(+), 37 deletions(-) diff --git a/distributed/cluster_dump.py b/distributed/cluster_dump.py index 2ce7e38c88..a103993378 100644 --- a/distributed/cluster_dump.py +++ b/distributed/cluster_dump.py @@ -2,6 +2,7 @@ from __future__ import annotations +from collections import defaultdict from collections.abc import Mapping from pathlib import Path from typing import IO, Any, Awaitable, Callable, Collection, Literal @@ -180,34 +181,39 @@ def worker_tasks_in_state(self, state: str | None = None) -> list: return tasks - def scheduler_story(self, *key_or_stimulus_id: str) -> list: + def scheduler_story(self, *key_or_stimulus_id: str) -> dict: """ Returns ------- - stories : list + stories : dict A list of stories for the keys/stimulus ID's in ``*key_or_stimulus_id``. """ + stories = defaultdict(list) + + log = self.dump["scheduler"]["transition_log"] keys = set(key_or_stimulus_id) - story = _scheduler_story(keys, self.dump["scheduler"]["transition_log"]) - return list(map(tuple, story)) - def worker_story(self, *key_or_stimulus_id: str) -> list: + for story in _scheduler_story(keys, log): + stories[story[0]].append(tuple(story)) + + return dict(stories) + + def worker_story(self, *key_or_stimulus_id: str) -> dict: """ Returns ------- - stories : list - A list of stories for the keys/stimulus ID's in ``*key_or_stimulus_id`.` + stories : dict + A dict of stories for the keys/stimulus ID's in ``*key_or_stimulus_id`.` """ keys = set(key_or_stimulus_id) - stories: list = [] + stories = defaultdict(list) for worker_dump in self.dump["workers"].values(): if isinstance(worker_dump, dict) and "log" in worker_dump: - # Stories are tuples, not lists - story = _worker_story(keys, worker_dump["log"]) - stories.extend(map(tuple, story)) + for story in _worker_story(keys, worker_dump["log"]): + stories[story[0]].append(tuple(story)) - return stories + return dict(stories) def missing_workers(self) -> list: """ diff --git a/distributed/tests/test_cluster_dump.py b/distributed/tests/test_cluster_dump.py index fa873c6137..794e63d78f 100644 --- a/distributed/tests/test_cluster_dump.py +++ b/distributed/tests/test_cluster_dump.py @@ -117,31 +117,38 @@ async def test_cluster_dump_story(c, s, a, b, tmp_path): dump = DumpArtefact.from_url(f"{filename}.msgpack.gz") task_key = next(iter(fut_keys)) - expected = [ - (task_key, "released", "waiting", {task_key: "processing"}), - (task_key, "waiting", "processing", {}), - (task_key, "processing", "memory", {}), - ] - - story = dump.scheduler_story(task_key) - assert len(story) == len(expected) - - for event, expected_event in zip(story, expected): - for e1, e2 in zip(event, expected_event): - assert e1 == e2 - - assert len(dump.scheduler_story(task_key)) == 3 - assert_worker_story( - dump.worker_story(task_key), - [ - (task_key, "compute-task"), - (task_key, "released", "waiting", "waiting", {task_key: "ready"}), - (task_key, "waiting", "ready", "ready", {}), - (task_key, "ready", "executing", "executing", {}), - (task_key, "put-in-memory"), - (task_key, "executing", "memory", "memory", {}), - ], - ) + def _expected_story(task_key): + return + + story = dump.scheduler_story(*fut_keys) + assert len(story) == len(fut_keys) + + for k, task_story in story.items(): + expected = [ + (k, "released", "waiting", {k: "processing"}), + (k, "waiting", "processing", {}), + (k, "processing", "memory", {}), + ] + + for event, expected_event in zip(task_story, expected): + for e1, e2 in zip(event, expected_event): + assert e1 == e2 + + story = dump.worker_story(*fut_keys) + assert len(story) == len(fut_keys) + + for k, task_story in story.items(): + assert_worker_story( + task_story, + [ + (k, "compute-task"), + (k, "released", "waiting", "waiting", {k: "ready"}), + (k, "waiting", "ready", "ready", {}), + (k, "ready", "executing", "executing", {}), + (k, "put-in-memory"), + (k, "executing", "memory", "memory", {}), + ], + ) @gen_cluster(client=True) From 1ecb088306071ef2dcf807463e1f691c253a4911 Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Wed, 16 Mar 2022 17:58:10 +0200 Subject: [PATCH 25/26] Some other test case improvements --- distributed/tests/test_cluster_dump.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/distributed/tests/test_cluster_dump.py b/distributed/tests/test_cluster_dump.py index 794e63d78f..dca99d51cb 100644 --- a/distributed/tests/test_cluster_dump.py +++ b/distributed/tests/test_cluster_dump.py @@ -100,6 +100,12 @@ async def test_cluster_dump_state(c, s, a, b, tmp_path): assert sproc_keys == {t.key for t in sproc_tasks} assert wproc_keys == {t.key for t in wproc_tasks} + sall_keys = {t["key"] for t in dump.scheduler_tasks_in_state()} + wall_keys = {t["key"] for t in dump.worker_tasks_in_state()} + + assert fut_keys | {blocked_fut.key} == sall_keys + assert fut_keys | {blocked_fut.key} == wall_keys + # Mapping API works assert "transition_log" in dump["scheduler"] assert "log" in dump["workers"][a.address] @@ -195,3 +201,8 @@ async def test_cluster_dump_to_yaml(c, s, a, b, tmp_path): worker_yaml_path = yaml_path / worker.id expected = {worker_yaml_path / f for f in worker_files} assert expected == set(worker_yaml_path.iterdir()) + + # Internal dictionary state compaction + # has not been destructive of the original dictionary + assert "id" in dump["scheduler"] + assert "address" in dump["scheduler"] From 4b65b81e559ff7c5811fe4a0062d35edb7f05853 Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Thu, 17 Mar 2022 18:51:30 +0200 Subject: [PATCH 26/26] to_yaml -> to_yamls --- distributed/cluster_dump.py | 2 +- distributed/tests/test_cluster_dump.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/distributed/cluster_dump.py b/distributed/cluster_dump.py index a103993378..2d3a400b25 100644 --- a/distributed/cluster_dump.py +++ b/distributed/cluster_dump.py @@ -248,7 +248,7 @@ def _compact_state(self, state: dict, expand_keys: set[str]): result["general"] = general return result - def to_yaml( + def to_yamls( self, root_dir: str | Path | None = None, worker_expand_keys: Collection[str] = ("config", "log", "logs", "tasks"), diff --git a/distributed/tests/test_cluster_dump.py b/distributed/tests/test_cluster_dump.py index dca99d51cb..963d756372 100644 --- a/distributed/tests/test_cluster_dump.py +++ b/distributed/tests/test_cluster_dump.py @@ -158,7 +158,7 @@ def _expected_story(task_key): @gen_cluster(client=True) -async def test_cluster_dump_to_yaml(c, s, a, b, tmp_path): +async def test_cluster_dump_to_yamls(c, s, a, b, tmp_path): futs = c.map(inc, range(2)) await c.gather(futs) @@ -172,7 +172,7 @@ async def test_cluster_dump_to_yaml(c, s, a, b, tmp_path): dump = DumpArtefact.from_url(f"{filename}.msgpack.gz") yaml_path = Path(tmp_path / "dump") - dump.to_yaml(yaml_path) + dump.to_yamls(yaml_path) scheduler_files = { "events.yaml",