diff --git a/distributed/core.py b/distributed/core.py index 6c86db301b6..b17177e2805 100644 --- a/distributed/core.py +++ b/distributed/core.py @@ -662,11 +662,6 @@ def stop(self): if self.__stopped: return - if self._workdir is not None: - self._workdir.release() - - self.monitor.close() - self.__stopped = True _stops = set() for listener in self.listeners: @@ -687,6 +682,11 @@ async def background_stops(): self._ongoing_background_tasks.call_soon(background_stops) + self.monitor.close() + + if self._workdir is not None: + self._workdir.release() + @property def listener(self): if self.listeners: diff --git a/distributed/worker.py b/distributed/worker.py index 737938f52f6..27dbc5166f2 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -6,6 +6,7 @@ import contextlib import contextvars import errno +import inspect import logging import math import os @@ -1571,8 +1572,6 @@ async def close( # type: ignore for pc in self.periodic_callbacks.values(): pc.stop() - self.stop() - if self._client: # If this worker is the last one alive, clean up the worker # initialized clients @@ -1595,7 +1594,27 @@ async def close( # type: ignore # otherwise c.close() - await self.scheduler.close_rpc() + # FIXME: Copy-paste from `Server.stop`. See dask/distributed#8077 + _stops = set() + for listener in self.listeners: + future = listener.stop() + if inspect.isawaitable(future): + _stops.add(future) + try: + abort_handshaking_comms = listener.abort_handshaking_comms + except AttributeError: + pass + else: + abort_handshaking_comms() + + if _stops: + + async def background_stops(): + await asyncio.gather(*_stops) + + # end copy-paste + + await self.rpc.close() # Give some time for a UCX scheduler to complete closing endpoints # before closing self.batched_stream, otherwise the local endpoint @@ -1644,8 +1663,7 @@ def _close(executor, wait): executor=executor, wait=executor_wait ) # Just run it directly - await self.rpc.close() - + self.stop() self.status = Status.closed setproctitle("dask worker [closed]")