From dea9ef282a3a08d953bb118610994ea3ff7def9b Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Wed, 25 May 2022 16:56:32 +0200 Subject: [PATCH] Add a lock to `distributed.profile` for better concurrency control (#6421) Adds a Lock to distributed.profile to enable better concurrency control. In particular, it allows running garbage collection without a profiling thread holding references to objects, which is necessary for #6250. --- distributed/profile.py | 39 ++++++++-------------- distributed/protocol/tests/test_pickle.py | 10 +++--- distributed/tests/test_client.py | 18 +++++----- distributed/tests/test_diskutils.py | 18 +++++----- distributed/tests/test_failed_workers.py | 8 +++-- distributed/tests/test_nanny.py | 7 ++-- distributed/tests/test_profile.py | 40 +++++++++++++++++++++++ distributed/tests/test_spill.py | 7 ++-- distributed/tests/test_steal.py | 7 ++-- distributed/tests/test_worker.py | 6 ++-- 10 files changed, 94 insertions(+), 66 deletions(-) diff --git a/distributed/profile.py b/distributed/profile.py index a3a7bef94b..5432637e5d 100644 --- a/distributed/profile.py +++ b/distributed/profile.py @@ -44,6 +44,9 @@ from distributed.metrics import time from distributed.utils import color_of +#: This lock can be acquired to ensure that no instance of watch() is concurrently holding references to frames +lock = threading.Lock() + def identifier(frame: FrameType | None) -> str: """A string identifier from a frame @@ -314,18 +317,6 @@ def traverse(state, start, stop, height): } -_watch_running: set[int] = set() - - -def wait_profiler() -> None: - """Wait until a moment when no instances of watch() are sampling the frames. - You must call this function whenever you would otherwise expect an object to be - immediately released after it's descoped. - """ - while _watch_running: - sleep(0.0001) - - def _watch( thread_id: int, log: deque[tuple[float, dict[str, Any]]], # [(timestamp, output of create()), ...] @@ -337,24 +328,20 @@ def _watch( recent = create() last = time() - watch_id = threading.get_ident() while not stop(): - _watch_running.add(watch_id) - try: - if time() > last + cycle: + if time() > last + cycle: + recent = create() + with lock: log.append((time(), recent)) - recent = create() last = time() - try: - frame = sys._current_frames()[thread_id] - except KeyError: - return - - process(frame, None, recent, omit=omit) - del frame - finally: - _watch_running.remove(watch_id) + try: + frame = sys._current_frames()[thread_id] + except KeyError: + return + + process(frame, None, recent, omit=omit) + del frame sleep(interval) diff --git a/distributed/protocol/tests/test_pickle.py b/distributed/protocol/tests/test_pickle.py index 7fb486857c..02310db6ae 100644 --- a/distributed/protocol/tests/test_pickle.py +++ b/distributed/protocol/tests/test_pickle.py @@ -5,7 +5,7 @@ import pytest -from distributed.profile import wait_profiler +from distributed import profile from distributed.protocol import deserialize, serialize from distributed.protocol.pickle import HIGHEST_PROTOCOL, dumps, loads @@ -181,7 +181,7 @@ def funcs(): assert func3(1) == func(1) del func, func2, func3 - wait_profiler() - assert wr() is None - assert wr2() is None - assert wr3() is None + with profile.lock: + assert wr() is None + assert wr2() is None + assert wr3() is None diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index fe2fed711a..b82f36e27f 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -69,7 +69,6 @@ from distributed.compatibility import LINUX, WINDOWS from distributed.core import Server, Status from distributed.metrics import time -from distributed.profile import wait_profiler from distributed.scheduler import CollectTaskMetaDataPlugin, KilledWorker, Scheduler from distributed.sizeof import sizeof from distributed.utils import is_valid_xml, mp_context, sync, tmp_text @@ -678,8 +677,8 @@ def test_no_future_references(c): futures = c.map(inc, range(10)) ws.update(futures) del futures - wait_profiler() - assert not list(ws) + with profile.lock: + assert not list(ws) def test_get_sync_optimize_graph_passes_through(c): @@ -811,9 +810,9 @@ async def test_recompute_released_key(c, s, a, b): result1 = await x xkey = x.key del x - wait_profiler() - await asyncio.sleep(0) - assert c.refcount[xkey] == 0 + with profile.lock: + await asyncio.sleep(0) + assert c.refcount[xkey] == 0 # 1 second batching needs a second action to trigger while xkey in s.tasks and s.tasks[xkey].who_has or xkey in a.data or xkey in b.data: @@ -3483,10 +3482,9 @@ async def test_Client_clears_references_after_restart(c, s, a, b): key = x.key del x - wait_profiler() - await asyncio.sleep(0) - - assert key not in c.refcount + with profile.lock: + await asyncio.sleep(0) + assert key not in c.refcount @gen_cluster(Worker=Nanny, client=True) diff --git a/distributed/tests/test_diskutils.py b/distributed/tests/test_diskutils.py index c9f1a91a63..9127651cfd 100644 --- a/distributed/tests/test_diskutils.py +++ b/distributed/tests/test_diskutils.py @@ -12,10 +12,10 @@ import dask +from distributed import profile from distributed.compatibility import WINDOWS from distributed.diskutils import WorkSpace from distributed.metrics import time -from distributed.profile import wait_profiler from distributed.utils import mp_context from distributed.utils_test import captured_logger @@ -53,8 +53,8 @@ def test_workdir_simple(tmpdir): a.release() assert_contents(["bb", "bb.dirlock"]) del b - wait_profiler() - gc.collect() + with profile.lock: + gc.collect() assert_contents([]) # Generated temporary name with a prefix @@ -89,12 +89,12 @@ def test_two_workspaces_in_same_directory(tmpdir): del ws del b - wait_profiler() - gc.collect() + with profile.lock: + gc.collect() assert_contents(["aa", "aa.dirlock"], trials=5) del a - wait_profiler() - gc.collect() + with profile.lock: + gc.collect() assert_contents([], trials=5) @@ -188,8 +188,8 @@ def test_locking_disabled(tmpdir): a.release() assert_contents(["bb"]) del b - wait_profiler() - gc.collect() + with profile.lock: + gc.collect() assert_contents([]) lock_file.assert_not_called() diff --git a/distributed/tests/test_failed_workers.py b/distributed/tests/test_failed_workers.py index 371eb8ae54..dfdfa2c003 100644 --- a/distributed/tests/test_failed_workers.py +++ b/distributed/tests/test_failed_workers.py @@ -10,11 +10,10 @@ from dask import delayed -from distributed import Client, Nanny, wait +from distributed import Client, Nanny, profile, wait from distributed.comm import CommClosedError from distributed.compatibility import MACOS from distributed.metrics import time -from distributed.profile import wait_profiler from distributed.utils import CancelledError, sync from distributed.utils_test import ( captured_logger, @@ -262,7 +261,10 @@ async def test_forgotten_futures_dont_clean_up_new_futures(c, s, a, b): await c.restart() y = c.submit(inc, 1) del x - wait_profiler() + + # Ensure that the profiler has stopped and released all references to x so that it can be garbage-collected + with profile.lock: + pass await asyncio.sleep(0.1) await y diff --git a/distributed/tests/test_nanny.py b/distributed/tests/test_nanny.py index 6e52b38f9a..619ab299fd 100644 --- a/distributed/tests/test_nanny.py +++ b/distributed/tests/test_nanny.py @@ -19,12 +19,11 @@ import dask from dask.utils import tmpfile -from distributed import Nanny, Scheduler, Worker, rpc, wait, worker +from distributed import Nanny, Scheduler, Worker, profile, rpc, wait, worker from distributed.compatibility import LINUX, WINDOWS from distributed.core import CommClosedError, Status from distributed.diagnostics import SchedulerPlugin from distributed.metrics import time -from distributed.profile import wait_profiler from distributed.protocol.pickle import dumps from distributed.utils import TimeoutError, parse_ports from distributed.utils_test import ( @@ -170,8 +169,8 @@ async def test_num_fds(s): # Warm up async with Nanny(s.address): pass - wait_profiler() - gc.collect() + with profile.lock: + gc.collect() before = proc.num_fds() diff --git a/distributed/tests/test_profile.py b/distributed/tests/test_profile.py index 92fb6c1cfe..e0563b7c7f 100644 --- a/distributed/tests/test_profile.py +++ b/distributed/tests/test_profile.py @@ -18,6 +18,7 @@ info_frame, ll_get_stack, llprocess, + lock, merge, plot_data, process, @@ -207,6 +208,45 @@ def stop(): sleep(0.01) +def test_watch_requires_lock_to_run(): + start = time() + + def stop_lock(): + return time() > start + 0.600 + + def stop_profile(): + return time() > start + 0.500 + + def hold_lock(stop): + with lock: + while not stop(): + sleep(0.1) + + start_threads = threading.active_count() + + # Hog the lock over the entire duration of watch + thread = threading.Thread( + target=hold_lock, name="Hold Lock", kwargs={"stop": stop_lock} + ) + thread.daemon = True + thread.start() + + log = watch(interval="10ms", cycle="50ms", stop=stop_profile) + + start = time() # wait until thread starts up + while threading.active_count() < start_threads + 2: + assert time() < start + 2 + sleep(0.01) + + sleep(0.5) + assert len(log) == 0 + + start = time() + while threading.active_count() > start_threads: + assert time() < start + 2 + sleep(0.01) + + @dataclasses.dataclass(frozen=True) class FakeCode: co_filename: str diff --git a/distributed/tests/test_spill.py b/distributed/tests/test_spill.py index 4bd0887530..7c14013c37 100644 --- a/distributed/tests/test_spill.py +++ b/distributed/tests/test_spill.py @@ -8,8 +8,8 @@ from dask.sizeof import sizeof +from distributed import profile from distributed.compatibility import WINDOWS -from distributed.profile import wait_profiler from distributed.protocol import serialize_bytelist from distributed.spill import SpillBuffer, has_zict_210, has_zict_220 from distributed.utils_test import captured_logger @@ -338,7 +338,10 @@ def test_weakref_cache(tmpdir, cls, expect_cached, size): # the same id as a deleted one id_x = x.id del x - wait_profiler() + + # Ensure that the profiler has stopped and released all references to x so that it can be garbage-collected + with profile.lock: + pass if size < 100: buf["y"] diff --git a/distributed/tests/test_steal.py b/distributed/tests/test_steal.py index 9fc2420d6b..4b68f92759 100644 --- a/distributed/tests/test_steal.py +++ b/distributed/tests/test_steal.py @@ -12,12 +12,11 @@ import dask -from distributed import Event, Lock, Nanny, Worker, wait, worker_client +from distributed import Event, Lock, Nanny, Worker, profile, wait, worker_client from distributed.compatibility import LINUX from distributed.config import config from distributed.core import Status from distributed.metrics import time -from distributed.profile import wait_profiler from distributed.scheduler import key_split from distributed.system import MEMORY_LIMIT from distributed.utils_test import ( @@ -948,8 +947,8 @@ class Foo: assert not s.tasks - wait_profiler() - assert not list(ws) + with profile.lock: + assert not list(ws) @gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 2) diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index 264676fbad..128df44069 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -34,6 +34,7 @@ default_client, get_client, get_worker, + profile, wait, ) from distributed.comm.registry import backends @@ -42,7 +43,6 @@ from distributed.diagnostics import nvml from distributed.diagnostics.plugin import PipInstall from distributed.metrics import time -from distributed.profile import wait_profiler from distributed.protocol import pickle from distributed.scheduler import Scheduler from distributed.utils_test import ( @@ -1851,8 +1851,8 @@ class C: del f while "f" in a.data: await asyncio.sleep(0.01) - wait_profiler() - assert ref() is None + with profile.lock: + assert ref() is None story = a.stimulus_story("f", "f2") assert {ev.key for ev in story} == {"f", "f2"}