Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core][ci] Run dag microbenchmark seperately #42360

Merged
merged 2 commits into from
Jan 12, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
224 changes: 114 additions & 110 deletions python/ray/_private/ray_perf.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def check_optimized_build():
logger.warning(msg)


def main(results=None):
def main(run_dag: bool = False, results=None):
results = results or []

check_optimized_build()
Expand Down Expand Up @@ -300,134 +300,138 @@ def async_actor_multi():
#################################################
# Perf tests for channels, used in compiled DAGs.
#################################################
if run_dag:
ray.init()

ray.init()

def put_channel_small(chans, do_get=False, do_release=False):
for chan in chans:
chan.write(b"0")
if do_get:
chan.begin_read()
if do_release:
chan.end_read()

@ray.remote
class ChannelReader:
def ready(self):
return

def read(self, chans):
while True:
for chan in chans:
def put_channel_small(chans, do_get=False, do_release=False):
for chan in chans:
chan.write(b"0")
if do_get:
chan.begin_read()
if do_release:
chan.end_read()

chans = [ray_channel.Channel(1000)]
results += timeit(
"local put, single channel calls",
lambda: put_channel_small(chans, do_release=True),
)
results += timeit(
"local put:local get, single channel calls",
lambda: put_channel_small(chans, do_get=True, do_release=True),
)

chans = [ray_channel.Channel(1000)]
reader = ChannelReader.remote()
ray.get(reader.ready.remote())
reader.read.remote(chans)
results += timeit(
"local put:1 remote get, single channel calls", lambda: put_channel_small(chans)
)
ray.kill(reader)

n_cpu = multiprocessing.cpu_count() // 2
print(f"Testing multiple readers/channels, n={n_cpu}")
@ray.remote
class ChannelReader:
def ready(self):
return

def read(self, chans):
while True:
for chan in chans:
chan.begin_read()
chan.end_read()

chans = [ray_channel.Channel(1000)]
results += timeit(
"local put, single channel calls",
lambda: put_channel_small(chans, do_release=True),
)
results += timeit(
"local put:local get, single channel calls",
lambda: put_channel_small(chans, do_get=True, do_release=True),
)

chans = [ray_channel.Channel(1000, num_readers=n_cpu)]
readers = [ChannelReader.remote() for _ in range(n_cpu)]
ray.get([reader.ready.remote() for reader in readers])
for reader in readers:
chans = [ray_channel.Channel(1000)]
reader = ChannelReader.remote()
ray.get(reader.ready.remote())
reader.read.remote(chans)
results += timeit(
"local put:n remote get, single channel calls",
lambda: put_channel_small(chans),
)
for reader in readers:
results += timeit(
"local put:1 remote get, single channel calls",
lambda: put_channel_small(chans),
)
ray.kill(reader)

chans = [ray_channel.Channel(1000) for _ in range(n_cpu)]
reader = ChannelReader.remote()
ray.get(reader.ready.remote())
reader.read.remote(chans)
results += timeit(
"local put:1 remote get, n channels calls", lambda: put_channel_small(chans)
)
ray.kill(reader)
n_cpu = multiprocessing.cpu_count() // 2
print(f"Testing multiple readers/channels, n={n_cpu}")

chans = [ray_channel.Channel(1000, num_readers=n_cpu)]
readers = [ChannelReader.remote() for _ in range(n_cpu)]
ray.get([reader.ready.remote() for reader in readers])
for reader in readers:
reader.read.remote(chans)
results += timeit(
"local put:n remote get, single channel calls",
lambda: put_channel_small(chans),
)
for reader in readers:
ray.kill(reader)

chans = [ray_channel.Channel(1000) for _ in range(n_cpu)]
readers = [ChannelReader.remote() for _ in range(n_cpu)]
ray.get([reader.ready.remote() for reader in readers])
for chan, reader in zip(chans, readers):
reader.read.remote([chan])
results += timeit(
"local put:n remote get, n channels calls", lambda: put_channel_small(chans)
)
for reader in readers:
chans = [ray_channel.Channel(1000) for _ in range(n_cpu)]
reader = ChannelReader.remote()
ray.get(reader.ready.remote())
reader.read.remote(chans)
results += timeit(
"local put:1 remote get, n channels calls", lambda: put_channel_small(chans)
)
ray.kill(reader)

# Tests for compiled DAGs.
chans = [ray_channel.Channel(1000) for _ in range(n_cpu)]
readers = [ChannelReader.remote() for _ in range(n_cpu)]
ray.get([reader.ready.remote() for reader in readers])
for chan, reader in zip(chans, readers):
reader.read.remote([chan])
results += timeit(
"local put:n remote get, n channels calls", lambda: put_channel_small(chans)
)
for reader in readers:
ray.kill(reader)

def _exec(dag):
output_channel = dag.execute(b"x")
output_channel.begin_read()
output_channel.end_read()
# Tests for compiled DAGs.

def _exec_multi_output(dag):
output_channels = dag.execute(b"x")
for output_channel in output_channels:
def _exec(dag):
output_channel = dag.execute(b"x")
output_channel.begin_read()
for output_channel in output_channels:
output_channel.end_read()

a = DAGActor.remote()
with InputNode() as inp:
dag = a.echo.bind(inp)

results += timeit("single-actor DAG calls", lambda: ray.get(dag.execute(b"x")))
dag = dag.experimental_compile()
results += timeit("compiled single-actor DAG calls", lambda: _exec(dag))

del a
n_cpu = multiprocessing.cpu_count() // 2
actors = [DAGActor.remote() for _ in range(n_cpu)]
with InputNode() as inp:
dag = MultiOutputNode([a.echo.bind(inp) for a in actors])
results += timeit(
"scatter-gather DAG calls, n={n_cpu} actors", lambda: ray.get(dag.execute(b"x"))
)
dag = dag.experimental_compile()
results += timeit(
f"compiled scatter-gather DAG calls, n={n_cpu} actors",
lambda: _exec_multi_output(dag),
)
def _exec_multi_output(dag):
output_channels = dag.execute(b"x")
for output_channel in output_channels:
output_channel.begin_read()
for output_channel in output_channels:
output_channel.end_read()

a = DAGActor.remote()
with InputNode() as inp:
dag = a.echo.bind(inp)

results += timeit("single-actor DAG calls", lambda: ray.get(dag.execute(b"x")))
dag = dag.experimental_compile()
results += timeit("compiled single-actor DAG calls", lambda: _exec(dag))

del a
n_cpu = multiprocessing.cpu_count() // 2
actors = [DAGActor.remote() for _ in range(n_cpu)]
with InputNode() as inp:
dag = MultiOutputNode([a.echo.bind(inp) for a in actors])
results += timeit(
"scatter-gather DAG calls, n={n_cpu} actors",
lambda: ray.get(dag.execute(b"x")),
)
dag = dag.experimental_compile()
results += timeit(
f"compiled scatter-gather DAG calls, n={n_cpu} actors",
lambda: _exec_multi_output(dag),
)

actors = [DAGActor.remote() for _ in range(n_cpu)]
with InputNode() as inp:
dag = inp
for a in actors:
dag = a.echo.bind(dag)
results += timeit(
f"chain DAG calls, n={n_cpu} actors", lambda: ray.get(dag.execute(b"x"))
)
dag = dag.experimental_compile()
results += timeit(f"compiled chain DAG calls, n={n_cpu} actors", lambda: _exec(dag))
actors = [DAGActor.remote() for _ in range(n_cpu)]
with InputNode() as inp:
dag = inp
for a in actors:
dag = a.echo.bind(dag)
results += timeit(
f"chain DAG calls, n={n_cpu} actors", lambda: ray.get(dag.execute(b"x"))
)
dag = dag.experimental_compile()
results += timeit(
f"compiled chain DAG calls, n={n_cpu} actors", lambda: _exec(dag)
)

ray.shutdown()
ray.shutdown()

############################
# End of channel perf tests.
############################
############################
# End of channel perf tests.
############################

NUM_PGS = 100
NUM_BUNDLES = 1
Expand Down
9 changes: 8 additions & 1 deletion release/microbenchmark/run_microbenchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@
import os


import argparse

parser = argparse.ArgumentParser()
parser.add_argument("--run-dag", action="store_true", default=False, help="Run DAG related tests")

def to_dict_key(key: str):
for r in [" ", ":", "-"]:
key = key.replace(r, "_")
Expand All @@ -13,7 +18,9 @@ def to_dict_key(key: str):
if __name__ == "__main__":
from ray._private.ray_perf import main

results = main() or []
args = parser.parse_args()

results = main(run_dag=args.run_dag) or []

result_dict = {
f"{to_dict_key(v[0])}": (v[1], v[2]) for v in results if v is not None
Expand Down
15 changes: 15 additions & 0 deletions release/release_tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4643,6 +4643,21 @@
frequency: weekly
python: "3.11"

- name: microbenchmark_dag
group: core-daily-test
team: core
frequency: nightly
working_dir: microbenchmark

stable: false

cluster:
byod: {}
cluster_compute: tpl_64.yaml

run:
timeout: 1800
script: OMP_NUM_THREADS=64 RAY_ADDRESS=local python run_microbenchmark.py --run-dag

- name: benchmark_worker_startup
group: core-daily-test
Expand Down
Loading