From a30e25355795d96de03a39a7ee098c1fcb22676d Mon Sep 17 00:00:00 2001 From: codingl2k1 <138426806+codingl2k1@users.noreply.github.com> Date: Thu, 12 Dec 2024 15:14:19 +0100 Subject: [PATCH] BUG: Terminate sub processes at exit (#119) --- python/xoscar/backends/indigen/pool.py | 16 ++++++++++++++++ python/xoscar/backends/pool.py | 2 +- 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/python/xoscar/backends/indigen/pool.py b/python/xoscar/backends/indigen/pool.py index 779dfc3f..f23213dc 100644 --- a/python/xoscar/backends/indigen/pool.py +++ b/python/xoscar/backends/indigen/pool.py @@ -16,6 +16,7 @@ from __future__ import annotations import asyncio +import atexit import concurrent.futures as futures import configparser import contextlib @@ -29,6 +30,7 @@ import threading import uuid from dataclasses import dataclass +from multiprocessing import util from types import TracebackType from typing import List, Optional @@ -79,6 +81,19 @@ def _mp_kill(self): _init_main_suspended_local = threading.local() +def _terminate_children(): + for c in multiprocessing.active_children(): + try: + c.terminate() + except Exception: + pass + + +if util: + # Import multiprocessing.util to register _exit_function at exit. + atexit.register(_terminate_children) + + def _patch_spawn_get_preparation_data(): try: from multiprocessing import spawn as mp_spawn @@ -309,6 +324,7 @@ async def _create_sub_pool( raise finally: status_queue.put(process_status) + status_queue.cancel_join_thread() await pool.join() async def append_sub_pool( diff --git a/python/xoscar/backends/pool.py b/python/xoscar/backends/pool.py index 93fd1645..a3cf86ef 100644 --- a/python/xoscar/backends/pool.py +++ b/python/xoscar/backends/pool.py @@ -377,7 +377,7 @@ async def _send_channel( try: await channel.send(result) except (ChannelClosed, ConnectionResetError): - if not self._stopped.is_set(): + if not self._stopped.is_set() and not channel.closed: raise except Exception as ex: logger.exception(