diff --git a/mars/oscar/backends/mars/pool.py b/mars/oscar/backends/mars/pool.py index 2ab4d200ca..ea383d8772 100644 --- a/mars/oscar/backends/mars/pool.py +++ b/mars/oscar/backends/mars/pool.py @@ -19,10 +19,11 @@ import os import signal import sys -from enum import Enum +from dataclasses import dataclass +from types import TracebackType from typing import List -from ....utils import get_next_port +from ....utils import get_next_port, dataslots from ..config import ActorPoolConfig from ..message import CreateActorMessage from ..pool import MainActorPoolBase, SubActorPoolBase, _register_message_handler @@ -59,9 +60,13 @@ def _mp_kill(self): logger = logging.getLogger(__name__) -class SubpoolStatus(Enum): - succeeded = 0 - failed = 1 +@dataslots +@dataclass +class SubpoolStatus: + # for status, 0 is succeeded, 1 is failed + status: int = None + error: BaseException = None + traceback: TracebackType = None @_register_message_handler @@ -136,8 +141,9 @@ async def wait_sub_pools_ready(cls, processes = [] for task in create_pool_tasks: process, status = await task - if status == SubpoolStatus.failed: - raise RuntimeError('Start sub pool failed.') + if status.status == 1: + # start sub pool failed + raise status.error.with_traceback(status.traceback) processes.append(process) return processes @@ -185,7 +191,7 @@ async def _create_sub_pool( actor_config: ActorPoolConfig, process_index: int, status: multiprocessing.Queue): - process_status = SubpoolStatus.succeeded + process_status = None try: env = actor_config.get_pool_config(process_index)['env'] if env: @@ -194,9 +200,11 @@ async def _create_sub_pool( 'actor_pool_config': actor_config, 'process_index': process_index }) + process_status = SubpoolStatus(status=0) await pool.start() except: # noqa: E722 # nosec # pylint: disable=bare-except - process_status = SubpoolStatus.failed + _, error, tb = sys.exc_info() + process_status = SubpoolStatus(status=1, error=error, traceback=tb) raise finally: status.put(process_status) diff --git a/mars/oscar/backends/mars/tests/test_pool.py b/mars/oscar/backends/mars/tests/test_pool.py index 4c3123c15e..fc5425ee54 100644 --- a/mars/oscar/backends/mars/tests/test_pool.py +++ b/mars/oscar/backends/mars/tests/test_pool.py @@ -238,7 +238,7 @@ async def test_fail_when_create_subpool(): env={'my_env': '1'}) _add_pool_conf(config, 2, my_label, 'unixsocket:///2', f'127.0.0.1:{port}') - with pytest.raises(RuntimeError): + with pytest.raises(OSError): await MainActorPool.create({'actor_pool_config': config}) diff --git a/mars/oscar/backends/test/pool.py b/mars/oscar/backends/test/pool.py index 8e9e49b5bb..7c0fae2fdf 100644 --- a/mars/oscar/backends/test/pool.py +++ b/mars/oscar/backends/test/pool.py @@ -61,7 +61,7 @@ async def _create_sub_pool( 'process_index': process_index }) await pool.start() - status.put(SubpoolStatus.succeeded) + status.put(SubpoolStatus(0)) await pool.join() async def kill_sub_pool(self, process: multiprocessing.Process,