Skip to content

Commit

Permalink
Clean up nanny WorkerProcess.kill (#6972)
Browse files Browse the repository at this point in the history
  • Loading branch information
gjoseph92 authored Aug 31, 2022
1 parent 817ead3 commit f07f384
Showing 1 changed file with 20 additions and 12 deletions.
32 changes: 20 additions & 12 deletions distributed/nanny.py
Original file line number Diff line number Diff line change
Expand Up @@ -780,29 +780,37 @@ async def kill(self, timeout: float = 2, executor_wait: bool = True) -> None:
logger.info("Nanny asking worker to close")

process = self.process
assert self.process
assert process
queue = self.child_stop_q
assert queue
wait_timeout = timeout * 0.8
self.child_stop_q.put(
queue.put(
{
"op": "stop",
"timeout": wait_timeout,
"executor_wait": executor_wait,
}
)
await asyncio.sleep(0) # otherwise we get broken pipe errors
self.child_stop_q.close()
queue.close()
del queue

try:
await process.join(wait_timeout)
return
except asyncio.TimeoutError:
pass
try:
await process.join(wait_timeout)
return
except asyncio.TimeoutError:
pass

logger.warning(
f"Worker process still alive after {wait_timeout} seconds, killing"
)
await process.kill()
await process.join(max(0, deadline - time()))
logger.warning(
f"Worker process still alive after {wait_timeout} seconds, killing"
)
await process.kill()
await process.join(max(0, deadline - time()))
except ValueError as e:
if "invalid operation on closed AsyncProcess" in str(e):
return
raise

async def _wait_until_connected(self, uid):
while True:
Expand Down

0 comments on commit f07f384

Please sign in to comment.