Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Ray] Fix ray executor progress test #3033

Merged
merged 2 commits into from
May 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions mars/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ def ray_start_regular_shared(request): # pragma: no cover

@pytest.fixture(scope="module")
def ray_start_regular_shared2(request): # pragma: no cover
os.environ["RAY_kill_idle_workers_interval_ms"] = "0"
param = getattr(request, "param", {})
num_cpus = param.get("num_cpus", 64)
total_memory_mb = num_cpus * 2 * 1024**2
Expand All @@ -49,6 +50,7 @@ def ray_start_regular_shared2(request): # pragma: no cover
yield ray.init(num_cpus=num_cpus, job_config=job_config)
finally:
ray.shutdown()
os.environ.pop("RAY_kill_idle_workers_interval_ms", None)


@pytest.fixture
Expand Down
32 changes: 31 additions & 1 deletion mars/deploy/oscar/tests/test_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ def test_no_default_session():


@pytest.mark.asyncio
async def test_session_progress(create_cluster):
async def test_session_set_progress(create_cluster):
session = get_default_async_session()
assert session.address is not None
assert session.session_id is not None
Expand All @@ -597,6 +597,36 @@ def f1(interval: float, count: int):
assert info.progress() == 1


@pytest.mark.asyncio
async def test_session_get_progress(create_cluster):
session = get_default_async_session()
assert session.address is not None
assert session.session_id is not None

raw = np.random.rand(100, 4)
t = mt.tensor(raw, chunk_size=5)

def f1(c):
time.sleep(0.5)
return c

t1 = t.sum()
r = t1.map_chunk(f1)
info = await session.execute(r)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How long does this test case take? I noticed that there is a time.sleep(2) in the f1.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It takes about 5s on mars executor, 10s on ray executor.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you make it faster? Ray backend is slower because:

Copy link
Contributor Author

@zhongchun zhongchun May 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for you reminder. I've fixed it. Now its cost is about 6s on ray.


for _ in range(100):
if 0 < info.progress() < 1:
break
await asyncio.sleep(0.1)
else:
raise Exception(f"progress test failed, actual value {info.progress()}.")

await info
assert info.result() is None
assert info.exception() is None
assert info.progress() == 1


@pytest.fixture
def setup_session():
session = new_session(n_cpu=2, use_uvloop=False)
Expand Down
4 changes: 2 additions & 2 deletions mars/deploy/oscar/tests/test_ray_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,5 +120,5 @@ def test_sync_execute(config):

@require_ray
@pytest.mark.asyncio
async def test_session_progress(ray_start_regular_shared2, create_cluster):
test_local.test_session_progress(create_cluster)
async def test_session_get_progress(ray_start_regular_shared2, create_cluster):
await test_local.test_session_get_progress(create_cluster)