Skip to content

Commit

Permalink
Fix hang when start sub pool fails (mars-project#2468)
Browse files Browse the repository at this point in the history
  • Loading branch information
hekaisheng authored Sep 18, 2021
1 parent 16f045c commit 5cdb251
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 17 deletions.
52 changes: 41 additions & 11 deletions mars/oscar/backends/mars/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
import os
import signal
import sys
from dataclasses import dataclass
from types import TracebackType
from typing import List

from ....utils import get_next_port
from ....utils import get_next_port, dataslots
from ..config import ActorPoolConfig
from ..message import CreateActorMessage
from ..pool import MainActorPoolBase, SubActorPoolBase, _register_message_handler
Expand Down Expand Up @@ -58,6 +60,15 @@ def _mp_kill(self):
logger = logging.getLogger(__name__)


@dataslots
@dataclass
class SubpoolStatus:
# for status, 0 is succeeded, 1 is failed
status: int = None
error: BaseException = None
traceback: TracebackType = None


@_register_message_handler
class MainActorPool(MainActorPoolBase):

Expand Down Expand Up @@ -107,29 +118,41 @@ async def start_sub_pool(

def start_pool_in_process():
ctx = multiprocessing.get_context(method=start_method)
started = ctx.Event()
status_queue = ctx.Queue()
process = ctx.Process(
target=cls._start_sub_pool,
args=(actor_pool_config, process_index, started),
args=(actor_pool_config, process_index, status_queue),
name=f'MarsActorPool{process_index}',
)
process.daemon = True
process.start()
# wait for sub actor pool to finish starting
started.wait()
return process
process_status = status_queue.get()
return process, process_status

loop = asyncio.get_running_loop()
executor = futures.ThreadPoolExecutor(1)
create_pool_task = loop.run_in_executor(executor, start_pool_in_process)
return await create_pool_task

@classmethod
async def wait_sub_pools_ready(cls,
create_pool_tasks: List[asyncio.Task]):
processes = []
for task in create_pool_tasks:
process, status = await task
if status.status == 1:
# start sub pool failed
raise status.error.with_traceback(status.traceback)
processes.append(process)
return processes

@classmethod
def _start_sub_pool(
cls,
actor_config: ActorPoolConfig,
process_index: int,
started: multiprocessing.Event):
status_queue: multiprocessing.Queue):
if not _is_windows:
try:
# register coverage hooks on SIGTERM
Expand Down Expand Up @@ -159,15 +182,16 @@ def _start_sub_pool(
else:
asyncio.set_event_loop(asyncio.new_event_loop())

coro = cls._create_sub_pool(actor_config, process_index, started)
coro = cls._create_sub_pool(actor_config, process_index, status_queue)
asyncio.run(coro)

@classmethod
async def _create_sub_pool(
cls,
actor_config: ActorPoolConfig,
process_index: int,
started: multiprocessing.Event):
status_queue: multiprocessing.Queue):
process_status = None
try:
env = actor_config.get_pool_config(process_index)['env']
if env:
Expand All @@ -176,9 +200,14 @@ async def _create_sub_pool(
'actor_pool_config': actor_config,
'process_index': process_index
})
process_status = SubpoolStatus(status=0)
await pool.start()
except: # noqa: E722 # nosec # pylint: disable=bare-except
_, error, tb = sys.exc_info()
process_status = SubpoolStatus(status=1, error=error, traceback=tb)
raise
finally:
started.set()
status_queue.put(process_status)
await pool.join()

async def kill_sub_pool(self, process: multiprocessing.Process,
Expand All @@ -203,8 +232,9 @@ async def recover_sub_pool(self, address: str):
process_index = self._config.get_process_index(address)
# process dead, restart it
# remember always use spawn to recover sub pool
self.sub_processes[address] = await self.__class__.start_sub_pool(
self._config, process_index, 'spawn')
task = asyncio.create_task(self.start_sub_pool(
self._config, process_index, 'spawn'))
self.sub_processes[address] = (await self.wait_sub_pools_ready([task]))[0]

if self._auto_recover == 'actor':
# need to recover all created actors
Expand Down
17 changes: 17 additions & 0 deletions mars/oscar/backends/mars/tests/test_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,23 @@ async def test_sub_actor_pool(notify_main_pool):
await pool.stop()


@pytest.mark.asyncio
async def test_fail_when_create_subpool():
config = ActorPoolConfig()
my_label = 'computation'
main_address = f'127.0.0.1:{get_next_port()}'
port = get_next_port()
_add_pool_conf(config, 0, 'main', 'unixsocket:///0', main_address)

# use the same port for sub pools, will raise `OSError` with "address already in use"
_add_pool_conf(config, 1, my_label, 'unixsocket:///1', f'127.0.0.1:{port}',
env={'my_env': '1'})
_add_pool_conf(config, 2, my_label, 'unixsocket:///2', f'127.0.0.1:{port}')

with pytest.raises(OSError):
await MainActorPool.create({'actor_pool_config': config})


@pytest.mark.asyncio
async def test_main_actor_pool():
config = ActorPoolConfig()
Expand Down
8 changes: 7 additions & 1 deletion mars/oscar/backends/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -912,7 +912,7 @@ async def create(cls, config: Dict) -> MainActorPoolType:
# await create_pool_task
tasks.append(create_pool_task)

processes = [await t for t in tasks]
processes = await cls.wait_sub_pools_ready(tasks)
# create main actor pool
pool: MainActorPoolType = await super().create(config)
addresses = actor_pool_config.get_external_addresses()[1:]
Expand Down Expand Up @@ -948,6 +948,12 @@ async def start_sub_pool(
start_method: str = None):
"""Start a sub actor pool"""

@classmethod
@abstractmethod
async def wait_sub_pools_ready(cls,
create_pool_tasks: List[asyncio.Task]):
"""Wait all sub pools ready """

def attach_sub_process(self,
external_address: str,
process: SubProcessHandle):
Expand Down
5 changes: 5 additions & 0 deletions mars/oscar/backends/ray/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ async def start_sub_pool(
await actor_handle.start.remote()
return actor_handle

@classmethod
async def wait_sub_pools_ready(cls,
create_pool_tasks: List[asyncio.Task]):
return [await t for t in create_pool_tasks]

async def recover_sub_pool(self, address: str):
process = self.sub_processes[address]
await process.start.remote()
Expand Down
15 changes: 10 additions & 5 deletions mars/oscar/backends/test/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

from ..config import ActorPoolConfig
from ..communication import gen_local_address, get_server_type, DummyServer
from ..mars.pool import MainActorPool, SubActorPool
from ..mars.pool import MainActorPool, SubActorPool, SubpoolStatus
from ..pool import ActorPoolType


Expand All @@ -41,22 +41,27 @@ async def start_sub_pool(
actor_pool_config: ActorPoolConfig,
process_index: int,
start_method: str = None):
started = multiprocessing.Event()
status_queue = multiprocessing.Queue()
return asyncio.create_task(
cls._create_sub_pool(actor_pool_config, process_index, started))
cls._create_sub_pool(actor_pool_config, process_index, status_queue))

@classmethod
async def wait_sub_pools_ready(cls,
create_pool_tasks: List[asyncio.Task]):
return [await t for t in create_pool_tasks]

@classmethod
async def _create_sub_pool(
cls,
actor_config: ActorPoolConfig,
process_index: int,
started: multiprocessing.Event):
status_queue: multiprocessing.Queue):
pool = await TestSubActorPool.create({
'actor_pool_config': actor_config,
'process_index': process_index
})
await pool.start()
started.set()
status_queue.put(SubpoolStatus(0))
await pool.join()

async def kill_sub_pool(self, process: multiprocessing.Process,
Expand Down

0 comments on commit 5cdb251

Please sign in to comment.