Skip to content

Commit

Permalink
[core][ci] Run dag microbenchmark seperately as unstable (#42360)
Browse files Browse the repository at this point in the history
---------

Signed-off-by: rickyyx <rickyx@anyscale.com>
  • Loading branch information
rickyyx authored Jan 12, 2024
1 parent dadf905 commit f76081f
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 15 deletions.
5 changes: 4 additions & 1 deletion python/ray/_private/ray_microbenchmark_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,15 @@
# Only run tests matching this filter pattern.

filter_pattern = os.environ.get("TESTS_TO_RUN", "")
skip_pattern = os.environ.get("TESTS_TO_SKIP", "")


def timeit(
name, fn, multiplier=1, warmup_time_sec=10
) -> List[Optional[Tuple[str, float, float]]]:
if filter_pattern not in name:
if filter_pattern and filter_pattern not in name:
return [None]
if skip_pattern and skip_pattern in name:
return [None]
# sleep for a while to avoid noisy neigbhors.
# related issue: https://github.com/ray-project/ray/issues/22045
Expand Down
34 changes: 21 additions & 13 deletions python/ray/_private/ray_perf.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,6 @@ def async_actor_multi():
#################################################
# Perf tests for channels, used in compiled DAGs.
#################################################

ray.init()

def put_channel_small(chans, do_get=False, do_release=False):
Expand All @@ -324,11 +323,11 @@ def read(self, chans):

chans = [ray_channel.Channel(1000)]
results += timeit(
"local put, single channel calls",
"[unstable] local put, single channel calls",
lambda: put_channel_small(chans, do_release=True),
)
results += timeit(
"local put:local get, single channel calls",
"[unstable] local put:local get, single channel calls",
lambda: put_channel_small(chans, do_get=True, do_release=True),
)

Expand All @@ -337,7 +336,8 @@ def read(self, chans):
ray.get(reader.ready.remote())
reader.read.remote(chans)
results += timeit(
"local put:1 remote get, single channel calls", lambda: put_channel_small(chans)
"[unstable] local put:1 remote get, single channel calls",
lambda: put_channel_small(chans),
)
ray.kill(reader)

Expand All @@ -350,7 +350,7 @@ def read(self, chans):
for reader in readers:
reader.read.remote(chans)
results += timeit(
"local put:n remote get, single channel calls",
"[unstable] local put:n remote get, single channel calls",
lambda: put_channel_small(chans),
)
for reader in readers:
Expand All @@ -361,7 +361,8 @@ def read(self, chans):
ray.get(reader.ready.remote())
reader.read.remote(chans)
results += timeit(
"local put:1 remote get, n channels calls", lambda: put_channel_small(chans)
"[unstable] local put:1 remote get, n channels calls",
lambda: put_channel_small(chans),
)
ray.kill(reader)

Expand All @@ -371,7 +372,8 @@ def read(self, chans):
for chan, reader in zip(chans, readers):
reader.read.remote([chan])
results += timeit(
"local put:n remote get, n channels calls", lambda: put_channel_small(chans)
"[unstable] local put:n remote get, n channels calls",
lambda: put_channel_small(chans),
)
for reader in readers:
ray.kill(reader)
Expand All @@ -394,21 +396,24 @@ def _exec_multi_output(dag):
with InputNode() as inp:
dag = a.echo.bind(inp)

results += timeit("single-actor DAG calls", lambda: ray.get(dag.execute(b"x")))
results += timeit(
"[unstable] single-actor DAG calls", lambda: ray.get(dag.execute(b"x"))
)
dag = dag.experimental_compile()
results += timeit("compiled single-actor DAG calls", lambda: _exec(dag))
results += timeit("[unstable] compiled single-actor DAG calls", lambda: _exec(dag))

del a
n_cpu = multiprocessing.cpu_count() // 2
actors = [DAGActor.remote() for _ in range(n_cpu)]
with InputNode() as inp:
dag = MultiOutputNode([a.echo.bind(inp) for a in actors])
results += timeit(
"scatter-gather DAG calls, n={n_cpu} actors", lambda: ray.get(dag.execute(b"x"))
"[unstable] scatter-gather DAG calls, n={n_cpu} actors",
lambda: ray.get(dag.execute(b"x")),
)
dag = dag.experimental_compile()
results += timeit(
f"compiled scatter-gather DAG calls, n={n_cpu} actors",
f"[unstable] compiled scatter-gather DAG calls, n={n_cpu} actors",
lambda: _exec_multi_output(dag),
)

Expand All @@ -418,10 +423,13 @@ def _exec_multi_output(dag):
for a in actors:
dag = a.echo.bind(dag)
results += timeit(
f"chain DAG calls, n={n_cpu} actors", lambda: ray.get(dag.execute(b"x"))
f"[unstable] chain DAG calls, n={n_cpu} actors",
lambda: ray.get(dag.execute(b"x")),
)
dag = dag.experimental_compile()
results += timeit(f"compiled chain DAG calls, n={n_cpu} actors", lambda: _exec(dag))
results += timeit(
f"[unstable] compiled chain DAG calls, n={n_cpu} actors", lambda: _exec(dag)
)

ray.shutdown()

Expand Down
17 changes: 16 additions & 1 deletion release/release_tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4630,7 +4630,7 @@

run:
timeout: 1800
script: OMP_NUM_THREADS=64 RAY_ADDRESS=local python run_microbenchmark.py
script: OMP_NUM_THREADS=64 RAY_ADDRESS=local TESTS_TO_SKIP=unstable python run_microbenchmark.py

variations:
- __suffix__: aws
Expand All @@ -4643,6 +4643,21 @@
frequency: weekly
python: "3.11"

- name: microbenchmark_unstable
group: core-daily-test
team: core
frequency: nightly
working_dir: microbenchmark

stable: false

cluster:
byod: {}
cluster_compute: tpl_64.yaml

run:
timeout: 1800
script: OMP_NUM_THREADS=64 RAY_ADDRESS=local TESTS_TO_RUN=unstable python run_microbenchmark.py --run-dag

- name: benchmark_worker_startup
group: core-daily-test
Expand Down

0 comments on commit f76081f

Please sign in to comment.