diff --git a/README.md b/README.md index ce300520d9..b4d9f8d742 100644 --- a/README.md +++ b/README.md @@ -17,7 +17,7 @@ has Steps. Please look for more details in [Tekton repo](https://github.com/tekt ## Kubeflow Pipeline with Tekton Backend We are currently using [Kubeflow Pipelines 1.0.4](https://github.com/kubeflow/pipelines/releases/tag/1.0.4) and -[Tekton >= 0.14.0](https://github.com/tektoncd/pipeline/releases/tag/v0.14.0) for this project. +[Tekton >= 0.16.0](https://github.com/tektoncd/pipeline/releases/tag/v0.16.0) for this project. ![kfp-tekton](images/kfp-tekton.png) diff --git a/go.mod b/go.mod index 7af359f138..cb9b4e305e 100644 --- a/go.mod +++ b/go.mod @@ -31,7 +31,7 @@ require ( github.com/sirupsen/logrus v1.6.0 github.com/spf13/viper v1.7.0 github.com/stretchr/testify v1.5.1 - github.com/tektoncd/pipeline v0.15.0 + github.com/tektoncd/pipeline v0.16.3 github.com/valyala/fasttemplate v1.1.0 // indirect golang.org/x/net v0.0.0-20200520182314-0ba52f642ac2 golang.org/x/sync v0.0.0-20201008141435-b3e1573b7520 // indirect diff --git a/go.sum b/go.sum index c6812b51ce..00867e7fec 100644 --- a/go.sum +++ b/go.sum @@ -1031,8 +1031,8 @@ github.com/syndtr/gocapability v0.0.0-20170704070218-db04d3cc01c8/go.mod h1:hkRG github.com/tdakkota/asciicheck v0.0.0-20200416190851-d7f85be797a2/go.mod h1:yHp0ai0Z9gUljN3o0xMhYJnH/IcvkdTBOX2fmJ93JEM= github.com/tdakkota/asciicheck v0.0.0-20200416200610-e657995f937b/go.mod h1:yHp0ai0Z9gUljN3o0xMhYJnH/IcvkdTBOX2fmJ93JEM= github.com/tektoncd/pipeline v0.11.0/go.mod h1:hlkH32S92+/UODROH0dmxzyuMxfRFp/Nc3e29MewLn8= -github.com/tektoncd/pipeline v0.15.0 h1:+hXd3eyqLConntpzg2nR7ysCjc9/qrJQ+k1t1xuZoNU= -github.com/tektoncd/pipeline v0.15.0/go.mod h1:SaAxUpYbcFik2hKGOTX7NnvOa1VXTzXRGNZsY2TSwTs= +github.com/tektoncd/pipeline v0.16.3 h1:h5bJGOxbzYpa+5OwWGdlWphCEh9nmxnfVH+nhwaOFCQ= +github.com/tektoncd/pipeline v0.16.3/go.mod h1:5vn2IJH46ntWTKLkwNbtZNd38FYkNP0cBtu5sgqm5xA= github.com/tektoncd/plumbing v0.0.0-20200217163359-cd0db6e567d2/go.mod h1:QZHgU07PRBTRF6N57w4+ApRu8OgfYLFNqCDlfEZaD9Y= github.com/tektoncd/plumbing v0.0.0-20200430135134-e53521e1d887/go.mod h1:cZPJIeTIoP7UPTxQyTQLs7VE1TiXJSNj0te+If4Q+jI= github.com/tektoncd/plumbing/pipelinerun-logs v0.0.0-20191206114338-712d544c2c21/go.mod h1:S62EUWtqmejjJgUMOGB1CCCHRp6C706laH06BoALkzU= diff --git a/guides/developer_guide.md b/guides/developer_guide.md index 78bd29ecdd..c5db54e3d4 100644 --- a/guides/developer_guide.md +++ b/guides/developer_guide.md @@ -28,8 +28,7 @@ If you would like to do it in development mode, or if you already have a Kubeflo ## Prerequisites 1. [Install Tekton](https://github.com/tektoncd/pipeline/blob/master/docs/install.md#installing-tekton-pipelines-on-kubernetes). - - Minimum version: `0.14.0` - - Recommended version: `0.15.0` + - Minimum version: `0.16.0` 2. Clone this repository ``` git clone github.com/kubeflow/kfp-tekton diff --git a/guides/kfp_tekton_install.md b/guides/kfp_tekton_install.md index 4d2a599f5a..eb89fb7308 100644 --- a/guides/kfp_tekton_install.md +++ b/guides/kfp_tekton_install.md @@ -35,7 +35,7 @@ A Kubernetes cluster `v1.16` that has least 8 vCPU and 16 GB memory. ## Standalone Kubeflow Pipelines with Tekton Backend Deployment To install the standalone Kubeflow Pipelines with Tekton, run the following steps: -1. Install [Tekton v0.14.3](https://github.com/tektoncd/pipeline/releases/tag/v0.14.3) +1. Install [Tekton v0.16.3](https://github.com/tektoncd/pipeline/releases/tag/v0.16.3) 2. Install Kubeflow Pipelines with Tekton backend (kfp-tekton) v0.4.0 release ```shell diff --git a/samples/README.md b/samples/README.md index 66f5a0c75f..58fcb74538 100644 --- a/samples/README.md +++ b/samples/README.md @@ -1,13 +1,13 @@ # KFP Tekton Samples -Below are the list of samples that are currently running end to end taking the compiled Tekton yaml and deploying on a Tekton cluster directly. +Below are the list of samples that are currently running end to end taking the compiled Tekton yaml and deploying on a Tekton cluster directly. If you are interested more in the larger list of pipelines samples we are testing for whether they can be 'compiled to Tekton' format, please [look at the corresponding status page](/sdk/python/tests/README.md) [KFP Tekton User Guide](/guides/kfp-user-guide) is a guideline for the possible ways to develop and consume Kubeflow Pipeline with Tekton. It's recommended to go over at least one of the methods in the user guide before heading into the KFP Tekton Samples. -## Prerequisites +## Prerequisites - Install [Kubeflow 1.0.2+](https://www.kubeflow.org/docs/started/getting-started/) and connect the cluster to the current shell with `kubectl` -- Install [Tekton 0.14.0](https://github.com/tektoncd/pipeline/releases/tag/v0.14.0) +- Install [Tekton 0.16.3](https://github.com/tektoncd/pipeline/releases/tag/v0.16.3) - For KFP, we shouldn't be modifying the default work directory for any component. Therefore, please run the below command to disable the [home and work directories overwrite](https://github.com/tektoncd/pipeline/blob/master/docs/install.md#customizing-the-pipelines-controller-behavior) from Tekton default. ```shell kubectl patch cm feature-flags -n tekton-pipelines -p '{"data":{"disable-home-env-overwrite":"true","disable-working-directory-overwrite":"true"}}' diff --git a/sdk/FEATURES.md b/sdk/FEATURES.md index 9fdbe2ca57..5243ad69a2 100644 --- a/sdk/FEATURES.md +++ b/sdk/FEATURES.md @@ -118,7 +118,7 @@ This feature has been available since Tekton version `0.13.0`. An _exit handler_ is a component that always executes, irrespective of success or failure, at the end of the pipeline. It is implemented using Tekton's -[finally](https://github.com/tektoncd/pipeline/blob/v0.14.0/docs/pipelines.md#adding-finally-to-the-pipeline) +[finally](https://github.com/tektoncd/pipeline/blob/v0.16.0/docs/pipelines.md#adding-finally-to-the-pipeline) section under the Pipeline `spec`. An example of how to use an _exit handler_ can be found in the [exit_handler](/sdk/python/tests/compiler/testdata/exit_handler.py) compiler test. @@ -141,12 +141,9 @@ is an example of how to use this feature. ### Conditions -Conditions are for determining whether to execute certain components based on the output of the condition checks. Since Tekton required users to define an image for doing the [condition check](https://github.com/tektoncd/pipeline/blob/master/docs/conditions.md), we created a custom python image to replicate the same condition checks from Argo and made it as the default in our compiler. The +Conditions are used for determining whether to execute certain components based on the output of the condition checks. In KFP Argo, each condition is represented as an Argo DAG template so it can be used as a dependency for other Argo templates. To replicate this in KFP Tekton, we put our condition into a dedicated Tekton task so that conditions can be treated as a dependency for other Tekton tasks. Another advantage of creating conditions using Tekton tasks is that we can have more flexible conditions such as comparing an integer and a float number, which currently is not available in Tekton. We are using the Tekton [when expression](https://github.com/tektoncd/pipeline/blob/master/docs/pipelines.md#guard-task-execution-using-whenexpressions) to check whether the condition task has succeeded or not. We created a custom python image to replicate the same condition checks that are in Argo and made it as the default in our compiler. The [flip-coin](/samples/flip-coin) example demonstrates how to use multiple conditions within the same pipeline. -Please be aware that the current Condition feature is using Tekton V1alpha1 API because the Tekton community is still designing the V1beta1 API. -We will be migrating to the V1beta1 API once it's available in Tekton. Please refer to the [design proposal](https://docs.google.com/document/d/1kESrgmFHnirKNS4oDq3mucuB_OycBm6dSCSwRUHccZg/edit?usp=sharing) for more details. - ### ResourceOp, VolumeOp, and VolumeSnapshotOp [ResourceOp, VolumeOp, and VolumeSnapshotOp](https://www.kubeflow.org/docs/pipelines/sdk/manipulate-resources/) are special operations for @@ -205,6 +202,10 @@ However, when using dynamic parameters, the number of parallel tasks is determin argo -> tekton {{inputs.parameters.%s}} -> $(inputs.params.%s) {{outputs.parameters.%s}} -> $(results.%s.path) +{{workflow.uid}} -> $(context.pipelineRun.uid) +{{workflow.name}} -> $(context.pipelineRun.name) +{{workflow.namespace}} -> $(context.pipelineRun.namespace) +{{workflow.parameters.%s}} -> $(params.%s) ``` [parallel_join_with_argo_vars](/sdk/python/tests/compiler/testdata/parallel_join_with_argo_vars.py) is an example of how Argo variables are diff --git a/sdk/README.md b/sdk/README.md index be90126cc2..f15a505352 100644 --- a/sdk/README.md +++ b/sdk/README.md @@ -36,7 +36,7 @@ adding the `TektonCompiler` and the `TektonClient`: * `kfp_tekton.compiler.TektonCompiler.compile` compiles your Python DSL code into a single static configuration (in YAML format) that the Kubeflow Pipelines service - can process. The Kubeflow Pipelines service converts the static + can process. The Kubeflow Pipelines service converts the static configuration into a set of Kubernetes resources for execution. * `kfp_tekton.TektonClient` contains the Python client libraries for the [Kubeflow Pipelines API](https://www.kubeflow.org/docs/pipelines/reference/api/kubeflow-pipeline-api-spec/). @@ -49,14 +49,14 @@ adding the `TektonCompiler` and the `TektonClient`: * `kfp_tekton.TektonClient.run_pipeline` runs a pipeline and returns a run object. * `kfp_tekton.TektonClient.create_run_from_pipeline_func` compiles a pipeline function and submits it for execution on Kubeflow Pipelines. - * `kfp_tekton.TektonClient.create_run_from_pipeline_package` runs a local + * `kfp_tekton.TektonClient.create_run_from_pipeline_package` runs a local pipeline package on Kubeflow Pipelines. ## Project Prerequisites - Python: `3.5.3` or later - - Tekton: [`v0.14.0`](https://github.com/tektoncd/pipeline/releases/tag/v0.14.0) or [later](https://github.com/tektoncd/pipeline/releases/latest) + - Tekton: [`v0.16.3`](https://github.com/tektoncd/pipeline/releases/tag/v0.16.3) or [later](https://github.com/tektoncd/pipeline/releases/latest) - Tekton CLI: [`0.11.0`](https://github.com/tektoncd/cli/releases/tag/v0.11.0) - Kubeflow Pipelines: [KFP with Tekton backend](/guides/kfp_tekton_install.md) @@ -72,9 +72,9 @@ virtual environment first: python3 -m venv .venv source .venv/bin/activate - + pip install kfp-tekton - + Alternatively you can install the latest version of the `kfp-tekton` compiler from source by cloning the repository [https://github.com/kubeflow/kfp-tekton](https://github.com/kubeflow/kfp-tekton): @@ -159,19 +159,19 @@ You can run the pipeline directly using a pre-compiled file and KFP-Tekton SDK. ```python experiment = kfp_tekton.TektonClient.create_experiment(name=EXPERIMENT_NAME, namespace=KUBEFLOW_PROFILE_NAME) run = client.run_pipeline(experiment.id, 'parallal-join-pipeline', 'pipeline.yaml') -``` +``` You can also deploy directly on Tekton cluster with `kubectl`. The Tekton server will automatically start a pipeline run. We can then follow the logs using the `tkn` CLI. kubectl apply -f pipeline.yaml - + tkn pipelinerun logs --last --follow Once the Tekton Pipeline is running, the logs should start streaming: - + Waiting for logs to be available... - + [gcs-download : main] With which he yoketh your rebellious necks Razeth your cities and subverts your towns And in a moment makes them desolate [gcs-download-2 : main] I find thou art no less than fame hath bruited And more than may be gatherd by thy shape Let my presumption not provoke thy wrath @@ -204,7 +204,7 @@ your code changes are improving the number of successfully compiled KFP pipeline - When you encounter ServiceAccount related permission issues, refer to the ["Service Account and RBAC" doc](sa-and-rbac.md) - -- If you run into the error `bad interpreter: No such file or director` when trying + +- If you run into the error `bad interpreter: No such file or director` when trying to use Python's venv, remove the current virtual environment in the `.venv` directory and create a new one using `virtualenv .venv` diff --git a/sdk/python/README.md b/sdk/python/README.md index ed16e8eda7..6bdfcddabc 100644 --- a/sdk/python/README.md +++ b/sdk/python/README.md @@ -1,6 +1,6 @@ # KFP-Tekton Developer Guide -This document describes the development guidelines for contributing to the KFP-Tekton project. +This document describes the development guidelines for contributing to the KFP-Tekton project. Details about the required contributor license agreement (CLA) and the code review process can be found in the [CONTRIBUTING.md](/CONTRIBUTING.md) document. A quick-start guide with general setup instruction, trouble shooting guide and technical limitations @@ -36,7 +36,7 @@ can be found in the [SDK README](/sdk/README.md) 1. [`Python`](https://www.python.org/downloads/): version `3.5.3` or later (new code must maintain compatibility with `3.5`) 2. [`Kubernetes` Cluster](https://v1-15.docs.kubernetes.io/docs/setup/): version `1.15` ([required by Kubeflow](https://www.kubeflow.org/docs/started/k8s/overview/) and Tekton 0.11) 3. [`kubectl` CLI](https://kubernetes.io/docs/tasks/tools/install-kubectl/): required to deploy Tekton pipelines to Kubernetes cluster -4. [`Tekton` Deployment](https://github.com/tektoncd/pipeline/releases/tag/v0.15.0/): version `0.14.0` or greater (minimum version `0.13.0` to support Tekton API version `v1beta1`), required for end-to-end testing +4. [`Tekton` Deployment](https://github.com/tektoncd/pipeline/releases/tag/v0.15.0/): version `0.16.3` or greater, required for end-to-end testing 5. [`tkn` CLI](https://github.com/tektoncd/cli#installing-tkn): version `0.11.0` or greater, required for end-to-end testing of Tekton pipelines 6. [`Kubeflow Pipelines` Deployment](https://www.kubeflow.org/docs/pipelines/installation/overview/): required for some end-to-end tests @@ -74,7 +74,7 @@ branch. Currently there are no features that require a special build. #### Tekton CLI -Follow the instructions [here](https://github.com/tektoncd/cli#installing-tkn). +Follow the instructions [here](https://github.com/tektoncd/cli#installing-tkn). Mac OS users can install the Tekton CLI using the `homebrew` formula: @@ -91,7 +91,7 @@ The Tekton Dashboard can be accessed through its `ClusterIP` service by running be patched to expose a public `NodePort` IP: kubectl patch svc tekton-dashboard -n tekton-pipelines --type='json' -p '[{"op":"replace","path":"/spec/type","value":"NodePort"}]' - + To open the dashboard run: TKN_DASHBOARD_SVC_PORT=$(kubectl -n tekton-pipelines get service tekton-dashboard -o jsonpath='{.spec.ports[0].nodePort}') @@ -107,8 +107,8 @@ like the DSL and components packages, but "override" or "replace" those parts of the Tekton YAML instead of Argo YAML. Since the KFP SDK was not designed and implemented to easily be extended, _monkey-patching_ was used to replace non-class methods and functions at runtime. -In order for the _monkey patch_ to work properly, the `kfp-tekton` compiler source code has to be aligned with a -specific version of the `kfp` SDK compiler. As of now that version is [`1.0.4`](https://github.com/kubeflow/pipelines/releases/tag/1.0.4). +In order for the _monkey patch_ to work properly, the `kfp-tekton` compiler source code has to be aligned with a +specific version of the `kfp` SDK compiler. As of now that version is [`1.0.4`](https://github.com/kubeflow/pipelines/releases/tag/1.0.4). ## Adding New Code @@ -118,12 +118,12 @@ The Python package structure as well as the module names and method signatures c This helps keeping track of all the code that had to modified and will make merging (some of) the code back into KFP or identify pieces of code that need to be refactored in KFP in order to accommodate various execution platforms. When it is necessary to bring further methods from `kfp` compiler package into the `kfp-tekton` compiler package, keep -the original method names and signatures as well as their position inside their respective Python modules. +the original method names and signatures as well as their position inside their respective Python modules. Be sure to run `make verify` before committing your code changes and creating a pull request: $ make verify - + check_license: OK lint: OK unit_test: OK @@ -138,7 +138,7 @@ Most of the functions provided by the `kfp.compiler.compiler.Compiler` are insta Static `Compiler` methods may need to be also be added to the _monkey patch_ described in the next section unless they are only used by other methods that are already overridden in `TektonCompiler`. -Be careful not to mix inheritance and monkey patching. A method which in its body calls on its +Be careful not to mix inheritance and monkey patching. A method which in its body calls on its `super().` implementation must not be added to the list of methods that get dynamically replaced via the _monkey patch_. @@ -149,8 +149,8 @@ should be added to their respective modules in `kfp_tekton.compiler` and added t which dynamically replaces the code in the `kfp` at runtime. As of May 2020, the _monkey patch_ was no longer needed and removed since all of the _patched_ methods -are now invoked directly (and exclusively) by other code implemented in the `kfp_tekton` compiler. -Details on how to implement a _monkey patch_ can be found in the +are now invoked directly (and exclusively) by other code implemented in the `kfp_tekton` compiler. +Details on how to implement a _monkey patch_ can be found in the [Removed Features](#monkey-patch-to-dynamically-replace-static-kfp-compiler-methods) section if it becomes necessary to reintroduce the _monkey patch_ for any methods we need to "override" which are not exclusively called directly by other methods we already implemented in the `kfp_tekton` @@ -190,7 +190,7 @@ run on a Tekton cluster. Any new functionality being added to the `kfp_tekton.compiler` should be accompanied by a new unit test in `sdk/python/tests/compiler/compiler_tests.py` Typically a test case comes with a minimal Python DSL script and a "golden" YAML file in `sdk/python/tests/compiler/testdata`. The "golden" YAML file contains the expected compiler output. The unit tests use the "golden" YAML files to compare -the current compiler output with the previously expected compiler output. +the current compiler output with the previously expected compiler output. make unit_test @@ -207,7 +207,7 @@ environment variable: The unit tests are designed to verify the YAML produced by the compiler matches the expected, previously generated "golden" YAML. End-to-end (E2E) tests are necessary to verify that the generated Tekton YAML is syntactically valid and -that the pipeline can be executed successfully on a Tekton cluster. +that the pipeline can be executed successfully on a Tekton cluster. A manual E2E test can be performed in the following manner: @@ -222,7 +222,7 @@ access secrets: tkn pipeline start --showlog -n kubeflow You can also run the dynamically generated end-to-end test suite which takes all of the "golden" YAML files from the -compiler `testdata` directory and runs them on a Kubernetes cluster, prerequisite that the environment variable +compiler `testdata` directory and runs them on a Kubernetes cluster, prerequisite that the environment variable `KUBECONFIG` is set and the K8s cluster has both Kubeflow Pipelines as well as Tekton Pipelines installed: make e2e_test @@ -319,7 +319,7 @@ except Exception as error: ``` **Note**: Since the _monkey patch_ gets triggered by importing any member of the `kfp_tekton.compiler` module, we try to -avoid using top-level imports of any members in `kfp_tekton.compiler` in pipeline DSL scripts. +avoid using top-level imports of any members in `kfp_tekton.compiler` in pipeline DSL scripts. Instead use local imports to avoid triggering the _monkey-patch_ when the original KFP compiler is used to compile a pipeline DSL script using KFP's `dsl-compile --py ` command. diff --git a/sdk/python/kfp_tekton/compiler/_data_passing_rewriter.py b/sdk/python/kfp_tekton/compiler/_data_passing_rewriter.py index c9a5362603..3b03324623 100644 --- a/sdk/python/kfp_tekton/compiler/_data_passing_rewriter.py +++ b/sdk/python/kfp_tekton/compiler/_data_passing_rewriter.py @@ -340,15 +340,16 @@ def mark_upstream_ios_of_output(template_output, marked_inputs, # Remove pipeline task parameters unless they're used downstream for task in pipeline_tasks: - task['params'] = [ - parameter_argument - for parameter_argument in task.get('params', []) - if (task['name'], parameter_argument['name'] - ) in inputs_consumed_as_parameters and - (task['name'], - parameter_argument['name']) not in inputs_consumed_as_artifacts - or task['name'] in resource_template_names - ] + if 'condition-' not in task['name']: + task['params'] = [ + parameter_argument + for parameter_argument in task.get('params', []) + if (task['name'], parameter_argument['name'] + ) in inputs_consumed_as_parameters and + (task['name'], + parameter_argument['name']) not in inputs_consumed_as_artifacts + or task['name'] in resource_template_names + ] # tekton results doesn't support underscore for argument in task['params']: diff --git a/sdk/python/kfp_tekton/compiler/compiler.py b/sdk/python/kfp_tekton/compiler/compiler.py index cc9bbb005b..40276da32f 100644 --- a/sdk/python/kfp_tekton/compiler/compiler.py +++ b/sdk/python/kfp_tekton/compiler/compiler.py @@ -59,26 +59,26 @@ def _get_super_condition_template(): input2=int(input2) except: input1=str(input1) - sys.exit(0) if (input1 $(params.operator) input2) else sys.exit(1)' ''') + status="true" if (input1 $(inputs.params.operator) input2) else "false" + f = open("/tekton/results/status", "w") + f.write(status) + f.close()' ''') - # TODO Change to tekton_api_version once Conditions are out of v1alpha1 template = { - 'apiVersion': 'tekton.dev/v1alpha1', - 'kind': 'Condition', - 'metadata': { - 'name': 'super-condition' - }, - 'spec': { - 'params': [ - {'name': 'operand1'}, - {'name': 'operand2'}, - {'name': 'operator'} - ], - 'check': { - 'script': 'python -c ' + python_script + "'$(params.operand1)' '$(params.operand2)'", - 'image': 'python:alpine3.6', - } - } + 'results': [ + {'name': 'status', + 'description': 'Conditional task status' + } + ], + 'params': [ + {'name': 'operand1'}, + {'name': 'operand2'}, + {'name': 'operator'} + ], + 'steps': [{ + 'script': 'python -c ' + python_script + "'$(inputs.params.operand1)' '$(inputs.params.operand2)'", + 'image': 'python:alpine3.6', + }] } return template @@ -193,7 +193,6 @@ def _group_to_dag_template(self, group, inputs, outputs, dependencies): # Generate GroupOp template sub_group = group template = { - 'apiVersion': tekton_api_version, 'metadata': { 'name': sanitize_k8s_name(sub_group.name), }, @@ -342,25 +341,30 @@ def _create_pipeline_workflow(self, args, pipeline, op_transformers=None, pipeli # TODO task_refs = [] templates = [] - condition_added = False + condition_task_refs = {} for template in raw_templates: # TODO Allow an opt-out for the condition_template if template['kind'] == 'Condition': - if not condition_added: - templates.append(_get_super_condition_template()) - condition_added = True - condition_refs[template['metadata']['name']] = [{ - 'conditionRef': 'super-condition', - 'params': [{ - 'name': param['name'], - 'value': param['value'] - } for param in template['spec'].get('params', []) - ] + condition_task_ref = [{ + 'name': template['metadata']['name'], + 'params': [{ + 'name': p['name'], + 'value': p.get('value', '') + } for p in template['spec'].get('params', []) + ], + 'taskSpec': _get_super_condition_template(), }] + condition_refs[template['metadata']['name']] = [ + { + 'input': '$(tasks.%s.results.status)' % template['metadata']['name'], + 'operator': 'in', + 'values': ['true'] + } + ] + condition_task_refs[template['metadata']['name']] = condition_task_ref else: templates.append(template) - task_refs.append( - { + task_ref = { 'name': template['metadata']['name'], 'params': [{ 'name': p['name'], @@ -369,7 +373,14 @@ def _create_pipeline_workflow(self, args, pipeline, op_transformers=None, pipeli ], 'taskSpec': template['spec'], } - ) + + if template['metadata'].get('labels', None): + task_ref['taskSpec']['metadata'] = task_ref['taskSpec'].get('metadata', {}) + task_ref['taskSpec']['metadata']['labels'] = template['metadata']['labels'] + if template['metadata'].get('annotations', None): + task_ref['taskSpec']['metadata'] = task_ref['taskSpec'].get('metadata', {}) + task_ref['taskSpec']['metadata']['annotations'] = template['metadata']['annotations'] + task_refs.append(task_ref) # process input parameters from upstream tasks for conditions and pair conditions with their ancestor conditions opsgroup_stack = [pipeline.groups[0]] @@ -379,7 +390,7 @@ def _create_pipeline_workflow(self, args, pipeline, op_transformers=None, pipeli most_recent_condition = condition_stack.pop() if cur_opsgroup.type == 'condition': - condition_ref = condition_refs[cur_opsgroup.name][0] + condition_task_ref = condition_task_refs[cur_opsgroup.name][0] condition = cur_opsgroup.condition input_params = [] @@ -397,16 +408,10 @@ def _create_pipeline_workflow(self, args, pipeline, op_transformers=None, pipeli operand_value = '$(params.' + condition.operand2.name + ')' input_params.append(operand_value) for param_iter in range(len(input_params)): - condition_ref['params'][param_iter]['value'] = input_params[param_iter] - - # Add ancestor conditions to the current condition ref - if most_recent_condition: - condition_refs[cur_opsgroup.name].extend(condition_refs[most_recent_condition]) - most_recent_condition = cur_opsgroup.name + condition_task_ref['params'][param_iter]['value'] = input_params[param_iter] opsgroup_stack.extend(cur_opsgroup.groups) condition_stack.extend([most_recent_condition for x in range(len(cur_opsgroup.groups))]) - # add task dependencies and add condition refs to the task ref that depends on the condition op_name_to_parent_groups = self._get_groups_for_ops(pipeline.groups[0]) for task in task_refs: @@ -414,16 +419,8 @@ def _create_pipeline_workflow(self, args, pipeline, op_transformers=None, pipeli parent_group = op_name_to_parent_groups.get(task['name'], []) if parent_group: if condition_refs.get(parent_group[-2], []): - task['conditions'] = condition_refs.get(op_name_to_parent_groups[task['name']][-2], []) + task['when'] = condition_refs.get(op_name_to_parent_groups[task['name']][-2], []) if op.dependent_names: - for dependent_name in op.dependent_names: - if condition_refs.get(dependent_name, []): - # Prompt an error here because Tekton condition cannot be a dependency. - raise TypeError(textwrap.dedent("""\ - '%s' cannot run after the Tekton condition '%s'. - A Tekton task is only allowed to run 'after' another Tekton task (ContainerOp). - Tekton doc: https://github.com/tektoncd/pipeline/blob/master/docs/pipelines.md#using-the-runafter-parameter - """ % (task['name'], dependent_name))) task['runAfter'] = op.dependent_names # process input parameters from upstream tasks @@ -484,6 +481,12 @@ def _create_pipeline_workflow(self, args, pipeline, op_transformers=None, pipeli include_loop_task_refs.extend(with_loop_task) task_refs = include_loop_task_refs + # Flatten condition task + condition_task_refs_temp = [] + for condition_task_ref in condition_task_refs.values(): + for ref in condition_task_ref: + condition_task_refs_temp.append(ref) + condition_task_refs = condition_task_refs_temp # TODO: generate the PipelineRun template pipeline_run = { 'apiVersion': tekton_api_version, @@ -508,7 +511,7 @@ def _create_pipeline_workflow(self, args, pipeline, op_transformers=None, pipeli } for p in params], 'pipelineSpec': { 'params': params, - 'tasks': task_refs, + 'tasks': task_refs + condition_task_refs, 'finally': finally_tasks } } @@ -742,6 +745,22 @@ def _write_workflow(workflow: Dict[Text, Any], { 'argo_rule': '{{outputs.parameters.([^ \t\n.:,;{}]+).path}}', 'tekton_rule': '$(results.\g<1>.path)' + }, + { + 'argo_rule': '{{workflow.uid}}', + 'tekton_rule': '$(context.pipelineRun.uid)' + }, + { + 'argo_rule': '{{workflow.name}}', + 'tekton_rule': '$(context.pipelineRun.name)' + }, + { + 'argo_rule': '{{workflow.namespace}}', + 'tekton_rule': '$(context.pipelineRun.namespace)' + }, + { + 'argo_rule': '{{workflow.parameters.([^ \t\n.:,;{}]+)}}', + 'tekton_rule': '$(params.\g<1>)' } ] for regex_rule in tekton_var_regex_rules: diff --git a/sdk/python/tests/compiler/compiler_tests.py b/sdk/python/tests/compiler/compiler_tests.py index 5df1ab4adf..792284463a 100644 --- a/sdk/python/tests/compiler/compiler_tests.py +++ b/sdk/python/tests/compiler/compiler_tests.py @@ -77,13 +77,12 @@ def test_condition_workflow(self): from .testdata.condition import flipcoin self._test_pipeline_workflow(flipcoin, 'condition.yaml') - def test_condition_dependency_error(self): + def test_condition_dependency(self): """ - Test errors for dependency on Tekton conditional. + Test dependency on Tekton conditional task. """ - from .testdata.condition_error import flipcoin - with pytest.raises(TypeError): - self._test_pipeline_workflow(flipcoin, 'condition.yaml') + from .testdata.condition_dependency import flipcoin + self._test_pipeline_workflow(flipcoin, 'condition_dependency.yaml') def test_sequential_workflow(self): """ @@ -112,7 +111,7 @@ def test_sidecar_workflow(self): """ from .testdata.sidecar import sidecar_pipeline self._test_pipeline_workflow(sidecar_pipeline, 'sidecar.yaml') - + def test_loop_static_workflow(self): """ Test compiling a loop static params in workflow. @@ -268,7 +267,7 @@ def test_load_from_yaml_workflow(self): """ from .testdata.load_from_yaml import component_yaml_pipeline self._test_pipeline_workflow(component_yaml_pipeline, 'load_from_yaml.yaml') - + def test_imagepullsecrets_workflow(self): """ Test compiling a imagepullsecrets workflow. @@ -302,7 +301,7 @@ def test_compose(self): """ from .testdata import compose self._test_nested_workflow('compose.yaml', [compose.save_most_frequent_word, compose.download_save_most_frequent_word]) - + def _test_pipeline_workflow(self, pipeline_function, pipeline_yaml, diff --git a/sdk/python/tests/compiler/testdata/big_data_passing.yaml b/sdk/python/tests/compiler/testdata/big_data_passing.yaml index e13e66b0f9..816a010963 100644 --- a/sdk/python/tests/compiler/testdata/big_data_passing.yaml +++ b/sdk/python/tests/compiler/testdata/big_data_passing.yaml @@ -52,6 +52,27 @@ spec: tasks: - name: repeat-line taskSpec: + metadata: + annotations: + pipelines.kubeflow.org/component_spec: '{"description": "Repeat the line + specified number of times", "implementation": {"container": {"args": + ["--line", {"inputValue": "line"}, {"if": {"cond": {"isPresent": "count"}, + "then": ["--count", {"inputValue": "count"}]}}, "--output-text", {"outputPath": + "output_text"}], "command": ["python3", "-u", "-c", "def _make_parent_dirs_and_return_path(file_path: + str):\n import os\n os.makedirs(os.path.dirname(file_path), exist_ok=True)\n return + file_path\n\ndef repeat_line(line, output_text_path, count = 10):\n ''''''Repeat + the line specified number of times''''''\n with open(output_text_path, + ''w'') as writer:\n for i in range(count):\n writer.write(line + + ''\\n'')\n\nimport argparse\n_parser = argparse.ArgumentParser(prog=''Repeat + line'', description=''Repeat the line specified number of times'')\n_parser.add_argument(\"--line\", + dest=\"line\", type=str, required=True, default=argparse.SUPPRESS)\n_parser.add_argument(\"--count\", + dest=\"count\", type=int, required=False, default=argparse.SUPPRESS)\n_parser.add_argument(\"--output-text\", + dest=\"output_text_path\", type=_make_parent_dirs_and_return_path, required=True, + default=argparse.SUPPRESS)\n_parsed_args = vars(_parser.parse_args())\n\n_outputs + = repeat_line(**_parsed_args)\n"], "image": "python:3.7"}}, "inputs": + [{"name": "line", "type": "String"}, {"default": "10", "name": "count", + "optional": true, "type": "Integer"}], "name": "Repeat line", "outputs": + [{"name": "output_text", "type": "String"}]}' steps: - args: - --line @@ -89,6 +110,19 @@ spec: runAfter: - repeat-line taskSpec: + metadata: + annotations: + pipelines.kubeflow.org/component_spec: '{"description": "Print text", + "implementation": {"container": {"args": ["--text", {"inputPath": "text"}], + "command": ["python3", "-u", "-c", "def print_text(\n text_path\n): # + The \"text\" input is untyped so that any data can be printed\n ''''''Print + text''''''\n with open(text_path, ''r'') as reader:\n for + line in reader:\n print(line, end='''')\n\nimport argparse\n_parser + = argparse.ArgumentParser(prog=''Print text'', description=''Print text'')\n_parser.add_argument(\"--text\", + dest=\"text_path\", type=str, required=True, default=argparse.SUPPRESS)\n_parsed_args + = vars(_parser.parse_args())\n\n_outputs = print_text(**_parsed_args)\n"], + "image": "python:3.7"}}, "inputs": [{"name": "text"}], "name": "Print + text"}' steps: - args: - --text @@ -114,6 +148,29 @@ spec: workspace: file-passing-pipelines - name: split-text-lines taskSpec: + metadata: + annotations: + pipelines.kubeflow.org/component_spec: '{"implementation": {"container": + {"args": ["--source", {"inputPath": "source"}, "--odd-lines", {"outputPath": + "odd_lines"}, "--even-lines", {"outputPath": "even_lines"}], "command": + ["python3", "-u", "-c", "def _make_parent_dirs_and_return_path(file_path: + str):\n import os\n os.makedirs(os.path.dirname(file_path), exist_ok=True)\n return + file_path\n\ndef split_text_lines(source_path,\n odd_lines_path,\n even_lines_path):\n with + open(source_path, ''r'') as reader:\n with open(odd_lines_path, + ''w'') as odd_writer:\n with open(even_lines_path, ''w'') + as even_writer:\n while True:\n line + = reader.readline()\n if line == \"\":\n break\n odd_writer.write(line)\n line + = reader.readline()\n if line == \"\":\n break\n even_writer.write(line)\n\nimport + argparse\n_parser = argparse.ArgumentParser(prog=''Split text lines'', + description='''')\n_parser.add_argument(\"--source\", dest=\"source_path\", + type=str, required=True, default=argparse.SUPPRESS)\n_parser.add_argument(\"--odd-lines\", + dest=\"odd_lines_path\", type=_make_parent_dirs_and_return_path, required=True, + default=argparse.SUPPRESS)\n_parser.add_argument(\"--even-lines\", dest=\"even_lines_path\", + type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS)\n_parsed_args + = vars(_parser.parse_args())\n\n_outputs = split_text_lines(**_parsed_args)\n"], + "image": "python:3.7"}}, "inputs": [{"name": "source", "type": "String"}], + "name": "Split text lines", "outputs": [{"name": "odd_lines", "type": + "String"}, {"name": "even_lines", "type": "String"}]}' stepTemplate: volumeMounts: - mountPath: /tmp/inputs/source @@ -191,6 +248,19 @@ spec: runAfter: - split-text-lines taskSpec: + metadata: + annotations: + pipelines.kubeflow.org/component_spec: '{"description": "Print text", + "implementation": {"container": {"args": ["--text", {"inputPath": "text"}], + "command": ["python3", "-u", "-c", "def print_text(\n text_path\n): # + The \"text\" input is untyped so that any data can be printed\n ''''''Print + text''''''\n with open(text_path, ''r'') as reader:\n for + line in reader:\n print(line, end='''')\n\nimport argparse\n_parser + = argparse.ArgumentParser(prog=''Print text'', description=''Print text'')\n_parser.add_argument(\"--text\", + dest=\"text_path\", type=str, required=True, default=argparse.SUPPRESS)\n_parsed_args + = vars(_parser.parse_args())\n\n_outputs = print_text(**_parsed_args)\n"], + "image": "python:3.7"}}, "inputs": [{"name": "text"}], "name": "Print + text"}' steps: - args: - --text @@ -218,6 +288,19 @@ spec: runAfter: - split-text-lines taskSpec: + metadata: + annotations: + pipelines.kubeflow.org/component_spec: '{"description": "Print text", + "implementation": {"container": {"args": ["--text", {"inputPath": "text"}], + "command": ["python3", "-u", "-c", "def print_text(\n text_path\n): # + The \"text\" input is untyped so that any data can be printed\n ''''''Print + text''''''\n with open(text_path, ''r'') as reader:\n for + line in reader:\n print(line, end='''')\n\nimport argparse\n_parser + = argparse.ArgumentParser(prog=''Print text'', description=''Print text'')\n_parser.add_argument(\"--text\", + dest=\"text_path\", type=str, required=True, default=argparse.SUPPRESS)\n_parsed_args + = vars(_parser.parse_args())\n\n_outputs = print_text(**_parsed_args)\n"], + "image": "python:3.7"}}, "inputs": [{"name": "text"}], "name": "Print + text"}' steps: - args: - --text @@ -243,6 +326,27 @@ spec: workspace: file-passing-pipelines - name: write-numbers taskSpec: + metadata: + annotations: + pipelines.kubeflow.org/component_spec: '{"implementation": {"container": + {"args": [{"if": {"cond": {"isPresent": "start"}, "then": ["--start", + {"inputValue": "start"}]}}, {"if": {"cond": {"isPresent": "count"}, + "then": ["--count", {"inputValue": "count"}]}}, "--numbers", {"outputPath": + "numbers"}], "command": ["python3", "-u", "-c", "def _make_parent_dirs_and_return_path(file_path: + str):\n import os\n os.makedirs(os.path.dirname(file_path), exist_ok=True)\n return + file_path\n\ndef write_numbers(\n numbers_path, start = 0, count + = 10):\n with open(numbers_path, ''w'') as writer:\n for i + in range(start, count):\n writer.write(str(i) + ''\\n'')\n\nimport + argparse\n_parser = argparse.ArgumentParser(prog=''Write numbers'', + description='''')\n_parser.add_argument(\"--start\", dest=\"start\", + type=int, required=False, default=argparse.SUPPRESS)\n_parser.add_argument(\"--count\", + dest=\"count\", type=int, required=False, default=argparse.SUPPRESS)\n_parser.add_argument(\"--numbers\", + dest=\"numbers_path\", type=_make_parent_dirs_and_return_path, required=True, + default=argparse.SUPPRESS)\n_parsed_args = vars(_parser.parse_args())\n\n_outputs + = write_numbers(**_parsed_args)\n"], "image": "python:3.7"}}, "inputs": + [{"default": "0", "name": "start", "optional": true, "type": "Integer"}, + {"default": "10", "name": "count", "optional": true, "type": "Integer"}], + "name": "Write numbers", "outputs": [{"name": "numbers", "type": "String"}]}' steps: - args: - --count @@ -277,6 +381,19 @@ spec: runAfter: - write-numbers taskSpec: + metadata: + annotations: + pipelines.kubeflow.org/component_spec: '{"description": "Print text", + "implementation": {"container": {"args": ["--text", {"inputPath": "text"}], + "command": ["python3", "-u", "-c", "def print_text(\n text_path\n): # + The \"text\" input is untyped so that any data can be printed\n ''''''Print + text''''''\n with open(text_path, ''r'') as reader:\n for + line in reader:\n print(line, end='''')\n\nimport argparse\n_parser + = argparse.ArgumentParser(prog=''Print text'', description=''Print text'')\n_parser.add_argument(\"--text\", + dest=\"text_path\", type=str, required=True, default=argparse.SUPPRESS)\n_parsed_args + = vars(_parser.parse_args())\n\n_outputs = print_text(**_parsed_args)\n"], + "image": "python:3.7"}}, "inputs": [{"name": "text"}], "name": "Print + text"}' steps: - args: - --text @@ -304,6 +421,26 @@ spec: runAfter: - write-numbers taskSpec: + metadata: + annotations: + pipelines.kubeflow.org/component_spec: '{"implementation": {"container": + {"args": ["--numbers", {"inputPath": "numbers"}, "----output-paths", + {"outputPath": "Output"}], "command": ["python3", "-u", "-c", "def sum_numbers(numbers_path):\n sum + = 0\n with open(numbers_path, ''r'') as reader:\n for line + in reader:\n sum = sum + int(line)\n return sum\n\ndef + _serialize_int(int_value: int) -> str:\n if isinstance(int_value, + str):\n return int_value\n if not isinstance(int_value, int):\n raise + TypeError(''Value \"{}\" has type \"{}\" instead of int.''.format(str(int_value), + str(type(int_value))))\n return str(int_value)\n\nimport argparse\n_parser + = argparse.ArgumentParser(prog=''Sum numbers'', description='''')\n_parser.add_argument(\"--numbers\", + dest=\"numbers_path\", type=str, required=True, default=argparse.SUPPRESS)\n_parser.add_argument(\"----output-paths\", + dest=\"_output_paths\", type=str, nargs=1)\n_parsed_args = vars(_parser.parse_args())\n_output_files + = _parsed_args.pop(\"_output_paths\", [])\n\n_outputs = sum_numbers(**_parsed_args)\n\n_outputs + = [_outputs]\n\n_output_serializers = [\n _serialize_int,\n\n]\n\nimport + os\nfor idx, output_file in enumerate(_output_files):\n try:\n os.makedirs(os.path.dirname(output_file))\n except + OSError:\n pass\n with open(output_file, ''w'') as f:\n f.write(_output_serializers[idx](_outputs[idx]))\n"], + "image": "python:3.7"}}, "inputs": [{"name": "numbers", "type": "String"}], + "name": "Sum numbers", "outputs": [{"name": "Output", "type": "Integer"}]}' steps: - args: - --numbers @@ -343,6 +480,19 @@ spec: runAfter: - sum-numbers taskSpec: + metadata: + annotations: + pipelines.kubeflow.org/component_spec: '{"description": "Print text", + "implementation": {"container": {"args": ["--text", {"inputPath": "text"}], + "command": ["python3", "-u", "-c", "def print_text(\n text_path\n): # + The \"text\" input is untyped so that any data can be printed\n ''''''Print + text''''''\n with open(text_path, ''r'') as reader:\n for + line in reader:\n print(line, end='''')\n\nimport argparse\n_parser + = argparse.ArgumentParser(prog=''Print text'', description=''Print text'')\n_parser.add_argument(\"--text\", + dest=\"text_path\", type=str, required=True, default=argparse.SUPPRESS)\n_parsed_args + = vars(_parser.parse_args())\n\n_outputs = print_text(**_parsed_args)\n"], + "image": "python:3.7"}}, "inputs": [{"name": "text"}], "name": "Print + text"}' steps: - args: - --text @@ -368,6 +518,24 @@ spec: workspace: file-passing-pipelines - name: gen-params taskSpec: + metadata: + annotations: + pipelines.kubeflow.org/component_spec: '{"implementation": {"container": + {"args": ["----output-paths", {"outputPath": "Output"}], "command": + ["python3", "-u", "-c", "def gen_params():\n import random\n num + = random.randint(0, 9)\n return num\n\ndef _serialize_int(int_value: + int) -> str:\n if isinstance(int_value, str):\n return int_value\n if + not isinstance(int_value, int):\n raise TypeError(''Value \"{}\" + has type \"{}\" instead of int.''.format(str(int_value), str(type(int_value))))\n return + str(int_value)\n\nimport argparse\n_parser = argparse.ArgumentParser(prog=''Gen + params'', description='''')\n_parser.add_argument(\"----output-paths\", + dest=\"_output_paths\", type=str, nargs=1)\n_parsed_args = vars(_parser.parse_args())\n_output_files + = _parsed_args.pop(\"_output_paths\", [])\n\n_outputs = gen_params(**_parsed_args)\n\n_outputs + = [_outputs]\n\n_output_serializers = [\n _serialize_int,\n\n]\n\nimport + os\nfor idx, output_file in enumerate(_output_files):\n try:\n os.makedirs(os.path.dirname(output_file))\n except + OSError:\n pass\n with open(output_file, ''w'') as f:\n f.write(_output_serializers[idx](_outputs[idx]))\n"], + "image": "python:3.7"}}, "name": "Gen params", "outputs": [{"name": + "Output", "type": "Integer"}]}' results: - description: /tmp/outputs/Output/data name: output @@ -401,6 +569,17 @@ spec: - name: gen-params-Output value: $(tasks.gen-params.results.output) taskSpec: + metadata: + annotations: + pipelines.kubeflow.org/component_spec: '{"implementation": {"container": + {"args": ["--numbers-parm", {"inputValue": "numbers_parm"}], "command": + ["python3", "-u", "-c", "def print_params(numbers_parm):\n print(\"The + result number is: %d\" % numbers_parm)\n\nimport argparse\n_parser = + argparse.ArgumentParser(prog=''Print params'', description='''')\n_parser.add_argument(\"--numbers-parm\", + dest=\"numbers_parm\", type=int, required=True, default=argparse.SUPPRESS)\n_parsed_args + = vars(_parser.parse_args())\n\n_outputs = print_params(**_parsed_args)\n"], + "image": "python:3.7"}}, "inputs": [{"name": "numbers_parm", "type": + "Integer"}], "name": "Print params"}' params: - name: gen-params-Output steps: diff --git a/sdk/python/tests/compiler/testdata/condition.yaml b/sdk/python/tests/compiler/testdata/condition.yaml index 299a8dab40..2955520274 100644 --- a/sdk/python/tests/compiler/testdata/condition.yaml +++ b/sdk/python/tests/compiler/testdata/condition.yaml @@ -69,16 +69,7 @@ spec: image: python:alpine3.6 name: main timeout: 0s - - conditions: - - conditionRef: super-condition - params: - - name: operand1 - value: $(tasks.flip.results.output-value) - - name: operand2 - value: heads - - name: operator - value: == - name: flip-again + - name: flip-again params: - name: forced_result2 value: $(params.forced_result2) @@ -100,24 +91,12 @@ spec: image: python:alpine3.6 name: main timeout: 0s - - conditions: - - conditionRef: super-condition - params: - - name: operand1 - value: $(tasks.flip-again.results.output-value) - - name: operand2 - value: tails - - name: operator - value: == - - conditionRef: super-condition - params: - - name: operand1 - value: $(tasks.flip.results.output-value) - - name: operand2 - value: heads - - name: operator - value: == - name: print1 + when: + - input: $(tasks.condition-1.results.status) + operator: in + values: + - 'true' + - name: print1 params: - name: flip-again-output_value value: $(tasks.flip-again.results.output-value) @@ -131,16 +110,12 @@ spec: image: alpine:3.6 name: main timeout: 0s - - conditions: - - conditionRef: super-condition - params: - - name: operand1 - value: $(tasks.flip.results.output-value) - - name: operand2 - value: tails - - name: operator - value: == - name: print2 + when: + - input: $(tasks.condition-2.results.status) + operator: in + values: + - 'true' + - name: print2 params: - name: flip-output_value value: $(tasks.flip.results.output-value) @@ -154,4 +129,78 @@ spec: image: alpine:3.6 name: main timeout: 0s + when: + - input: $(tasks.condition-3.results.status) + operator: in + values: + - 'true' + - name: condition-1 + params: + - name: operand1 + value: $(tasks.flip.results.output-value) + - name: operand2 + value: heads + - name: operator + value: == + taskSpec: + params: + - name: operand1 + - name: operand2 + - name: operator + results: + - description: Conditional task status + name: status + steps: + - image: python:alpine3.6 + script: "python -c 'import sys\ninput1=str.rstrip(sys.argv[1])\ninput2=str.rstrip(sys.argv[2])\n\ + try:\n input1=int(input1)\n input2=int(input2)\nexcept:\n input1=str(input1)\n\ + status=\"true\" if (input1 $(inputs.params.operator) input2) else \"false\"\ + \nf = open(\"/tekton/results/status\", \"w\")\nf.write(status)\nf.close()'\ + \ '$(inputs.params.operand1)' '$(inputs.params.operand2)'" + - name: condition-2 + params: + - name: operand1 + value: $(tasks.flip-again.results.output-value) + - name: operand2 + value: tails + - name: operator + value: == + taskSpec: + params: + - name: operand1 + - name: operand2 + - name: operator + results: + - description: Conditional task status + name: status + steps: + - image: python:alpine3.6 + script: "python -c 'import sys\ninput1=str.rstrip(sys.argv[1])\ninput2=str.rstrip(sys.argv[2])\n\ + try:\n input1=int(input1)\n input2=int(input2)\nexcept:\n input1=str(input1)\n\ + status=\"true\" if (input1 $(inputs.params.operator) input2) else \"false\"\ + \nf = open(\"/tekton/results/status\", \"w\")\nf.write(status)\nf.close()'\ + \ '$(inputs.params.operand1)' '$(inputs.params.operand2)'" + - name: condition-3 + params: + - name: operand1 + value: $(tasks.flip.results.output-value) + - name: operand2 + value: tails + - name: operator + value: == + taskSpec: + params: + - name: operand1 + - name: operand2 + - name: operator + results: + - description: Conditional task status + name: status + steps: + - image: python:alpine3.6 + script: "python -c 'import sys\ninput1=str.rstrip(sys.argv[1])\ninput2=str.rstrip(sys.argv[2])\n\ + try:\n input1=int(input1)\n input2=int(input2)\nexcept:\n input1=str(input1)\n\ + status=\"true\" if (input1 $(inputs.params.operator) input2) else \"false\"\ + \nf = open(\"/tekton/results/status\", \"w\")\nf.write(status)\nf.close()'\ + \ '$(inputs.params.operand1)' '$(inputs.params.operand2)'" timeout: 0s diff --git a/sdk/python/tests/compiler/testdata/condition_error.py b/sdk/python/tests/compiler/testdata/condition_dependency.py similarity index 100% rename from sdk/python/tests/compiler/testdata/condition_error.py rename to sdk/python/tests/compiler/testdata/condition_dependency.py diff --git a/sdk/python/tests/compiler/testdata/condition_dependency.yaml b/sdk/python/tests/compiler/testdata/condition_dependency.yaml new file mode 100644 index 0000000000..b70ef91563 --- /dev/null +++ b/sdk/python/tests/compiler/testdata/condition_dependency.yaml @@ -0,0 +1,217 @@ +# Copyright 2020 kubeflow.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: tekton.dev/v1beta1 +kind: PipelineRun +metadata: + annotations: + pipelines.kubeflow.org/pipeline_spec: '{"description": "Shows how to use dsl.Condition.", + "inputs": [{"default": "heads", "name": "forced_result1", "optional": true, + "type": "String"}, {"default": "tails", "name": "forced_result2", "optional": + true, "type": "String"}], "name": "Flip Coin Example Pipeline"}' + sidecar.istio.io/inject: 'false' + tekton.dev/artifact_bucket: mlpipeline + tekton.dev/artifact_endpoint: minio-service.kubeflow:9000 + tekton.dev/artifact_endpoint_scheme: http:// + tekton.dev/artifact_items: '{"flip": [["output", "$(results.output.path)"]], "flip-again": + [["output", "$(results.output.path)"]], "print1": [], "print2": [], "print3": + []}' + tekton.dev/input_artifacts: '{"print1": [{"name": "flip-again-output", "parent_task": + "flip-again"}], "print2": [{"name": "flip-output", "parent_task": "flip"}]}' + tekton.dev/output_artifacts: '{"flip": [{"key": "artifacts/$PIPELINERUN/flip/output.tgz", + "name": "flip-output", "path": "/tmp/output"}], "flip-again": [{"key": "artifacts/$PIPELINERUN/flip-again/output.tgz", + "name": "flip-again-output", "path": "/tmp/output"}]}' + name: flip-coin-example-pipeline +spec: + params: + - name: forced_result1 + value: heads + - name: forced_result2 + value: tails + pipelineSpec: + params: + - default: heads + name: forced_result1 + - default: tails + name: forced_result2 + tasks: + - name: flip + params: + - name: forced_result1 + value: $(params.forced_result1) + taskSpec: + params: + - name: forced_result1 + results: + - description: /tmp/output + name: output + steps: + - args: + - python -c "import random; import sys; forced_result = '$(inputs.params.forced_result1)'; + result = 'heads' if random.randint(0,1) == 0 else 'tails'; print(forced_result) + if (forced_result == 'heads' or forced_result == 'tails') else print(result)" + | tee $(results.output.path) + command: + - sh + - -c + image: python:alpine3.6 + name: main + timeout: 0s + - name: flip-again + params: + - name: forced_result2 + value: $(params.forced_result2) + taskSpec: + params: + - name: forced_result2 + results: + - description: /tmp/output + name: output + steps: + - args: + - python -c "import random; import sys; forced_result = '$(inputs.params.forced_result2)'; + result = 'heads' if random.randint(0,1) == 0 else 'tails'; print(forced_result) + if (forced_result == 'heads' or forced_result == 'tails') else print(result)" + | tee $(results.output.path) + command: + - sh + - -c + image: python:alpine3.6 + name: main + timeout: 0s + when: + - input: $(tasks.condition-1.results.status) + operator: in + values: + - 'true' + - name: print1 + params: + - name: flip-again-output + value: $(tasks.flip-again.results.output) + taskSpec: + params: + - name: flip-again-output + steps: + - command: + - echo + - $(inputs.params.flip-again-output) + image: alpine:3.6 + name: main + timeout: 0s + when: + - input: $(tasks.condition-2.results.status) + operator: in + values: + - 'true' + - name: print2 + params: + - name: flip-output + value: $(tasks.flip.results.output) + taskSpec: + params: + - name: flip-output + steps: + - command: + - echo + - $(inputs.params.flip-output) + image: alpine:3.6 + name: main + timeout: 0s + when: + - input: $(tasks.condition-3.results.status) + operator: in + values: + - 'true' + - name: print3 + runAfter: + - condition-1 + - condition-3 + taskSpec: + steps: + - command: + - echo + - done + image: alpine:3.6 + name: main + timeout: 0s + - name: condition-1 + params: + - name: operand1 + value: $(tasks.flip.results.output) + - name: operand2 + value: heads + - name: operator + value: == + taskSpec: + params: + - name: operand1 + - name: operand2 + - name: operator + results: + - description: Conditional task status + name: status + steps: + - image: python:alpine3.6 + script: "python -c 'import sys\ninput1=str.rstrip(sys.argv[1])\ninput2=str.rstrip(sys.argv[2])\n\ + try:\n input1=int(input1)\n input2=int(input2)\nexcept:\n input1=str(input1)\n\ + status=\"true\" if (input1 $(inputs.params.operator) input2) else \"false\"\ + \nf = open(\"/tekton/results/status\", \"w\")\nf.write(status)\nf.close()'\ + \ '$(inputs.params.operand1)' '$(inputs.params.operand2)'" + - name: condition-2 + params: + - name: operand1 + value: $(tasks.flip-again.results.output) + - name: operand2 + value: tails + - name: operator + value: == + taskSpec: + params: + - name: operand1 + - name: operand2 + - name: operator + results: + - description: Conditional task status + name: status + steps: + - image: python:alpine3.6 + script: "python -c 'import sys\ninput1=str.rstrip(sys.argv[1])\ninput2=str.rstrip(sys.argv[2])\n\ + try:\n input1=int(input1)\n input2=int(input2)\nexcept:\n input1=str(input1)\n\ + status=\"true\" if (input1 $(inputs.params.operator) input2) else \"false\"\ + \nf = open(\"/tekton/results/status\", \"w\")\nf.write(status)\nf.close()'\ + \ '$(inputs.params.operand1)' '$(inputs.params.operand2)'" + - name: condition-3 + params: + - name: operand1 + value: $(tasks.flip.results.output) + - name: operand2 + value: tails + - name: operator + value: == + taskSpec: + params: + - name: operand1 + - name: operand2 + - name: operator + results: + - description: Conditional task status + name: status + steps: + - image: python:alpine3.6 + script: "python -c 'import sys\ninput1=str.rstrip(sys.argv[1])\ninput2=str.rstrip(sys.argv[2])\n\ + try:\n input1=int(input1)\n input2=int(input2)\nexcept:\n input1=str(input1)\n\ + status=\"true\" if (input1 $(inputs.params.operator) input2) else \"false\"\ + \nf = open(\"/tekton/results/status\", \"w\")\nf.write(status)\nf.close()'\ + \ '$(inputs.params.operand1)' '$(inputs.params.operand2)'" + timeout: 0s diff --git a/sdk/python/tests/compiler/testdata/create_component_from_func.yaml b/sdk/python/tests/compiler/testdata/create_component_from_func.yaml index 84a6e81a0e..5421c4a79f 100644 --- a/sdk/python/tests/compiler/testdata/create_component_from_func.yaml +++ b/sdk/python/tests/compiler/testdata/create_component_from_func.yaml @@ -43,6 +43,27 @@ spec: tasks: - name: produce-dir-with-files-python-op taskSpec: + metadata: + annotations: + pipelines.kubeflow.org/component_spec: '{"implementation": {"container": + {"args": [{"if": {"cond": {"isPresent": "num_files"}, "then": ["--num-files", + {"inputValue": "num_files"}]}}, "--output-dir", {"outputPath": "output_dir"}], + "command": ["python3", "-u", "-c", "def _make_parent_dirs_and_return_path(file_path: + str):\n import os\n os.makedirs(os.path.dirname(file_path), exist_ok=True)\n return + file_path\n\ndef produce_dir_with_files_python_op(output_dir_path, num_files + = 10):\n import os\n os.makedirs(os.path.join(output_dir_path, + ''subdir''), exist_ok=True)\n for i in range(num_files):\n file_path + = os.path.join(output_dir_path, ''subdir'', str(i) + ''.txt'')\n with + open(file_path, ''w'') as f:\n f.write(str(i))\n\nimport + argparse\n_parser = argparse.ArgumentParser(prog=''Produce dir with + files python op'', description='''')\n_parser.add_argument(\"--num-files\", + dest=\"num_files\", type=int, required=False, default=argparse.SUPPRESS)\n_parser.add_argument(\"--output-dir\", + dest=\"output_dir_path\", type=_make_parent_dirs_and_return_path, required=True, + default=argparse.SUPPRESS)\n_parsed_args = vars(_parser.parse_args())\n\n_outputs + = produce_dir_with_files_python_op(**_parsed_args)\n"], "image": "python:3.7"}}, + "inputs": [{"default": "10", "name": "num_files", "optional": true, + "type": "Integer"}], "name": "Produce dir with files python op", "outputs": + [{"name": "output_dir"}]}' steps: - args: - --num-files @@ -78,6 +99,15 @@ spec: runAfter: - produce-dir-with-files-python-op taskSpec: + metadata: + annotations: + pipelines.kubeflow.org/component_spec: '{"description": "Get subdirectory + from directory.", "implementation": {"container": {"command": ["sh", + "-ex", "-c", "mkdir -p \"$(dirname \"$2\")\"\ncp -r \"$0/$1\" \"$2\"\n", + {"inputPath": "Directory"}, {"inputValue": "Subpath"}, {"outputPath": + "Subdir"}], "image": "alpine"}}, "inputs": [{"name": "Directory", "type": + "Directory"}, {"name": "Subpath", "type": "String"}], "name": "Get subdirectory", + "outputs": [{"name": "Subdir", "type": "Directory"}]}' steps: - command: - sh @@ -103,6 +133,14 @@ spec: runAfter: - get-subdirectory taskSpec: + metadata: + annotations: + pipelines.kubeflow.org/component_spec: '{"description": "Recursively list + directory contents.", "implementation": {"container": {"command": ["sh", + "-ex", "-c", "mkdir -p \"$(dirname \"$1\")\"\n#ls --almost-all --recursive + \"$0\" > \"$1\"\nls -A -R \"$0\" > \"$1\"\n", {"inputPath": "Directory"}, + {"outputPath": "Items"}], "image": "alpine"}}, "inputs": [{"name": "Directory", + "type": "Directory"}], "name": "List items", "outputs": [{"name": "Items"}]}' results: - description: /tmp/outputs/Items/data name: items @@ -133,6 +171,16 @@ spec: - get-subdirectory - produce-dir-with-files-python-op taskSpec: + metadata: + annotations: + pipelines.kubeflow.org/component_spec: '{"description": "Recursively list + directory contents.", "implementation": {"container": {"command": ["sh", + "-ex", "-c", "mkdir -p \"$(dirname \"$2\")\"\nls -A -R \"$0\" > \"$2\"\nls + -A -R \"$1\" >> \"$2\"\n", {"inputPath": "Directory1"}, {"inputPath": + "Directory2"}, {"outputPath": "Items"}], "image": "alpine"}}, "inputs": + [{"name": "Directory1", "type": "Directory"}, {"name": "Directory2", + "type": "Directory"}], "name": "List items 2", "outputs": [{"name": + "Items"}]}' results: - description: /tmp/outputs/Items/data name: items diff --git a/sdk/python/tests/compiler/testdata/load_from_yaml.yaml b/sdk/python/tests/compiler/testdata/load_from_yaml.yaml index 1e13757c83..952c2f966d 100644 --- a/sdk/python/tests/compiler/testdata/load_from_yaml.yaml +++ b/sdk/python/tests/compiler/testdata/load_from_yaml.yaml @@ -32,6 +32,14 @@ spec: tasks: - name: busybox taskSpec: + metadata: + annotations: + pipelines.kubeflow.org/component_spec: '{"description": "Sample component + for testing\n", "implementation": {"container": {"args": ["start", {"inputValue": + "dummy_text"}, {"outputPath": "dummy_output_path"}, "end"], "command": + ["echo"], "image": "alpine:latest"}}, "inputs": [{"default": "", "description": + "Test empty input", "name": "dummy_text"}], "name": "busybox", "outputs": + [{"description": "Test unused output path", "name": "dummy_output_path"}]}' results: - description: /tmp/outputs/dummy_output_path/data name: dummy-output-path diff --git a/sdk/python/tests/compiler/testdata/pipeline_transformers.yaml b/sdk/python/tests/compiler/testdata/pipeline_transformers.yaml index e7125dda68..e8398c9b0e 100644 --- a/sdk/python/tests/compiler/testdata/pipeline_transformers.yaml +++ b/sdk/python/tests/compiler/testdata/pipeline_transformers.yaml @@ -32,6 +32,11 @@ spec: tasks: - name: print taskSpec: + metadata: + annotations: + hobby: football + labels: + hobby: football steps: - command: - echo @@ -41,6 +46,11 @@ spec: timeout: 0s - name: print-2 taskSpec: + metadata: + annotations: + hobby: football + labels: + hobby: football steps: - command: - echo