Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(sdk): avoid conflicting component names in DAG when reusing pipelines #11071

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions sdk/python/kfp/compiler/compiler_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -909,6 +909,48 @@ def my_pipeline() -> NamedTuple('Outputs', [
]):
task = print_and_return(text='Hello')

def test_pipeline_reusing_other_pipeline_multiple_times(self):

@dsl.pipeline()
def reusable_pipeline():
print_op(message='Reused pipeline')

@dsl.pipeline()
def do_something_else_pipeline():
print_op(message='Do something else pipeline')

reusable_pipeline()

@dsl.pipeline()
def orchestrator_pipeline():
reusable_pipeline()

do_something_else_pipeline()

with tempfile.TemporaryDirectory() as tmpdir:
output_yaml = os.path.join(tmpdir, 'result.yaml')
compiler.Compiler().compile(
pipeline_func=orchestrator_pipeline, package_path=output_yaml)
self.assertTrue(os.path.exists(output_yaml))

with open(output_yaml, 'r') as f:
pipeline_spec = yaml.safe_load(f)
tasks = [
comp.get('dag', {}).get('tasks', {})
for comp in pipeline_spec['components'].values()
]
component_refs = [[
x.get('componentRef', {}).get('name')
for x in task.values()
]
for task in tasks]
all_component_refs = [
item for sublist in component_refs for item in sublist
]
counted_refs = collections.Counter(all_component_refs)
print(counted_refs)
self.assertEqual(1, max(counted_refs.values()))


class V2NamespaceAliasTest(unittest.TestCase):
"""Test that imports of both modules and objects are aliased (e.g. all
Expand Down
24 changes: 18 additions & 6 deletions sdk/python/kfp/compiler/pipeline_spec_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -1773,10 +1773,18 @@ def _merge_component_spec(
) -> None:
"""Merges component spec from a sub pipeline spec into the main config.

During the merge we need to ensure all component specs have unique component
During the merge we need to ensure all component specs have a unique component
name, that means we might need to update the `component_ref` referenced from
task specs in sub_pipeline_spec.

Uniqueness is determined by existing component names in main_pipeline_spec
and sub_pipeline_spec.

Renaming is first done in place, specifically in a LIFO order. This is to avoid
a rename causing a name collision with a later rename. Then, the actual merge
of component specs is done in a second pass. This ensures all component specs
are in the final state at the time of merging.

Args:
main_pipeline_spec: The main pipeline spec to merge into.
sub_pipeline_spec: The pipeline spec of an inner pipeline whose
Expand All @@ -1800,18 +1808,22 @@ def _rename_component_refs(
if task_spec.component_ref.name == old_component_ref:
task_spec.component_ref.name = new_component_ref

# Do all the renaming in place, then do the acutal merge of component specs
# in a second pass. This would ensure all component specs are in the final
# state at the time of merging.
old_name_to_new_name = {}
for component_name, component_spec in sub_pipeline_spec.components.items():
existing_main_comp_names = list(main_pipeline_spec.components.keys())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Last I heard about this test, this was said. Let me know what you think.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Upon inspecting the logs more in depth, it seems logic that this test would fail is this renaming logic has been updated. Do you think it is appropriate to update the .yaml result with the new configuration?

for component_name, _ in sub_pipeline_spec.components.items():
old_component_name = component_name
current_comp_name_collection = [
key for pair in old_name_to_new_name.items() for key in pair
]
new_component_name = utils.make_name_unique_by_adding_index(
name=component_name,
collection=list(main_pipeline_spec.components.keys()),
collection=existing_main_comp_names + current_comp_name_collection,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make_name_unique_by_adding_index may not ensure complete uniqueness of component names when used within nested or reused pipelines. This could result in naming conflicts if the names are not correctly indexed.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would expect it does when passing every component to the collection instead of just the components from the main pipeline. Could you elaborate on other cases I'm missing out on?

delimiter='-')
old_name_to_new_name[old_component_name] = new_component_name

ordered_names = enumerate(old_name_to_new_name.items())
lifo_ordered_names = sorted(ordered_names, key=lambda x: x[0], reverse=True)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LIFO ordering for renaming might not appropriately handle the component references, especially if the pipeline structure doesn't align with this approach.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the pipeline structure have to align with the renaming approach? LIFO seems crucial here as you want the most complex names (the last in order) to be renamed first to avoid renaming/conflicting with the more generic names.

for _, (old_component_name, new_component_name) in lifo_ordered_names:
if new_component_name != old_component_name:
_rename_component_refs(
pipeline_spec=sub_pipeline_spec,
Expand Down
Loading