From 1fcc80dc977f535f2a3f294894c13721ef2a2730 Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Mon, 27 Jun 2022 10:54:37 +0100 Subject: [PATCH 1/7] remove Server._ongoing_comm_handlers tasks were never started here --- distributed/core.py | 4 ---- distributed/tests/test_core.py | 9 +++++---- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/distributed/core.py b/distributed/core.py index 85d0707da09..e2c4922377e 100644 --- a/distributed/core.py +++ b/distributed/core.py @@ -359,7 +359,6 @@ def __init__( self.counters = None self.digests = None self._ongoing_background_tasks = AsyncTaskGroup() - self._ongoing_comm_handlers = AsyncTaskGroup() self._event_finished = asyncio.Event() self.listeners = [] @@ -879,9 +878,6 @@ async def close(self, timeout=None): # TODO: Deal with exceptions await self._ongoing_background_tasks.stop(timeout=1) - # TODO: Deal with exceptions - await self._ongoing_comm_handlers.stop(timeout=1) - await self.rpc.close() await asyncio.gather(*[comm.close() for comm in list(self._comms)]) finally: diff --git a/distributed/tests/test_core.py b/distributed/tests/test_core.py index 0942fe67f1e..57467e60c9d 100644 --- a/distributed/tests/test_core.py +++ b/distributed/tests/test_core.py @@ -40,7 +40,6 @@ assert_can_connect_locally_4, assert_can_connect_locally_6, assert_cannot_connect, - async_wait_for, captured_logger, gen_cluster, gen_test, @@ -1065,9 +1064,12 @@ async def test_close_properly(): GH4704 """ + sleep_started = asyncio.Event() + async def sleep(comm=None): # We want to ensure this is actually canceled therefore don't give it a # chance to actually complete + sleep_started.set() await asyncio.sleep(2000000) server = await Server({"sleep": sleep}) @@ -1087,8 +1089,7 @@ async def sleep(comm=None): comm = await remote.live_comm() await comm.write({"op": "sleep"}) - - await async_wait_for(lambda: not server._ongoing_comm_handlers, 10) + await sleep_started.wait() listeners = server.listeners assert len(listeners) == len(ports) @@ -1102,7 +1103,7 @@ async def sleep(comm=None): await assert_cannot_connect(f"tcp://{ip}:{port}") # weakref set/dict should be cleaned up - assert not len(server._ongoing_comm_handlers) + assert not len(server._ongoing_background_tasks) @gen_test() From d4281815109bb00002fe47480da98b8c6efbebd7 Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Mon, 27 Jun 2022 11:10:38 +0100 Subject: [PATCH 2/7] remove server close grace period --- distributed/core.py | 36 ++++++++++++---------------------- distributed/tests/test_core.py | 22 ++++++++++----------- 2 files changed, 24 insertions(+), 34 deletions(-) diff --git a/distributed/core.py b/distributed/core.py index e2c4922377e..96764cfb886 100644 --- a/distributed/core.py +++ b/distributed/core.py @@ -237,36 +237,26 @@ def close(self) -> None: """ self.closed = True - async def stop(self, timeout: float = 1) -> None: + async def stop(self) -> None: """Close the group and stop all currently running tasks. - Closes the task group and waits `timeout` seconds for all tasks to gracefully finish. - After the timeout, all remaining tasks are cancelled. + Closes the task group and cancels all tasks. All tasks are cancelled + an additional time for each time this task is cancelled. """ self.close() current_task = asyncio.current_task(self._get_loop()) - tasks_to_stop = [t for t in self._ongoing_tasks if t is not current_task] - - if tasks_to_stop: - # Wrap gather in task to avoid Python3.8 issue, - # see https://github.com/dask/distributed/pull/6478#discussion_r885696827 - async def gather(): - return await asyncio.gather(*tasks_to_stop, return_exceptions=True) - + err = None + while tasks_to_stop := (self._ongoing_tasks - {current_task}): + for task in tasks_to_stop: + task.cancel() try: - await asyncio.wait_for( - gather(), - timeout, - ) - except asyncio.TimeoutError: - # The timeout on gather has cancelled the tasks, so this will not hang indefinitely - await asyncio.gather(*tasks_to_stop, return_exceptions=True) + await asyncio.wait(tasks_to_stop) + except asyncio.CancelledError as e: + err = e - if [t for t in self._ongoing_tasks if t is not current_task]: - raise RuntimeError( - f"Expected all ongoing tasks to be cancelled and removed, found {self._ongoing_tasks}." - ) + if err is not None: + raise err def __len__(self): return len(self._ongoing_tasks) @@ -876,7 +866,7 @@ async def close(self, timeout=None): await asyncio.gather(*_stops) # TODO: Deal with exceptions - await self._ongoing_background_tasks.stop(timeout=1) + await self._ongoing_background_tasks.stop() await self.rpc.close() await asyncio.gather(*[comm.close() for comm in list(self._comms)]) diff --git a/distributed/tests/test_core.py b/distributed/tests/test_core.py index 57467e60c9d..24306c60c5e 100644 --- a/distributed/tests/test_core.py +++ b/distributed/tests/test_core.py @@ -189,25 +189,21 @@ async def set_flag(): @gen_test() -async def test_async_task_group_stop_allows_shutdown(): +async def test_async_task_group_stop_disallows_shutdown(): group = AsyncTaskGroup() task = None async def set_flag(): nonlocal task - while not group.closed: - await asyncio.sleep(0.01) task = asyncio.current_task() - return None assert group.call_soon(set_flag) is None assert len(group) == 1 - # when given a grace period of 1 second tasks are allowed to poll group.stop - # before awaiting other async functions - await group.stop(timeout=1) - assert task.done() - assert not task.cancelled() + # tasks are not given a grace period, and are not even allowed to start + # if the group is closed immediately + await group.stop() + assert task is None @gen_test() @@ -216,10 +212,12 @@ async def test_async_task_group_stop_cancels_long_running(): task = None flag = False + started = asyncio.Event() async def set_flag(): nonlocal task task = asyncio.current_task() + started.set() await asyncio.sleep(10) nonlocal flag flag = True @@ -227,9 +225,11 @@ async def set_flag(): assert group.call_soon(set_flag) is None assert len(group) == 1 - await group.stop(timeout=1) - assert not flag + await started.wait() + await group.stop() + assert task assert task.cancelled() + assert not flag @gen_test() From 6bacdbf8eb2f2adb377456b0e5486f174c26198e Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Mon, 27 Jun 2022 12:49:35 +0100 Subject: [PATCH 3/7] begin synchronous stops with no grace period sync stop methods on listeners will never be started. --- distributed/core.py | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/distributed/core.py b/distributed/core.py index 96764cfb886..f23031b36fd 100644 --- a/distributed/core.py +++ b/distributed/core.py @@ -512,17 +512,22 @@ def start_periodic_callbacks(self): pc.start() def stop(self): - if not self.__stopped: - self.__stopped = True + if self.__stopped: + return - for listener in self.listeners: + self.__stopped = True + _stops = set() + for listener in self.listeners: + future = listener.stop() + if inspect.isawaitable(future): + _stops.add(future) - async def stop_listener(listener): - v = listener.stop() - if inspect.isawaitable(v): - await v + if _stops: - self._ongoing_background_tasks.call_soon(stop_listener, listener) + async def background_stops(): + await asyncio.gather(*_stops) + + self._ongoing_background_tasks.call_soon(background_stops) @property def listener(self): @@ -863,7 +868,8 @@ async def close(self, timeout=None): future = listener.stop() if inspect.isawaitable(future): _stops.add(future) - await asyncio.gather(*_stops) + if _stops: + await asyncio.gather(*_stops) # TODO: Deal with exceptions await self._ongoing_background_tasks.stop() From b9767caea78048de6564d8f02fdd59cfaff4ba91 Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Tue, 28 Jun 2022 09:55:08 +0100 Subject: [PATCH 4/7] aggressively poll for 'resumed' state in cancelled_state tests --- distributed/tests/test_cancelled_state.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/distributed/tests/test_cancelled_state.py b/distributed/tests/test_cancelled_state.py index f4c806e6af4..657062808e2 100644 --- a/distributed/tests/test_cancelled_state.py +++ b/distributed/tests/test_cancelled_state.py @@ -157,7 +157,7 @@ def blockable_compute(x, lock): # Close in scheduler to ensure we transition and reschedule task properly await s.close_worker(worker=a.address, stimulus_id="test") - await wait_for_state(fut1.key, "resumed", b) + await wait_for_state(fut1.key, "resumed", b, interval=0) block_get_data.release() await block_compute.release() @@ -417,7 +417,7 @@ async def get_data(self, comm, *args, **kwargs): ) await s.remove_worker(w1.address, stimulus_id="stim-id") - await wait_for_state(f3.key, "resumed", w2) + await wait_for_state(f3.key, "resumed", w2, interval=0) assert_story( w2.state.log, [ From 21ddc9bfd203d3768bcc9e2d0f25af20ad604a39 Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Tue, 28 Jun 2022 11:27:03 +0100 Subject: [PATCH 5/7] start waiting for resumed before calling close --- distributed/tests/test_cancelled_state.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/distributed/tests/test_cancelled_state.py b/distributed/tests/test_cancelled_state.py index 657062808e2..516d34974e1 100644 --- a/distributed/tests/test_cancelled_state.py +++ b/distributed/tests/test_cancelled_state.py @@ -156,8 +156,10 @@ def blockable_compute(x, lock): await block_compute.acquire() # Close in scheduler to ensure we transition and reschedule task properly - await s.close_worker(worker=a.address, stimulus_id="test") - await wait_for_state(fut1.key, "resumed", b, interval=0) + await asyncio.gather( + wait_for_state(fut1.key, "resumed", b, interval=0), + s.close_worker(worker=a.address, stimulus_id="test"), + ) block_get_data.release() await block_compute.release() @@ -415,9 +417,10 @@ async def get_data(self, comm, *args, **kwargs): f3.key: {w2.address}, } ) - await s.remove_worker(w1.address, stimulus_id="stim-id") - - await wait_for_state(f3.key, "resumed", w2, interval=0) + await asyncio.gather( + wait_for_state(f3.key, "resumed", w2, interval=0), + s.remove_worker(w1.address, stimulus_id="stim-id"), + ) assert_story( w2.state.log, [ From 008d66a7c917c7b635386d5b1af422157cb84db2 Mon Sep 17 00:00:00 2001 From: fjetter Date: Thu, 30 Jun 2022 13:27:57 +0200 Subject: [PATCH 6/7] Add PendingDeprecationWarning --- distributed/core.py | 5 +++++ distributed/tests/test_core.py | 21 +++++++++++++++++++++ 2 files changed, 26 insertions(+) diff --git a/distributed/core.py b/distributed/core.py index f23031b36fd..0dcd3cbdee8 100644 --- a/distributed/core.py +++ b/distributed/core.py @@ -867,6 +867,11 @@ async def close(self, timeout=None): for listener in self.listeners: future = listener.stop() if inspect.isawaitable(future): + warnings.warn( + f"{type(listener)} is using an asynchronous `stop` method. " + "Support for asynchronous `Listener.stop` will be removed in a future version", + PendingDeprecationWarning, + ) _stops.add(future) if _stops: await asyncio.gather(*_stops) diff --git a/distributed/tests/test_core.py b/distributed/tests/test_core.py index 24306c60c5e..32cf6fb5f07 100644 --- a/distributed/tests/test_core.py +++ b/distributed/tests/test_core.py @@ -14,6 +14,8 @@ import dask from distributed.comm.core import CommClosedError +from distributed.comm.registry import backends +from distributed.comm.tcp import TCPBackend, TCPListener from distributed.core import ( AsyncTaskGroup, AsyncTaskGroupClosedError, @@ -1234,3 +1236,22 @@ def stream_not_leading_position(self, other, stream): assert not _expects_comm(instance.comm_not_leading_position) assert not _expects_comm(instance.stream_not_leading_position) + + +class AsyncStopTCPListener(TCPListener): + async def stop(self): + await asyncio.sleep(0) + super().stop() + + +class TCPAsyncListenerBackend(TCPBackend): + _listener_class = AsyncStopTCPListener + + +@gen_test() +async def test_async_listener_stop(monkeypatch): + monkeypatch.setitem(backends, "tcp", TCPAsyncListenerBackend()) + with pytest.warns(PendingDeprecationWarning): + async with Server({}) as s: + await s.listen(0) + assert s.listeners From 4a053685aa5ef3bf44775f2a6c704558eb32b642 Mon Sep 17 00:00:00 2001 From: fjetter Date: Thu, 30 Jun 2022 19:19:13 +0200 Subject: [PATCH 7/7] More robust test --- distributed/tests/test_cancelled_state.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/distributed/tests/test_cancelled_state.py b/distributed/tests/test_cancelled_state.py index a2497ca9337..b75b3b2f275 100644 --- a/distributed/tests/test_cancelled_state.py +++ b/distributed/tests/test_cancelled_state.py @@ -158,14 +158,15 @@ def blockable_compute(x, lock): allow_other_workers=True, key="fut1", ) + await wait(fut1) + await block_compute.acquire() fut2 = c.submit(inc, fut1, workers=[b.address], key="fut2") await enter_get_data.wait() - await block_compute.acquire() # Close in scheduler to ensure we transition and reschedule task properly - s.close_worker(worker=a.address) - await wait_for_state(fut1.key, "resumed", b, interval=0) + await s.remove_worker(a.address, stimulus_id="test", close=False) + await wait_for_stimulus(ComputeTaskEvent, b, key=fut1.key) block_get_data.release() await block_compute.release()