Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core][experimental] Raise an exception if a DAG is compiled twice #47431

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion python/ray/dag/compiled_dag_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -735,7 +735,10 @@ def _preprocess(self) -> None:
task.dag_node, InputAttributeNode
):
continue
if len(task.downstream_task_idxs) == 0 and task.dag_node.is_output_node:
if (
len(task.downstream_task_idxs) == 0
and task.dag_node.is_adag_output_node
):
assert self.output_task_idx is None, "More than one output node found"
self.output_task_idx = idx

Expand Down
24 changes: 22 additions & 2 deletions python/ray/dag/dag_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ def __init__(
self.cache_from_last_execute = {}

self._type_hint: Optional[ChannelOutputType] = ChannelOutputType()
self.is_output_node = False
# Whether this node calls `experimental_compile`.
self.is_adag_output_node = False

def _collect_upstream_nodes(self) -> List["DAGNode"]:
"""
Expand Down Expand Up @@ -199,10 +200,17 @@ def experimental_compile(
if _max_buffered_results is None:
_max_buffered_results = ctx.max_buffered_results

# Validate whether this DAG node has already been compiled.
if self.is_adag_output_node:
raise ValueError(
"It is not allowed to call `experimental_compile` on the same DAG "
"object multiple times no matter whether `teardown` is called or not. "
"Please reuse the existing compiled DAG or create a new one."
)
# Whether this node is an output node in the DAG. We cannot determine
# this in the constructor because the output node is determined when
# `experimental_compile` is called.
self.is_output_node = True
self.is_adag_output_node = True
return build_compiled_dag_from_ray_dag(
self,
_execution_timeout,
Expand Down Expand Up @@ -380,9 +388,21 @@ def traverse_and_apply(self, fn: "Callable[[DAGNode], T]"):
"""
visited = set()
queue = [self]
adag_output_node: Optional[DAGNode] = None

while queue:
node = queue.pop(0)
if node not in visited:
if node.is_adag_output_node:
# Validate whether there are multiple nodes that call
# `experimental_compile`.
if adag_output_node is not None:
raise ValueError(
kevin85421 marked this conversation as resolved.
Show resolved Hide resolved
"The DAG was compiled more than once. The following two "
"nodes call `experimental_compile`: "
f"(1) {adag_output_node}, (2) {node}"
)
adag_output_node = node
fn(node)
visited.add(node)
"""
Expand Down
76 changes: 76 additions & 0 deletions python/ray/dag/tests/experimental/test_accelerated_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -824,6 +824,82 @@ def f(x):
compiled_dag.teardown()


class TestDAGExceptionCompileMultipleTimes:
def test_compile_twice_with_teardown(self, ray_start_regular):
a = Actor.remote(0)
with InputNode() as i:
dag = a.echo.bind(i)
compiled_dag = dag.experimental_compile()
compiled_dag.teardown()
with pytest.raises(
ValueError,
match="It is not allowed to call `experimental_compile` on the same DAG "
"object multiple times no matter whether `teardown` is called or not. "
"Please reuse the existing compiled DAG or create a new one.",
):
compiled_dag = dag.experimental_compile()

def test_compile_twice_without_teardown(self, ray_start_regular):
a = Actor.remote(0)
with InputNode() as i:
dag = a.echo.bind(i)
compiled_dag = dag.experimental_compile()
with pytest.raises(
ValueError,
match="It is not allowed to call `experimental_compile` on the same DAG "
"object multiple times no matter whether `teardown` is called or not. "
"Please reuse the existing compiled DAG or create a new one.",
):
compiled_dag = dag.experimental_compile()
compiled_dag.teardown()

def test_compile_twice_with_multioutputnode(self, ray_start_regular):
a = Actor.remote(0)
with InputNode() as i:
dag = MultiOutputNode([a.echo.bind(i)])
compiled_dag = dag.experimental_compile()
compiled_dag.teardown()
with pytest.raises(
ValueError,
match="It is not allowed to call `experimental_compile` on the same DAG "
"object multiple times no matter whether `teardown` is called or not. "
"Please reuse the existing compiled DAG or create a new one.",
):
compiled_dag = dag.experimental_compile()

def test_compile_twice_with_multioutputnode_without_teardown(
self, ray_start_regular
):
a = Actor.remote(0)
with InputNode() as i:
dag = MultiOutputNode([a.echo.bind(i)])
compiled_dag = dag.experimental_compile()
with pytest.raises(
ValueError,
match="It is not allowed to call `experimental_compile` on the same DAG "
"object multiple times no matter whether `teardown` is called or not. "
"Please reuse the existing compiled DAG or create a new one.",
):
compiled_dag = dag.experimental_compile()
compiled_dag.teardown()

def test_compile_twice_with_different_nodes(self, ray_start_regular):
a = Actor.remote(0)
b = Actor.remote(0)
with InputNode() as i:
branch1 = a.echo.bind(i)
branch2 = b.echo.bind(i)
dag = MultiOutputNode([branch1])
compiled_dag = dag.experimental_compile()
compiled_dag.teardown()
with pytest.raises(
ValueError,
match="The DAG was compiled more than once. The following two "
"nodes call `experimental_compile`: ",
):
compiled_dag = branch2.experimental_compile()


def test_exceed_max_buffered_results(ray_start_regular):
a = Actor.remote(0)
with InputNode() as i:
Expand Down