diff --git a/sdk/python/kfp/local/executor_input_utils.py b/sdk/python/kfp/local/executor_input_utils.py index 6b1ce147ab4..82eaa9d5b9f 100644 --- a/sdk/python/kfp/local/executor_input_utils.py +++ b/sdk/python/kfp/local/executor_input_utils.py @@ -35,6 +35,10 @@ def construct_executor_input( """Constructs the executor input message for a task execution.""" input_parameter_keys = list( component_spec.input_definitions.parameters.keys()) + # need to also add injected input parameters for f-string + input_parameter_keys += [ + k for k, v in arguments.items() if not isinstance(v, dsl.Artifact) + ] input_artifact_keys = list( component_spec.input_definitions.artifacts.keys()) if input_artifact_keys and block_input_artifact: diff --git a/sdk/python/kfp/local/executor_input_utils_test.py b/sdk/python/kfp/local/executor_input_utils_test.py index 707df162d79..a46c21801b4 100644 --- a/sdk/python/kfp/local/executor_input_utils_test.py +++ b/sdk/python/kfp/local/executor_input_utils_test.py @@ -257,6 +257,67 @@ def test_allow_input_artifact(self): }, expected) self.assertEqual(actual, expected) + def test_fstring_case(self): + component_spec = pipeline_spec_pb2.ComponentSpec() + json_format.ParseDict( + { + 'inputDefinitions': { + 'parameters': { + 'string': { + 'parameterType': 'STRING' + } + } + }, + 'outputDefinitions': { + 'parameters': { + 'Output': { + 'parameterType': 'STRING' + } + } + }, + 'executorLabel': 'exec-identity' + }, component_spec) + expected_executor_input = pipeline_spec_pb2.ExecutorInput() + json_format.ParseDict( + { + 'inputs': { + 'parameterValues': { + 'pipelinechannel--string': + 'baz', + 'string': + "bar-{{$.inputs.parameters['pipelinechannel--string']}}" + } + }, + 'outputs': { + 'parameters': { + 'Output': { + 'outputFile': + '/foo/bar/local_outputs/my-pipeline-2024-01-26-11-10-57XX-530768/identity/Output' + } + }, + 'outputFile': + '/foo/bar/local_outputs/my-pipeline-2024-01-26-11-10-57XX-530768/identity/executor_output.json' + } + }, expected_executor_input) + actual_executor_input = executor_input_utils.construct_executor_input( + component_spec=component_spec, + arguments={ + 'pipelinechannel--string': + 'baz', + # covers the case of an f-string, where the value of + # string includes an interpolation of + # pipelinechannel--string + 'string': + "bar-{{$.inputs.parameters['pipelinechannel--string']}}" + }, + task_root='/foo/bar/local_outputs/my-pipeline-2024-01-26-11-10-57XX-530768/identity', + block_input_artifact=True, + ) + self.assertEqual( + expected_executor_input, + actual_executor_input, + ) + class TestExecutorInputToDict(unittest.TestCase): diff --git a/sdk/python/kfp/local/pipeline_orchestrator_test.py b/sdk/python/kfp/local/pipeline_orchestrator_test.py index 9a061c9b839..5d392ff603e 100644 --- a/sdk/python/kfp/local/pipeline_orchestrator_test.py +++ b/sdk/python/kfp/local/pipeline_orchestrator_test.py @@ -418,6 +418,74 @@ def my_pipeline(): ) self.assertEqual(task.outputs, {}) + def test_fstring_python_component(self): + local.init(runner=local.SubprocessRunner()) + + @dsl.component + def identity(string: str) -> str: + return string + + @dsl.pipeline + def my_pipeline(string: str = 'baz') -> str: + op1 = identity(string=f'bar-{string}') + op2 = identity(string=f'foo-{op1.output}') + return op2.output + + task = my_pipeline() + self.assertEqual(task.output, 'foo-bar-baz') + + +class TestFstringContainerComponent( + testing_utilities.LocalRunnerEnvironmentTestCase): + + @classmethod + def setUpClass(cls): + from kfp.local import subprocess_task_handler + + # Temporarily removing these these validation calls is useful hack to + # test a ContainerComponent outside of a container. + # We do this here because we only want to test the very specific + # f-string logic in container components without the presence of + # Docker in the test environment. + cls.original_validate_image = subprocess_task_handler.SubprocessTaskHandler.validate_image + subprocess_task_handler.SubprocessTaskHandler.validate_image = lambda slf, image: None + + cls.original_validate_not_container_component = subprocess_task_handler.SubprocessTaskHandler.validate_not_container_component + subprocess_task_handler.SubprocessTaskHandler.validate_not_container_component = lambda slf, full_command: None + + cls.original_validate_not_containerized_python_component = subprocess_task_handler.SubprocessTaskHandler.validate_not_containerized_python_component + subprocess_task_handler.SubprocessTaskHandler.validate_not_containerized_python_component = lambda slf, full_command: None + + @classmethod + def tearDownClass(cls): + from kfp.local import subprocess_task_handler + + subprocess_task_handler.SubprocessTaskHandler.validate_image = cls.original_validate_image + subprocess_task_handler.SubprocessTaskHandler.validate_not_container_component = cls.original_validate_not_container_component + subprocess_task_handler.SubprocessTaskHandler.validate_not_containerized_python_component = cls.original_validate_not_containerized_python_component + + def test_fstring_container_component(self): + local.init(runner=local.SubprocessRunner()) + + @dsl.container_component + def identity_container(string: str, outpath: dsl.OutputPath(str)): + return dsl.ContainerSpec( + image='alpine', + command=[ + 'sh', + '-c', + f"""mkdir -p $(dirname {outpath}) && printf '%s' {string} > {outpath}""", + ]) + + @dsl.pipeline + def my_pipeline(string: str = 'baz') -> str: + op1 = identity_container(string=f'bar-{string}') + op2 = identity_container(string=f'foo-{op1.output}') + return op2.output + + task = my_pipeline() + self.assertEqual(task.output, 'foo-bar-baz') + if __name__ == '__main__': unittest.main() diff --git a/sdk/python/kfp/local/placeholder_utils.py b/sdk/python/kfp/local/placeholder_utils.py index 7f3ded618c8..059da9beba7 100644 --- a/sdk/python/kfp/local/placeholder_utils.py +++ b/sdk/python/kfp/local/placeholder_utils.py @@ -41,6 +41,14 @@ def replace_placeholders( task. """ unique_task_id = make_random_id() + executor_input_dict = resolve_self_references_in_executor_input( + executor_input_dict=executor_input_dict, + pipeline_resource_name=pipeline_resource_name, + task_resource_name=task_resource_name, + pipeline_root=pipeline_root, + pipeline_job_id=unique_pipeline_id, + pipeline_task_id=unique_task_id, + ) provided_inputs = get_provided_inputs(executor_input_dict) full_command = [ resolve_struct_placeholders( @@ -73,6 +81,45 @@ def replace_placeholders( return resolved_command +def resolve_self_references_in_executor_input( + executor_input_dict: Dict[str, Any], + pipeline_resource_name: str, + task_resource_name: str, + pipeline_root: str, + pipeline_job_id: str, + pipeline_task_id: str, +) -> Dict[str, Any]: + """Resolve parameter placeholders that point to other parameter + placeholders in the same ExecutorInput message. + + This occurs when passing f-strings to a component. For example: + + my_comp(foo=f'bar-{upstream.output}') + + May result in the ExecutorInput message: + + {'inputs': {'parameterValues': {'pipelinechannel--identity-Output': 'foo', + 'string': "{{$.inputs.parameters['pipelinechannel--identity-Output']}}-bar"}}, + 'outputs': ...} + + The placeholder "{{$.inputs.parameters['pipelinechannel--identity-Output']}}-bar" points to parameter 'pipelinechannel--identity-Output' with the value 'foo'. This function replaces "{{$.inputs.parameters['pipelinechannel--identity-Output']}}-bar" with 'foo'. + """ + for k, v in executor_input_dict.get('inputs', + {}).get('parameterValues', {}).items(): + if isinstance(v, str): + executor_input_dict['inputs']['parameterValues'][ + k] = resolve_individual_placeholder( + v, + executor_input_dict=executor_input_dict, + pipeline_resource_name=pipeline_resource_name, + task_resource_name=task_resource_name, + pipeline_root=pipeline_root, + pipeline_job_id=pipeline_job_id, + pipeline_task_id=pipeline_task_id, + ) + return executor_input_dict + + def flatten_list(l: List[Union[str, list, None]]) -> List[str]: """Iteratively flattens arbitrarily deeply nested lists, filtering out elements that are None.""" @@ -139,6 +186,10 @@ def resolve_io_placeholders( executor_input: Dict[str, Any], command: str, ) -> str: + """Resolves placeholders in command using executor_input. + + executor_input should not contain any unresolved placeholders. + """ placeholders = re.findall(r'\{\{\$\.(.*?)\}\}', command) # e.g., placeholder = "inputs.parameters[''text'']" diff --git a/sdk/python/kfp/local/placeholder_utils_test.py b/sdk/python/kfp/local/placeholder_utils_test.py index 97d3ac78950..05f83c2d274 100644 --- a/sdk/python/kfp/local/placeholder_utils_test.py +++ b/sdk/python/kfp/local/placeholder_utils_test.py @@ -418,5 +418,45 @@ def test( self.assertEqual(actual, expected) +class TestResolveSelfReferencesInExecutorInput(unittest.TestCase): + + def test_simple(self): + executor_input_dict = { + 'inputs': { + 'parameterValues': { + 'pipelinechannel--identity-Output': + 'foo', + 'string': + "{{$.inputs.parameters['pipelinechannel--identity-Output']}}-bar" + } + }, + 'outputs': { + 'outputFile': + '/foo/bar/my-pipeline-2024-01-26-12-26-24-162075/echo/executor_output.json' + } + } + expected = { + 'inputs': { + 'parameterValues': { + 'pipelinechannel--identity-Output': 'foo', + 'string': 'foo-bar' + } + }, + 'outputs': { + 'outputFile': + '/foo/bar/my-pipeline-2024-01-26-12-26-24-162075/echo/executor_output.json' + } + } + actual = placeholder_utils.resolve_self_references_in_executor_input( + executor_input_dict, + pipeline_resource_name='my-pipeline-2024-01-26-12-26-24-162075', + task_resource_name='echo', + pipeline_root='/foo/bar/my-pipeline-2024-01-26-12-26-24-162075', + pipeline_job_id='123456789', + pipeline_task_id='987654321', + ) + self.assertEqual(actual, expected) + + if __name__ == '__main__': unittest.main()