Skip to content

Commit

Permalink
refine
Browse files Browse the repository at this point in the history
  • Loading branch information
hekaisheng committed Sep 18, 2021
1 parent 2348e31 commit 7c14adc
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 7 deletions.
14 changes: 7 additions & 7 deletions mars/oscar/backends/mars/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,16 +118,16 @@ async def start_sub_pool(

def start_pool_in_process():
ctx = multiprocessing.get_context(method=start_method)
status = ctx.Queue()
status_queue = ctx.Queue()
process = ctx.Process(
target=cls._start_sub_pool,
args=(actor_pool_config, process_index, status),
args=(actor_pool_config, process_index, status_queue),
name=f'MarsActorPool{process_index}',
)
process.daemon = True
process.start()
# wait for sub actor pool to finish starting
process_status = status.get()
process_status = status_queue.get()
return process, process_status

loop = asyncio.get_running_loop()
Expand All @@ -152,7 +152,7 @@ def _start_sub_pool(
cls,
actor_config: ActorPoolConfig,
process_index: int,
status: multiprocessing.Queue):
status_queue: multiprocessing.Queue):
if not _is_windows:
try:
# register coverage hooks on SIGTERM
Expand Down Expand Up @@ -182,15 +182,15 @@ def _start_sub_pool(
else:
asyncio.set_event_loop(asyncio.new_event_loop())

coro = cls._create_sub_pool(actor_config, process_index, status)
coro = cls._create_sub_pool(actor_config, process_index, status_queue)
asyncio.run(coro)

@classmethod
async def _create_sub_pool(
cls,
actor_config: ActorPoolConfig,
process_index: int,
status: multiprocessing.Queue):
status_queue: multiprocessing.Queue):
process_status = None
try:
env = actor_config.get_pool_config(process_index)['env']
Expand All @@ -207,7 +207,7 @@ async def _create_sub_pool(
process_status = SubpoolStatus(status=1, error=error, traceback=tb)
raise
finally:
status.put(process_status)
status_queue.put(process_status)
await pool.join()

async def kill_sub_pool(self, process: multiprocessing.Process,
Expand Down
2 changes: 2 additions & 0 deletions mars/oscar/backends/mars/tests/test_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,8 @@ async def test_fail_when_create_subpool():
main_address = f'127.0.0.1:{get_next_port()}'
port = get_next_port()
_add_pool_conf(config, 0, 'main', 'unixsocket:///0', main_address)

# use the same port for sub pools, will raise `OSError` with "address already in use"
_add_pool_conf(config, 1, my_label, 'unixsocket:///1', f'127.0.0.1:{port}',
env={'my_env': '1'})
_add_pool_conf(config, 2, my_label, 'unixsocket:///2', f'127.0.0.1:{port}')
Expand Down

0 comments on commit 7c14adc

Please sign in to comment.