Skip to content

Commit

Permalink
[Ray] Fix ray executor progress test (#3033)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhongchun authored May 16, 2022
1 parent 6c9c4b0 commit de933b9
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 3 deletions.
2 changes: 2 additions & 0 deletions mars/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ def ray_start_regular_shared(request): # pragma: no cover

@pytest.fixture(scope="module")
def ray_start_regular_shared2(request): # pragma: no cover
os.environ["RAY_kill_idle_workers_interval_ms"] = "0"
param = getattr(request, "param", {})
num_cpus = param.get("num_cpus", 64)
total_memory_mb = num_cpus * 2 * 1024**2
Expand All @@ -50,6 +51,7 @@ def ray_start_regular_shared2(request): # pragma: no cover
yield ray.init(num_cpus=num_cpus, job_config=job_config)
finally:
ray.shutdown()
os.environ.pop("RAY_kill_idle_workers_interval_ms", None)


@pytest.fixture
Expand Down
32 changes: 31 additions & 1 deletion mars/deploy/oscar/tests/test_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ def test_no_default_session():


@pytest.mark.asyncio
async def test_session_progress(create_cluster):
async def test_session_set_progress(create_cluster):
session = get_default_async_session()
assert session.address is not None
assert session.session_id is not None
Expand All @@ -597,6 +597,36 @@ def f1(interval: float, count: int):
assert info.progress() == 1


@pytest.mark.asyncio
async def test_session_get_progress(create_cluster):
session = get_default_async_session()
assert session.address is not None
assert session.session_id is not None

raw = np.random.rand(100, 4)
t = mt.tensor(raw, chunk_size=5)

def f1(c):
time.sleep(0.5)
return c

t1 = t.sum()
r = t1.map_chunk(f1)
info = await session.execute(r)

for _ in range(100):
if 0 < info.progress() < 1:
break
await asyncio.sleep(0.1)
else:
raise Exception(f"progress test failed, actual value {info.progress()}.")

await info
assert info.result() is None
assert info.exception() is None
assert info.progress() == 1


@pytest.fixture
def setup_session():
session = new_session(n_cpu=2, use_uvloop=False)
Expand Down
4 changes: 2 additions & 2 deletions mars/deploy/oscar/tests/test_ray_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,5 +120,5 @@ def test_sync_execute(config):

@require_ray
@pytest.mark.asyncio
async def test_session_progress(ray_start_regular_shared2, create_cluster):
test_local.test_session_progress(create_cluster)
async def test_session_get_progress(ray_start_regular_shared2, create_cluster):
await test_local.test_session_get_progress(create_cluster)

0 comments on commit de933b9

Please sign in to comment.