diff --git a/python/ray/_private/ray_microbenchmark_helpers.py b/python/ray/_private/ray_microbenchmark_helpers.py index 7dc0b907090a..8cc2331fbcaf 100644 --- a/python/ray/_private/ray_microbenchmark_helpers.py +++ b/python/ray/_private/ray_microbenchmark_helpers.py @@ -9,12 +9,15 @@ # Only run tests matching this filter pattern. filter_pattern = os.environ.get("TESTS_TO_RUN", "") +skip_pattern = os.environ.get("TESTS_TO_SKIP", "") def timeit( name, fn, multiplier=1, warmup_time_sec=10 ) -> List[Optional[Tuple[str, float, float]]]: - if filter_pattern not in name: + if filter_pattern and filter_pattern not in name: + return [None] + if skip_pattern and skip_pattern in name: return [None] # sleep for a while to avoid noisy neigbhors. # related issue: https://github.com/ray-project/ray/issues/22045 diff --git a/python/ray/_private/ray_perf.py b/python/ray/_private/ray_perf.py index 24d62fa45d64..0bb865c5d27a 100644 --- a/python/ray/_private/ray_perf.py +++ b/python/ray/_private/ray_perf.py @@ -300,7 +300,6 @@ def async_actor_multi(): ################################################# # Perf tests for channels, used in compiled DAGs. ################################################# - ray.init() def put_channel_small(chans, do_get=False, do_release=False): @@ -324,11 +323,11 @@ def read(self, chans): chans = [ray_channel.Channel(1000)] results += timeit( - "local put, single channel calls", + "[unstable] local put, single channel calls", lambda: put_channel_small(chans, do_release=True), ) results += timeit( - "local put:local get, single channel calls", + "[unstable] local put:local get, single channel calls", lambda: put_channel_small(chans, do_get=True, do_release=True), ) @@ -337,7 +336,8 @@ def read(self, chans): ray.get(reader.ready.remote()) reader.read.remote(chans) results += timeit( - "local put:1 remote get, single channel calls", lambda: put_channel_small(chans) + "[unstable] local put:1 remote get, single channel calls", + lambda: put_channel_small(chans), ) ray.kill(reader) @@ -350,7 +350,7 @@ def read(self, chans): for reader in readers: reader.read.remote(chans) results += timeit( - "local put:n remote get, single channel calls", + "[unstable] local put:n remote get, single channel calls", lambda: put_channel_small(chans), ) for reader in readers: @@ -361,7 +361,8 @@ def read(self, chans): ray.get(reader.ready.remote()) reader.read.remote(chans) results += timeit( - "local put:1 remote get, n channels calls", lambda: put_channel_small(chans) + "[unstable] local put:1 remote get, n channels calls", + lambda: put_channel_small(chans), ) ray.kill(reader) @@ -371,7 +372,8 @@ def read(self, chans): for chan, reader in zip(chans, readers): reader.read.remote([chan]) results += timeit( - "local put:n remote get, n channels calls", lambda: put_channel_small(chans) + "[unstable] local put:n remote get, n channels calls", + lambda: put_channel_small(chans), ) for reader in readers: ray.kill(reader) @@ -394,9 +396,11 @@ def _exec_multi_output(dag): with InputNode() as inp: dag = a.echo.bind(inp) - results += timeit("single-actor DAG calls", lambda: ray.get(dag.execute(b"x"))) + results += timeit( + "[unstable] single-actor DAG calls", lambda: ray.get(dag.execute(b"x")) + ) dag = dag.experimental_compile() - results += timeit("compiled single-actor DAG calls", lambda: _exec(dag)) + results += timeit("[unstable] compiled single-actor DAG calls", lambda: _exec(dag)) del a n_cpu = multiprocessing.cpu_count() // 2 @@ -404,11 +408,12 @@ def _exec_multi_output(dag): with InputNode() as inp: dag = MultiOutputNode([a.echo.bind(inp) for a in actors]) results += timeit( - "scatter-gather DAG calls, n={n_cpu} actors", lambda: ray.get(dag.execute(b"x")) + "[unstable] scatter-gather DAG calls, n={n_cpu} actors", + lambda: ray.get(dag.execute(b"x")), ) dag = dag.experimental_compile() results += timeit( - f"compiled scatter-gather DAG calls, n={n_cpu} actors", + f"[unstable] compiled scatter-gather DAG calls, n={n_cpu} actors", lambda: _exec_multi_output(dag), ) @@ -418,10 +423,13 @@ def _exec_multi_output(dag): for a in actors: dag = a.echo.bind(dag) results += timeit( - f"chain DAG calls, n={n_cpu} actors", lambda: ray.get(dag.execute(b"x")) + f"[unstable] chain DAG calls, n={n_cpu} actors", + lambda: ray.get(dag.execute(b"x")), ) dag = dag.experimental_compile() - results += timeit(f"compiled chain DAG calls, n={n_cpu} actors", lambda: _exec(dag)) + results += timeit( + f"[unstable] compiled chain DAG calls, n={n_cpu} actors", lambda: _exec(dag) + ) ray.shutdown() diff --git a/release/release_tests.yaml b/release/release_tests.yaml index 60e49cfa8345..756377134b6a 100644 --- a/release/release_tests.yaml +++ b/release/release_tests.yaml @@ -4630,7 +4630,7 @@ run: timeout: 1800 - script: OMP_NUM_THREADS=64 RAY_ADDRESS=local python run_microbenchmark.py + script: OMP_NUM_THREADS=64 RAY_ADDRESS=local TESTS_TO_SKIP=unstable python run_microbenchmark.py variations: - __suffix__: aws @@ -4643,6 +4643,21 @@ frequency: weekly python: "3.11" +- name: microbenchmark_unstable + group: core-daily-test + team: core + frequency: nightly + working_dir: microbenchmark + + stable: false + + cluster: + byod: {} + cluster_compute: tpl_64.yaml + + run: + timeout: 1800 + script: OMP_NUM_THREADS=64 RAY_ADDRESS=local TESTS_TO_RUN=unstable python run_microbenchmark.py --run-dag - name: benchmark_worker_startup group: core-daily-test