Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(sdk): make component names more unique when merging specs #9969

49 changes: 49 additions & 0 deletions sdk/python/kfp/compiler/compiler_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -792,6 +792,55 @@ def my_pipeline() -> NamedTuple('Outputs', [
]):
task = print_op(msg='Hello')

def test_nested_conditions_with_for_loops_in_nested_pipelines(self):

@dsl.pipeline()
def pipeline_2():
with dsl.ParallelFor([1, 2, 3]):
print_op(message='1')

@dsl.pipeline()
def pipeline_3(run_pipeline_2: int = 1):
with dsl.ParallelFor([1, 2, 3]):
print_op(message='1')

with dsl.Condition(run_pipeline_2 == run_pipeline_2):
with dsl.Condition(run_pipeline_2 == 1):
pipeline_2()

@dsl.pipeline()
def pipeline_1(run_pipeline_2: int = 1, run_pipeline_3: int = 1):
with dsl.Condition(run_pipeline_2 == run_pipeline_2):
with dsl.Condition(run_pipeline_2 == 1):
pipeline_2()

with dsl.Condition(run_pipeline_3 == run_pipeline_3):
with dsl.Condition(run_pipeline_3 == 1):
pipeline_3()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a minimal example? If not, could we reduce this to the minimal example required to reproduce the problematic behavior? This will help us ensure we don't regress and will reduce the likelihood that the test is removed as redundant or unclear in the future.

Also, consider renaming the test method to reflect the behavior under test and, if needed, adding a docstring to describe in more detail.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be much better now, thanks for the push


with tempfile.TemporaryDirectory() as tmpdir:
output_yaml = os.path.join(tmpdir, 'result.yaml')
compiler.Compiler().compile(
pipeline_func=pipeline_1, package_path=output_yaml)
self.assertTrue(os.path.exists(output_yaml))

with open(output_yaml, 'r') as f:
pipeline_spec = yaml.safe_load(f)
tasks = [
comp.get('dag', {}).get('tasks', {})
for comp in pipeline_spec['components'].values()
]
component_refs = [[
x.get('componentRef', {}).get('name')
for x in task.values()
]
for task in tasks]
all_component_refs = [
item for sublist in component_refs for item in sublist
]
counted_refs = collections.Counter(all_component_refs)
self.assertEqual(1, max(counted_refs.values()))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my local testing, this passes both before and after the changes. I also observe that the YAML is not different. Is it possible there is something missing from this test case?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad - went to harsh on cutting components to a minimal version.



class V2NamespaceAliasTest(unittest.TestCase):
"""Test that imports of both modules and objects are aliased (e.g. all
Expand Down
24 changes: 18 additions & 6 deletions sdk/python/kfp/compiler/pipeline_spec_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -1579,10 +1579,18 @@ def _merge_component_spec(
) -> None:
"""Merges component spec from a sub pipeline spec into the main config.

During the merge we need to ensure all component specs have unique component
During the merge we need to ensure all component specs have a unique component
name, that means we might need to update the `component_ref` referenced from
task specs in sub_pipeline_spec.

Uniqueness is determined by existing component names in main_pipeline_spec
and sub_pipeline_spec.

Renaming is first done in place, specifically in a LIFO order. This is to avoid
a rename causing a name collision with a later rename. Then, the actual merge
of component specs is done in a second pass. This ensures all component specs
are in the final state at the time of merging.

Args:
main_pipeline_spec: The main pipeline spec to merge into.
sub_pipeline_spec: The pipeline spec of an inner pipeline whose
Expand All @@ -1606,18 +1614,22 @@ def _rename_component_refs(
if task_spec.component_ref.name == old_component_ref:
task_spec.component_ref.name = new_component_ref

# Do all the renaming in place, then do the acutal merge of component specs
# in a second pass. This would ensure all component specs are in the final
# state at the time of merging.
old_name_to_new_name = {}
for component_name, component_spec in sub_pipeline_spec.components.items():
existing_main_comp_names = list(main_pipeline_spec.components.keys())
for component_name, _ in sub_pipeline_spec.components.items():
old_component_name = component_name
current_comp_name_collection = [
key for pair in old_name_to_new_name.items() for key in pair
]
new_component_name = utils.make_name_unique_by_adding_index(
name=component_name,
collection=list(main_pipeline_spec.components.keys()),
collection=existing_main_comp_names + current_comp_name_collection,
delimiter='-')
old_name_to_new_name[old_component_name] = new_component_name

ordered_names = enumerate(old_name_to_new_name.items())
lifo_ordered_names = sorted(ordered_names, key=lambda x: x[0], reverse=True)
for _, (old_component_name, new_component_name) in lifo_ordered_names:
if new_component_name != old_component_name:
_rename_component_refs(
pipeline_spec=sub_pipeline_spec,
Expand Down