diff --git a/sdk/python/kfp/compiler/compiler_test.py b/sdk/python/kfp/compiler/compiler_test.py index d417d9eec19..f553329dea4 100644 --- a/sdk/python/kfp/compiler/compiler_test.py +++ b/sdk/python/kfp/compiler/compiler_test.py @@ -909,6 +909,48 @@ def my_pipeline() -> NamedTuple('Outputs', [ ]): task = print_and_return(text='Hello') + def test_pipeline_reusing_other_pipeline_multiple_times(self): + + @dsl.pipeline() + def reusable_pipeline(): + print_op(message='Reused pipeline') + + @dsl.pipeline() + def do_something_else_pipeline(): + print_op(message='Do something else pipeline') + + reusable_pipeline() + + @dsl.pipeline() + def orchestrator_pipeline(): + reusable_pipeline() + + do_something_else_pipeline() + + with tempfile.TemporaryDirectory() as tmpdir: + output_yaml = os.path.join(tmpdir, 'result.yaml') + compiler.Compiler().compile( + pipeline_func=orchestrator_pipeline, package_path=output_yaml) + self.assertTrue(os.path.exists(output_yaml)) + + with open(output_yaml, 'r') as f: + pipeline_spec = yaml.safe_load(f) + tasks = [ + comp.get('dag', {}).get('tasks', {}) + for comp in pipeline_spec['components'].values() + ] + component_refs = [[ + x.get('componentRef', {}).get('name') + for x in task.values() + ] + for task in tasks] + all_component_refs = [ + item for sublist in component_refs for item in sublist + ] + counted_refs = collections.Counter(all_component_refs) + print(counted_refs) + self.assertEqual(1, max(counted_refs.values())) + class V2NamespaceAliasTest(unittest.TestCase): """Test that imports of both modules and objects are aliased (e.g. all