From 9a8e0dcb6c5afb9c57f17cb6fb5ee88aa84616af Mon Sep 17 00:00:00 2001 From: Kai-Hsun Chen Date: Fri, 30 Aug 2024 17:31:29 +0000 Subject: [PATCH 1/2] update Signed-off-by: Kai-Hsun Chen --- python/ray/dag/dag_node.py | 17 +++++ .../experimental/test_accelerated_dag.py | 72 +++++++++++++++++++ 2 files changed, 89 insertions(+) diff --git a/python/ray/dag/dag_node.py b/python/ray/dag/dag_node.py index 639546be311f..280479fdf32d 100644 --- a/python/ray/dag/dag_node.py +++ b/python/ray/dag/dag_node.py @@ -199,6 +199,12 @@ def experimental_compile( if _max_buffered_results is None: _max_buffered_results = ctx.max_buffered_results + # Validate whether this DAG node has already been compiled. + if self.is_output_node: + raise ValueError( + "The DAG has already been compiled! " + "Please reuse the existing compiled DAG." + ) # Whether this node is an output node in the DAG. We cannot determine # this in the constructor because the output node is determined when # `experimental_compile` is called. @@ -380,9 +386,20 @@ def traverse_and_apply(self, fn: "Callable[[DAGNode], T]"): """ visited = set() queue = [self] + num_output_nodes = 0 + while queue: node = queue.pop(0) if node not in visited: + if node.is_output_node: + num_output_nodes += 1 + # Validate whether there are multiple nodes that call + # `experimental_compile`. + if num_output_nodes > 1: + raise ValueError( + "The DAG was compiled more than once, so some output " + "nodes became leaf nodes." + ) fn(node) visited.add(node) """ diff --git a/python/ray/dag/tests/experimental/test_accelerated_dag.py b/python/ray/dag/tests/experimental/test_accelerated_dag.py index ae76ed8db3d5..0270c127cb75 100644 --- a/python/ray/dag/tests/experimental/test_accelerated_dag.py +++ b/python/ray/dag/tests/experimental/test_accelerated_dag.py @@ -747,6 +747,78 @@ def f(x): compiled_dag.teardown() +class TestDAGExceptionCompileMultipleTimes: + def test_compile_twice_with_teardown(self, ray_start_regular): + a = Actor.remote(0) + with InputNode() as i: + dag = a.echo.bind(i) + compiled_dag = dag.experimental_compile() + compiled_dag.teardown() + with pytest.raises( + ValueError, + match="The DAG has already been compiled! " + "Please reuse the existing compiled DAG.", + ): + compiled_dag = dag.experimental_compile() + + def test_compile_twice_without_teardown(self, ray_start_regular): + a = Actor.remote(0) + with InputNode() as i: + dag = a.echo.bind(i) + compiled_dag = dag.experimental_compile() + with pytest.raises( + ValueError, + match="The DAG has already been compiled! " + "Please reuse the existing compiled DAG.", + ): + compiled_dag = dag.experimental_compile() + compiled_dag.teardown() + + def test_compile_twice_with_multioutputnode(self, ray_start_regular): + a = Actor.remote(0) + with InputNode() as i: + dag = MultiOutputNode([a.echo.bind(i)]) + compiled_dag = dag.experimental_compile() + compiled_dag.teardown() + with pytest.raises( + ValueError, + match="The DAG has already been compiled! " + "Please reuse the existing compiled DAG.", + ): + compiled_dag = dag.experimental_compile() + + def test_compile_twice_with_multioutputnode_without_teardown( + self, ray_start_regular + ): + a = Actor.remote(0) + with InputNode() as i: + dag = MultiOutputNode([a.echo.bind(i)]) + compiled_dag = dag.experimental_compile() + with pytest.raises( + ValueError, + match="The DAG has already been compiled! " + "Please reuse the existing compiled DAG.", + ): + compiled_dag = dag.experimental_compile() + compiled_dag.teardown() + + def test_compile_twice_with_different_nodes(self, ray_start_regular): + a = Actor.remote(0) + b = Actor.remote(0) + with InputNode() as i: + branch1 = a.echo.bind(i) + branch2 = b.echo.bind(i) + dag = MultiOutputNode([branch1]) + compiled_dag = dag.experimental_compile() + compiled_dag.teardown() + with pytest.raises( + ValueError, + match="The DAG was compiled more than once, " + "so some output nodes became leaf nodes.", + ): + compiled_dag = branch2.experimental_compile() + + def test_exceed_max_buffered_results(ray_start_regular): a = Actor.remote(0) with InputNode() as i: From c31286deac9f2a6d8ff996e049675685d76d479f Mon Sep 17 00:00:00 2001 From: Kai-Hsun Chen Date: Fri, 30 Aug 2024 21:30:01 +0000 Subject: [PATCH 2/2] update Signed-off-by: Kai-Hsun Chen --- python/ray/dag/compiled_dag_node.py | 5 +++- python/ray/dag/dag_node.py | 25 +++++++++++-------- .../experimental/test_accelerated_dag.py | 24 ++++++++++-------- 3 files changed, 32 insertions(+), 22 deletions(-) diff --git a/python/ray/dag/compiled_dag_node.py b/python/ray/dag/compiled_dag_node.py index a3b588fd2912..93eee9ec58c8 100644 --- a/python/ray/dag/compiled_dag_node.py +++ b/python/ray/dag/compiled_dag_node.py @@ -729,7 +729,10 @@ def _preprocess(self) -> None: task.dag_node, InputAttributeNode ): continue - if len(task.downstream_task_idxs) == 0 and task.dag_node.is_output_node: + if ( + len(task.downstream_task_idxs) == 0 + and task.dag_node.is_adag_output_node + ): assert self.output_task_idx is None, "More than one output node found" self.output_task_idx = idx diff --git a/python/ray/dag/dag_node.py b/python/ray/dag/dag_node.py index 280479fdf32d..972aee46b0ae 100644 --- a/python/ray/dag/dag_node.py +++ b/python/ray/dag/dag_node.py @@ -72,7 +72,8 @@ def __init__( self.cache_from_last_execute = {} self._type_hint: Optional[ChannelOutputType] = ChannelOutputType() - self.is_output_node = False + # Whether this node calls `experimental_compile`. + self.is_adag_output_node = False def _collect_upstream_nodes(self) -> List["DAGNode"]: """ @@ -200,15 +201,16 @@ def experimental_compile( _max_buffered_results = ctx.max_buffered_results # Validate whether this DAG node has already been compiled. - if self.is_output_node: + if self.is_adag_output_node: raise ValueError( - "The DAG has already been compiled! " - "Please reuse the existing compiled DAG." + "It is not allowed to call `experimental_compile` on the same DAG " + "object multiple times no matter whether `teardown` is called or not. " + "Please reuse the existing compiled DAG or create a new one." ) # Whether this node is an output node in the DAG. We cannot determine # this in the constructor because the output node is determined when # `experimental_compile` is called. - self.is_output_node = True + self.is_adag_output_node = True return build_compiled_dag_from_ray_dag( self, _execution_timeout, @@ -386,20 +388,21 @@ def traverse_and_apply(self, fn: "Callable[[DAGNode], T]"): """ visited = set() queue = [self] - num_output_nodes = 0 + adag_output_node: Optional[DAGNode] = None while queue: node = queue.pop(0) if node not in visited: - if node.is_output_node: - num_output_nodes += 1 + if node.is_adag_output_node: # Validate whether there are multiple nodes that call # `experimental_compile`. - if num_output_nodes > 1: + if adag_output_node is not None: raise ValueError( - "The DAG was compiled more than once, so some output " - "nodes became leaf nodes." + "The DAG was compiled more than once. The following two " + "nodes call `experimental_compile`: " + f"(1) {adag_output_node}, (2) {node}" ) + adag_output_node = node fn(node) visited.add(node) """ diff --git a/python/ray/dag/tests/experimental/test_accelerated_dag.py b/python/ray/dag/tests/experimental/test_accelerated_dag.py index 0270c127cb75..2e903151cf88 100644 --- a/python/ray/dag/tests/experimental/test_accelerated_dag.py +++ b/python/ray/dag/tests/experimental/test_accelerated_dag.py @@ -756,8 +756,9 @@ def test_compile_twice_with_teardown(self, ray_start_regular): compiled_dag.teardown() with pytest.raises( ValueError, - match="The DAG has already been compiled! " - "Please reuse the existing compiled DAG.", + match="It is not allowed to call `experimental_compile` on the same DAG " + "object multiple times no matter whether `teardown` is called or not. " + "Please reuse the existing compiled DAG or create a new one.", ): compiled_dag = dag.experimental_compile() @@ -768,8 +769,9 @@ def test_compile_twice_without_teardown(self, ray_start_regular): compiled_dag = dag.experimental_compile() with pytest.raises( ValueError, - match="The DAG has already been compiled! " - "Please reuse the existing compiled DAG.", + match="It is not allowed to call `experimental_compile` on the same DAG " + "object multiple times no matter whether `teardown` is called or not. " + "Please reuse the existing compiled DAG or create a new one.", ): compiled_dag = dag.experimental_compile() compiled_dag.teardown() @@ -782,8 +784,9 @@ def test_compile_twice_with_multioutputnode(self, ray_start_regular): compiled_dag.teardown() with pytest.raises( ValueError, - match="The DAG has already been compiled! " - "Please reuse the existing compiled DAG.", + match="It is not allowed to call `experimental_compile` on the same DAG " + "object multiple times no matter whether `teardown` is called or not. " + "Please reuse the existing compiled DAG or create a new one.", ): compiled_dag = dag.experimental_compile() @@ -796,8 +799,9 @@ def test_compile_twice_with_multioutputnode_without_teardown( compiled_dag = dag.experimental_compile() with pytest.raises( ValueError, - match="The DAG has already been compiled! " - "Please reuse the existing compiled DAG.", + match="It is not allowed to call `experimental_compile` on the same DAG " + "object multiple times no matter whether `teardown` is called or not. " + "Please reuse the existing compiled DAG or create a new one.", ): compiled_dag = dag.experimental_compile() compiled_dag.teardown() @@ -813,8 +817,8 @@ def test_compile_twice_with_different_nodes(self, ray_start_regular): compiled_dag.teardown() with pytest.raises( ValueError, - match="The DAG was compiled more than once, " - "so some output nodes became leaf nodes.", + match="The DAG was compiled more than once. The following two " + "nodes call `experimental_compile`: ", ): compiled_dag = branch2.experimental_compile()