From 2df8d1912492a045c1c64c11cc5579ca47cdfeff Mon Sep 17 00:00:00 2001 From: scottxu Date: Wed, 29 Jun 2022 15:44:56 -0700 Subject: [PATCH 01/23] support container_component decorator for function with no inputs --- sdk/python/kfp/compiler/compiler_test.py | 25 ++++++++ .../kfp/components/container_component.py | 38 +++++++++++++ .../container_component_decorator.py | 57 +++++++++++++++++++ .../container_component_decorator_test.py | 22 +++++++ sdk/python/kfp/dsl/__init__.py | 1 + 5 files changed, 143 insertions(+) create mode 100644 sdk/python/kfp/components/container_component.py create mode 100644 sdk/python/kfp/components/container_component_decorator.py create mode 100644 sdk/python/kfp/components/container_component_decorator_test.py diff --git a/sdk/python/kfp/compiler/compiler_test.py b/sdk/python/kfp/compiler/compiler_test.py index 6acf0748ac6..50acf97d299 100644 --- a/sdk/python/kfp/compiler/compiler_test.py +++ b/sdk/python/kfp/compiler/compiler_test.py @@ -30,6 +30,7 @@ from kfp.components.types import type_utils from kfp.dsl import PipelineTaskFinalStatus from kfp.pipeline_spec import pipeline_spec_pb2 +from kfp.components.structures import ContainerSpec import yaml VALID_PRODUCER_COMPONENT_SAMPLE = components.load_component_from_text(""" @@ -803,6 +804,30 @@ def hello_world(text: str) -> str: pipeline_spec['root']['inputDefinitions']['parameters']['text'] ['defaultValue'], 'override_string') + def test_compile_container_component_simple(self): + + @dsl.container_component + def hello_world_container() -> str: + """Hello world component.""" + return ContainerSpec( + image='python3.7', + command=['echo', 'hello world'], + args=[], + ) + + with tempfile.TemporaryDirectory() as tempdir: + output_json = os.path.join(tempdir, 'component.yaml') + compiler.Compiler().compile( + pipeline_func=hello_world_container, + package_path=output_json, + pipeline_name='hello-world-container') + with open(output_json, 'r') as f: + pipeline_spec = yaml.safe_load(f) + self.assertEqual( + pipeline_spec['components']['comp-hello-world-container'] + ['outputDefinitions']['parameters']['Output']['parameterType'], + "STRING") + class TestCompileBadInput(unittest.TestCase): diff --git a/sdk/python/kfp/components/container_component.py b/sdk/python/kfp/components/container_component.py new file mode 100644 index 00000000000..613ade94961 --- /dev/null +++ b/sdk/python/kfp/components/container_component.py @@ -0,0 +1,38 @@ +# Copyright 2021 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Container-based component.""" + +from typing import Callable + +from kfp.components import base_component +from kfp.components import structures + + +class ContainerComponent(base_component.BaseComponent): + """Component defined via pre-built container. + + Attribute: + pipeline_func: The function that becomes the implementation of this component. + """ + + def __init__(self, component_spec: structures.ComponentSpec, + pipeline_func: Callable): + super().__init__(component_spec=component_spec) + self.pipeline_func = pipeline_func + + def execute(self, **kwargs): + # ContainerComponent`: Also inherits from `BaseComponent`. + # As its name suggests, this class backs (custom) container components. + # Its `execute()` method uses `docker run` for local component execution + raise NotImplementedError diff --git a/sdk/python/kfp/components/container_component_decorator.py b/sdk/python/kfp/components/container_component_decorator.py new file mode 100644 index 00000000000..80e2b6a5219 --- /dev/null +++ b/sdk/python/kfp/components/container_component_decorator.py @@ -0,0 +1,57 @@ +# Copyright 2021-2022 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import inspect +from typing import Callable, List, Optional + +from kfp.components.structures import Implementation, ComponentSpec +from kfp.components import component_factory +from kfp.components.container_component import ContainerComponent + + +def container_component(func: Callable): + """Decorator for container-based components in KFP v2. + + Sample usage: + from kfp.v2.dsl import container_component, ContainerSpec, InputPath, OutputPath, Output + + @container_component + def my_component( + dataset_path: InputPath(Dataset), + model: Output[Model], + num_epochs: int, + output_parameter: OutputPath(str), + ): + return ContainerSpec( + image='gcr.io/my-image', + command=['python3', 'my_component.py'], + arguments=[ + '--dataset_path', dataset_path, + '--model_path', model.path, + '--output_parameter_path', output_parameter, + ] + ) + + Args: + func: The python function to create a component from. The function + should have type annotations for all its arguments, indicating how + it is intended to be used (e.g. as an input/output Artifact object, + a plain parameter, or a path to a file). + + """ + + component_spec = component_factory.extract_component_interface(func) + component_spec.implementation = Implementation( + func()) # TODO: add compatability for placeholder + return ContainerComponent(component_spec, func) diff --git a/sdk/python/kfp/components/container_component_decorator_test.py b/sdk/python/kfp/components/container_component_decorator_test.py new file mode 100644 index 00000000000..2fb494388fe --- /dev/null +++ b/sdk/python/kfp/components/container_component_decorator_test.py @@ -0,0 +1,22 @@ +import unittest + +from kfp.components.container_component import ContainerComponent +from kfp.components.structures import ContainerSpec +from kfp.components.container_component_decorator import container_component + + +class TestContainerComponentDecorator(unittest.TestCase): + + def test_func_with_no_arg(self): + + @container_component + def hello_world() -> str: + """Hello world component.""" + return ContainerSpec( + image='python3.7', + command=['echo', 'hello world'], + args=[], + ) + + self.assertIsInstance(hello_world, ContainerComponent) + self.assertIsNone(hello_world.component_spec.inputs) diff --git a/sdk/python/kfp/dsl/__init__.py b/sdk/python/kfp/dsl/__init__.py index a83f239cd4a..cdad2bba05d 100644 --- a/sdk/python/kfp/dsl/__init__.py +++ b/sdk/python/kfp/dsl/__init__.py @@ -43,6 +43,7 @@ ] from kfp.components.component_decorator import component +from kfp.components.container_component_decorator import container_component from kfp.components.importer_node import importer from kfp.components.pipeline_context import pipeline from kfp.components.pipeline_task import PipelineTask From 101b6156afd1058c0943fdc2bdcc855f220fc91a Mon Sep 17 00:00:00 2001 From: scottxu Date: Fri, 1 Jul 2022 10:36:18 -0700 Subject: [PATCH 02/23] resolve review comments --- sdk/python/kfp/compiler/compiler_test.py | 2 +- sdk/python/kfp/components/container_component.py | 2 +- .../components/container_component_decorator.py | 7 +++---- .../container_component_decorator_test.py | 14 ++++++++++++++ sdk/python/kfp/dsl/__init__.py | 6 ++++++ 5 files changed, 25 insertions(+), 6 deletions(-) diff --git a/sdk/python/kfp/compiler/compiler_test.py b/sdk/python/kfp/compiler/compiler_test.py index 50acf97d299..bbfaf6fb404 100644 --- a/sdk/python/kfp/compiler/compiler_test.py +++ b/sdk/python/kfp/compiler/compiler_test.py @@ -826,7 +826,7 @@ def hello_world_container() -> str: self.assertEqual( pipeline_spec['components']['comp-hello-world-container'] ['outputDefinitions']['parameters']['Output']['parameterType'], - "STRING") + 'STRING') class TestCompileBadInput(unittest.TestCase): diff --git a/sdk/python/kfp/components/container_component.py b/sdk/python/kfp/components/container_component.py index 613ade94961..9f71acff3b0 100644 --- a/sdk/python/kfp/components/container_component.py +++ b/sdk/python/kfp/components/container_component.py @@ -1,4 +1,4 @@ -# Copyright 2021 The Kubeflow Authors +# Copyright 2022 The Kubeflow Authors # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/sdk/python/kfp/components/container_component_decorator.py b/sdk/python/kfp/components/container_component_decorator.py index 80e2b6a5219..ab82632de30 100644 --- a/sdk/python/kfp/components/container_component_decorator.py +++ b/sdk/python/kfp/components/container_component_decorator.py @@ -1,4 +1,4 @@ -# Copyright 2021-2022 The Kubeflow Authors +# Copyright 2022 The Kubeflow Authors # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,10 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -import inspect -from typing import Callable, List, Optional +from typing import Callable -from kfp.components.structures import Implementation, ComponentSpec +from kfp.components.structures import Implementation from kfp.components import component_factory from kfp.components.container_component import ContainerComponent diff --git a/sdk/python/kfp/components/container_component_decorator_test.py b/sdk/python/kfp/components/container_component_decorator_test.py index 2fb494388fe..f8575c49858 100644 --- a/sdk/python/kfp/components/container_component_decorator_test.py +++ b/sdk/python/kfp/components/container_component_decorator_test.py @@ -1,3 +1,17 @@ +# Copyright 2022 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + import unittest from kfp.components.container_component import ContainerComponent diff --git a/sdk/python/kfp/dsl/__init__.py b/sdk/python/kfp/dsl/__init__.py index cdad2bba05d..11067a6deba 100644 --- a/sdk/python/kfp/dsl/__init__.py +++ b/sdk/python/kfp/dsl/__init__.py @@ -44,6 +44,12 @@ from kfp.components.component_decorator import component from kfp.components.container_component_decorator import container_component +<<<<<<< HEAD +======= + +from kfp.components.structures import ContainerSpec + +>>>>>>> e11415bae (resolve review comments) from kfp.components.importer_node import importer from kfp.components.pipeline_context import pipeline from kfp.components.pipeline_task import PipelineTask From 56ed6d56cbf1ce93fca187084e58178dd7c36cb9 Mon Sep 17 00:00:00 2001 From: scottxu Date: Wed, 6 Jul 2022 15:20:39 -0700 Subject: [PATCH 03/23] add sample tests for milestone 1 --- .../components/container_no_input.py | 30 ++++++++++++++++ .../components/container_no_input.yaml | 29 +++++++++++++++ .../container_component_with_no_inputs.py | 35 +++++++++++++++++++ .../container_component_with_no_inputs.yaml | 29 +++++++++++++++ 4 files changed, 123 insertions(+) create mode 100644 sdk/python/kfp/compiler/test_data/components/container_no_input.py create mode 100644 sdk/python/kfp/compiler/test_data/components/container_no_input.yaml create mode 100644 sdk/python/kfp/compiler/test_data/pipelines/container_component_with_no_inputs.py create mode 100644 sdk/python/kfp/compiler/test_data/pipelines/container_component_with_no_inputs.yaml diff --git a/sdk/python/kfp/compiler/test_data/components/container_no_input.py b/sdk/python/kfp/compiler/test_data/components/container_no_input.py new file mode 100644 index 00000000000..c917a569997 --- /dev/null +++ b/sdk/python/kfp/compiler/test_data/components/container_no_input.py @@ -0,0 +1,30 @@ +# Copyright 2022 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from kfp.dsl import container_component, ContainerSpec + + +@container_component +def hello_world_container() -> str: + return ContainerSpec( + image='python3.7', + command=['echo', 'hello world'], + args=[], + ) + + +if __name__ == '__main__': + from kfp import compiler + compiler.Compiler().compile( + pipeline_func=hello_world_container, + package_path=__file__.replace('.py', '.yaml')) diff --git a/sdk/python/kfp/compiler/test_data/components/container_no_input.yaml b/sdk/python/kfp/compiler/test_data/components/container_no_input.yaml new file mode 100644 index 00000000000..73f3705e8db --- /dev/null +++ b/sdk/python/kfp/compiler/test_data/components/container_no_input.yaml @@ -0,0 +1,29 @@ +components: + comp-hello-world-container: + executorLabel: exec-hello-world-container + outputDefinitions: + parameters: + Output: + parameterType: STRING +deploymentSpec: + executors: + exec-hello-world-container: + container: + command: + - echo + - hello world + image: python3.7 +pipelineInfo: + name: hello-world-container +root: + dag: + tasks: + hello-world-container: + cachingOptions: + enableCache: true + componentRef: + name: comp-hello-world-container + taskInfo: + name: hello-world-container +schemaVersion: 2.1.0 +sdkVersion: kfp-2.0.0-beta.0 diff --git a/sdk/python/kfp/compiler/test_data/pipelines/container_component_with_no_inputs.py b/sdk/python/kfp/compiler/test_data/pipelines/container_component_with_no_inputs.py new file mode 100644 index 00000000000..5d5c76aea9a --- /dev/null +++ b/sdk/python/kfp/compiler/test_data/pipelines/container_component_with_no_inputs.py @@ -0,0 +1,35 @@ +# Copyright 2022 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from kfp import compiler +from kfp import dsl + + +@dsl.container_component +def hello_world_container() -> str: + return dsl.ContainerSpec( + image='python3.7', + command=['echo', 'hello world'], + args=[], + ) + + +@dsl.pipeline(name='v2-container-component-no-input') +def pipeline(): + hello_world_container() + + +if __name__ == '__main__': + compiler.Compiler().compile( + pipeline_func=pipeline, package_path=__file__.replace('.py', '.yaml')) diff --git a/sdk/python/kfp/compiler/test_data/pipelines/container_component_with_no_inputs.yaml b/sdk/python/kfp/compiler/test_data/pipelines/container_component_with_no_inputs.yaml new file mode 100644 index 00000000000..5714f3ae505 --- /dev/null +++ b/sdk/python/kfp/compiler/test_data/pipelines/container_component_with_no_inputs.yaml @@ -0,0 +1,29 @@ +components: + comp-hello-world-container: + executorLabel: exec-hello-world-container + outputDefinitions: + parameters: + Output: + parameterType: STRING +deploymentSpec: + executors: + exec-hello-world-container: + container: + command: + - echo + - hello world + image: python3.7 +pipelineInfo: + name: v2-container-component-no-input +root: + dag: + tasks: + hello-world-container: + cachingOptions: + enableCache: true + componentRef: + name: comp-hello-world-container + taskInfo: + name: hello-world-container +schemaVersion: 2.1.0 +sdkVersion: kfp-2.0.0-beta.0 From 40c9b81bd0bd0b7060a4e3e1f048d28b9e635864 Mon Sep 17 00:00:00 2001 From: scottxu Date: Thu, 7 Jul 2022 10:34:35 -0700 Subject: [PATCH 04/23] modify compiler test data --- .../test_data/pipelines/container_component_with_no_inputs.py | 2 +- .../pipelines/container_component_with_no_inputs.yaml | 4 ---- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/sdk/python/kfp/compiler/test_data/pipelines/container_component_with_no_inputs.py b/sdk/python/kfp/compiler/test_data/pipelines/container_component_with_no_inputs.py index 5d5c76aea9a..22d21dce3e1 100644 --- a/sdk/python/kfp/compiler/test_data/pipelines/container_component_with_no_inputs.py +++ b/sdk/python/kfp/compiler/test_data/pipelines/container_component_with_no_inputs.py @@ -17,7 +17,7 @@ @dsl.container_component -def hello_world_container() -> str: +def hello_world_container(): return dsl.ContainerSpec( image='python3.7', command=['echo', 'hello world'], diff --git a/sdk/python/kfp/compiler/test_data/pipelines/container_component_with_no_inputs.yaml b/sdk/python/kfp/compiler/test_data/pipelines/container_component_with_no_inputs.yaml index 5714f3ae505..705689a6494 100644 --- a/sdk/python/kfp/compiler/test_data/pipelines/container_component_with_no_inputs.yaml +++ b/sdk/python/kfp/compiler/test_data/pipelines/container_component_with_no_inputs.yaml @@ -1,10 +1,6 @@ components: comp-hello-world-container: executorLabel: exec-hello-world-container - outputDefinitions: - parameters: - Output: - parameterType: STRING deploymentSpec: executors: exec-hello-world-container: From e9a8bf152d2d5c8de0da1bd30ad6c364539b3cdc Mon Sep 17 00:00:00 2001 From: scottxu Date: Thu, 7 Jul 2022 12:15:09 -0700 Subject: [PATCH 05/23] resolve reviews --- .../test_data/pipelines/container_component_with_no_inputs.py | 4 ++-- .../pipelines/container_component_with_no_inputs.yaml | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sdk/python/kfp/compiler/test_data/pipelines/container_component_with_no_inputs.py b/sdk/python/kfp/compiler/test_data/pipelines/container_component_with_no_inputs.py index 22d21dce3e1..52e11ed3b5e 100644 --- a/sdk/python/kfp/compiler/test_data/pipelines/container_component_with_no_inputs.py +++ b/sdk/python/kfp/compiler/test_data/pipelines/container_component_with_no_inputs.py @@ -17,9 +17,9 @@ @dsl.container_component -def hello_world_container(): +def hello_world_container() -> None: return dsl.ContainerSpec( - image='python3.7', + image='python:3.7', command=['echo', 'hello world'], args=[], ) diff --git a/sdk/python/kfp/compiler/test_data/pipelines/container_component_with_no_inputs.yaml b/sdk/python/kfp/compiler/test_data/pipelines/container_component_with_no_inputs.yaml index 705689a6494..6ceab5e57a0 100644 --- a/sdk/python/kfp/compiler/test_data/pipelines/container_component_with_no_inputs.yaml +++ b/sdk/python/kfp/compiler/test_data/pipelines/container_component_with_no_inputs.yaml @@ -8,7 +8,7 @@ deploymentSpec: command: - echo - hello world - image: python3.7 + image: python:3.7 pipelineInfo: name: v2-container-component-no-input root: From 56ac809cf37c43e293a5bfdcbaa6dc3fbf0091fd Mon Sep 17 00:00:00 2001 From: scottxu Date: Thu, 7 Jul 2022 12:15:51 -0700 Subject: [PATCH 06/23] resolve reviews --- sdk/python/kfp/compiler/_read_write_test_config.py | 2 ++ sdk/python/kfp/compiler/compiler_test.py | 2 +- .../test_data/components/container_no_input.py | 7 ++++--- .../test_data/components/container_no_input.yaml | 6 +----- sdk/python/kfp/components/container_component.py | 2 +- .../components/container_component_decorator.py | 10 +++++----- .../container_component_decorator_test.py | 14 +++++++------- 7 files changed, 21 insertions(+), 22 deletions(-) diff --git a/sdk/python/kfp/compiler/_read_write_test_config.py b/sdk/python/kfp/compiler/_read_write_test_config.py index 2e84ebb8fdf..aaf5628599f 100644 --- a/sdk/python/kfp/compiler/_read_write_test_config.py +++ b/sdk/python/kfp/compiler/_read_write_test_config.py @@ -42,6 +42,7 @@ 'pipeline_with_task_final_status', 'pipeline_with_task_final_status_yaml', 'component_with_pip_index_urls', + 'container_component_with_no_inputs', ], 'test_data_dir': 'sdk/python/kfp/compiler/test_data/pipelines', 'config': { @@ -60,6 +61,7 @@ 'output_artifact', 'output_metrics', 'preprocess', + 'container_no_input', ], 'test_data_dir': 'sdk/python/kfp/compiler/test_data/components', 'config': { diff --git a/sdk/python/kfp/compiler/compiler_test.py b/sdk/python/kfp/compiler/compiler_test.py index bbfaf6fb404..3c3c77652ca 100644 --- a/sdk/python/kfp/compiler/compiler_test.py +++ b/sdk/python/kfp/compiler/compiler_test.py @@ -807,7 +807,7 @@ def hello_world(text: str) -> str: def test_compile_container_component_simple(self): @dsl.container_component - def hello_world_container() -> str: + def hello_world_container() -> None: """Hello world component.""" return ContainerSpec( image='python3.7', diff --git a/sdk/python/kfp/compiler/test_data/components/container_no_input.py b/sdk/python/kfp/compiler/test_data/components/container_no_input.py index c917a569997..8defbd808ec 100644 --- a/sdk/python/kfp/compiler/test_data/components/container_no_input.py +++ b/sdk/python/kfp/compiler/test_data/components/container_no_input.py @@ -11,13 +11,14 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from kfp.dsl import container_component, ContainerSpec +from kfp.dsl import container_component +from kfp.dsl import ContainerSpec @container_component -def hello_world_container() -> str: +def hello_world_container() -> None: return ContainerSpec( - image='python3.7', + image='python:3.7', command=['echo', 'hello world'], args=[], ) diff --git a/sdk/python/kfp/compiler/test_data/components/container_no_input.yaml b/sdk/python/kfp/compiler/test_data/components/container_no_input.yaml index 73f3705e8db..2d4f0367413 100644 --- a/sdk/python/kfp/compiler/test_data/components/container_no_input.yaml +++ b/sdk/python/kfp/compiler/test_data/components/container_no_input.yaml @@ -1,10 +1,6 @@ components: comp-hello-world-container: executorLabel: exec-hello-world-container - outputDefinitions: - parameters: - Output: - parameterType: STRING deploymentSpec: executors: exec-hello-world-container: @@ -12,7 +8,7 @@ deploymentSpec: command: - echo - hello world - image: python3.7 + image: python:3.7 pipelineInfo: name: hello-world-container root: diff --git a/sdk/python/kfp/components/container_component.py b/sdk/python/kfp/components/container_component.py index 9f71acff3b0..d9e0773bd1f 100644 --- a/sdk/python/kfp/components/container_component.py +++ b/sdk/python/kfp/components/container_component.py @@ -27,7 +27,7 @@ class ContainerComponent(base_component.BaseComponent): """ def __init__(self, component_spec: structures.ComponentSpec, - pipeline_func: Callable): + pipeline_func: Callable) -> None: super().__init__(component_spec=component_spec) self.pipeline_func = pipeline_func diff --git a/sdk/python/kfp/components/container_component_decorator.py b/sdk/python/kfp/components/container_component_decorator.py index ab82632de30..78543f485c0 100644 --- a/sdk/python/kfp/components/container_component_decorator.py +++ b/sdk/python/kfp/components/container_component_decorator.py @@ -14,12 +14,12 @@ from typing import Callable -from kfp.components.structures import Implementation +from kfp.components import structures from kfp.components import component_factory -from kfp.components.container_component import ContainerComponent +from kfp.components import container_component -def container_component(func: Callable): +def container_component(func: Callable) -> container_component.ContainerComponent: """Decorator for container-based components in KFP v2. Sample usage: @@ -51,6 +51,6 @@ def my_component( """ component_spec = component_factory.extract_component_interface(func) - component_spec.implementation = Implementation( + component_spec.implementation = structures.Implementation( func()) # TODO: add compatability for placeholder - return ContainerComponent(component_spec, func) + return container_component.ContainerComponent(component_spec, func) diff --git a/sdk/python/kfp/components/container_component_decorator_test.py b/sdk/python/kfp/components/container_component_decorator_test.py index f8575c49858..f9f5e91db20 100644 --- a/sdk/python/kfp/components/container_component_decorator_test.py +++ b/sdk/python/kfp/components/container_component_decorator_test.py @@ -14,23 +14,23 @@ import unittest -from kfp.components.container_component import ContainerComponent -from kfp.components.structures import ContainerSpec -from kfp.components.container_component_decorator import container_component +from kfp.components import container_component +from kfp.components import structures +from kfp.components import container_component_decorator class TestContainerComponentDecorator(unittest.TestCase): def test_func_with_no_arg(self): - @container_component - def hello_world() -> str: + @container_component_decorator.container_component + def hello_world() -> None: """Hello world component.""" - return ContainerSpec( + return structures.ContainerSpec( image='python3.7', command=['echo', 'hello world'], args=[], ) - self.assertIsInstance(hello_world, ContainerComponent) + self.assertIsInstance(hello_world, container_component.ContainerComponent) self.assertIsNone(hello_world.component_spec.inputs) From 702f3ef70938f9a75ec1088c4e79282bb1859a5a Mon Sep 17 00:00:00 2001 From: scottxu Date: Mon, 11 Jul 2022 10:51:15 -0700 Subject: [PATCH 07/23] WIP --- sdk/python/kfp/compiler/compiler_test.py | 11 ++++++----- .../test_data/components/container_no_input.py | 9 ++++----- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/sdk/python/kfp/compiler/compiler_test.py b/sdk/python/kfp/compiler/compiler_test.py index 3c3c77652ca..12108b4996b 100644 --- a/sdk/python/kfp/compiler/compiler_test.py +++ b/sdk/python/kfp/compiler/compiler_test.py @@ -30,9 +30,10 @@ from kfp.components.types import type_utils from kfp.dsl import PipelineTaskFinalStatus from kfp.pipeline_spec import pipeline_spec_pb2 -from kfp.components.structures import ContainerSpec import yaml +from pipelines.sdk.python.kfp.components import container_component + VALID_PRODUCER_COMPONENT_SAMPLE = components.load_component_from_text(""" name: producer inputs: @@ -807,10 +808,10 @@ def hello_world(text: str) -> str: def test_compile_container_component_simple(self): @dsl.container_component - def hello_world_container() -> None: + def hello_world_container() -> dsl.ContainerSpec: """Hello world component.""" - return ContainerSpec( - image='python3.7', + return dsl.ContainerSpec( + image='python:3.7', command=['echo', 'hello world'], args=[], ) @@ -826,7 +827,7 @@ def hello_world_container() -> None: self.assertEqual( pipeline_spec['components']['comp-hello-world-container'] ['outputDefinitions']['parameters']['Output']['parameterType'], - 'STRING') + 'None') class TestCompileBadInput(unittest.TestCase): diff --git a/sdk/python/kfp/compiler/test_data/components/container_no_input.py b/sdk/python/kfp/compiler/test_data/components/container_no_input.py index 8defbd808ec..e3fbc3eb506 100644 --- a/sdk/python/kfp/compiler/test_data/components/container_no_input.py +++ b/sdk/python/kfp/compiler/test_data/components/container_no_input.py @@ -11,13 +11,12 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from kfp.dsl import container_component -from kfp.dsl import ContainerSpec +from kfp import dsl -@container_component -def hello_world_container() -> None: - return ContainerSpec( +@dsl.container_component +def hello_world_container(): + return dsl.ContainerSpec( image='python:3.7', command=['echo', 'hello world'], args=[], From b6eb5e2b10858966f41d4a403c80d77604425dba Mon Sep 17 00:00:00 2001 From: scottxu Date: Wed, 13 Jul 2022 16:25:01 -0700 Subject: [PATCH 08/23] implementation of function of no inputs --- samples/test/config.yaml | 4 +- samples/v2/pipeline_container_no_input.py | 38 ++++++ .../v2/pipeline_container_no_input_test.py | 52 ++++++++ sdk/python/kfp/compiler/compiler_test.py | 8 +- .../components/container_no_input.py | 11 +- .../components/container_no_input.yaml | 16 +-- .../container_component_with_no_inputs.py | 2 +- .../kfp/components/component_factory.py | 115 ++++++++++++++++++ .../container_component_decorator.py | 14 +-- .../container_component_decorator_test.py | 11 +- 10 files changed, 237 insertions(+), 34 deletions(-) create mode 100644 samples/v2/pipeline_container_no_input.py create mode 100644 samples/v2/pipeline_container_no_input_test.py diff --git a/samples/test/config.yaml b/samples/test/config.yaml index 1c6cce44acc..1a6b079a2fc 100644 --- a/samples/test/config.yaml +++ b/samples/test/config.yaml @@ -1,4 +1,4 @@ -# Copyright 2021 The Kubeflow Authors +# Copyright 2021-2022 The Kubeflow Authors # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -122,6 +122,8 @@ path: samples.v2.producer_consumer_param_test - name: pipeline_with_importer path: samples.v2.pipeline_with_importer_test +- nmae: pipeline_container_no_input + path: samples.v2.pipeline_container_no_input_test # TODO(capri-xiyue): Re-enable after figuring out V2 Engine # and protobuf.Value support. # - name: cache_v2 diff --git a/samples/v2/pipeline_container_no_input.py b/samples/v2/pipeline_container_no_input.py new file mode 100644 index 00000000000..a8a23f66623 --- /dev/null +++ b/samples/v2/pipeline_container_no_input.py @@ -0,0 +1,38 @@ +# Copyright 2022 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import os + +from kfp import dsl +from kfp import compiler + + +@dsl.container_component +def container_no_input(): + return dsl.ContainerSpec( + image='python:3.7', + command=['echo', 'hello world'], + args=[], + ) + + +@dsl.pipeline(name='v2-container-component-no-input') +def pipeline_container_no_input(): + container_no_input() + + +if __name__ == "__main__": + # execute only if run as a script + compiler.Compiler().compile( + pipeline_func=pipeline_container_no_input, + package_path='pipeline_container_no_input.json') diff --git a/samples/v2/pipeline_container_no_input_test.py b/samples/v2/pipeline_container_no_input_test.py new file mode 100644 index 00000000000..7ba2a66622c --- /dev/null +++ b/samples/v2/pipeline_container_no_input_test.py @@ -0,0 +1,52 @@ +# Copyright 2022 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Pipeline container no input v2 engine pipeline.""" + +from __future__ import annotations + +import unittest + +import kfp.deprecated as kfp +import kfp_server_api +from ml_metadata.proto import Execution + +from kfp.samples.test.utils import KfpTask, TaskInputs, TaskOutputs, TestCase, run_pipeline_func +from .pipeline_container_no_input import pipeline_container_no_input + + +def verify(t: unittest.TestCase, run: kfp_server_api.ApiRun, + tasks: dict[str, KfpTask], **kwargs): + t.assertEqual(run.status, 'Succeeded') + t.assertEqual( + { + 'hello-world': + KfpTask( + name='hello-world', + type='system.ContainerExecution', + state=Execution.State.COMPLETE, + inputs=TaskInputs(parameters={}, artifacts=[]), + outputs=TaskOutputs(parameters={}, artifacts=[])) + }, + tasks, + ) + + +if __name__ == '__main__': + run_pipeline_func([ + TestCase( + pipeline_func=pipeline_container_no_input, + verify_func=verify, + mode=kfp.dsl.PipelineExecutionMode.V2_ENGINE, + ), + ]) diff --git a/sdk/python/kfp/compiler/compiler_test.py b/sdk/python/kfp/compiler/compiler_test.py index 12108b4996b..d72b2a4840a 100644 --- a/sdk/python/kfp/compiler/compiler_test.py +++ b/sdk/python/kfp/compiler/compiler_test.py @@ -32,8 +32,6 @@ from kfp.pipeline_spec import pipeline_spec_pb2 import yaml -from pipelines.sdk.python.kfp.components import container_component - VALID_PRODUCER_COMPONENT_SAMPLE = components.load_component_from_text(""" name: producer inputs: @@ -825,9 +823,9 @@ def hello_world_container() -> dsl.ContainerSpec: with open(output_json, 'r') as f: pipeline_spec = yaml.safe_load(f) self.assertEqual( - pipeline_spec['components']['comp-hello-world-container'] - ['outputDefinitions']['parameters']['Output']['parameterType'], - 'None') + pipeline_spec['deploymentSpec']['executors'] + ['exec-hello-world-container']['container']['command'], + ['echo', 'hello world']) class TestCompileBadInput(unittest.TestCase): diff --git a/sdk/python/kfp/compiler/test_data/components/container_no_input.py b/sdk/python/kfp/compiler/test_data/components/container_no_input.py index e3fbc3eb506..7c806ee3018 100644 --- a/sdk/python/kfp/compiler/test_data/components/container_no_input.py +++ b/sdk/python/kfp/compiler/test_data/components/container_no_input.py @@ -11,12 +11,13 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from kfp import dsl +from kfp.dsl import ContainerSpec +from kfp.dsl import container_component -@dsl.container_component -def hello_world_container(): - return dsl.ContainerSpec( +@container_component +def container_no_input(): + return ContainerSpec( image='python:3.7', command=['echo', 'hello world'], args=[], @@ -26,5 +27,5 @@ def hello_world_container(): if __name__ == '__main__': from kfp import compiler compiler.Compiler().compile( - pipeline_func=hello_world_container, + pipeline_func=container_no_input, package_path=__file__.replace('.py', '.yaml')) diff --git a/sdk/python/kfp/compiler/test_data/components/container_no_input.yaml b/sdk/python/kfp/compiler/test_data/components/container_no_input.yaml index 2d4f0367413..984dd5d3888 100644 --- a/sdk/python/kfp/compiler/test_data/components/container_no_input.yaml +++ b/sdk/python/kfp/compiler/test_data/components/container_no_input.yaml @@ -1,25 +1,25 @@ components: - comp-hello-world-container: - executorLabel: exec-hello-world-container + comp-container-no-input: + executorLabel: exec-container-no-input deploymentSpec: executors: - exec-hello-world-container: + exec-container-no-input: container: command: - echo - hello world image: python:3.7 pipelineInfo: - name: hello-world-container + name: container-no-input root: dag: tasks: - hello-world-container: + container-no-input: cachingOptions: enableCache: true componentRef: - name: comp-hello-world-container + name: comp-container-no-input taskInfo: - name: hello-world-container + name: container-no-input schemaVersion: 2.1.0 -sdkVersion: kfp-2.0.0-beta.0 +sdkVersion: kfp-2.0.0-beta.1 diff --git a/sdk/python/kfp/compiler/test_data/pipelines/container_component_with_no_inputs.py b/sdk/python/kfp/compiler/test_data/pipelines/container_component_with_no_inputs.py index 52e11ed3b5e..724b701dba1 100644 --- a/sdk/python/kfp/compiler/test_data/pipelines/container_component_with_no_inputs.py +++ b/sdk/python/kfp/compiler/test_data/pipelines/container_component_with_no_inputs.py @@ -17,7 +17,7 @@ @dsl.container_component -def hello_world_container() -> None: +def hello_world_container(): return dsl.ContainerSpec( image='python:3.7', command=['echo', 'hello world'], diff --git a/sdk/python/kfp/components/component_factory.py b/sdk/python/kfp/components/component_factory.py index 9dbd920adee..aebb7bcb290 100644 --- a/sdk/python/kfp/components/component_factory.py +++ b/sdk/python/kfp/components/component_factory.py @@ -23,6 +23,7 @@ import docstring_parser from kfp.components import placeholders from kfp.components import python_component +from kfp.components import container_component from kfp.components import structures from kfp.components.types import artifact_types from kfp.components.types import type_annotations @@ -435,3 +436,117 @@ def create_component_from_func(func: Callable, return python_component.PythonComponent( component_spec=component_spec, python_func=func) + + +def extract_container_component_interface( + func: Callable) -> structures.ComponentSpec: + """Extracting the type annotations from function signature, without analyzing return annotation. + """ + signature = inspect.signature(func) + parameters = list(signature.parameters.values()) + + parsed_docstring = docstring_parser.parse(inspect.getdoc(func)) + + inputs = {} + outputs = {} + + input_names = set() + output_names = set() + for parameter in parameters: + parameter_type = type_annotations.maybe_strip_optional_from_annotation( + parameter.annotation) + passing_style = None + io_name = parameter.name + + if type_annotations.is_artifact_annotation(parameter_type): + # passing_style is either type_annotations.InputAnnotation or + # type_annotations.OutputAnnotation. + passing_style = type_annotations.get_io_artifact_annotation( + parameter_type) + + # parameter_type is type_annotations.Artifact or one of its subclasses. + parameter_type = type_annotations.get_io_artifact_class( + parameter_type) + if not issubclass(parameter_type, artifact_types.Artifact): + raise ValueError( + 'Input[T] and Output[T] are only supported when T is a ' + 'subclass of Artifact. Found `{} with type {}`'.format( + io_name, parameter_type)) + + if parameter.default is not inspect.Parameter.empty: + raise ValueError( + 'Default values for Input/Output artifacts are not supported.' + ) + elif isinstance( + parameter_type, + (type_annotations.InputPath, type_annotations.OutputPath)): + passing_style = type(parameter_type) + parameter_type = parameter_type.type + if parameter.default is not inspect.Parameter.empty and not ( + passing_style == type_annotations.InputPath and + parameter.default is None): + raise ValueError( + 'Path inputs only support default values of None. Default' + ' values for outputs are not supported.') + + type_struct = _annotation_to_type_struct(parameter_type) + if type_struct is None: + raise TypeError('Missing type annotation for argument: {}'.format( + parameter.name)) + + if passing_style in [ + type_annotations.OutputAnnotation, type_annotations.OutputPath + ]: + io_name = _maybe_make_unique(io_name, output_names) + output_names.add(io_name) + output_spec = structures.OutputSpec(type=type_struct) + outputs[io_name] = output_spec + else: + io_name = _maybe_make_unique(io_name, input_names) + input_names.add(io_name) + if parameter.default is not inspect.Parameter.empty: + input_spec = structures.InputSpec( + type=type_struct, + default=parameter.default, + ) + else: + input_spec = structures.InputSpec(type=type_struct) + + inputs[io_name] = input_spec + + # Component name and description are derived from the function's name and + # docstring. The name can be overridden by setting setting func.__name__ + # attribute (of the legacy func._component_human_name attribute). The + # description can be overridden by setting the func.__doc__ attribute (or + # the legacy func._component_description attribute). + component_name = getattr(func, '_component_human_name', + None) or _python_function_name_to_component_name( + func.__name__) + description = getattr(func, '_component_description', + None) or parsed_docstring.short_description + if description: + description = description.strip() + + component_spec = structures.ComponentSpec( + name=component_name, + description=description, + inputs=inputs if inputs else None, + outputs=outputs if outputs else None, + # Dummy implementation to bypass model validation. + implementation=structures.Implementation(), + ) + return component_spec + + +def create_container_component_from_func( + func: Callable) -> container_component.ContainerComponent: + """Implementation for the @container_component decorator. + + The decorator is defined under container_component_decorator.py. See the + decorator for the canonical documentation for this function. + """ + + component_spec = extract_container_component_interface(func) + component_spec.implementation = structures.Implementation( + func()) # TODO: Milestone 1b: add compatability for placeholder in args + return container_component.ContainerComponent(component_spec, func) diff --git a/sdk/python/kfp/components/container_component_decorator.py b/sdk/python/kfp/components/container_component_decorator.py index 78543f485c0..94d476a2718 100644 --- a/sdk/python/kfp/components/container_component_decorator.py +++ b/sdk/python/kfp/components/container_component_decorator.py @@ -14,16 +14,16 @@ from typing import Callable -from kfp.components import structures from kfp.components import component_factory from kfp.components import container_component -def container_component(func: Callable) -> container_component.ContainerComponent: +def container_component( + func: Callable) -> container_component.ContainerComponent: """Decorator for container-based components in KFP v2. Sample usage: - from kfp.v2.dsl import container_component, ContainerSpec, InputPath, OutputPath, Output + from kfp.dsl import container_component, ContainerSpec, InputPath, OutputPath, Output @container_component def my_component( @@ -34,7 +34,7 @@ def my_component( ): return ContainerSpec( image='gcr.io/my-image', - command=['python3', 'my_component.py'], + command=['sh', 'my_component.sh'], arguments=[ '--dataset_path', dataset_path, '--model_path', model.path, @@ -49,8 +49,4 @@ def my_component( a plain parameter, or a path to a file). """ - - component_spec = component_factory.extract_component_interface(func) - component_spec.implementation = structures.Implementation( - func()) # TODO: add compatability for placeholder - return container_component.ContainerComponent(component_spec, func) + return component_factory.create_container_component_from_func(func) diff --git a/sdk/python/kfp/components/container_component_decorator_test.py b/sdk/python/kfp/components/container_component_decorator_test.py index f9f5e91db20..b407219c394 100644 --- a/sdk/python/kfp/components/container_component_decorator_test.py +++ b/sdk/python/kfp/components/container_component_decorator_test.py @@ -15,8 +15,8 @@ import unittest from kfp.components import container_component -from kfp.components import structures -from kfp.components import container_component_decorator +from kfp.components import container_component_decorator +from kfp import dsl class TestContainerComponentDecorator(unittest.TestCase): @@ -24,13 +24,14 @@ class TestContainerComponentDecorator(unittest.TestCase): def test_func_with_no_arg(self): @container_component_decorator.container_component - def hello_world() -> None: + def hello_world() -> dsl.ContainerSpec: """Hello world component.""" - return structures.ContainerSpec( + return dsl.ContainerSpec( image='python3.7', command=['echo', 'hello world'], args=[], ) - self.assertIsInstance(hello_world, container_component.ContainerComponent) + self.assertIsInstance(hello_world, + container_component.ContainerComponent) self.assertIsNone(hello_world.component_spec.inputs) From 4f7745a3c201841b6416319ec75d864bcfe2f9f3 Mon Sep 17 00:00:00 2001 From: scottxu Date: Thu, 14 Jul 2022 16:23:29 -0700 Subject: [PATCH 09/23] fixed sample test --- samples/v2/pipeline_container_no_input_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/samples/v2/pipeline_container_no_input_test.py b/samples/v2/pipeline_container_no_input_test.py index 7ba2a66622c..cc824a0ae1a 100644 --- a/samples/v2/pipeline_container_no_input_test.py +++ b/samples/v2/pipeline_container_no_input_test.py @@ -32,7 +32,7 @@ def verify(t: unittest.TestCase, run: kfp_server_api.ApiRun, { 'hello-world': KfpTask( - name='hello-world', + name='container-no-input', type='system.ContainerExecution', state=Execution.State.COMPLETE, inputs=TaskInputs(parameters={}, artifacts=[]), From 8275381df137fdc92a05b68e6200a1b258a07a91 Mon Sep 17 00:00:00 2001 From: scottxu Date: Thu, 14 Jul 2022 16:45:53 -0700 Subject: [PATCH 10/23] re-fix sample test --- samples/v2/pipeline_container_no_input_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/samples/v2/pipeline_container_no_input_test.py b/samples/v2/pipeline_container_no_input_test.py index cc824a0ae1a..f77572e53fe 100644 --- a/samples/v2/pipeline_container_no_input_test.py +++ b/samples/v2/pipeline_container_no_input_test.py @@ -30,7 +30,7 @@ def verify(t: unittest.TestCase, run: kfp_server_api.ApiRun, t.assertEqual(run.status, 'Succeeded') t.assertEqual( { - 'hello-world': + 'container-no-input': KfpTask( name='container-no-input', type='system.ContainerExecution', From 7b922239ce2181068702bee2c754a05e335a85b5 Mon Sep 17 00:00:00 2001 From: scottxu Date: Fri, 15 Jul 2022 10:34:05 -0700 Subject: [PATCH 11/23] fix rebase merge conflict --- sdk/python/kfp/dsl/__init__.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/sdk/python/kfp/dsl/__init__.py b/sdk/python/kfp/dsl/__init__.py index 11067a6deba..79ba66e05e7 100644 --- a/sdk/python/kfp/dsl/__init__.py +++ b/sdk/python/kfp/dsl/__init__.py @@ -44,15 +44,10 @@ from kfp.components.component_decorator import component from kfp.components.container_component_decorator import container_component -<<<<<<< HEAD -======= - -from kfp.components.structures import ContainerSpec - ->>>>>>> e11415bae (resolve review comments) from kfp.components.importer_node import importer from kfp.components.pipeline_context import pipeline from kfp.components.pipeline_task import PipelineTask +from kfp.components.structures import ContainerSpec from kfp.components.task_final_status import PipelineTaskFinalStatus from kfp.components.tasks_group import Condition from kfp.components.tasks_group import ExitHandler From dc064b5f058046cbd0dcf35e619e5ea1b325263a Mon Sep 17 00:00:00 2001 From: scottxu Date: Fri, 15 Jul 2022 10:45:29 -0700 Subject: [PATCH 12/23] resolve formatting --- sdk/python/kfp/components/component_factory.py | 10 +++++----- .../kfp/components/container_component_decorator.py | 1 - .../components/container_component_decorator_test.py | 2 +- 3 files changed, 6 insertions(+), 7 deletions(-) diff --git a/sdk/python/kfp/components/component_factory.py b/sdk/python/kfp/components/component_factory.py index aebb7bcb290..a002451d848 100644 --- a/sdk/python/kfp/components/component_factory.py +++ b/sdk/python/kfp/components/component_factory.py @@ -21,9 +21,9 @@ import warnings import docstring_parser +from kfp.components import container_component from kfp.components import placeholders from kfp.components import python_component -from kfp.components import container_component from kfp.components import structures from kfp.components.types import artifact_types from kfp.components.types import type_annotations @@ -440,8 +440,8 @@ def create_component_from_func(func: Callable, def extract_container_component_interface( func: Callable) -> structures.ComponentSpec: - """Extracting the type annotations from function signature, without analyzing return annotation. - """ + """Extracting the type annotations from function signature, without + analyzing return annotation.""" signature = inspect.signature(func) parameters = list(signature.parameters.values()) @@ -542,8 +542,8 @@ def create_container_component_from_func( func: Callable) -> container_component.ContainerComponent: """Implementation for the @container_component decorator. - The decorator is defined under container_component_decorator.py. See the - decorator for the canonical documentation for this function. + The decorator is defined under container_component_decorator.py. See + the decorator for the canonical documentation for this function. """ component_spec = extract_container_component_interface(func) diff --git a/sdk/python/kfp/components/container_component_decorator.py b/sdk/python/kfp/components/container_component_decorator.py index 94d476a2718..41a3d2064c6 100644 --- a/sdk/python/kfp/components/container_component_decorator.py +++ b/sdk/python/kfp/components/container_component_decorator.py @@ -47,6 +47,5 @@ def my_component( should have type annotations for all its arguments, indicating how it is intended to be used (e.g. as an input/output Artifact object, a plain parameter, or a path to a file). - """ return component_factory.create_container_component_from_func(func) diff --git a/sdk/python/kfp/components/container_component_decorator_test.py b/sdk/python/kfp/components/container_component_decorator_test.py index b407219c394..3be609d686a 100644 --- a/sdk/python/kfp/components/container_component_decorator_test.py +++ b/sdk/python/kfp/components/container_component_decorator_test.py @@ -14,9 +14,9 @@ import unittest +from kfp import dsl from kfp.components import container_component from kfp.components import container_component_decorator -from kfp import dsl class TestContainerComponentDecorator(unittest.TestCase): From eb317811dac778d04e33d1c14a438d6d3b27caae Mon Sep 17 00:00:00 2001 From: scottxu Date: Fri, 15 Jul 2022 10:49:37 -0700 Subject: [PATCH 13/23] resolve isort error for test data --- .../kfp/compiler/test_data/components/container_no_input.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/kfp/compiler/test_data/components/container_no_input.py b/sdk/python/kfp/compiler/test_data/components/container_no_input.py index 7c806ee3018..1a35eb3c47f 100644 --- a/sdk/python/kfp/compiler/test_data/components/container_no_input.py +++ b/sdk/python/kfp/compiler/test_data/components/container_no_input.py @@ -11,8 +11,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from kfp.dsl import ContainerSpec from kfp.dsl import container_component +from kfp.dsl import ContainerSpec @container_component From 8db6d8934dbaaff4e4cd66b9d0c3d38681b96a52 Mon Sep 17 00:00:00 2001 From: scottxu Date: Mon, 18 Jul 2022 12:49:24 -0700 Subject: [PATCH 14/23] resolve comments --- samples/test/config.yaml | 2 +- samples/v2/pipeline_container_no_input.py | 4 ++-- sdk/python/kfp/components/component_factory.py | 5 +---- 3 files changed, 4 insertions(+), 7 deletions(-) diff --git a/samples/test/config.yaml b/samples/test/config.yaml index 1a6b079a2fc..fe273050eef 100644 --- a/samples/test/config.yaml +++ b/samples/test/config.yaml @@ -122,7 +122,7 @@ path: samples.v2.producer_consumer_param_test - name: pipeline_with_importer path: samples.v2.pipeline_with_importer_test -- nmae: pipeline_container_no_input +- name: pipeline_container_no_input path: samples.v2.pipeline_container_no_input_test # TODO(capri-xiyue): Re-enable after figuring out V2 Engine # and protobuf.Value support. diff --git a/samples/v2/pipeline_container_no_input.py b/samples/v2/pipeline_container_no_input.py index a8a23f66623..6657e7c61b6 100644 --- a/samples/v2/pipeline_container_no_input.py +++ b/samples/v2/pipeline_container_no_input.py @@ -13,8 +13,8 @@ # limitations under the License. import os -from kfp import dsl from kfp import compiler +from kfp import dsl @dsl.container_component @@ -31,7 +31,7 @@ def pipeline_container_no_input(): container_no_input() -if __name__ == "__main__": +if __name__ == '__main__': # execute only if run as a script compiler.Compiler().compile( pipeline_func=pipeline_container_no_input, diff --git a/sdk/python/kfp/components/component_factory.py b/sdk/python/kfp/components/component_factory.py index a002451d848..3c4db05f2cb 100644 --- a/sdk/python/kfp/components/component_factory.py +++ b/sdk/python/kfp/components/component_factory.py @@ -533,8 +533,7 @@ def extract_container_component_interface( inputs=inputs if inputs else None, outputs=outputs if outputs else None, # Dummy implementation to bypass model validation. - implementation=structures.Implementation(), - ) + implementation=func()) return component_spec @@ -547,6 +546,4 @@ def create_container_component_from_func( """ component_spec = extract_container_component_interface(func) - component_spec.implementation = structures.Implementation( - func()) # TODO: Milestone 1b: add compatability for placeholder in args return container_component.ContainerComponent(component_spec, func) From b146ae0d42dd571511ce571b7bc6cc1516fbd63b Mon Sep 17 00:00:00 2001 From: scottxu Date: Mon, 18 Jul 2022 12:58:24 -0700 Subject: [PATCH 15/23] fix nit --- sdk/python/kfp/components/component_factory.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/kfp/components/component_factory.py b/sdk/python/kfp/components/component_factory.py index 3c4db05f2cb..961fb1895b8 100644 --- a/sdk/python/kfp/components/component_factory.py +++ b/sdk/python/kfp/components/component_factory.py @@ -533,7 +533,7 @@ def extract_container_component_interface( inputs=inputs if inputs else None, outputs=outputs if outputs else None, # Dummy implementation to bypass model validation. - implementation=func()) + implementation=structures.Implementation(func())) return component_spec From bb9484ec3151f0a2cab84ad05a2e105d67685fe8 Mon Sep 17 00:00:00 2001 From: scottxu Date: Mon, 18 Jul 2022 13:33:16 -0700 Subject: [PATCH 16/23] resolve nit --- sdk/python/kfp/components/component_factory.py | 1 - 1 file changed, 1 deletion(-) diff --git a/sdk/python/kfp/components/component_factory.py b/sdk/python/kfp/components/component_factory.py index 961fb1895b8..1fad87e0415 100644 --- a/sdk/python/kfp/components/component_factory.py +++ b/sdk/python/kfp/components/component_factory.py @@ -532,7 +532,6 @@ def extract_container_component_interface( description=description, inputs=inputs if inputs else None, outputs=outputs if outputs else None, - # Dummy implementation to bypass model validation. implementation=structures.Implementation(func())) return component_spec From 920a30fd6ce7e2394c458efa32ec6dab41de4187 Mon Sep 17 00:00:00 2001 From: scottxu Date: Fri, 22 Jul 2022 11:38:00 -0700 Subject: [PATCH 17/23] add implementation for placeholders i/o, sample and compiler tests --- .../kfp/compiler/_read_write_test_config.py | 3 + sdk/python/kfp/compiler/compiler_test.py | 68 ++++++++++++++++ .../test_data/components/container_io.py | 31 +++++++ .../test_data/components/container_io.yaml | 44 ++++++++++ .../container_with_artifact_output.py | 44 ++++++++++ .../container_with_artifact_output.yaml | 53 ++++++++++++ .../two_step_pipeline_containerized.py | 47 +++++++++++ .../two_step_pipeline_containerized.yaml | 80 +++++++++++++++++++ .../kfp/components/component_factory.py | 56 ++++++++++++- .../container_component_decorator_test.py | 44 +++++++++- 10 files changed, 466 insertions(+), 4 deletions(-) create mode 100644 sdk/python/kfp/compiler/test_data/components/container_io.py create mode 100644 sdk/python/kfp/compiler/test_data/components/container_io.yaml create mode 100644 sdk/python/kfp/compiler/test_data/components/container_with_artifact_output.py create mode 100644 sdk/python/kfp/compiler/test_data/components/container_with_artifact_output.yaml create mode 100644 sdk/python/kfp/compiler/test_data/pipelines/two_step_pipeline_containerized.py create mode 100644 sdk/python/kfp/compiler/test_data/pipelines/two_step_pipeline_containerized.yaml diff --git a/sdk/python/kfp/compiler/_read_write_test_config.py b/sdk/python/kfp/compiler/_read_write_test_config.py index aaf5628599f..234eea3f387 100644 --- a/sdk/python/kfp/compiler/_read_write_test_config.py +++ b/sdk/python/kfp/compiler/_read_write_test_config.py @@ -43,6 +43,7 @@ 'pipeline_with_task_final_status_yaml', 'component_with_pip_index_urls', 'container_component_with_no_inputs', + 'two_step_pipeline_containerized', ], 'test_data_dir': 'sdk/python/kfp/compiler/test_data/pipelines', 'config': { @@ -62,6 +63,8 @@ 'output_metrics', 'preprocess', 'container_no_input', + 'container_io', + 'container_with_artifact_output', ], 'test_data_dir': 'sdk/python/kfp/compiler/test_data/components', 'config': { diff --git a/sdk/python/kfp/compiler/compiler_test.py b/sdk/python/kfp/compiler/compiler_test.py index d72b2a4840a..246368f9c7d 100644 --- a/sdk/python/kfp/compiler/compiler_test.py +++ b/sdk/python/kfp/compiler/compiler_test.py @@ -827,6 +827,74 @@ def hello_world_container() -> dsl.ContainerSpec: ['exec-hello-world-container']['container']['command'], ['echo', 'hello world']) + def test_compile_container_with_simple_io(self): + + @dsl.container_component + def container_simple_io(text: str, output_path: dsl.OutputPath(str)): + return dsl.ContainerSpec( + image='python:3.7', + command=['echo', text], + args=['--output_path', output_path]) + + with tempfile.TemporaryDirectory() as tempdir: + output_json = os.path.join(tempdir, 'component.yaml') + compiler.Compiler().compile( + pipeline_func=container_simple_io, + package_path=output_json, + pipeline_name='container-simple-io') + with open(output_json, 'r') as f: + pipeline_spec = yaml.safe_load(f) + self.assertEqual( + pipeline_spec['components']['comp-container-simple-io'] + ['inputDefinitions']['parameters']['text']['parameterType'], + 'STRING') + self.assertEqual( + pipeline_spec['components']['comp-container-simple-io'] + ['outputDefinitions']['parameters']['output_path']['parameterType'], + 'STRING') + + def test_compile_container_with_artifact_output(self): + from kfp.dsl import container_component + from kfp.dsl import ContainerSpec + from kfp.dsl import Model + from kfp.dsl import Output + from kfp.dsl import OutputPath + + @container_component + def container_with_artifacts_output( + num_epochs: int, # also as an input + model: Output[Model], + model_config_path: OutputPath(str), + ): + return ContainerSpec( + image='gcr.io/my-image', + command=['sh', 'run.sh'], + args=[ + '--epochs', + num_epochs, + '--model_path', + model.uri, + '--model_config_path', + model_config_path, + ]) + + with tempfile.TemporaryDirectory() as tempdir: + output_json = os.path.join(tempdir, 'component.yaml') + compiler.Compiler().compile( + pipeline_func=container_with_artifacts_output, + package_path=output_json, + pipeline_name='container-with-artifacts-output') + with open(output_json, 'r') as f: + pipeline_spec = yaml.safe_load(f) + self.assertEqual( + pipeline_spec['components']['comp-container-with-artifacts-output'] + ['inputDefinitions']['parameters']['num_epochs']['parameterType'], + 'NUMBER_INTEGER') + self.assertIn( + 'model', + pipeline_spec['components']['comp-container-with-artifacts-output'] + ['outputDefinitions']['artifacts']) + class TestCompileBadInput(unittest.TestCase): diff --git a/sdk/python/kfp/compiler/test_data/components/container_io.py b/sdk/python/kfp/compiler/test_data/components/container_io.py new file mode 100644 index 00000000000..48637b721c0 --- /dev/null +++ b/sdk/python/kfp/compiler/test_data/components/container_io.py @@ -0,0 +1,31 @@ +# Copyright 2022 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from kfp.dsl import container_component +from kfp.dsl import ContainerSpec +from kfp.dsl import OutputPath + + +@container_component +def container_io(text: str, output_path: OutputPath(str)): + return ContainerSpec( + image='python:3.7', + command=['echo', text], + args=['--output_path', output_path]) + + +if __name__ == '__main__': + from kfp import compiler + compiler.Compiler().compile( + pipeline_func=container_io, + package_path=__file__.replace('.py', '.yaml')) diff --git a/sdk/python/kfp/compiler/test_data/components/container_io.yaml b/sdk/python/kfp/compiler/test_data/components/container_io.yaml new file mode 100644 index 00000000000..60a0f7c1659 --- /dev/null +++ b/sdk/python/kfp/compiler/test_data/components/container_io.yaml @@ -0,0 +1,44 @@ +components: + comp-container-io: + executorLabel: exec-container-io + inputDefinitions: + parameters: + text: + parameterType: STRING + outputDefinitions: + parameters: + output_path: + parameterType: STRING +deploymentSpec: + executors: + exec-container-io: + container: + args: + - --output_path + - '{{$.outputs.parameters[''output_path''].output_file}}' + command: + - echo + - '{{$.inputs.parameters[''text'']}}' + image: python:3.7 +pipelineInfo: + name: container-io +root: + dag: + tasks: + container-io: + cachingOptions: + enableCache: true + componentRef: + name: comp-container-io + inputs: + parameters: + text: + componentInputParameter: text + taskInfo: + name: container-io + inputDefinitions: + parameters: + text: + parameterType: STRING +schemaVersion: 2.1.0 +sdkVersion: kfp-2.0.0-beta.1 diff --git a/sdk/python/kfp/compiler/test_data/components/container_with_artifact_output.py b/sdk/python/kfp/compiler/test_data/components/container_with_artifact_output.py new file mode 100644 index 00000000000..255ec7a67bd --- /dev/null +++ b/sdk/python/kfp/compiler/test_data/components/container_with_artifact_output.py @@ -0,0 +1,44 @@ +# Copyright 2022 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from kfp.dsl import container_component +from kfp.dsl import ContainerSpec +from kfp.dsl import Model +from kfp.dsl import Output +from kfp.dsl import OutputPath + + +@container_component +def container_with_artifacts_output( + num_epochs: int, # also as an input + model: Output[Model], + model_config_path: OutputPath(str), +): + return ContainerSpec( + image='gcr.io/my-image', + command=['sh', 'run.sh'], + args=[ + '--epochs', + num_epochs, + '--model_path', + model.uri, + '--model_config_path', + model_config_path, + ]) + + +if __name__ == '__main__': + from kfp import compiler + compiler.Compiler().compile( + pipeline_func=container_with_artifacts_output, + package_path=__file__.replace('.py', '.yaml')) diff --git a/sdk/python/kfp/compiler/test_data/components/container_with_artifact_output.yaml b/sdk/python/kfp/compiler/test_data/components/container_with_artifact_output.yaml new file mode 100644 index 00000000000..f3fd72dcd36 --- /dev/null +++ b/sdk/python/kfp/compiler/test_data/components/container_with_artifact_output.yaml @@ -0,0 +1,53 @@ +components: + comp-container-with-artifacts-output: + executorLabel: exec-container-with-artifacts-output + inputDefinitions: + parameters: + num_epochs: + parameterType: NUMBER_INTEGER + outputDefinitions: + artifacts: + model: + artifactType: + schemaTitle: system.Model + schemaVersion: 0.0.1 + parameters: + model_config_path: + parameterType: STRING +deploymentSpec: + executors: + exec-container-with-artifacts-output: + container: + args: + - --epochs + - '{{$.inputs.parameters[''num_epochs'']}}' + - --model_path + - '{{$.outputs.artifacts[''model''].uri}}' + - --model_config_path + - '{{$.outputs.parameters[''model_config_path''].output_file}}' + command: + - sh + - run.sh + image: gcr.io/my-image +pipelineInfo: + name: container-with-artifacts-output +root: + dag: + tasks: + container-with-artifacts-output: + cachingOptions: + enableCache: true + componentRef: + name: comp-container-with-artifacts-output + inputs: + parameters: + num_epochs: + componentInputParameter: num_epochs + taskInfo: + name: container-with-artifacts-output + inputDefinitions: + parameters: + num_epochs: + parameterType: NUMBER_INTEGER +schemaVersion: 2.1.0 +sdkVersion: kfp-2.0.0-beta.1 diff --git a/sdk/python/kfp/compiler/test_data/pipelines/two_step_pipeline_containerized.py b/sdk/python/kfp/compiler/test_data/pipelines/two_step_pipeline_containerized.py new file mode 100644 index 00000000000..9eb03f4b159 --- /dev/null +++ b/sdk/python/kfp/compiler/test_data/pipelines/two_step_pipeline_containerized.py @@ -0,0 +1,47 @@ +# Copyright 2022 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from kfp import compiler +from kfp import dsl + + +@dsl.container_component +def component1(text: str, output_gcs: dsl.Output[dsl.Dataset]): + return dsl.ContainerSpec( + image='google/cloud-sdk:slim', + command=[ + 'sh -c | set -e -x', 'echo', text, '| gsutil cp -', output_gcs.uri + ]) + + +@dsl.container_component +def component2(input_gcs: dsl.Input[dsl.Dataset]): + return dsl.ContainerSpec( + image='google/cloud-sdk:slim', + command=['sh', '-c', '|', 'set -e -x gsutil cat'], + args=[input_gcs.uri]) + + +@dsl.pipeline(name='containerized-two-step-pipeline') +def my_pipeline(text: str = 'Hello world!'): + component_1 = component1(text=text).set_display_name('Producer') + component_2 = component2(input_gcs=component_1.outputs['output_gcs']) + component_2.set_display_name('Consumer') + + +if __name__ == '__main__': + compiler.Compiler().compile( + pipeline_func=my_pipeline, + pipeline_parameters={'text': 'Hello KFP!'}, + package_path=__file__.replace('.py', '.yaml')) diff --git a/sdk/python/kfp/compiler/test_data/pipelines/two_step_pipeline_containerized.yaml b/sdk/python/kfp/compiler/test_data/pipelines/two_step_pipeline_containerized.yaml new file mode 100644 index 00000000000..07f125e1eae --- /dev/null +++ b/sdk/python/kfp/compiler/test_data/pipelines/two_step_pipeline_containerized.yaml @@ -0,0 +1,80 @@ +components: + comp-component1: + executorLabel: exec-component1 + inputDefinitions: + parameters: + text: + parameterType: STRING + outputDefinitions: + artifacts: + output_gcs: + artifactType: + schemaTitle: system.Dataset + schemaVersion: 0.0.1 + comp-component2: + executorLabel: exec-component2 + inputDefinitions: + artifacts: + input_gcs: + artifactType: + schemaTitle: system.Dataset + schemaVersion: 0.0.1 +deploymentSpec: + executors: + exec-component1: + container: + command: + - sh -c | set -e -x + - echo + - '{{$.inputs.parameters[''text'']}}' + - '| gsutil cp -' + - '{{$.outputs.artifacts[''output_gcs''].uri}}' + image: google/cloud-sdk:slim + exec-component2: + container: + args: + - '{{$.inputs.artifacts[''input_gcs''].uri}}' + command: + - sh + - -c + - '|' + - set -e -x gsutil cat + image: google/cloud-sdk:slim +pipelineInfo: + name: containerized-two-step-pipeline +root: + dag: + tasks: + component1: + cachingOptions: + enableCache: true + componentRef: + name: comp-component1 + inputs: + parameters: + text: + componentInputParameter: text + taskInfo: + name: Producer + component2: + cachingOptions: + enableCache: true + componentRef: + name: comp-component2 + dependentTasks: + - component1 + inputs: + artifacts: + input_gcs: + taskOutputArtifact: + outputArtifactKey: output_gcs + producerTask: component1 + taskInfo: + name: Consumer + inputDefinitions: + parameters: + text: + defaultValue: Hello KFP! + parameterType: STRING +schemaVersion: 2.1.0 +sdkVersion: kfp-2.0.0-beta.1 diff --git a/sdk/python/kfp/components/component_factory.py b/sdk/python/kfp/components/component_factory.py index 1fad87e0415..7d6a0ee3ba8 100644 --- a/sdk/python/kfp/components/component_factory.py +++ b/sdk/python/kfp/components/component_factory.py @@ -17,7 +17,7 @@ import pathlib import re import textwrap -from typing import Callable, List, Optional, Tuple +from typing import Callable, List, Optional, Tuple, Union import warnings import docstring_parser @@ -48,6 +48,35 @@ class ComponentInfo(): base_image: str = _DEFAULT_BASE_IMAGE +class ContainerComponentArtifactChannel(): + """A class for passing in placeholders into container_component decorated + function.""" + + def __init__(self, io_type: str, var_name: str): + self._io_type = io_type + self._var_name = var_name + + def __getattr__( + self, __name: str + ) -> Union[placeholders.InputUriPlaceholder, placeholders + .InputPathPlaceholder, placeholders.OutputUriPlaceholder, + placeholders.OutputPathPlaceholder]: + if __name not in ['uri', 'path']: + raise AttributeError( + 'Accessing artifact attribute other than uri or path is not supported.' + ) + if self._io_type == 'input': + if __name == 'uri': + return placeholders.InputUriPlaceholder(self._var_name) + elif __name == 'path': + return placeholders.InputPathPlaceholder(self._var_name) + elif self._io_type == 'output': + if __name == 'uri': + return placeholders.OutputUriPlaceholder(self._var_name) + elif __name == 'path': + return placeholders.OutputPathPlaceholder(self._var_name) + + # A map from function_name to components. This is always populated when a # module containing KFP components is loaded. Primarily used by KFP CLI # component builder to package components in a file into containers. @@ -527,12 +556,35 @@ def extract_container_component_interface( if description: description = description.strip() + arg_list = [] + for parameter in parameters: + parameter_type = type_annotations.maybe_strip_optional_from_annotation( + parameter.annotation) + passing_style = None + io_name = parameter.name + if type_annotations.is_input_artifact(parameter_type): + arg_list.append( + ContainerComponentArtifactChannel( + io_type='input', var_name=io_name)) + elif type_annotations.is_output_artifact(parameter_type): + arg_list.append( + ContainerComponentArtifactChannel( + io_type='output', var_name=io_name)) + elif isinstance( + parameter_type, + (type_annotations.OutputAnnotation, type_annotations.OutputPath)): + arg_list.append(placeholders.OutputParameterPlaceholder(io_name)) + else: # parameter is an input value + arg_list.append(placeholders.InputValuePlaceholder(io_name)) + + container_spec = func(*arg_list) + component_spec = structures.ComponentSpec( name=component_name, description=description, inputs=inputs if inputs else None, outputs=outputs if outputs else None, - implementation=structures.Implementation(func())) + implementation=structures.Implementation(container_spec)) return component_spec diff --git a/sdk/python/kfp/components/container_component_decorator_test.py b/sdk/python/kfp/components/container_component_decorator_test.py index 3be609d686a..12f8c8a7226 100644 --- a/sdk/python/kfp/components/container_component_decorator_test.py +++ b/sdk/python/kfp/components/container_component_decorator_test.py @@ -16,14 +16,13 @@ from kfp import dsl from kfp.components import container_component -from kfp.components import container_component_decorator class TestContainerComponentDecorator(unittest.TestCase): def test_func_with_no_arg(self): - @container_component_decorator.container_component + @dsl.container_component def hello_world() -> dsl.ContainerSpec: """Hello world component.""" return dsl.ContainerSpec( @@ -35,3 +34,44 @@ def hello_world() -> dsl.ContainerSpec: self.assertIsInstance(hello_world, container_component.ContainerComponent) self.assertIsNone(hello_world.component_spec.inputs) + + def test_func_with_simple_io(self): + + @dsl.container_component + def hello_world_io( + text: str, + text_output_path: dsl.OutputPath(str)) -> dsl.ContainerSpec: + """Hello world component with input and output.""" + return dsl.ContainerSpec( + image='python:3.7', + command=['echo'], + args=['--text', text, '--output_path', text_output_path]) + + self.assertIsInstance(hello_world_io, + container_component.ContainerComponent) + + def test_func_with_artifact_io(self): + + @dsl.container_component + def container_comp_with_artifacts( + dataset: dsl.Input[dsl.Dataset], + num_epochs: int, # also as an input + model: dsl.Output[dsl.Model], + model_config_path: dsl.OutputPath(str), + ): + return dsl.ContainerSpec( + image='gcr.io/my-image', + command=['sh', 'run.sh'], + args=[ + '--dataset_location', + dataset.path, + '--epochs', + num_epochs, + '--model_path', + model.uri, + '--model_config_path', + model_config_path, + ]) + + self.assertIsInstance(container_comp_with_artifacts, + container_component.ContainerComponent) From 4199c7253c1b69a385d30422343c94bd1b59cab5 Mon Sep 17 00:00:00 2001 From: scottxu Date: Wed, 27 Jul 2022 16:14:35 -0700 Subject: [PATCH 18/23] resolve comments and merge logic for constructing container component --- samples/v2/two_step_pipeline_containerized.py | 55 +++++ .../two_step_pipeline_containerized_test.py | 91 ++++++++ sdk/python/kfp/compiler/compiler_test.py | 44 ++-- .../test_data/components/container_io.py | 2 +- .../test_data/components/container_io.yaml | 2 +- .../container_with_artifact_output.py | 6 +- .../container_with_artifact_output.yaml | 14 +- .../two_step_pipeline_containerized.py | 3 +- .../two_step_pipeline_containerized.yaml | 1 - sdk/python/kfp/components/__init__.py | 2 + .../kfp/components/component_factory.py | 208 +++++------------- .../kfp/components/component_factory_test.py | 16 ++ .../container_component_decorator.py | 42 ++-- sdk/python/kfp/components/structures.py | 38 +++- sdk/python/kfp/dsl/__init__.py | 2 + 15 files changed, 316 insertions(+), 210 deletions(-) create mode 100644 samples/v2/two_step_pipeline_containerized.py create mode 100644 samples/v2/two_step_pipeline_containerized_test.py diff --git a/samples/v2/two_step_pipeline_containerized.py b/samples/v2/two_step_pipeline_containerized.py new file mode 100644 index 00000000000..dbd045b8886 --- /dev/null +++ b/samples/v2/two_step_pipeline_containerized.py @@ -0,0 +1,55 @@ +# Copyright 2022 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Two step pipeline using dsl.container_component decorator.""" +import os + +from kfp import compiler +from kfp.dsl import container_component +from kfp.dsl import ContainerSpec +from kfp.dsl import Dataset +from kfp.dsl import Input +from kfp.dsl import Output +from kfp.dsl import pipeline + + +@container_component +def component1(text: str, output_gcs: Output[Dataset]): + return ContainerSpec( + image='google/cloud-sdk:slim', + command=[ + 'sh -c | set -e -x', 'echo', text, '| gsutil cp -', output_gcs.uri + ]) + + +@container_component +def component2(input_gcs: Input[Dataset]): + return ContainerSpec( + image='google/cloud-sdk:slim', + command=['sh', '-c', '|', 'set -e -x gsutil cat'], + args=[input_gcs.uri]) + + +@pipeline(name='two_step_pipeline_containerized') +def two_step_pipeline_containerized(): + component_1 = component1(text='hi').set_display_name('Producer') + component_2 = component2(input_gcs=component_1.outputs['output_gcs']) + component_2.set_display_name('Consumer') + + +if __name__ == '__main__': + # execute only if run as a script + + compiler.Compiler().compile( + pipeline_func=two_step_pipeline_containerized, + package_path='two_step_pipeline_containerized.json') diff --git a/samples/v2/two_step_pipeline_containerized_test.py b/samples/v2/two_step_pipeline_containerized_test.py new file mode 100644 index 00000000000..2562fd5685c --- /dev/null +++ b/samples/v2/two_step_pipeline_containerized_test.py @@ -0,0 +1,91 @@ +# Copyright 2022 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Pipeline container no input v2 engine pipeline.""" + +from __future__ import annotations + +import unittest + +import kfp.deprecated as kfp +from kfp.samples.test.utils import KfpTask +from kfp.samples.test.utils import run_pipeline_func +from kfp.samples.test.utils import TaskInputs +from kfp.samples.test.utils import TaskOutputs +from kfp.samples.test.utils import TestCase +import kfp_server_api +from ml_metadata.proto import Execution + +from .two_step_pipeline_containerized import two_step_pipeline_containerized + + +def verify(t: unittest.TestCase, run: kfp_server_api.ApiRun, + tasks: dict[str, KfpTask], **kwargs): + t.assertEqual(run.status, 'Succeeded') + component1_dict = tasks['component1'].get_dict() + component2_dict = tasks['component2'].get_dict() + for artifact in component1_dict.get('outputs').get('artifacts'): + # pop metadata here because the artifact which got re-imported may have metadata with uncertain data + if artifact.get('metadata') is not None: + artifact.pop('metadata') + for artifact in component2_dict.get('inputs').get('artifacts'): + # pop metadata here because the artifact which got re-imported may have metadata with uncertain data + if artifact.get('metadata') is not None: + artifact.pop('metadata') + + t.assertEqual( + { + 'name': 'component1', + 'inputs': { + 'parameters': { + 'text': 'hi' + } + }, + 'outputs': { + 'artifacts': [{ + 'name': 'output_gcs', + 'type': 'system.Artifact', + # 'type': 'system.Dataset' + }], + }, + 'type': 'system.ContainerExecution', + 'state': Execution.State.COMPLETE, + }, + component1_dict) + + t.assertEqual( + { + 'name': 'component2', + 'inputs': { + 'artifacts': [{ + 'name': 'input_gcs', + # TODO(chesu): compiled pipeline spec incorrectly sets importer artifact type to system.Artifact, but in the pipeline, it should be system.Dataset. + 'type': 'system.Artifact', + # 'type': 'system.Dataset' + }], + }, + 'outputs': {}, + 'type': 'system.ContainerExecution', + 'state': Execution.State.COMPLETE, + }, + component2_dict) + + +if __name__ == '__main__': + run_pipeline_func([ + TestCase( + pipeline_func=two_step_pipeline_containerized, + verify_func=verify, + mode=kfp.dsl.PipelineExecutionMode.V2_ENGINE, + ), + ]) diff --git a/sdk/python/kfp/compiler/compiler_test.py b/sdk/python/kfp/compiler/compiler_test.py index 246368f9c7d..3a112c1075d 100644 --- a/sdk/python/kfp/compiler/compiler_test.py +++ b/sdk/python/kfp/compiler/compiler_test.py @@ -833,7 +833,7 @@ def test_compile_container_with_simple_io(self): def container_simple_io(text: str, output_path: dsl.OutputPath(str)): return dsl.ContainerSpec( image='python:3.7', - command=['echo', text], + command=['my_program', text], args=['--output_path', output_path]) with tempfile.TemporaryDirectory() as tempdir: @@ -854,19 +854,14 @@ def container_simple_io(text: str, output_path: dsl.OutputPath(str)): 'STRING') def test_compile_container_with_artifact_output(self): - from kfp.dsl import container_component - from kfp.dsl import ContainerSpec - from kfp.dsl import Model - from kfp.dsl import Output - from kfp.dsl import OutputPath - - @container_component - def container_with_artifacts_output( + + @dsl.container_component + def container_with_artifact_output( num_epochs: int, # also as an input - model: Output[Model], - model_config_path: OutputPath(str), + model: dsl.Output[dsl.Model], + model_config_path: dsl.OutputPath(str), ): - return ContainerSpec( + return dsl.ContainerSpec( image='gcr.io/my-image', command=['sh', 'run.sh'], args=[ @@ -881,19 +876,30 @@ def container_with_artifacts_output( with tempfile.TemporaryDirectory() as tempdir: output_json = os.path.join(tempdir, 'component.yaml') compiler.Compiler().compile( - pipeline_func=container_with_artifacts_output, + pipeline_func=container_with_artifact_output, package_path=output_json, - pipeline_name='container-with-artifacts-output') + pipeline_name='container-with-artifact-output') with open(output_json, 'r') as f: pipeline_spec = yaml.safe_load(f) self.assertEqual( - pipeline_spec['components']['comp-container-with-artifacts-output'] + pipeline_spec['components']['comp-container-with-artifact-output'] ['inputDefinitions']['parameters']['num_epochs']['parameterType'], 'NUMBER_INTEGER') - self.assertIn( - 'model', - pipeline_spec['components']['comp-container-with-artifacts-output'] - ['outputDefinitions']['artifacts']) + self.assertEqual( + pipeline_spec['components']['comp-container-with-artifact-output'] + ['outputDefinitions']['artifacts']['model']['artifactType'] + ['schemaTitle'], 'system.Model') + self.assertEqual( + pipeline_spec['components']['comp-container-with-artifact-output'] + ['outputDefinitions']['parameters']['model_config_path'] + ['parameterType'], 'STRING') + args_to_check = pipeline_spec['deploymentSpec']['executors'][ + 'exec-container-with-artifact-output']['container']['args'] + self.assertEqual(args_to_check[3], + "{{$.outputs.artifacts['model'].uri}}") + self.assertEqual( + args_to_check[5], + "{{$.outputs.parameters['model_config_path'].output_file}}") class TestCompileBadInput(unittest.TestCase): diff --git a/sdk/python/kfp/compiler/test_data/components/container_io.py b/sdk/python/kfp/compiler/test_data/components/container_io.py index 48637b721c0..1b64bbc0b1b 100644 --- a/sdk/python/kfp/compiler/test_data/components/container_io.py +++ b/sdk/python/kfp/compiler/test_data/components/container_io.py @@ -20,7 +20,7 @@ def container_io(text: str, output_path: OutputPath(str)): return ContainerSpec( image='python:3.7', - command=['echo', text], + command=['my_program', text], args=['--output_path', output_path]) diff --git a/sdk/python/kfp/compiler/test_data/components/container_io.yaml b/sdk/python/kfp/compiler/test_data/components/container_io.yaml index 60a0f7c1659..61a83d02535 100644 --- a/sdk/python/kfp/compiler/test_data/components/container_io.yaml +++ b/sdk/python/kfp/compiler/test_data/components/container_io.yaml @@ -17,7 +17,7 @@ deploymentSpec: - --output_path - '{{$.outputs.parameters[''output_path''].output_file}}' command: - - echo + - my_program - '{{$.inputs.parameters[''text'']}}' image: python:3.7 pipelineInfo: diff --git a/sdk/python/kfp/compiler/test_data/components/container_with_artifact_output.py b/sdk/python/kfp/compiler/test_data/components/container_with_artifact_output.py index 255ec7a67bd..92fb142e564 100644 --- a/sdk/python/kfp/compiler/test_data/components/container_with_artifact_output.py +++ b/sdk/python/kfp/compiler/test_data/components/container_with_artifact_output.py @@ -19,8 +19,8 @@ @container_component -def container_with_artifacts_output( - num_epochs: int, # also as an input +def container_with_artifact_output( + num_epochs: int, # built-in types are parsed as inputs model: Output[Model], model_config_path: OutputPath(str), ): @@ -40,5 +40,5 @@ def container_with_artifacts_output( if __name__ == '__main__': from kfp import compiler compiler.Compiler().compile( - pipeline_func=container_with_artifacts_output, + pipeline_func=container_with_artifact_output, package_path=__file__.replace('.py', '.yaml')) diff --git a/sdk/python/kfp/compiler/test_data/components/container_with_artifact_output.yaml b/sdk/python/kfp/compiler/test_data/components/container_with_artifact_output.yaml index f3fd72dcd36..15763b9c626 100644 --- a/sdk/python/kfp/compiler/test_data/components/container_with_artifact_output.yaml +++ b/sdk/python/kfp/compiler/test_data/components/container_with_artifact_output.yaml @@ -1,6 +1,6 @@ components: - comp-container-with-artifacts-output: - executorLabel: exec-container-with-artifacts-output + comp-container-with-artifact-output: + executorLabel: exec-container-with-artifact-output inputDefinitions: parameters: num_epochs: @@ -16,7 +16,7 @@ components: parameterType: STRING deploymentSpec: executors: - exec-container-with-artifacts-output: + exec-container-with-artifact-output: container: args: - --epochs @@ -30,21 +30,21 @@ deploymentSpec: - run.sh image: gcr.io/my-image pipelineInfo: - name: container-with-artifacts-output + name: container-with-artifact-output root: dag: tasks: - container-with-artifacts-output: + container-with-artifact-output: cachingOptions: enableCache: true componentRef: - name: comp-container-with-artifacts-output + name: comp-container-with-artifact-output inputs: parameters: num_epochs: componentInputParameter: num_epochs taskInfo: - name: container-with-artifacts-output + name: container-with-artifact-output inputDefinitions: parameters: num_epochs: diff --git a/sdk/python/kfp/compiler/test_data/pipelines/two_step_pipeline_containerized.py b/sdk/python/kfp/compiler/test_data/pipelines/two_step_pipeline_containerized.py index 9eb03f4b159..f85f0507c2b 100644 --- a/sdk/python/kfp/compiler/test_data/pipelines/two_step_pipeline_containerized.py +++ b/sdk/python/kfp/compiler/test_data/pipelines/two_step_pipeline_containerized.py @@ -34,7 +34,7 @@ def component2(input_gcs: dsl.Input[dsl.Dataset]): @dsl.pipeline(name='containerized-two-step-pipeline') -def my_pipeline(text: str = 'Hello world!'): +def my_pipeline(text: str): component_1 = component1(text=text).set_display_name('Producer') component_2 = component2(input_gcs=component_1.outputs['output_gcs']) component_2.set_display_name('Consumer') @@ -43,5 +43,4 @@ def my_pipeline(text: str = 'Hello world!'): if __name__ == '__main__': compiler.Compiler().compile( pipeline_func=my_pipeline, - pipeline_parameters={'text': 'Hello KFP!'}, package_path=__file__.replace('.py', '.yaml')) diff --git a/sdk/python/kfp/compiler/test_data/pipelines/two_step_pipeline_containerized.yaml b/sdk/python/kfp/compiler/test_data/pipelines/two_step_pipeline_containerized.yaml index 07f125e1eae..38f2a3e6c90 100644 --- a/sdk/python/kfp/compiler/test_data/pipelines/two_step_pipeline_containerized.yaml +++ b/sdk/python/kfp/compiler/test_data/pipelines/two_step_pipeline_containerized.yaml @@ -74,7 +74,6 @@ root: inputDefinitions: parameters: text: - defaultValue: Hello KFP! parameterType: STRING schemaVersion: 2.1.0 sdkVersion: kfp-2.0.0-beta.1 diff --git a/sdk/python/kfp/components/__init__.py b/sdk/python/kfp/components/__init__.py index dd507b2b788..02ee9300c21 100644 --- a/sdk/python/kfp/components/__init__.py +++ b/sdk/python/kfp/components/__init__.py @@ -20,10 +20,12 @@ 'load_component_from_url', 'PythonComponent', 'BaseComponent', + 'ContainerComponent', 'YamlComponent', ] from kfp.components.base_component import BaseComponent +from kfp.components.container_component import ContainerComponent from kfp.components.python_component import PythonComponent from kfp.components.yaml_component import load_component_from_file from kfp.components.yaml_component import load_component_from_text diff --git a/sdk/python/kfp/components/component_factory.py b/sdk/python/kfp/components/component_factory.py index 7d6a0ee3ba8..10e94289853 100644 --- a/sdk/python/kfp/components/component_factory.py +++ b/sdk/python/kfp/components/component_factory.py @@ -57,23 +57,23 @@ def __init__(self, io_type: str, var_name: str): self._var_name = var_name def __getattr__( - self, __name: str + self, _name: str ) -> Union[placeholders.InputUriPlaceholder, placeholders .InputPathPlaceholder, placeholders.OutputUriPlaceholder, placeholders.OutputPathPlaceholder]: - if __name not in ['uri', 'path']: + if _name not in ['uri', 'path']: raise AttributeError( 'Accessing artifact attribute other than uri or path is not supported.' ) if self._io_type == 'input': - if __name == 'uri': + if _name == 'uri': return placeholders.InputUriPlaceholder(self._var_name) - elif __name == 'path': + elif _name == 'path': return placeholders.InputPathPlaceholder(self._var_name) elif self._io_type == 'output': - if __name == 'uri': + if _name == 'uri': return placeholders.OutputUriPlaceholder(self._var_name) - elif __name == 'path': + elif _name == 'path': return placeholders.OutputPathPlaceholder(self._var_name) @@ -201,7 +201,9 @@ def _maybe_make_unique(name: str, names: List[str]): raise RuntimeError('Too many arguments with the name {}'.format(name)) -def extract_component_interface(func: Callable) -> structures.ComponentSpec: +def extract_component_interface( + func: Callable, + containerized: bool = False) -> structures.ComponentSpec: single_output_name_const = 'Output' signature = inspect.signature(func) @@ -277,45 +279,48 @@ def extract_component_interface(func: Callable) -> structures.ComponentSpec: inputs[io_name] = input_spec #Analyzing the return type annotations. - return_ann = signature.return_annotation - if hasattr(return_ann, '_fields'): #NamedTuple - # Getting field type annotations. - # __annotations__ does not exist in python 3.5 and earlier - # _field_types does not exist in python 3.9 and later - field_annotations = getattr(return_ann, - '__annotations__', None) or getattr( - return_ann, '_field_types', None) - for field_name in return_ann._fields: - type_struct = None - if field_annotations: - type_struct = _annotation_to_type_struct( - field_annotations.get(field_name, None)) - - output_name = _maybe_make_unique(field_name, output_names) + if not containerized: + return_ann = signature.return_annotation + if hasattr(return_ann, '_fields'): #NamedTuple + # Getting field type annotations. + # __annotations__ does not exist in python 3.5 and earlier + # _field_types does not exist in python 3.9 and later + field_annotations = getattr(return_ann, '__annotations__', + None) or getattr( + return_ann, '_field_types', None) + for field_name in return_ann._fields: + type_struct = None + if field_annotations: + type_struct = _annotation_to_type_struct( + field_annotations.get(field_name, None)) + + output_name = _maybe_make_unique(field_name, output_names) + output_names.add(output_name) + output_spec = structures.OutputSpec(type=type_struct) + outputs[output_name] = output_spec + # Deprecated dict-based way of declaring multiple outputs. Was only used by + # the @component decorator + elif isinstance(return_ann, dict): + warnings.warn( + 'The ability to specify multiple outputs using the dict syntax' + ' has been deprecated. It will be removed soon after release' + ' 0.1.32. Please use typing.NamedTuple to declare multiple' + ' outputs.') + for output_name, output_type_annotation in return_ann.items(): + output_type_struct = _annotation_to_type_struct( + output_type_annotation) + output_spec = structures.OutputSpec(type=output_type_struct) + outputs[name] = output_spec + elif signature.return_annotation is not None and signature.return_annotation != inspect.Parameter.empty: + output_name = _maybe_make_unique(single_output_name_const, + output_names) + # Fixes exotic, but possible collision: + # `def func(output_path: OutputPath()) -> str: ...` output_names.add(output_name) + type_struct = _annotation_to_type_struct( + signature.return_annotation) output_spec = structures.OutputSpec(type=type_struct) outputs[output_name] = output_spec - # Deprecated dict-based way of declaring multiple outputs. Was only used by - # the @component decorator - elif isinstance(return_ann, dict): - warnings.warn( - 'The ability to specify multiple outputs using the dict syntax' - ' has been deprecated. It will be removed soon after release' - ' 0.1.32. Please use typing.NamedTuple to declare multiple' - ' outputs.') - for output_name, output_type_annotation in return_ann.items(): - output_type_struct = _annotation_to_type_struct( - output_type_annotation) - output_spec = structures.OutputSpec(type=output_type_struct) - outputs[name] = output_spec - elif signature.return_annotation is not None and signature.return_annotation != inspect.Parameter.empty: - output_name = _maybe_make_unique(single_output_name_const, output_names) - # Fixes exotic, but possible collision: - # `def func(output_path: OutputPath()) -> str: ...` - output_names.add(output_name) - type_struct = _annotation_to_type_struct(signature.return_annotation) - output_spec = structures.OutputSpec(type=type_struct) - outputs[output_name] = output_spec # Component name and description are derived from the function's name and # docstring. The name can be overridden by setting setting func.__name__ @@ -467,100 +472,21 @@ def create_component_from_func(func: Callable, component_spec=component_spec, python_func=func) -def extract_container_component_interface( - func: Callable) -> structures.ComponentSpec: - """Extracting the type annotations from function signature, without - analyzing return annotation.""" - signature = inspect.signature(func) - parameters = list(signature.parameters.values()) - - parsed_docstring = docstring_parser.parse(inspect.getdoc(func)) - - inputs = {} - outputs = {} - - input_names = set() - output_names = set() - for parameter in parameters: - parameter_type = type_annotations.maybe_strip_optional_from_annotation( - parameter.annotation) - passing_style = None - io_name = parameter.name - - if type_annotations.is_artifact_annotation(parameter_type): - # passing_style is either type_annotations.InputAnnotation or - # type_annotations.OutputAnnotation. - passing_style = type_annotations.get_io_artifact_annotation( - parameter_type) - - # parameter_type is type_annotations.Artifact or one of its subclasses. - parameter_type = type_annotations.get_io_artifact_class( - parameter_type) - if not issubclass(parameter_type, artifact_types.Artifact): - raise ValueError( - 'Input[T] and Output[T] are only supported when T is a ' - 'subclass of Artifact. Found `{} with type {}`'.format( - io_name, parameter_type)) - - if parameter.default is not inspect.Parameter.empty: - raise ValueError( - 'Default values for Input/Output artifacts are not supported.' - ) - elif isinstance( - parameter_type, - (type_annotations.InputPath, type_annotations.OutputPath)): - passing_style = type(parameter_type) - parameter_type = parameter_type.type - if parameter.default is not inspect.Parameter.empty and not ( - passing_style == type_annotations.InputPath and - parameter.default is None): - raise ValueError( - 'Path inputs only support default values of None. Default' - ' values for outputs are not supported.') - - type_struct = _annotation_to_type_struct(parameter_type) - if type_struct is None: - raise TypeError('Missing type annotation for argument: {}'.format( - parameter.name)) - - if passing_style in [ - type_annotations.OutputAnnotation, type_annotations.OutputPath - ]: - io_name = _maybe_make_unique(io_name, output_names) - output_names.add(io_name) - output_spec = structures.OutputSpec(type=type_struct) - outputs[io_name] = output_spec - else: - io_name = _maybe_make_unique(io_name, input_names) - input_names.add(io_name) - if parameter.default is not inspect.Parameter.empty: - input_spec = structures.InputSpec( - type=type_struct, - default=parameter.default, - ) - else: - input_spec = structures.InputSpec(type=type_struct) - - inputs[io_name] = input_spec +def create_container_component_from_func( + func: Callable) -> container_component.ContainerComponent: + """Implementation for the @container_component decorator. - # Component name and description are derived from the function's name and - # docstring. The name can be overridden by setting setting func.__name__ - # attribute (of the legacy func._component_human_name attribute). The - # description can be overridden by setting the func.__doc__ attribute (or - # the legacy func._component_description attribute). - component_name = getattr(func, '_component_human_name', - None) or _python_function_name_to_component_name( - func.__name__) - description = getattr(func, '_component_description', - None) or parsed_docstring.short_description - if description: - description = description.strip() + The decorator is defined under container_component_decorator.py. See + the decorator for the canonical documentation for this function. + """ + component_spec = extract_component_interface(func, containerized=True) arg_list = [] + signature = inspect.signature(func) + parameters = list(signature.parameters.values()) for parameter in parameters: parameter_type = type_annotations.maybe_strip_optional_from_annotation( parameter.annotation) - passing_style = None io_name = parameter.name if type_annotations.is_input_artifact(parameter_type): arg_list.append( @@ -578,23 +504,5 @@ def extract_container_component_interface( arg_list.append(placeholders.InputValuePlaceholder(io_name)) container_spec = func(*arg_list) - - component_spec = structures.ComponentSpec( - name=component_name, - description=description, - inputs=inputs if inputs else None, - outputs=outputs if outputs else None, - implementation=structures.Implementation(container_spec)) - return component_spec - - -def create_container_component_from_func( - func: Callable) -> container_component.ContainerComponent: - """Implementation for the @container_component decorator. - - The decorator is defined under container_component_decorator.py. See - the decorator for the canonical documentation for this function. - """ - - component_spec = extract_container_component_interface(func) + component_spec.implementation = structures.Implementation(container_spec) return container_component.ContainerComponent(component_spec, func) diff --git a/sdk/python/kfp/components/component_factory_test.py b/sdk/python/kfp/components/component_factory_test.py index 6b984b6962b..6be610b3067 100644 --- a/sdk/python/kfp/components/component_factory_test.py +++ b/sdk/python/kfp/components/component_factory_test.py @@ -15,6 +15,7 @@ import unittest from kfp.components import component_factory +from kfp.components import placeholders class TestGetPackagesToInstallCommand(unittest.TestCase): @@ -44,3 +45,18 @@ def test_with_packages_to_install_with_pip_index_url(self): concat_command = ' '.join(command) for package in packages_to_install + pip_index_urls: self.assertTrue(package in concat_command) + + +class TestContainerComponentArtifactChannel(unittest.TestCase): + + def test_correct_placeholder_and_attribute_error(self): + in_channel = component_factory.ContainerComponentArtifactChannel( + 'input', 'my_dataset') + out_channel = component_factory.ContainerComponentArtifactChannel( + 'output', 'my_result') + self.assertEqual(in_channel.uri, + placeholders.InputUriPlaceholder('my_dataset')) + self.assertEqual(out_channel.path, + placeholders.OutputPathPlaceholder('my_result')) + self.assertRaises(AttributeError, lambda: in_channel.name) + self.assertRaises(AttributeError, lambda: out_channel.channel) diff --git a/sdk/python/kfp/components/container_component_decorator.py b/sdk/python/kfp/components/container_component_decorator.py index 41a3d2064c6..95cb334eb80 100644 --- a/sdk/python/kfp/components/container_component_decorator.py +++ b/sdk/python/kfp/components/container_component_decorator.py @@ -22,30 +22,32 @@ def container_component( func: Callable) -> container_component.ContainerComponent: """Decorator for container-based components in KFP v2. - Sample usage: - from kfp.dsl import container_component, ContainerSpec, InputPath, OutputPath, Output - - @container_component - def my_component( - dataset_path: InputPath(Dataset), - model: Output[Model], - num_epochs: int, - output_parameter: OutputPath(str), - ): - return ContainerSpec( - image='gcr.io/my-image', - command=['sh', 'my_component.sh'], - arguments=[ - '--dataset_path', dataset_path, - '--model_path', model.path, - '--output_parameter_path', output_parameter, - ] - ) - Args: func: The python function to create a component from. The function should have type annotations for all its arguments, indicating how it is intended to be used (e.g. as an input/output Artifact object, a plain parameter, or a path to a file). + + Example: + :: + + from kfp.dsl import container_component, ContainerSpec, InputPath, OutputPath, Output + + @container_component + def my_component( + dataset_path: InputPath(Dataset), + model: Output[Model], + num_epochs: int, + output_parameter: OutputPath(str), + ): + return ContainerSpec( + image='gcr.io/my-image', + command=['sh', 'my_component.sh'], + arguments=[ + '--dataset_path', dataset_path, + '--model_path', model.path, + '--output_parameter_path', output_parameter, + ] + ) """ return component_factory.create_container_component_from_func(func) diff --git a/sdk/python/kfp/components/structures.py b/sdk/python/kfp/components/structures.py index 60a1710cab4..84f5e83f56b 100644 --- a/sdk/python/kfp/components/structures.py +++ b/sdk/python/kfp/components/structures.py @@ -180,18 +180,44 @@ class ResourceSpec(base_model.BaseModel): class ContainerSpec(base_model.BaseModel): """Container implementation definition. - Attributes: - image: The container image. - command (optional): the container entrypoint. - args (optional): the arguments to the container entrypoint. - env (optional): the environment variables to be passed to the container. - resources (optional): the specification on the resource requirements. + This is only used for pipeline authors when constructing a containerized component + using @container_component decorator. + + Examples: + :: + + @container_component + def container_with_artifact_output( + num_epochs: int, # built-in types are parsed as inputs + model: Output[Model], + model_config_path: OutputPath(str), + ): + return ContainerSpec( + image='gcr.io/my-image', + command=['sh', 'run.sh'], + args=[ + '--epochs', + num_epochs, + '--model_path', + model.uri, + '--model_config_path', + model_config_path, + ]) """ image: str + """Container image.""" + command: Optional[List[placeholders.CommandLineElement]] = None + """Container entrypoint.""" + args: Optional[List[placeholders.CommandLineElement]] = None + """Arguments to the container entrypoint.""" + env: Optional[Mapping[str, placeholders.CommandLineElement]] = None + """Environment variables to be passed to the container.""" + resources: Optional[ResourceSpec] = None + """Specification on the resource requirements.""" def transform_command(self) -> None: """Use None instead of empty list for command.""" diff --git a/sdk/python/kfp/dsl/__init__.py b/sdk/python/kfp/dsl/__init__.py index 79ba66e05e7..1f21b2b8754 100644 --- a/sdk/python/kfp/dsl/__init__.py +++ b/sdk/python/kfp/dsl/__init__.py @@ -16,6 +16,8 @@ __all__ = [ 'component', + 'container_component', + 'ContainerSpec', 'importer', 'pipeline', 'PipelineTask', From 97a1e1ad766275194f527cd97e1c71177c254f75 Mon Sep 17 00:00:00 2001 From: scottxu Date: Thu, 28 Jul 2022 15:29:03 -0700 Subject: [PATCH 19/23] resolve comments --- samples/v2/pipeline_container_no_input.py | 2 +- samples/v2/two_step_pipeline_containerized.py | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/samples/v2/pipeline_container_no_input.py b/samples/v2/pipeline_container_no_input.py index 6657e7c61b6..d9e7c24c5c5 100644 --- a/samples/v2/pipeline_container_no_input.py +++ b/samples/v2/pipeline_container_no_input.py @@ -35,4 +35,4 @@ def pipeline_container_no_input(): # execute only if run as a script compiler.Compiler().compile( pipeline_func=pipeline_container_no_input, - package_path='pipeline_container_no_input.json') + package_path='pipeline_container_no_input.yaml') diff --git a/samples/v2/two_step_pipeline_containerized.py b/samples/v2/two_step_pipeline_containerized.py index dbd045b8886..2cee711ed7d 100644 --- a/samples/v2/two_step_pipeline_containerized.py +++ b/samples/v2/two_step_pipeline_containerized.py @@ -37,10 +37,10 @@ def component2(input_gcs: Input[Dataset]): return ContainerSpec( image='google/cloud-sdk:slim', command=['sh', '-c', '|', 'set -e -x gsutil cat'], - args=[input_gcs.uri]) + args=[input_gcs.path]) -@pipeline(name='two_step_pipeline_containerized') +@pipeline(name='two-step-pipeline-containerized') def two_step_pipeline_containerized(): component_1 = component1(text='hi').set_display_name('Producer') component_2 = component2(input_gcs=component_1.outputs['output_gcs']) @@ -52,4 +52,4 @@ def two_step_pipeline_containerized(): compiler.Compiler().compile( pipeline_func=two_step_pipeline_containerized, - package_path='two_step_pipeline_containerized.json') + package_path='two_step_pipeline_containerized.yaml') From 2dc74c4a7d78507b6f33e5f95e6c57210e49cbf8 Mon Sep 17 00:00:00 2001 From: scottxu Date: Mon, 1 Aug 2022 11:09:40 -0700 Subject: [PATCH 20/23] resolve comments --- samples/v2/two_step_pipeline_containerized_test.py | 13 ++++--------- sdk/python/kfp/compiler/compiler_test.py | 6 +++--- sdk/python/kfp/components/component_factory.py | 10 ++++++---- 3 files changed, 13 insertions(+), 16 deletions(-) diff --git a/samples/v2/two_step_pipeline_containerized_test.py b/samples/v2/two_step_pipeline_containerized_test.py index 2562fd5685c..9aa76556070 100644 --- a/samples/v2/two_step_pipeline_containerized_test.py +++ b/samples/v2/two_step_pipeline_containerized_test.py @@ -54,14 +54,12 @@ def verify(t: unittest.TestCase, run: kfp_server_api.ApiRun, 'outputs': { 'artifacts': [{ 'name': 'output_gcs', - 'type': 'system.Artifact', - # 'type': 'system.Dataset' + 'type': 'system.Dataset' }], }, 'type': 'system.ContainerExecution', 'state': Execution.State.COMPLETE, - }, - component1_dict) + }, component1_dict) t.assertEqual( { @@ -69,16 +67,13 @@ def verify(t: unittest.TestCase, run: kfp_server_api.ApiRun, 'inputs': { 'artifacts': [{ 'name': 'input_gcs', - # TODO(chesu): compiled pipeline spec incorrectly sets importer artifact type to system.Artifact, but in the pipeline, it should be system.Dataset. - 'type': 'system.Artifact', - # 'type': 'system.Dataset' + 'type': 'system.Dataset' }], }, 'outputs': {}, 'type': 'system.ContainerExecution', 'state': Execution.State.COMPLETE, - }, - component2_dict) + }, component2_dict) if __name__ == '__main__': diff --git a/sdk/python/kfp/compiler/compiler_test.py b/sdk/python/kfp/compiler/compiler_test.py index 3a112c1075d..bff66204eed 100644 --- a/sdk/python/kfp/compiler/compiler_test.py +++ b/sdk/python/kfp/compiler/compiler_test.py @@ -874,12 +874,12 @@ def container_with_artifact_output( ]) with tempfile.TemporaryDirectory() as tempdir: - output_json = os.path.join(tempdir, 'component.yaml') + output_yaml = os.path.join(tempdir, 'component.yaml') compiler.Compiler().compile( pipeline_func=container_with_artifact_output, - package_path=output_json, + package_path=output_yaml, pipeline_name='container-with-artifact-output') - with open(output_json, 'r') as f: + with open(output_yaml, 'r') as f: pipeline_spec = yaml.safe_load(f) self.assertEqual( pipeline_spec['components']['comp-container-with-artifact-output'] diff --git a/sdk/python/kfp/components/component_factory.py b/sdk/python/kfp/components/component_factory.py index 10e94289853..91aba55715d 100644 --- a/sdk/python/kfp/components/component_factory.py +++ b/sdk/python/kfp/components/component_factory.py @@ -62,9 +62,7 @@ def __getattr__( .InputPathPlaceholder, placeholders.OutputUriPlaceholder, placeholders.OutputPathPlaceholder]: if _name not in ['uri', 'path']: - raise AttributeError( - 'Accessing artifact attribute other than uri or path is not supported.' - ) + raise AttributeError('Cannot access artifact attribute "{_name}".') if self._io_type == 'input': if _name == 'uri': return placeholders.InputUriPlaceholder(self._var_name) @@ -279,8 +277,8 @@ def extract_component_interface( inputs[io_name] = input_spec #Analyzing the return type annotations. + return_ann = signature.return_annotation if not containerized: - return_ann = signature.return_annotation if hasattr(return_ann, '_fields'): #NamedTuple # Getting field type annotations. # __annotations__ does not exist in python 3.5 and earlier @@ -321,6 +319,10 @@ def extract_component_interface( signature.return_annotation) output_spec = structures.OutputSpec(type=type_struct) outputs[output_name] = output_spec + elif return_ann != inspect.Parameter.empty and return_ann != structures.ContainerSpec: + raise TypeError( + 'Return annotation should be either ContainerSpec or ignored for container components.' + ) # Component name and description are derived from the function's name and # docstring. The name can be overridden by setting setting func.__name__ From e8f393c1674fbbfc63ea6960546f0b4525bcd4cc Mon Sep 17 00:00:00 2001 From: scottxu Date: Mon, 1 Aug 2022 14:47:52 -0700 Subject: [PATCH 21/23] fix assertion messages --- sdk/python/kfp/components/component_factory.py | 4 ++-- sdk/python/kfp/components/component_factory_test.py | 8 ++++++-- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/sdk/python/kfp/components/component_factory.py b/sdk/python/kfp/components/component_factory.py index 91aba55715d..403487508ce 100644 --- a/sdk/python/kfp/components/component_factory.py +++ b/sdk/python/kfp/components/component_factory.py @@ -62,7 +62,7 @@ def __getattr__( .InputPathPlaceholder, placeholders.OutputUriPlaceholder, placeholders.OutputPathPlaceholder]: if _name not in ['uri', 'path']: - raise AttributeError('Cannot access artifact attribute "{_name}".') + raise AttributeError(f'Cannot access artifact attribute "{_name}".') if self._io_type == 'input': if _name == 'uri': return placeholders.InputUriPlaceholder(self._var_name) @@ -321,7 +321,7 @@ def extract_component_interface( outputs[output_name] = output_spec elif return_ann != inspect.Parameter.empty and return_ann != structures.ContainerSpec: raise TypeError( - 'Return annotation should be either ContainerSpec or ignored for container components.' + 'Return annotation should be either ContainerSpec or omitted for container components.' ) # Component name and description are derived from the function's name and diff --git a/sdk/python/kfp/components/component_factory_test.py b/sdk/python/kfp/components/component_factory_test.py index 6be610b3067..4764144aac7 100644 --- a/sdk/python/kfp/components/component_factory_test.py +++ b/sdk/python/kfp/components/component_factory_test.py @@ -58,5 +58,9 @@ def test_correct_placeholder_and_attribute_error(self): placeholders.InputUriPlaceholder('my_dataset')) self.assertEqual(out_channel.path, placeholders.OutputPathPlaceholder('my_result')) - self.assertRaises(AttributeError, lambda: in_channel.name) - self.assertRaises(AttributeError, lambda: out_channel.channel) + self.assertRaisesRegex(AttributeError, + r'Cannot access artifact attribute "name"', + lambda: in_channel.name) + self.assertRaisesRegex(AttributeError, + r'Cannot access artifact attribute "channel"', + lambda: out_channel.channel) From bb282dc5609f741310294cc28495551de1058091 Mon Sep 17 00:00:00 2001 From: scottxu Date: Thu, 4 Aug 2022 11:51:44 -0700 Subject: [PATCH 22/23] add error handling for accessing artifact by itself --- sdk/python/kfp/components/component_factory.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/sdk/python/kfp/components/component_factory.py b/sdk/python/kfp/components/component_factory.py index 403487508ce..f239f377921 100644 --- a/sdk/python/kfp/components/component_factory.py +++ b/sdk/python/kfp/components/component_factory.py @@ -506,5 +506,10 @@ def create_container_component_from_func( arg_list.append(placeholders.InputValuePlaceholder(io_name)) container_spec = func(*arg_list) + for arg in (container_spec.command or []) + (container_spec.args or []): + if isinstance(arg, ContainerComponentArtifactChannel): + raise TypeError( + 'Cannot access artifact by itself in the container definition. Please use .uri or .path instead to access the artifact.' + ) component_spec.implementation = structures.Implementation(container_spec) return container_component.ContainerComponent(component_spec, func) From fb26f9145cfbcec95fae7dd6f9e6539f0f812163 Mon Sep 17 00:00:00 2001 From: scottxu Date: Fri, 5 Aug 2022 11:32:55 -0700 Subject: [PATCH 23/23] add test for raising error for accessing artifact by itself --- .../kfp/components/component_factory_test.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/sdk/python/kfp/components/component_factory_test.py b/sdk/python/kfp/components/component_factory_test.py index 4764144aac7..18f528672f8 100644 --- a/sdk/python/kfp/components/component_factory_test.py +++ b/sdk/python/kfp/components/component_factory_test.py @@ -14,6 +14,7 @@ import unittest +from kfp import dsl from kfp.components import component_factory from kfp.components import placeholders @@ -64,3 +65,20 @@ def test_correct_placeholder_and_attribute_error(self): self.assertRaisesRegex(AttributeError, r'Cannot access artifact attribute "channel"', lambda: out_channel.channel) + + +class TestContainerComponentFactory(unittest.TestCase): + + def test_raise_error_if_access_artifact_by_itself(self): + + def comp_with_artifact_input(dataset: dsl.Input[dsl.Dataset]): + return dsl.ContainerSpec( + image='gcr.io/my-image', + command=['sh', 'run.sh'], + args=[dataset]) + + self.assertRaisesRegex( + TypeError, + r'Cannot access artifact by itself in the container definition.', + component_factory.create_container_component_from_func, + comp_with_artifact_input)