Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
Signed-off-by: Kai-Hsun Chen <kaihsun@anyscale.com>
  • Loading branch information
kevin85421 committed Aug 29, 2024
1 parent e043a03 commit abd946b
Showing 1 changed file with 25 additions and 1 deletion.
26 changes: 25 additions & 1 deletion python/ray/_private/ray_experimental_perf.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ async def _exec_async():
_exec_async,
)

# Single-actor DAG calls

a = DAGActor.remote()
with InputNode() as inp:
dag = a.echo.bind(inp)
Expand All @@ -172,7 +174,13 @@ async def _exec_async():
"[unstable] compiled single-actor DAG calls", lambda: _exec(compiled_dag)
)
compiled_dag.teardown()
del a

# Single-actor asyncio DAG calls

a = DAGActor.remote()
with InputNode() as inp:
dag = a.echo.bind(inp)
compiled_dag = dag.experimental_compile(enable_asyncio=True)
results += loop.run_until_complete(
exec_async(
Expand All @@ -183,8 +191,10 @@ async def _exec_async():
# these DAGs create a background thread that can segfault if the CoreWorker
# is torn down first.
compiled_dag.teardown()

del a

# Scatter-gather DAG calls

n_cpu = multiprocessing.cpu_count() // 2
actors = [DAGActor.remote() for _ in range(n_cpu)]
with InputNode() as inp:
Expand All @@ -200,6 +210,11 @@ async def _exec_async():
)
compiled_dag.teardown()

# Scatter-gather asyncio DAG calls

actors = [DAGActor.remote() for _ in range(n_cpu)]
with InputNode() as inp:
dag = MultiOutputNode([a.echo.bind(inp) for a in actors])
compiled_dag = dag.experimental_compile(enable_asyncio=True)
results += loop.run_until_complete(
exec_async(
Expand All @@ -211,6 +226,8 @@ async def _exec_async():
# is torn down first.
compiled_dag.teardown()

# Chain DAG calls

actors = [DAGActor.remote() for _ in range(n_cpu)]
with InputNode() as inp:
dag = inp
Expand All @@ -227,6 +244,13 @@ async def _exec_async():
)
compiled_dag.teardown()

# Chain asyncio DAG calls

actors = [DAGActor.remote() for _ in range(n_cpu)]
with InputNode() as inp:
dag = inp
for a in actors:
dag = a.echo.bind(dag)
compiled_dag = dag.experimental_compile(enable_asyncio=True)
results += loop.run_until_complete(
exec_async(f"[unstable] compiled chain asyncio DAG calls, n={n_cpu} actors")
Expand Down

0 comments on commit abd946b

Please sign in to comment.