Skip to content

Commit

Permalink
feat(sdk): support f-strings in local pipeline execution (kubeflow#10435
Browse files Browse the repository at this point in the history
)
  • Loading branch information
connor-mccarthy authored and petethegreat committed Mar 27, 2024
1 parent 7dd3d67 commit 37605e4
Show file tree
Hide file tree
Showing 5 changed files with 224 additions and 0 deletions.
4 changes: 4 additions & 0 deletions sdk/python/kfp/local/executor_input_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ def construct_executor_input(
"""Constructs the executor input message for a task execution."""
input_parameter_keys = list(
component_spec.input_definitions.parameters.keys())
# need to also add injected input parameters for f-string
input_parameter_keys += [
k for k, v in arguments.items() if not isinstance(v, dsl.Artifact)
]
input_artifact_keys = list(
component_spec.input_definitions.artifacts.keys())
if input_artifact_keys and block_input_artifact:
Expand Down
61 changes: 61 additions & 0 deletions sdk/python/kfp/local/executor_input_utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,67 @@ def test_allow_input_artifact(self):
}, expected)
self.assertEqual(actual, expected)

def test_fstring_case(self):
component_spec = pipeline_spec_pb2.ComponentSpec()
json_format.ParseDict(
{
'inputDefinitions': {
'parameters': {
'string': {
'parameterType': 'STRING'
}
}
},
'outputDefinitions': {
'parameters': {
'Output': {
'parameterType': 'STRING'
}
}
},
'executorLabel': 'exec-identity'
}, component_spec)
expected_executor_input = pipeline_spec_pb2.ExecutorInput()
json_format.ParseDict(
{
'inputs': {
'parameterValues': {
'pipelinechannel--string':
'baz',
'string':
"bar-{{$.inputs.parameters['pipelinechannel--string']}}"
}
},
'outputs': {
'parameters': {
'Output': {
'outputFile':
'/foo/bar/local_outputs/my-pipeline-2024-01-26-11-10-57XX-530768/identity/Output'
}
},
'outputFile':
'/foo/bar/local_outputs/my-pipeline-2024-01-26-11-10-57XX-530768/identity/executor_output.json'
}
}, expected_executor_input)
actual_executor_input = executor_input_utils.construct_executor_input(
component_spec=component_spec,
arguments={
'pipelinechannel--string':
'baz',
# covers the case of an f-string, where the value of
# string includes an interpolation of
# pipelinechannel--string
'string':
"bar-{{$.inputs.parameters['pipelinechannel--string']}}"
},
task_root='/foo/bar/local_outputs/my-pipeline-2024-01-26-11-10-57XX-530768/identity',
block_input_artifact=True,
)
self.assertEqual(
expected_executor_input,
actual_executor_input,
)


class TestExecutorInputToDict(unittest.TestCase):

Expand Down
68 changes: 68 additions & 0 deletions sdk/python/kfp/local/pipeline_orchestrator_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,74 @@ def my_pipeline():
)
self.assertEqual(task.outputs, {})

def test_fstring_python_component(self):
local.init(runner=local.SubprocessRunner())

@dsl.component
def identity(string: str) -> str:
return string

@dsl.pipeline
def my_pipeline(string: str = 'baz') -> str:
op1 = identity(string=f'bar-{string}')
op2 = identity(string=f'foo-{op1.output}')
return op2.output

task = my_pipeline()
self.assertEqual(task.output, 'foo-bar-baz')


class TestFstringContainerComponent(
testing_utilities.LocalRunnerEnvironmentTestCase):

@classmethod
def setUpClass(cls):
from kfp.local import subprocess_task_handler

# Temporarily removing these these validation calls is useful hack to
# test a ContainerComponent outside of a container.
# We do this here because we only want to test the very specific
# f-string logic in container components without the presence of
# Docker in the test environment.
cls.original_validate_image = subprocess_task_handler.SubprocessTaskHandler.validate_image
subprocess_task_handler.SubprocessTaskHandler.validate_image = lambda slf, image: None

cls.original_validate_not_container_component = subprocess_task_handler.SubprocessTaskHandler.validate_not_container_component
subprocess_task_handler.SubprocessTaskHandler.validate_not_container_component = lambda slf, full_command: None

cls.original_validate_not_containerized_python_component = subprocess_task_handler.SubprocessTaskHandler.validate_not_containerized_python_component
subprocess_task_handler.SubprocessTaskHandler.validate_not_containerized_python_component = lambda slf, full_command: None

@classmethod
def tearDownClass(cls):
from kfp.local import subprocess_task_handler

subprocess_task_handler.SubprocessTaskHandler.validate_image = cls.original_validate_image
subprocess_task_handler.SubprocessTaskHandler.validate_not_container_component = cls.original_validate_not_container_component
subprocess_task_handler.SubprocessTaskHandler.validate_not_containerized_python_component = cls.original_validate_not_containerized_python_component

def test_fstring_container_component(self):
local.init(runner=local.SubprocessRunner())

@dsl.container_component
def identity_container(string: str, outpath: dsl.OutputPath(str)):
return dsl.ContainerSpec(
image='alpine',
command=[
'sh',
'-c',
f"""mkdir -p $(dirname {outpath}) && printf '%s' {string} > {outpath}""",
])

@dsl.pipeline
def my_pipeline(string: str = 'baz') -> str:
op1 = identity_container(string=f'bar-{string}')
op2 = identity_container(string=f'foo-{op1.output}')
return op2.output

task = my_pipeline()
self.assertEqual(task.output, 'foo-bar-baz')


if __name__ == '__main__':
unittest.main()
51 changes: 51 additions & 0 deletions sdk/python/kfp/local/placeholder_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@ def replace_placeholders(
task.
"""
unique_task_id = make_random_id()
executor_input_dict = resolve_self_references_in_executor_input(
executor_input_dict=executor_input_dict,
pipeline_resource_name=pipeline_resource_name,
task_resource_name=task_resource_name,
pipeline_root=pipeline_root,
pipeline_job_id=unique_pipeline_id,
pipeline_task_id=unique_task_id,
)
provided_inputs = get_provided_inputs(executor_input_dict)
full_command = [
resolve_struct_placeholders(
Expand Down Expand Up @@ -73,6 +81,45 @@ def replace_placeholders(
return resolved_command


def resolve_self_references_in_executor_input(
executor_input_dict: Dict[str, Any],
pipeline_resource_name: str,
task_resource_name: str,
pipeline_root: str,
pipeline_job_id: str,
pipeline_task_id: str,
) -> Dict[str, Any]:
"""Resolve parameter placeholders that point to other parameter
placeholders in the same ExecutorInput message.
This occurs when passing f-strings to a component. For example:
my_comp(foo=f'bar-{upstream.output}')
May result in the ExecutorInput message:
{'inputs': {'parameterValues': {'pipelinechannel--identity-Output': 'foo',
'string': "{{$.inputs.parameters['pipelinechannel--identity-Output']}}-bar"}},
'outputs': ...}
The placeholder "{{$.inputs.parameters['pipelinechannel--identity-Output']}}-bar" points to parameter 'pipelinechannel--identity-Output' with the value 'foo'. This function replaces "{{$.inputs.parameters['pipelinechannel--identity-Output']}}-bar" with 'foo'.
"""
for k, v in executor_input_dict.get('inputs',
{}).get('parameterValues', {}).items():
if isinstance(v, str):
executor_input_dict['inputs']['parameterValues'][
k] = resolve_individual_placeholder(
v,
executor_input_dict=executor_input_dict,
pipeline_resource_name=pipeline_resource_name,
task_resource_name=task_resource_name,
pipeline_root=pipeline_root,
pipeline_job_id=pipeline_job_id,
pipeline_task_id=pipeline_task_id,
)
return executor_input_dict


def flatten_list(l: List[Union[str, list, None]]) -> List[str]:
"""Iteratively flattens arbitrarily deeply nested lists, filtering out
elements that are None."""
Expand Down Expand Up @@ -139,6 +186,10 @@ def resolve_io_placeholders(
executor_input: Dict[str, Any],
command: str,
) -> str:
"""Resolves placeholders in command using executor_input.
executor_input should not contain any unresolved placeholders.
"""
placeholders = re.findall(r'\{\{\$\.(.*?)\}\}', command)

# e.g., placeholder = "inputs.parameters[''text'']"
Expand Down
40 changes: 40 additions & 0 deletions sdk/python/kfp/local/placeholder_utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -418,5 +418,45 @@ def test(
self.assertEqual(actual, expected)


class TestResolveSelfReferencesInExecutorInput(unittest.TestCase):

def test_simple(self):
executor_input_dict = {
'inputs': {
'parameterValues': {
'pipelinechannel--identity-Output':
'foo',
'string':
"{{$.inputs.parameters['pipelinechannel--identity-Output']}}-bar"
}
},
'outputs': {
'outputFile':
'/foo/bar/my-pipeline-2024-01-26-12-26-24-162075/echo/executor_output.json'
}
}
expected = {
'inputs': {
'parameterValues': {
'pipelinechannel--identity-Output': 'foo',
'string': 'foo-bar'
}
},
'outputs': {
'outputFile':
'/foo/bar/my-pipeline-2024-01-26-12-26-24-162075/echo/executor_output.json'
}
}
actual = placeholder_utils.resolve_self_references_in_executor_input(
executor_input_dict,
pipeline_resource_name='my-pipeline-2024-01-26-12-26-24-162075',
task_resource_name='echo',
pipeline_root='/foo/bar/my-pipeline-2024-01-26-12-26-24-162075',
pipeline_job_id='123456789',
pipeline_task_id='987654321',
)
self.assertEqual(actual, expected)


if __name__ == '__main__':
unittest.main()

0 comments on commit 37605e4

Please sign in to comment.