From 837138db1deefca53541fd3b2c1efac28cb939d6 Mon Sep 17 00:00:00 2001 From: rickyyx Date: Fri, 12 Jan 2024 01:21:37 +0000 Subject: [PATCH 1/2] init Signed-off-by: rickyyx --- python/ray/_private/ray_perf.py | 224 ++++++++++--------- release/microbenchmark/run_microbenchmark.py | 9 +- release/release_tests.yaml | 15 ++ 3 files changed, 137 insertions(+), 111 deletions(-) diff --git a/python/ray/_private/ray_perf.py b/python/ray/_private/ray_perf.py index 24d62fa45d64..e6b7fc88651d 100644 --- a/python/ray/_private/ray_perf.py +++ b/python/ray/_private/ray_perf.py @@ -99,7 +99,7 @@ def check_optimized_build(): logger.warning(msg) -def main(results=None): +def main(run_dag: bool = False, results=None): results = results or [] check_optimized_build() @@ -300,134 +300,138 @@ def async_actor_multi(): ################################################# # Perf tests for channels, used in compiled DAGs. ################################################# + if run_dag: + ray.init() - ray.init() - - def put_channel_small(chans, do_get=False, do_release=False): - for chan in chans: - chan.write(b"0") - if do_get: - chan.begin_read() - if do_release: - chan.end_read() - - @ray.remote - class ChannelReader: - def ready(self): - return - - def read(self, chans): - while True: - for chan in chans: + def put_channel_small(chans, do_get=False, do_release=False): + for chan in chans: + chan.write(b"0") + if do_get: chan.begin_read() + if do_release: chan.end_read() - chans = [ray_channel.Channel(1000)] - results += timeit( - "local put, single channel calls", - lambda: put_channel_small(chans, do_release=True), - ) - results += timeit( - "local put:local get, single channel calls", - lambda: put_channel_small(chans, do_get=True, do_release=True), - ) - - chans = [ray_channel.Channel(1000)] - reader = ChannelReader.remote() - ray.get(reader.ready.remote()) - reader.read.remote(chans) - results += timeit( - "local put:1 remote get, single channel calls", lambda: put_channel_small(chans) - ) - ray.kill(reader) - - n_cpu = multiprocessing.cpu_count() // 2 - print(f"Testing multiple readers/channels, n={n_cpu}") + @ray.remote + class ChannelReader: + def ready(self): + return + + def read(self, chans): + while True: + for chan in chans: + chan.begin_read() + chan.end_read() + + chans = [ray_channel.Channel(1000)] + results += timeit( + "local put, single channel calls", + lambda: put_channel_small(chans, do_release=True), + ) + results += timeit( + "local put:local get, single channel calls", + lambda: put_channel_small(chans, do_get=True, do_release=True), + ) - chans = [ray_channel.Channel(1000, num_readers=n_cpu)] - readers = [ChannelReader.remote() for _ in range(n_cpu)] - ray.get([reader.ready.remote() for reader in readers]) - for reader in readers: + chans = [ray_channel.Channel(1000)] + reader = ChannelReader.remote() + ray.get(reader.ready.remote()) reader.read.remote(chans) - results += timeit( - "local put:n remote get, single channel calls", - lambda: put_channel_small(chans), - ) - for reader in readers: + results += timeit( + "local put:1 remote get, single channel calls", + lambda: put_channel_small(chans), + ) ray.kill(reader) - chans = [ray_channel.Channel(1000) for _ in range(n_cpu)] - reader = ChannelReader.remote() - ray.get(reader.ready.remote()) - reader.read.remote(chans) - results += timeit( - "local put:1 remote get, n channels calls", lambda: put_channel_small(chans) - ) - ray.kill(reader) + n_cpu = multiprocessing.cpu_count() // 2 + print(f"Testing multiple readers/channels, n={n_cpu}") + + chans = [ray_channel.Channel(1000, num_readers=n_cpu)] + readers = [ChannelReader.remote() for _ in range(n_cpu)] + ray.get([reader.ready.remote() for reader in readers]) + for reader in readers: + reader.read.remote(chans) + results += timeit( + "local put:n remote get, single channel calls", + lambda: put_channel_small(chans), + ) + for reader in readers: + ray.kill(reader) - chans = [ray_channel.Channel(1000) for _ in range(n_cpu)] - readers = [ChannelReader.remote() for _ in range(n_cpu)] - ray.get([reader.ready.remote() for reader in readers]) - for chan, reader in zip(chans, readers): - reader.read.remote([chan]) - results += timeit( - "local put:n remote get, n channels calls", lambda: put_channel_small(chans) - ) - for reader in readers: + chans = [ray_channel.Channel(1000) for _ in range(n_cpu)] + reader = ChannelReader.remote() + ray.get(reader.ready.remote()) + reader.read.remote(chans) + results += timeit( + "local put:1 remote get, n channels calls", lambda: put_channel_small(chans) + ) ray.kill(reader) - # Tests for compiled DAGs. + chans = [ray_channel.Channel(1000) for _ in range(n_cpu)] + readers = [ChannelReader.remote() for _ in range(n_cpu)] + ray.get([reader.ready.remote() for reader in readers]) + for chan, reader in zip(chans, readers): + reader.read.remote([chan]) + results += timeit( + "local put:n remote get, n channels calls", lambda: put_channel_small(chans) + ) + for reader in readers: + ray.kill(reader) - def _exec(dag): - output_channel = dag.execute(b"x") - output_channel.begin_read() - output_channel.end_read() + # Tests for compiled DAGs. - def _exec_multi_output(dag): - output_channels = dag.execute(b"x") - for output_channel in output_channels: + def _exec(dag): + output_channel = dag.execute(b"x") output_channel.begin_read() - for output_channel in output_channels: output_channel.end_read() - a = DAGActor.remote() - with InputNode() as inp: - dag = a.echo.bind(inp) - - results += timeit("single-actor DAG calls", lambda: ray.get(dag.execute(b"x"))) - dag = dag.experimental_compile() - results += timeit("compiled single-actor DAG calls", lambda: _exec(dag)) - - del a - n_cpu = multiprocessing.cpu_count() // 2 - actors = [DAGActor.remote() for _ in range(n_cpu)] - with InputNode() as inp: - dag = MultiOutputNode([a.echo.bind(inp) for a in actors]) - results += timeit( - "scatter-gather DAG calls, n={n_cpu} actors", lambda: ray.get(dag.execute(b"x")) - ) - dag = dag.experimental_compile() - results += timeit( - f"compiled scatter-gather DAG calls, n={n_cpu} actors", - lambda: _exec_multi_output(dag), - ) + def _exec_multi_output(dag): + output_channels = dag.execute(b"x") + for output_channel in output_channels: + output_channel.begin_read() + for output_channel in output_channels: + output_channel.end_read() + + a = DAGActor.remote() + with InputNode() as inp: + dag = a.echo.bind(inp) + + results += timeit("single-actor DAG calls", lambda: ray.get(dag.execute(b"x"))) + dag = dag.experimental_compile() + results += timeit("compiled single-actor DAG calls", lambda: _exec(dag)) + + del a + n_cpu = multiprocessing.cpu_count() // 2 + actors = [DAGActor.remote() for _ in range(n_cpu)] + with InputNode() as inp: + dag = MultiOutputNode([a.echo.bind(inp) for a in actors]) + results += timeit( + "scatter-gather DAG calls, n={n_cpu} actors", + lambda: ray.get(dag.execute(b"x")), + ) + dag = dag.experimental_compile() + results += timeit( + f"compiled scatter-gather DAG calls, n={n_cpu} actors", + lambda: _exec_multi_output(dag), + ) - actors = [DAGActor.remote() for _ in range(n_cpu)] - with InputNode() as inp: - dag = inp - for a in actors: - dag = a.echo.bind(dag) - results += timeit( - f"chain DAG calls, n={n_cpu} actors", lambda: ray.get(dag.execute(b"x")) - ) - dag = dag.experimental_compile() - results += timeit(f"compiled chain DAG calls, n={n_cpu} actors", lambda: _exec(dag)) + actors = [DAGActor.remote() for _ in range(n_cpu)] + with InputNode() as inp: + dag = inp + for a in actors: + dag = a.echo.bind(dag) + results += timeit( + f"chain DAG calls, n={n_cpu} actors", lambda: ray.get(dag.execute(b"x")) + ) + dag = dag.experimental_compile() + results += timeit( + f"compiled chain DAG calls, n={n_cpu} actors", lambda: _exec(dag) + ) - ray.shutdown() + ray.shutdown() - ############################ - # End of channel perf tests. - ############################ + ############################ + # End of channel perf tests. + ############################ NUM_PGS = 100 NUM_BUNDLES = 1 diff --git a/release/microbenchmark/run_microbenchmark.py b/release/microbenchmark/run_microbenchmark.py index afa4e1b724f8..aa017154eb6e 100644 --- a/release/microbenchmark/run_microbenchmark.py +++ b/release/microbenchmark/run_microbenchmark.py @@ -2,6 +2,11 @@ import os +import argparse + +parser = argparse.ArgumentParser() +parser.add_argument("--run-dag", action="store_true", default=False, help="Run DAG related tests") + def to_dict_key(key: str): for r in [" ", ":", "-"]: key = key.replace(r, "_") @@ -13,7 +18,9 @@ def to_dict_key(key: str): if __name__ == "__main__": from ray._private.ray_perf import main - results = main() or [] + args = parser.parse_args() + + results = main(run_dag=args.run_dag) or [] result_dict = { f"{to_dict_key(v[0])}": (v[1], v[2]) for v in results if v is not None diff --git a/release/release_tests.yaml b/release/release_tests.yaml index 60e49cfa8345..02158e21628e 100644 --- a/release/release_tests.yaml +++ b/release/release_tests.yaml @@ -4643,6 +4643,21 @@ frequency: weekly python: "3.11" +- name: microbenchmark_dag + group: core-daily-test + team: core + frequency: nightly + working_dir: microbenchmark + + stable: false + + cluster: + byod: {} + cluster_compute: tpl_64.yaml + + run: + timeout: 1800 + script: OMP_NUM_THREADS=64 RAY_ADDRESS=local python run_microbenchmark.py --run-dag - name: benchmark_worker_startup group: core-daily-test From f6244e79b31c9a71868b53c7533cd9f9e8f3b5a7 Mon Sep 17 00:00:00 2001 From: rickyyx Date: Fri, 12 Jan 2024 18:12:07 +0000 Subject: [PATCH 2/2] update Signed-off-by: rickyyx --- .../_private/ray_microbenchmark_helpers.py | 5 +- python/ray/_private/ray_perf.py | 232 +++++++++--------- release/microbenchmark/run_microbenchmark.py | 9 +- release/release_tests.yaml | 6 +- 4 files changed, 126 insertions(+), 126 deletions(-) diff --git a/python/ray/_private/ray_microbenchmark_helpers.py b/python/ray/_private/ray_microbenchmark_helpers.py index 7dc0b907090a..8cc2331fbcaf 100644 --- a/python/ray/_private/ray_microbenchmark_helpers.py +++ b/python/ray/_private/ray_microbenchmark_helpers.py @@ -9,12 +9,15 @@ # Only run tests matching this filter pattern. filter_pattern = os.environ.get("TESTS_TO_RUN", "") +skip_pattern = os.environ.get("TESTS_TO_SKIP", "") def timeit( name, fn, multiplier=1, warmup_time_sec=10 ) -> List[Optional[Tuple[str, float, float]]]: - if filter_pattern not in name: + if filter_pattern and filter_pattern not in name: + return [None] + if skip_pattern and skip_pattern in name: return [None] # sleep for a while to avoid noisy neigbhors. # related issue: https://github.com/ray-project/ray/issues/22045 diff --git a/python/ray/_private/ray_perf.py b/python/ray/_private/ray_perf.py index e6b7fc88651d..0bb865c5d27a 100644 --- a/python/ray/_private/ray_perf.py +++ b/python/ray/_private/ray_perf.py @@ -99,7 +99,7 @@ def check_optimized_build(): logger.warning(msg) -def main(run_dag: bool = False, results=None): +def main(results=None): results = results or [] check_optimized_build() @@ -300,138 +300,142 @@ def async_actor_multi(): ################################################# # Perf tests for channels, used in compiled DAGs. ################################################# - if run_dag: - ray.init() + ray.init() + + def put_channel_small(chans, do_get=False, do_release=False): + for chan in chans: + chan.write(b"0") + if do_get: + chan.begin_read() + if do_release: + chan.end_read() + + @ray.remote + class ChannelReader: + def ready(self): + return - def put_channel_small(chans, do_get=False, do_release=False): - for chan in chans: - chan.write(b"0") - if do_get: + def read(self, chans): + while True: + for chan in chans: chan.begin_read() - if do_release: chan.end_read() - @ray.remote - class ChannelReader: - def ready(self): - return - - def read(self, chans): - while True: - for chan in chans: - chan.begin_read() - chan.end_read() - - chans = [ray_channel.Channel(1000)] - results += timeit( - "local put, single channel calls", - lambda: put_channel_small(chans, do_release=True), - ) - results += timeit( - "local put:local get, single channel calls", - lambda: put_channel_small(chans, do_get=True, do_release=True), - ) + chans = [ray_channel.Channel(1000)] + results += timeit( + "[unstable] local put, single channel calls", + lambda: put_channel_small(chans, do_release=True), + ) + results += timeit( + "[unstable] local put:local get, single channel calls", + lambda: put_channel_small(chans, do_get=True, do_release=True), + ) + + chans = [ray_channel.Channel(1000)] + reader = ChannelReader.remote() + ray.get(reader.ready.remote()) + reader.read.remote(chans) + results += timeit( + "[unstable] local put:1 remote get, single channel calls", + lambda: put_channel_small(chans), + ) + ray.kill(reader) + + n_cpu = multiprocessing.cpu_count() // 2 + print(f"Testing multiple readers/channels, n={n_cpu}") - chans = [ray_channel.Channel(1000)] - reader = ChannelReader.remote() - ray.get(reader.ready.remote()) + chans = [ray_channel.Channel(1000, num_readers=n_cpu)] + readers = [ChannelReader.remote() for _ in range(n_cpu)] + ray.get([reader.ready.remote() for reader in readers]) + for reader in readers: reader.read.remote(chans) - results += timeit( - "local put:1 remote get, single channel calls", - lambda: put_channel_small(chans), - ) + results += timeit( + "[unstable] local put:n remote get, single channel calls", + lambda: put_channel_small(chans), + ) + for reader in readers: ray.kill(reader) - n_cpu = multiprocessing.cpu_count() // 2 - print(f"Testing multiple readers/channels, n={n_cpu}") - - chans = [ray_channel.Channel(1000, num_readers=n_cpu)] - readers = [ChannelReader.remote() for _ in range(n_cpu)] - ray.get([reader.ready.remote() for reader in readers]) - for reader in readers: - reader.read.remote(chans) - results += timeit( - "local put:n remote get, single channel calls", - lambda: put_channel_small(chans), - ) - for reader in readers: - ray.kill(reader) + chans = [ray_channel.Channel(1000) for _ in range(n_cpu)] + reader = ChannelReader.remote() + ray.get(reader.ready.remote()) + reader.read.remote(chans) + results += timeit( + "[unstable] local put:1 remote get, n channels calls", + lambda: put_channel_small(chans), + ) + ray.kill(reader) - chans = [ray_channel.Channel(1000) for _ in range(n_cpu)] - reader = ChannelReader.remote() - ray.get(reader.ready.remote()) - reader.read.remote(chans) - results += timeit( - "local put:1 remote get, n channels calls", lambda: put_channel_small(chans) - ) + chans = [ray_channel.Channel(1000) for _ in range(n_cpu)] + readers = [ChannelReader.remote() for _ in range(n_cpu)] + ray.get([reader.ready.remote() for reader in readers]) + for chan, reader in zip(chans, readers): + reader.read.remote([chan]) + results += timeit( + "[unstable] local put:n remote get, n channels calls", + lambda: put_channel_small(chans), + ) + for reader in readers: ray.kill(reader) - chans = [ray_channel.Channel(1000) for _ in range(n_cpu)] - readers = [ChannelReader.remote() for _ in range(n_cpu)] - ray.get([reader.ready.remote() for reader in readers]) - for chan, reader in zip(chans, readers): - reader.read.remote([chan]) - results += timeit( - "local put:n remote get, n channels calls", lambda: put_channel_small(chans) - ) - for reader in readers: - ray.kill(reader) + # Tests for compiled DAGs. - # Tests for compiled DAGs. + def _exec(dag): + output_channel = dag.execute(b"x") + output_channel.begin_read() + output_channel.end_read() - def _exec(dag): - output_channel = dag.execute(b"x") + def _exec_multi_output(dag): + output_channels = dag.execute(b"x") + for output_channel in output_channels: output_channel.begin_read() + for output_channel in output_channels: output_channel.end_read() - def _exec_multi_output(dag): - output_channels = dag.execute(b"x") - for output_channel in output_channels: - output_channel.begin_read() - for output_channel in output_channels: - output_channel.end_read() - - a = DAGActor.remote() - with InputNode() as inp: - dag = a.echo.bind(inp) - - results += timeit("single-actor DAG calls", lambda: ray.get(dag.execute(b"x"))) - dag = dag.experimental_compile() - results += timeit("compiled single-actor DAG calls", lambda: _exec(dag)) - - del a - n_cpu = multiprocessing.cpu_count() // 2 - actors = [DAGActor.remote() for _ in range(n_cpu)] - with InputNode() as inp: - dag = MultiOutputNode([a.echo.bind(inp) for a in actors]) - results += timeit( - "scatter-gather DAG calls, n={n_cpu} actors", - lambda: ray.get(dag.execute(b"x")), - ) - dag = dag.experimental_compile() - results += timeit( - f"compiled scatter-gather DAG calls, n={n_cpu} actors", - lambda: _exec_multi_output(dag), - ) + a = DAGActor.remote() + with InputNode() as inp: + dag = a.echo.bind(inp) - actors = [DAGActor.remote() for _ in range(n_cpu)] - with InputNode() as inp: - dag = inp - for a in actors: - dag = a.echo.bind(dag) - results += timeit( - f"chain DAG calls, n={n_cpu} actors", lambda: ray.get(dag.execute(b"x")) - ) - dag = dag.experimental_compile() - results += timeit( - f"compiled chain DAG calls, n={n_cpu} actors", lambda: _exec(dag) - ) + results += timeit( + "[unstable] single-actor DAG calls", lambda: ray.get(dag.execute(b"x")) + ) + dag = dag.experimental_compile() + results += timeit("[unstable] compiled single-actor DAG calls", lambda: _exec(dag)) + + del a + n_cpu = multiprocessing.cpu_count() // 2 + actors = [DAGActor.remote() for _ in range(n_cpu)] + with InputNode() as inp: + dag = MultiOutputNode([a.echo.bind(inp) for a in actors]) + results += timeit( + "[unstable] scatter-gather DAG calls, n={n_cpu} actors", + lambda: ray.get(dag.execute(b"x")), + ) + dag = dag.experimental_compile() + results += timeit( + f"[unstable] compiled scatter-gather DAG calls, n={n_cpu} actors", + lambda: _exec_multi_output(dag), + ) - ray.shutdown() + actors = [DAGActor.remote() for _ in range(n_cpu)] + with InputNode() as inp: + dag = inp + for a in actors: + dag = a.echo.bind(dag) + results += timeit( + f"[unstable] chain DAG calls, n={n_cpu} actors", + lambda: ray.get(dag.execute(b"x")), + ) + dag = dag.experimental_compile() + results += timeit( + f"[unstable] compiled chain DAG calls, n={n_cpu} actors", lambda: _exec(dag) + ) + + ray.shutdown() - ############################ - # End of channel perf tests. - ############################ + ############################ + # End of channel perf tests. + ############################ NUM_PGS = 100 NUM_BUNDLES = 1 diff --git a/release/microbenchmark/run_microbenchmark.py b/release/microbenchmark/run_microbenchmark.py index aa017154eb6e..afa4e1b724f8 100644 --- a/release/microbenchmark/run_microbenchmark.py +++ b/release/microbenchmark/run_microbenchmark.py @@ -2,11 +2,6 @@ import os -import argparse - -parser = argparse.ArgumentParser() -parser.add_argument("--run-dag", action="store_true", default=False, help="Run DAG related tests") - def to_dict_key(key: str): for r in [" ", ":", "-"]: key = key.replace(r, "_") @@ -18,9 +13,7 @@ def to_dict_key(key: str): if __name__ == "__main__": from ray._private.ray_perf import main - args = parser.parse_args() - - results = main(run_dag=args.run_dag) or [] + results = main() or [] result_dict = { f"{to_dict_key(v[0])}": (v[1], v[2]) for v in results if v is not None diff --git a/release/release_tests.yaml b/release/release_tests.yaml index 02158e21628e..756377134b6a 100644 --- a/release/release_tests.yaml +++ b/release/release_tests.yaml @@ -4630,7 +4630,7 @@ run: timeout: 1800 - script: OMP_NUM_THREADS=64 RAY_ADDRESS=local python run_microbenchmark.py + script: OMP_NUM_THREADS=64 RAY_ADDRESS=local TESTS_TO_SKIP=unstable python run_microbenchmark.py variations: - __suffix__: aws @@ -4643,7 +4643,7 @@ frequency: weekly python: "3.11" -- name: microbenchmark_dag +- name: microbenchmark_unstable group: core-daily-test team: core frequency: nightly @@ -4657,7 +4657,7 @@ run: timeout: 1800 - script: OMP_NUM_THREADS=64 RAY_ADDRESS=local python run_microbenchmark.py --run-dag + script: OMP_NUM_THREADS=64 RAY_ADDRESS=local TESTS_TO_RUN=unstable python run_microbenchmark.py --run-dag - name: benchmark_worker_startup group: core-daily-test