Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sdk): support f-strings in local pipeline execution #localexecution #10435

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions sdk/python/kfp/local/executor_input_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ def construct_executor_input(
"""Constructs the executor input message for a task execution."""
input_parameter_keys = list(
component_spec.input_definitions.parameters.keys())
# need to also add injected input parameters for f-string
input_parameter_keys += [
k for k, v in arguments.items() if not isinstance(v, dsl.Artifact)
]
input_artifact_keys = list(
component_spec.input_definitions.artifacts.keys())
if input_artifact_keys and block_input_artifact:
Expand Down
61 changes: 61 additions & 0 deletions sdk/python/kfp/local/executor_input_utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,67 @@ def test_allow_input_artifact(self):
}, expected)
self.assertEqual(actual, expected)

def test_fstring_case(self):
component_spec = pipeline_spec_pb2.ComponentSpec()
json_format.ParseDict(
{
'inputDefinitions': {
'parameters': {
'string': {
'parameterType': 'STRING'
}
}
},
'outputDefinitions': {
'parameters': {
'Output': {
'parameterType': 'STRING'
}
}
},
'executorLabel': 'exec-identity'
}, component_spec)
expected_executor_input = pipeline_spec_pb2.ExecutorInput()
json_format.ParseDict(
{
'inputs': {
'parameterValues': {
'pipelinechannel--string':
'baz',
'string':
"bar-{{$.inputs.parameters['pipelinechannel--string']}}"
}
},
'outputs': {
'parameters': {
'Output': {
'outputFile':
'/foo/bar/local_outputs/my-pipeline-2024-01-26-11-10-57XX-530768/identity/Output'
}
},
'outputFile':
'/foo/bar/local_outputs/my-pipeline-2024-01-26-11-10-57XX-530768/identity/executor_output.json'
}
}, expected_executor_input)
actual_executor_input = executor_input_utils.construct_executor_input(
component_spec=component_spec,
arguments={
'pipelinechannel--string':
'baz',
# covers the case of an f-string, where the value of
# string includes an interpolation of
# pipelinechannel--string
'string':
"bar-{{$.inputs.parameters['pipelinechannel--string']}}"
},
task_root='/foo/bar/local_outputs/my-pipeline-2024-01-26-11-10-57XX-530768/identity',
block_input_artifact=True,
)
self.assertEqual(
expected_executor_input,
actual_executor_input,
)


class TestExecutorInputToDict(unittest.TestCase):

Expand Down
68 changes: 68 additions & 0 deletions sdk/python/kfp/local/pipeline_orchestrator_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,74 @@ def my_pipeline():
)
self.assertEqual(task.outputs, {})

def test_fstring_python_component(self):
local.init(runner=local.SubprocessRunner())

@dsl.component
def identity(string: str) -> str:
return string

@dsl.pipeline
def my_pipeline(string: str = 'baz') -> str:
op1 = identity(string=f'bar-{string}')
op2 = identity(string=f'foo-{op1.output}')
return op2.output

task = my_pipeline()
self.assertEqual(task.output, 'foo-bar-baz')


class TestFstringContainerComponent(
testing_utilities.LocalRunnerEnvironmentTestCase):

@classmethod
def setUpClass(cls):
from kfp.local import subprocess_task_handler

# Temporarily removing these these validation calls is useful hack to
# test a ContainerComponent outside of a container.
# We do this here because we only want to test the very specific
# f-string logic in container components without the presence of
# Docker in the test environment.
cls.original_validate_image = subprocess_task_handler.SubprocessTaskHandler.validate_image
subprocess_task_handler.SubprocessTaskHandler.validate_image = lambda slf, image: None

cls.original_validate_not_container_component = subprocess_task_handler.SubprocessTaskHandler.validate_not_container_component
subprocess_task_handler.SubprocessTaskHandler.validate_not_container_component = lambda slf, full_command: None

cls.original_validate_not_containerized_python_component = subprocess_task_handler.SubprocessTaskHandler.validate_not_containerized_python_component
subprocess_task_handler.SubprocessTaskHandler.validate_not_containerized_python_component = lambda slf, full_command: None

@classmethod
def tearDownClass(cls):
from kfp.local import subprocess_task_handler

subprocess_task_handler.SubprocessTaskHandler.validate_image = cls.original_validate_image
subprocess_task_handler.SubprocessTaskHandler.validate_not_container_component = cls.original_validate_not_container_component
subprocess_task_handler.SubprocessTaskHandler.validate_not_containerized_python_component = cls.original_validate_not_containerized_python_component

def test_fstring_container_component(self):
local.init(runner=local.SubprocessRunner())

@dsl.container_component
def identity_container(string: str, outpath: dsl.OutputPath(str)):
return dsl.ContainerSpec(
image='alpine',
command=[
'sh',
'-c',
f"""mkdir -p $(dirname {outpath}) && printf '%s' {string} > {outpath}""",
])

@dsl.pipeline
def my_pipeline(string: str = 'baz') -> str:
op1 = identity_container(string=f'bar-{string}')
op2 = identity_container(string=f'foo-{op1.output}')
return op2.output

task = my_pipeline()
self.assertEqual(task.output, 'foo-bar-baz')


if __name__ == '__main__':
unittest.main()
51 changes: 51 additions & 0 deletions sdk/python/kfp/local/placeholder_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@ def replace_placeholders(
task.
"""
unique_task_id = make_random_id()
executor_input_dict = resolve_self_references_in_executor_input(
executor_input_dict=executor_input_dict,
pipeline_resource_name=pipeline_resource_name,
task_resource_name=task_resource_name,
pipeline_root=pipeline_root,
pipeline_job_id=unique_pipeline_id,
pipeline_task_id=unique_task_id,
)
provided_inputs = get_provided_inputs(executor_input_dict)
full_command = [
resolve_struct_placeholders(
Expand Down Expand Up @@ -73,6 +81,45 @@ def replace_placeholders(
return resolved_command


def resolve_self_references_in_executor_input(
executor_input_dict: Dict[str, Any],
pipeline_resource_name: str,
task_resource_name: str,
pipeline_root: str,
pipeline_job_id: str,
pipeline_task_id: str,
) -> Dict[str, Any]:
"""Resolve parameter placeholders that point to other parameter
placeholders in the same ExecutorInput message.

This occurs when passing f-strings to a component. For example:

my_comp(foo=f'bar-{upstream.output}')

May result in the ExecutorInput message:

{'inputs': {'parameterValues': {'pipelinechannel--identity-Output': 'foo',
'string': "{{$.inputs.parameters['pipelinechannel--identity-Output']}}-bar"}},
'outputs': ...}

The placeholder "{{$.inputs.parameters['pipelinechannel--identity-Output']}}-bar" points to parameter 'pipelinechannel--identity-Output' with the value 'foo'. This function replaces "{{$.inputs.parameters['pipelinechannel--identity-Output']}}-bar" with 'foo'.
"""
for k, v in executor_input_dict.get('inputs',
{}).get('parameterValues', {}).items():
if isinstance(v, str):
executor_input_dict['inputs']['parameterValues'][
k] = resolve_individual_placeholder(
v,
executor_input_dict=executor_input_dict,
pipeline_resource_name=pipeline_resource_name,
task_resource_name=task_resource_name,
pipeline_root=pipeline_root,
pipeline_job_id=pipeline_job_id,
pipeline_task_id=pipeline_task_id,
)
return executor_input_dict


def flatten_list(l: List[Union[str, list, None]]) -> List[str]:
"""Iteratively flattens arbitrarily deeply nested lists, filtering out
elements that are None."""
Expand Down Expand Up @@ -139,6 +186,10 @@ def resolve_io_placeholders(
executor_input: Dict[str, Any],
command: str,
) -> str:
"""Resolves placeholders in command using executor_input.

executor_input should not contain any unresolved placeholders.
"""
placeholders = re.findall(r'\{\{\$\.(.*?)\}\}', command)

# e.g., placeholder = "inputs.parameters[''text'']"
Expand Down
40 changes: 40 additions & 0 deletions sdk/python/kfp/local/placeholder_utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -418,5 +418,45 @@ def test(
self.assertEqual(actual, expected)


class TestResolveSelfReferencesInExecutorInput(unittest.TestCase):

def test_simple(self):
executor_input_dict = {
'inputs': {
'parameterValues': {
'pipelinechannel--identity-Output':
'foo',
'string':
"{{$.inputs.parameters['pipelinechannel--identity-Output']}}-bar"
}
},
'outputs': {
'outputFile':
'/foo/bar/my-pipeline-2024-01-26-12-26-24-162075/echo/executor_output.json'
}
}
expected = {
'inputs': {
'parameterValues': {
'pipelinechannel--identity-Output': 'foo',
'string': 'foo-bar'
}
},
'outputs': {
'outputFile':
'/foo/bar/my-pipeline-2024-01-26-12-26-24-162075/echo/executor_output.json'
}
}
actual = placeholder_utils.resolve_self_references_in_executor_input(
executor_input_dict,
pipeline_resource_name='my-pipeline-2024-01-26-12-26-24-162075',
task_resource_name='echo',
pipeline_root='/foo/bar/my-pipeline-2024-01-26-12-26-24-162075',
pipeline_job_id='123456789',
pipeline_task_id='987654321',
)
self.assertEqual(actual, expected)


if __name__ == '__main__':
unittest.main()