Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix hang when start sub pool fails #2468

Merged
merged 5 commits into from
Sep 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 41 additions & 11 deletions mars/oscar/backends/mars/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
import os
import signal
import sys
from dataclasses import dataclass
from types import TracebackType
from typing import List

from ....utils import get_next_port
from ....utils import get_next_port, dataslots
from ..config import ActorPoolConfig
from ..message import CreateActorMessage
from ..pool import MainActorPoolBase, SubActorPoolBase, _register_message_handler
Expand Down Expand Up @@ -58,6 +60,15 @@ def _mp_kill(self):
logger = logging.getLogger(__name__)


@dataslots
@dataclass
class SubpoolStatus:
# for status, 0 is succeeded, 1 is failed
status: int = None
error: BaseException = None
traceback: TracebackType = None


@_register_message_handler
class MainActorPool(MainActorPoolBase):

Expand Down Expand Up @@ -107,29 +118,41 @@ async def start_sub_pool(

def start_pool_in_process():
ctx = multiprocessing.get_context(method=start_method)
started = ctx.Event()
status_queue = ctx.Queue()
process = ctx.Process(
target=cls._start_sub_pool,
args=(actor_pool_config, process_index, started),
args=(actor_pool_config, process_index, status_queue),
name=f'MarsActorPool{process_index}',
)
process.daemon = True
process.start()
# wait for sub actor pool to finish starting
started.wait()
return process
process_status = status_queue.get()
return process, process_status

loop = asyncio.get_running_loop()
executor = futures.ThreadPoolExecutor(1)
create_pool_task = loop.run_in_executor(executor, start_pool_in_process)
return await create_pool_task

@classmethod
async def wait_sub_pools_ready(cls,
create_pool_tasks: List[asyncio.Task]):
processes = []
for task in create_pool_tasks:
process, status = await task
if status.status == 1:
# start sub pool failed
raise status.error.with_traceback(status.traceback)
processes.append(process)
return processes

@classmethod
def _start_sub_pool(
cls,
actor_config: ActorPoolConfig,
process_index: int,
started: multiprocessing.Event):
status_queue: multiprocessing.Queue):
if not _is_windows:
try:
# register coverage hooks on SIGTERM
Expand Down Expand Up @@ -159,15 +182,16 @@ def _start_sub_pool(
else:
asyncio.set_event_loop(asyncio.new_event_loop())

coro = cls._create_sub_pool(actor_config, process_index, started)
coro = cls._create_sub_pool(actor_config, process_index, status_queue)
asyncio.run(coro)

@classmethod
async def _create_sub_pool(
cls,
actor_config: ActorPoolConfig,
process_index: int,
started: multiprocessing.Event):
status_queue: multiprocessing.Queue):
process_status = None
try:
env = actor_config.get_pool_config(process_index)['env']
if env:
Expand All @@ -176,9 +200,14 @@ async def _create_sub_pool(
'actor_pool_config': actor_config,
'process_index': process_index
})
process_status = SubpoolStatus(status=0)
await pool.start()
except: # noqa: E722 # nosec # pylint: disable=bare-except
_, error, tb = sys.exc_info()
process_status = SubpoolStatus(status=1, error=error, traceback=tb)
raise
finally:
started.set()
status_queue.put(process_status)
await pool.join()

async def kill_sub_pool(self, process: multiprocessing.Process,
Expand All @@ -203,8 +232,9 @@ async def recover_sub_pool(self, address: str):
process_index = self._config.get_process_index(address)
# process dead, restart it
# remember always use spawn to recover sub pool
self.sub_processes[address] = await self.__class__.start_sub_pool(
self._config, process_index, 'spawn')
task = asyncio.create_task(self.start_sub_pool(
self._config, process_index, 'spawn'))
self.sub_processes[address] = (await self.wait_sub_pools_ready([task]))[0]

if self._auto_recover == 'actor':
# need to recover all created actors
Expand Down
17 changes: 17 additions & 0 deletions mars/oscar/backends/mars/tests/test_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,23 @@ async def test_sub_actor_pool(notify_main_pool):
await pool.stop()


@pytest.mark.asyncio
async def test_fail_when_create_subpool():
config = ActorPoolConfig()
my_label = 'computation'
main_address = f'127.0.0.1:{get_next_port()}'
port = get_next_port()
_add_pool_conf(config, 0, 'main', 'unixsocket:///0', main_address)

# use the same port for sub pools, will raise `OSError` with "address already in use"
_add_pool_conf(config, 1, my_label, 'unixsocket:///1', f'127.0.0.1:{port}',
env={'my_env': '1'})
hekaisheng marked this conversation as resolved.
Show resolved Hide resolved
_add_pool_conf(config, 2, my_label, 'unixsocket:///2', f'127.0.0.1:{port}')

with pytest.raises(OSError):
await MainActorPool.create({'actor_pool_config': config})


@pytest.mark.asyncio
async def test_main_actor_pool():
config = ActorPoolConfig()
Expand Down
8 changes: 7 additions & 1 deletion mars/oscar/backends/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -912,7 +912,7 @@ async def create(cls, config: Dict) -> MainActorPoolType:
# await create_pool_task
tasks.append(create_pool_task)

processes = [await t for t in tasks]
processes = await cls.wait_sub_pools_ready(tasks)
# create main actor pool
pool: MainActorPoolType = await super().create(config)
addresses = actor_pool_config.get_external_addresses()[1:]
Expand Down Expand Up @@ -948,6 +948,12 @@ async def start_sub_pool(
start_method: str = None):
"""Start a sub actor pool"""

@classmethod
@abstractmethod
async def wait_sub_pools_ready(cls,
create_pool_tasks: List[asyncio.Task]):
"""Wait all sub pools ready """

def attach_sub_process(self,
external_address: str,
process: SubProcessHandle):
Expand Down
5 changes: 5 additions & 0 deletions mars/oscar/backends/ray/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ async def start_sub_pool(
await actor_handle.start.remote()
return actor_handle

@classmethod
async def wait_sub_pools_ready(cls,
create_pool_tasks: List[asyncio.Task]):
return [await t for t in create_pool_tasks]

async def recover_sub_pool(self, address: str):
process = self.sub_processes[address]
await process.start.remote()
Expand Down
15 changes: 10 additions & 5 deletions mars/oscar/backends/test/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

from ..config import ActorPoolConfig
from ..communication import gen_local_address, get_server_type, DummyServer
from ..mars.pool import MainActorPool, SubActorPool
from ..mars.pool import MainActorPool, SubActorPool, SubpoolStatus
from ..pool import ActorPoolType


Expand All @@ -41,22 +41,27 @@ async def start_sub_pool(
actor_pool_config: ActorPoolConfig,
process_index: int,
start_method: str = None):
started = multiprocessing.Event()
status_queue = multiprocessing.Queue()
return asyncio.create_task(
cls._create_sub_pool(actor_pool_config, process_index, started))
cls._create_sub_pool(actor_pool_config, process_index, status_queue))

@classmethod
async def wait_sub_pools_ready(cls,
create_pool_tasks: List[asyncio.Task]):
return [await t for t in create_pool_tasks]

@classmethod
async def _create_sub_pool(
cls,
actor_config: ActorPoolConfig,
process_index: int,
started: multiprocessing.Event):
status_queue: multiprocessing.Queue):
pool = await TestSubActorPool.create({
'actor_pool_config': actor_config,
'process_index': process_index
})
await pool.start()
started.set()
status_queue.put(SubpoolStatus(0))
await pool.join()

async def kill_sub_pool(self, process: multiprocessing.Process,
Expand Down