Skip to content

Commit

Permalink
begin synchronous stops
Browse files Browse the repository at this point in the history
with no grace period sync stop methods on listeners will never be
started.
  • Loading branch information
graingert committed Jun 27, 2022
1 parent 0c8e80a commit d93c7b2
Showing 1 changed file with 15 additions and 9 deletions.
24 changes: 15 additions & 9 deletions distributed/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -514,17 +514,22 @@ def start_periodic_callbacks(self):
pc.start()

def stop(self):
if not self.__stopped:
self.__stopped = True
if self.__stopped:
return

for listener in self.listeners:
self.__stopped = True
_stops = set()
for listener in self.listeners:
future = listener.stop()
if inspect.isawaitable(future):
_stops.add(future)

async def stop_listener(listener):
v = listener.stop()
if inspect.isawaitable(v):
await v
if _stops:

self._ongoing_background_tasks.call_soon(stop_listener, listener)
async def background_stops():
await asyncio.gather(*_stops)

self._ongoing_background_tasks.call_soon(background_stops)

@property
def listener(self):
Expand Down Expand Up @@ -865,7 +870,8 @@ async def close(self, timeout=None):
future = listener.stop()
if inspect.isawaitable(future):
_stops.add(future)
await asyncio.gather(*_stops)
if _stops:
await asyncio.gather(*_stops)

# TODO: Deal with exceptions
await self._ongoing_background_tasks.stop()
Expand Down

0 comments on commit d93c7b2

Please sign in to comment.