From ddbf4c8b3e503b107aa785701b1e4aa85a21ad79 Mon Sep 17 00:00:00 2001 From: Stijn Tratsaert Date: Mon, 11 Sep 2023 02:01:34 +0200 Subject: [PATCH 1/7] fix(sdk): make component names in dag more unique --- sdk/python/kfp/compiler/pipeline_spec_builder.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/sdk/python/kfp/compiler/pipeline_spec_builder.py b/sdk/python/kfp/compiler/pipeline_spec_builder.py index b276f892c1d..5b1ee0733b8 100644 --- a/sdk/python/kfp/compiler/pipeline_spec_builder.py +++ b/sdk/python/kfp/compiler/pipeline_spec_builder.py @@ -1606,15 +1606,19 @@ def _rename_component_refs( if task_spec.component_ref.name == old_component_ref: task_spec.component_ref.name = new_component_ref - # Do all the renaming in place, then do the acutal merge of component specs + # Do all the renaming in place, then do the actual merge of component specs # in a second pass. This would ensure all component specs are in the final # state at the time of merging. old_name_to_new_name = {} + existing_main_comp_names = list(main_pipeline_spec.components.keys()) for component_name, component_spec in sub_pipeline_spec.components.items(): old_component_name = component_name + current_comp_name_collection = [ + key for pair in old_name_to_new_name.items() for key in pair + ] new_component_name = utils.make_name_unique_by_adding_index( name=component_name, - collection=list(main_pipeline_spec.components.keys()), + collection=existing_main_comp_names + current_comp_name_collection, delimiter='-') old_name_to_new_name[old_component_name] = new_component_name From 954a9f3a9522ad0793b4d7c03d2525754b82d62e Mon Sep 17 00:00:00 2001 From: Stijn Tratsaert Date: Sun, 17 Sep 2023 16:39:51 +0200 Subject: [PATCH 2/7] tweak ordering so that renaming is handled in LIFO fashion --- .../kfp/compiler/pipeline_spec_builder.py | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/sdk/python/kfp/compiler/pipeline_spec_builder.py b/sdk/python/kfp/compiler/pipeline_spec_builder.py index 5b1ee0733b8..b60fb4ec367 100644 --- a/sdk/python/kfp/compiler/pipeline_spec_builder.py +++ b/sdk/python/kfp/compiler/pipeline_spec_builder.py @@ -1579,10 +1579,18 @@ def _merge_component_spec( ) -> None: """Merges component spec from a sub pipeline spec into the main config. - During the merge we need to ensure all component specs have unique component + During the merge we need to ensure all component specs have a unique component name, that means we might need to update the `component_ref` referenced from task specs in sub_pipeline_spec. + Uniqueness is determined by existing component names in main_pipeline_spec + and sub_pipeline_spec. + + Renaming is first done in place, specifically in a LIFO order. This is to avoid + a rename causing a name collision with a later rename. Then, the actual merge + of component specs is done in a second pass. This ensures all component specs + are in the final state at the time of merging. + Args: main_pipeline_spec: The main pipeline spec to merge into. sub_pipeline_spec: The pipeline spec of an inner pipeline whose @@ -1606,12 +1614,9 @@ def _rename_component_refs( if task_spec.component_ref.name == old_component_ref: task_spec.component_ref.name = new_component_ref - # Do all the renaming in place, then do the actual merge of component specs - # in a second pass. This would ensure all component specs are in the final - # state at the time of merging. old_name_to_new_name = {} existing_main_comp_names = list(main_pipeline_spec.components.keys()) - for component_name, component_spec in sub_pipeline_spec.components.items(): + for component_name, _ in sub_pipeline_spec.components.items(): old_component_name = component_name current_comp_name_collection = [ key for pair in old_name_to_new_name.items() for key in pair @@ -1622,6 +1627,9 @@ def _rename_component_refs( delimiter='-') old_name_to_new_name[old_component_name] = new_component_name + ordered_names = enumerate(old_name_to_new_name.items()) + lifo_ordered_names = sorted(ordered_names, key=lambda x: x[0], reverse=True) + for _, (old_component_name, new_component_name) in lifo_ordered_names: if new_component_name != old_component_name: _rename_component_refs( pipeline_spec=sub_pipeline_spec, From 15e9d33b7bfba0d050a4adeea346df9ca9009e38 Mon Sep 17 00:00:00 2001 From: Stijn Tratsaert Date: Mon, 18 Sep 2023 22:15:14 +0200 Subject: [PATCH 3/7] format docstring --- sdk/python/kfp/compiler/pipeline_spec_builder.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sdk/python/kfp/compiler/pipeline_spec_builder.py b/sdk/python/kfp/compiler/pipeline_spec_builder.py index b60fb4ec367..1dc8ee4e991 100644 --- a/sdk/python/kfp/compiler/pipeline_spec_builder.py +++ b/sdk/python/kfp/compiler/pipeline_spec_builder.py @@ -1585,11 +1585,11 @@ def _merge_component_spec( Uniqueness is determined by existing component names in main_pipeline_spec and sub_pipeline_spec. - + Renaming is first done in place, specifically in a LIFO order. This is to avoid a rename causing a name collision with a later rename. Then, the actual merge - of component specs is done in a second pass. This ensures all component specs - are in the final state at the time of merging. + of component specs is done in a second pass. This ensures all component specs + are in the final state at the time of merging. Args: main_pipeline_spec: The main pipeline spec to merge into. From 5ac37df3b30bce394ee05c820f2550bf6c898d21 Mon Sep 17 00:00:00 2001 From: Stijn Tratsaert Date: Mon, 18 Sep 2023 23:40:45 +0200 Subject: [PATCH 4/7] first iteration of new test_nested_conditions_with_for_loops_in_nested_pipelines --- sdk/python/kfp/compiler/compiler_test.py | 39 ++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/sdk/python/kfp/compiler/compiler_test.py b/sdk/python/kfp/compiler/compiler_test.py index 92b1f6a1b72..006670941aa 100644 --- a/sdk/python/kfp/compiler/compiler_test.py +++ b/sdk/python/kfp/compiler/compiler_test.py @@ -792,6 +792,45 @@ def my_pipeline() -> NamedTuple('Outputs', [ ]): task = print_op(msg='Hello') + def test_nested_conditions_with_for_loops_in_nested_pipelines(self): + @dsl.pipeline() + def pipeline_2(): + with dsl.ParallelFor([1, 2, 3]): + print_op(message='1') + + @dsl.pipeline() + def pipeline_3(run_pipeline_2: int = 1): + with dsl.ParallelFor([1, 2, 3]): + print_op(message='1') + + with dsl.Condition(run_pipeline_2==run_pipeline_2): + with dsl.Condition(run_pipeline_2 == 1): + pipeline_2() + + @dsl.pipeline() + def pipeline_1(run_pipeline_2: int = 1, run_pipeline_3: int = 1): + with dsl.Condition(run_pipeline_2 == run_pipeline_2): + with dsl.Condition(run_pipeline_2 == 1): + pipeline_2() + + with dsl.Condition(run_pipeline_3 == run_pipeline_3): + with dsl.Condition(run_pipeline_3 == 1): + pipeline_3() + + with tempfile.TemporaryDirectory() as tmpdir: + output_yaml = os.path.join(tmpdir, 'result.yaml') + compiler.Compiler().compile( + pipeline_func=pipeline_1, package_path=output_yaml) + self.assertTrue(os.path.exists(output_yaml)) + + with open(output_yaml, 'r') as f: + pipeline_spec = yaml.safe_load(f) + tasks = [comp.get("dag", {}).get("tasks", {}) for comp in pipeline_spec['components'].values()] + component_refs = [[x.get("componentRef", {}).get("name") for x in task.values()] for task in tasks] + all_component_refs = [item for sublist in component_refs for item in sublist] + counted_refs = collections.Counter(all_component_refs) + self.assertEqual(1, max(counted_refs.values())) + class V2NamespaceAliasTest(unittest.TestCase): """Test that imports of both modules and objects are aliased (e.g. all From c2c2f16c9a0ee829fb9705a9d3debc1e71ca06d0 Mon Sep 17 00:00:00 2001 From: Stijn Tratsaert Date: Tue, 19 Sep 2023 09:20:44 +0200 Subject: [PATCH 5/7] format using yapf --- sdk/python/kfp/compiler/compiler_test.py | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/sdk/python/kfp/compiler/compiler_test.py b/sdk/python/kfp/compiler/compiler_test.py index 006670941aa..31f38bbcba7 100644 --- a/sdk/python/kfp/compiler/compiler_test.py +++ b/sdk/python/kfp/compiler/compiler_test.py @@ -793,6 +793,7 @@ def my_pipeline() -> NamedTuple('Outputs', [ task = print_op(msg='Hello') def test_nested_conditions_with_for_loops_in_nested_pipelines(self): + @dsl.pipeline() def pipeline_2(): with dsl.ParallelFor([1, 2, 3]): @@ -803,7 +804,7 @@ def pipeline_3(run_pipeline_2: int = 1): with dsl.ParallelFor([1, 2, 3]): print_op(message='1') - with dsl.Condition(run_pipeline_2==run_pipeline_2): + with dsl.Condition(run_pipeline_2 == run_pipeline_2): with dsl.Condition(run_pipeline_2 == 1): pipeline_2() @@ -825,9 +826,18 @@ def pipeline_1(run_pipeline_2: int = 1, run_pipeline_3: int = 1): with open(output_yaml, 'r') as f: pipeline_spec = yaml.safe_load(f) - tasks = [comp.get("dag", {}).get("tasks", {}) for comp in pipeline_spec['components'].values()] - component_refs = [[x.get("componentRef", {}).get("name") for x in task.values()] for task in tasks] - all_component_refs = [item for sublist in component_refs for item in sublist] + tasks = [ + comp.get('dag', {}).get('tasks', {}) + for comp in pipeline_spec['components'].values() + ] + component_refs = [[ + x.get('componentRef', {}).get('name') + for x in task.values() + ] + for task in tasks] + all_component_refs = [ + item for sublist in component_refs for item in sublist + ] counted_refs = collections.Counter(all_component_refs) self.assertEqual(1, max(counted_refs.values())) From ae8495bee70b55e9a85f33b0ab6599fbff46a30d Mon Sep 17 00:00:00 2001 From: Stijn Tratsaert Date: Tue, 13 Feb 2024 21:17:00 +0100 Subject: [PATCH 6/7] rename test & improve readability --- sdk/python/kfp/compiler/compiler_test.py | 28 ++++++++---------------- 1 file changed, 9 insertions(+), 19 deletions(-) diff --git a/sdk/python/kfp/compiler/compiler_test.py b/sdk/python/kfp/compiler/compiler_test.py index a307fc4e0ed..2ff42d50abf 100644 --- a/sdk/python/kfp/compiler/compiler_test.py +++ b/sdk/python/kfp/compiler/compiler_test.py @@ -882,36 +882,26 @@ def my_pipeline() -> NamedTuple('Outputs', [ ]): task = print_and_return(text='Hello') - def test_nested_conditions_with_for_loops_in_nested_pipelines(self): + def test_pipeline_reusing_other_pipeline_multiple_times(self): @dsl.pipeline() - def pipeline_2(): - with dsl.ParallelFor([1, 2, 3]): - print_op(message='1') + def reusable_pipeline(): + print_op(message='Reused pipeline') @dsl.pipeline() - def pipeline_3(run_pipeline_2: int = 1): - with dsl.ParallelFor([1, 2, 3]): - print_op(message='1') - - with dsl.Condition(run_pipeline_2 == run_pipeline_2): - with dsl.Condition(run_pipeline_2 == 1): - pipeline_2() + def do_something_else_pipeline(): + reusable_pipeline() @dsl.pipeline() - def pipeline_1(run_pipeline_2: int = 1, run_pipeline_3: int = 1): - with dsl.Condition(run_pipeline_2 == run_pipeline_2): - with dsl.Condition(run_pipeline_2 == 1): - pipeline_2() + def orchestrator_pipeline(): + reusable_pipeline() - with dsl.Condition(run_pipeline_3 == run_pipeline_3): - with dsl.Condition(run_pipeline_3 == 1): - pipeline_3() + do_something_else_pipeline() with tempfile.TemporaryDirectory() as tmpdir: output_yaml = os.path.join(tmpdir, 'result.yaml') compiler.Compiler().compile( - pipeline_func=pipeline_1, package_path=output_yaml) + pipeline_func=orchestrator_pipeline, package_path=output_yaml) self.assertTrue(os.path.exists(output_yaml)) with open(output_yaml, 'r') as f: From 294403f3157829b59bafd4d8eb680b91fc5ee2d1 Mon Sep 17 00:00:00 2001 From: Stijn Tratsaert Date: Fri, 16 Feb 2024 09:59:44 +0100 Subject: [PATCH 7/7] readd component that made test pass unintendedly --- sdk/python/kfp/compiler/compiler_test.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sdk/python/kfp/compiler/compiler_test.py b/sdk/python/kfp/compiler/compiler_test.py index 2ff42d50abf..6314585345c 100644 --- a/sdk/python/kfp/compiler/compiler_test.py +++ b/sdk/python/kfp/compiler/compiler_test.py @@ -890,6 +890,8 @@ def reusable_pipeline(): @dsl.pipeline() def do_something_else_pipeline(): + print_op(message='Do something else pipeline') + reusable_pipeline() @dsl.pipeline() @@ -919,6 +921,7 @@ def orchestrator_pipeline(): item for sublist in component_refs for item in sublist ] counted_refs = collections.Counter(all_component_refs) + print(counted_refs) self.assertEqual(1, max(counted_refs.values()))