Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sdk): add special dsl.OutputPath read logic #localexecution #10334

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 26 additions & 12 deletions sdk/python/kfp/local/executor_output_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,18 +99,32 @@ def get_outputs_from_executor_output(
return {**output_parameters, **output_artifacts}


def special_dsl_outputpath_read(output_file: str, is_string: bool) -> Any:
def special_dsl_outputpath_read(
parameter_name: str,
output_file: str,
dtype: pipeline_spec_pb2.ParameterType.ParameterTypeEnum,
) -> Any:
"""Reads the text in dsl.OutputPath files in the same way as the remote
backend.

Basically deserialize all types as JSON, but also support strings
that are written directly without quotes (e.g., `foo` instead of
`"foo"`).
In brief: read strings as strings and JSON load everything else.
"""
with open(output_file) as f:
parameter_value = f.read()
# TODO: verify this is the correct special handling of OutputPath
return parameter_value if is_string else json.loads(parameter_value)
try:
with open(output_file) as f:
value = f.read()

if dtype == pipeline_spec_pb2.ParameterType.ParameterTypeEnum.STRING:
value = value
elif dtype == pipeline_spec_pb2.ParameterType.ParameterTypeEnum.BOOLEAN:
# permit true/True and false/False, consistent with remote BE
value = json.loads(value.lower())
else:
value = json.loads(value)
return value
except Exception as e:
raise ValueError(
f'Could not deserialize output {parameter_name!r} from path {output_file}'
) from e


def merge_dsl_output_file_parameters_to_executor_output(
Expand All @@ -123,11 +137,11 @@ def merge_dsl_output_file_parameters_to_executor_output(
for parameter_key, output_parameter in executor_input.outputs.parameters.items(
):
if os.path.exists(output_parameter.output_file):
is_string = component_spec.output_definitions.parameters[
parameter_key].parameter_type == pipeline_spec_pb2.ParameterType.ParameterTypeEnum.STRING
parameter_value = special_dsl_outputpath_read(
output_parameter.output_file,
is_string,
parameter_name=parameter_key,
output_file=output_parameter.output_file,
dtype=component_spec.output_definitions
.parameters[parameter_key].parameter_type,
)
executor_output.parameter_values[parameter_key].CopyFrom(
pipeline_spec_builder.to_protobuf_value(parameter_value))
Expand Down
49 changes: 43 additions & 6 deletions sdk/python/kfp/local/executor_output_utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.
"""Tests for executor_output_utils.py."""

import json
import os
import tempfile
from typing import List
Expand Down Expand Up @@ -580,19 +581,55 @@ def test(self):

class TestSpecialDslOutputPathRead(parameterized.TestCase):

@parameterized.parameters([('foo', 'foo', True)])
def test(self, written_string, expected_object, is_string):
@parameterized.parameters([
('foo', 'foo',
pipeline_spec_pb2.ParameterType.ParameterTypeEnum.STRING),
('foo', 'foo',
pipeline_spec_pb2.ParameterType.ParameterTypeEnum.STRING),
('true', True,
pipeline_spec_pb2.ParameterType.ParameterTypeEnum.BOOLEAN),
('True', True,
pipeline_spec_pb2.ParameterType.ParameterTypeEnum.BOOLEAN),
('false', False,
pipeline_spec_pb2.ParameterType.ParameterTypeEnum.BOOLEAN),
('False', False,
pipeline_spec_pb2.ParameterType.ParameterTypeEnum.BOOLEAN),
(json.dumps({'x': 'y'}), {
'x': 'y'
}, pipeline_spec_pb2.ParameterType.ParameterTypeEnum.STRUCT),
('3.14', 3.14,
pipeline_spec_pb2.ParameterType.ParameterTypeEnum.NUMBER_DOUBLE),
('100', 100,
pipeline_spec_pb2.ParameterType.ParameterTypeEnum.NUMBER_INTEGER),
])
def test(self, written, expected, dtype):
with tempfile.TemporaryDirectory() as tempdir:
output_file = os.path.join(tempdir, 'Output')
with open(output_file, 'w') as f:
f.write(written_string)
f.write(written)

actual = executor_output_utils.special_dsl_outputpath_read(
output_file,
is_string=is_string,
parameter_name='name',
output_file=output_file,
dtype=dtype,
)

self.assertEqual(actual, expected_object)
self.assertEqual(actual, expected)

def test_exception(self):
with tempfile.TemporaryDirectory() as tempdir:
output_file = os.path.join(tempdir, 'Output')
with open(output_file, 'w') as f:
f.write(str({'x': 'y'}))
with self.assertRaisesRegex(
ValueError,
r"Could not deserialize output 'name' from path"):
executor_output_utils.special_dsl_outputpath_read(
parameter_name='name',
output_file=output_file,
dtype=pipeline_spec_pb2.ParameterType.ParameterTypeEnum
.STRUCT,
)


def assert_artifacts_equal(
Expand Down