Skip to content

Commit

Permalink
Refine error message
Browse files Browse the repository at this point in the history
  • Loading branch information
hekaisheng committed Sep 18, 2021
1 parent 55c4d3b commit 8dea408
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 11 deletions.
26 changes: 17 additions & 9 deletions mars/oscar/backends/mars/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@
import os
import signal
import sys
from enum import Enum
from dataclasses import dataclass
from types import TracebackType
from typing import List

from ....utils import get_next_port
from ....utils import get_next_port, dataslots
from ..config import ActorPoolConfig
from ..message import CreateActorMessage
from ..pool import MainActorPoolBase, SubActorPoolBase, _register_message_handler
Expand Down Expand Up @@ -59,9 +60,13 @@ def _mp_kill(self):
logger = logging.getLogger(__name__)


class SubpoolStatus(Enum):
succeeded = 0
failed = 1
@dataslots
@dataclass
class SubpoolStatus:
# for status, 0 is succeeded, 1 is failed
status: int = None
error: BaseException = None
traceback: TracebackType = None


@_register_message_handler
Expand Down Expand Up @@ -136,8 +141,9 @@ async def wait_sub_pools_ready(cls,
processes = []
for task in create_pool_tasks:
process, status = await task
if status == SubpoolStatus.failed:
raise RuntimeError('Start sub pool failed.')
if status.status == 1:
# start sub pool failed
raise status.error.with_traceback(status.traceback)
processes.append(process)
return processes

Expand Down Expand Up @@ -185,7 +191,7 @@ async def _create_sub_pool(
actor_config: ActorPoolConfig,
process_index: int,
status: multiprocessing.Queue):
process_status = SubpoolStatus.succeeded
process_status = None
try:
env = actor_config.get_pool_config(process_index)['env']
if env:
Expand All @@ -194,9 +200,11 @@ async def _create_sub_pool(
'actor_pool_config': actor_config,
'process_index': process_index
})
process_status = SubpoolStatus(status=0)
await pool.start()
except: # noqa: E722 # nosec # pylint: disable=bare-except
process_status = SubpoolStatus.failed
_, error, tb = sys.exc_info()
process_status = SubpoolStatus(status=1, error=error, traceback=tb)
raise
finally:
status.put(process_status)
Expand Down
2 changes: 1 addition & 1 deletion mars/oscar/backends/mars/tests/test_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ async def test_fail_when_create_subpool():
env={'my_env': '1'})
_add_pool_conf(config, 2, my_label, 'unixsocket:///2', f'127.0.0.1:{port}')

with pytest.raises(RuntimeError):
with pytest.raises(OSError):
await MainActorPool.create({'actor_pool_config': config})


Expand Down
2 changes: 1 addition & 1 deletion mars/oscar/backends/test/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ async def _create_sub_pool(
'process_index': process_index
})
await pool.start()
status.put(SubpoolStatus.succeeded)
status.put(SubpoolStatus(0))
await pool.join()

async def kill_sub_pool(self, process: multiprocessing.Process,
Expand Down

0 comments on commit 8dea408

Please sign in to comment.