Skip to content

Commit

Permalink
fix subppol restart
Browse files Browse the repository at this point in the history
  • Loading branch information
chaokunyang committed May 27, 2022
1 parent 0651d4a commit 25acb25
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 4 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/platform-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ jobs:
coverage combine build/ && coverage report
fi
if [ -n "$WITH_RAY" ]; then
pytest $PYTEST_CONFIG --durations=0 --timeout=600 -v -s -m ray
pytest $PYTEST_CONFIG --durations=0 --timeout=200 -v -s -m ray
coverage report
fi
if [ -n "$WITH_RAY_DAG" ]; then
Expand Down
2 changes: 1 addition & 1 deletion mars/deploy/oscar/tests/test_ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -678,7 +678,7 @@ async def test_auto_scale_in(ray_large_cluster):
assert await autoscaler_ref.get_dynamic_worker_nums() == 2


@pytest.mark.timeout(timeout=1000)
@pytest.mark.timeout(timeout=200)
@pytest.mark.parametrize("ray_large_cluster", [{"num_nodes": 4}], indirect=True)
@require_ray
@pytest.mark.asyncio
Expand Down
11 changes: 9 additions & 2 deletions mars/oscar/backends/ray/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,15 @@ async def start_sub_pool(
f"process_index {process_index} is not consistent with index {_process_index} "
f"in external_address {external_address}"
)
actor_handle = config["kwargs"]["sub_pool_handles"][external_address]
state = await retry_callable(
actor_handle.state.remote, ex_type=ray.exceptions.RayActorError, sync=False
)()
if state is RayPoolState.SERVICE_READY: # pragma: no cover
logger.info("Ray sub pool %s is alive, skip it.", external_address)
return actor_handle
logger.info("Start to start ray sub pool %s.", external_address)
create_sub_pool_timeout = 120
actor_handle = config["kwargs"]["sub_pool_handles"][external_address]
done, _ = await asyncio.wait(
[actor_handle.set_actor_pool_config.remote(actor_pool_config)],
timeout=create_sub_pool_timeout,
Expand All @@ -143,6 +149,7 @@ async def start_sub_pool(
)
logger.error(msg)
raise Exception(msg)
assert done is None, done
await actor_handle.start.remote()
logger.info("Start ray sub pool %s successfully.", external_address)
return actor_handle
Expand Down Expand Up @@ -305,7 +312,7 @@ async def mark_service_ready(self):
await self._actor_pool.start_monitor()

async def alive(self):
await asyncio.sleep(30)
await asyncio.sleep(300)
return self._start_timestamp


Expand Down

0 comments on commit 25acb25

Please sign in to comment.