Skip to content

Commit

Permalink
Any Sequencer implements
Browse files Browse the repository at this point in the history
  • Loading branch information
jinchihe committed Jan 5, 2021
1 parent 66f3351 commit 4ea94ec
Show file tree
Hide file tree
Showing 14 changed files with 3,410 additions and 25 deletions.
58 changes: 36 additions & 22 deletions sdk/FEATURES.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,30 @@ and test pipelines found in the KFP repository.
<!-- # Table of Contents -->

- [Pipeline DSL Features with Native Tekton Implementation](#pipeline-dsl-features-with-native-tekton-implementation)
+ [pod_annotations and pod_labels](#pod_annotations-and-pod_labels)
+ [Retries](#retries)
+ [Volumes](#volumes)
+ [Timeout for Tasks and Pipelines](#timeout-for-tasks-and-pipelines)
+ [RunAfter](#runafter)
+ [Input Parameters](#input-parameters)
+ [ContainerOp](#containerop)
+ [Affinity, Node Selector, and Tolerations](#affinity-node-selector-and-tolerations)
+ [ImagePullSecrets](#imagepullsecrets)
+ [Exit Handler](#exit-handler)
- [pod_annotations and pod_labels](#pod_annotations-and-pod_labels)
- [Retries](#retries)
- [Volumes](#volumes)
- [Timeout for Tasks and Pipelines](#timeout-for-tasks-and-pipelines)
- [RunAfter](#runafter)
- [Input Parameters](#input-parameters)
- [ContainerOp](#containerop)
- [Affinity, Node Selector, and Tolerations](#affinity-node-selector-and-tolerations)
- [ImagePullSecrets](#imagepullsecrets)
- [Exit Handler](#exit-handler)
- [Any sequencer](#any-sequencer)
- [Pipeline DSL Features with a Custom Tekton Implementation](#pipeline-dsl-features-with-a-custom-tekton-implementation)
* [Features with the Same Behavior as Argo](#features-with-the-same-behavior-as-argo)
+ [InitContainers](#initcontainers)
+ [Conditions](#conditions)
+ [ResourceOp, VolumeOp, and VolumeSnapshotOp](#resourceop-volumeop-and-volumesnapshotop)
+ [Output Parameters](#output-parameters)
+ [Input Artifacts](#input-artifacts)
+ [Output Artifacts](#output-artifacts)
* [Features with Limitations](#features-with-limitations)
+ [ParallelFor](#parallelfor) - [Tracking issue][ParallelFor]
+ [Variable Substitutions](#variable-substitutions) - [Tracking issue][VarSub]
* [Features with a Different Behavior than Argo](#features-with-a-different-behavior-than-argo)
+ [Sidecars](#sidecars) - [Tracking issue][Sidecars]
- [Features with the Same Behavior as Argo](#features-with-the-same-behavior-as-argo)
- [InitContainers](#initcontainers)
- [Conditions](#conditions)
- [ResourceOp, VolumeOp, and VolumeSnapshotOp](#resourceop-volumeop-and-volumesnapshotop)
- [Output Parameters](#output-parameters)
- [Input Artifacts](#input-artifacts)
- [Output Artifacts](#output-artifacts)
- [Features with Limitations](#features-with-limitations)
- [ParallelFor](#parallelfor)
- [Variable Substitutions](#variable-substitutions)
- [Features with a Different Behavior than Argo](#features-with-a-different-behavior-than-argo)
- [Sidecars](#sidecars)


# Pipeline DSL Features with Native Tekton Implementation
Expand Down Expand Up @@ -125,6 +126,19 @@ the [exit_handler](/sdk/python/tests/compiler/testdata/exit_handler.py) compiler

The `finally` syntax is supported since Tekton version `0.14.0`.

### Any sequencer

When any one of the dependencies complete successfully, the Job will be started. Order doesn’t matter, but don’t wait for all the job status. For example:

```
from kfp_tekton.compiler.any_sequencer import after_any
dsl.ContainerOp(
...
).apply(after_any(([containerOps]))
```

Note that the service account of the `Any Sequencer` need get premission to watch the status of sepecified `taskRun`.

# Pipeline DSL Features with a Custom Tekton Implementation

Expand Down
4 changes: 3 additions & 1 deletion sdk/python/kfp_tekton/compiler/_data_passing_rewriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,8 @@ def mark_upstream_ios_of_output(template_output, marked_inputs,
# Remove input parameters unless they're used downstream.
# This also removes unused container template inputs if any.
for template in container_templates + [pipeline_template]:
if 'any-sequencer-' in template.get('name', {}):
continue
spec = template.get('taskSpec', {}) or template.get('pipelineSpec', {})
spec['params'] = [
input_parameter for input_parameter in spec.get('params', []) if (
Expand Down Expand Up @@ -340,7 +342,7 @@ def mark_upstream_ios_of_output(template_output, marked_inputs,

# Remove pipeline task parameters unless they're used downstream
for task in pipeline_tasks:
if 'condition-' not in task['name']:
if 'condition-' not in task['name'] and 'any-sequencer-' not in task['name']:
task['params'] = [
parameter_argument
for parameter_argument in task.get('params', [])
Expand Down
82 changes: 82 additions & 0 deletions sdk/python/kfp_tekton/compiler/any_sequencer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
# Copyright 2020 kubeflow.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import uuid
from typing import List
from kfp import dsl

from kfp_tekton.compiler._k8s_helper import sanitize_k8s_name


def after_any(container_ops: List[dsl.ContainerOp]):
'''
The function add a flag for any condition handler.
'''
tasks_list = []
for cop in container_ops:
cop_name = sanitize_k8s_name(cop.name)
tasks_list.append(cop_name)
task_list_str = ",".join(tasks_list)

def _after_components(cop):
cop.any_sequencer = {"tasks_list": task_list_str}
return cop
return _after_components


def generate_any_sequencer(task_list):
'''
Generate any sequencer task
'''
any_sequencer = {
"name": "any-sequencer-" + str(uuid.uuid4().hex[:5]),
"params": [
{
"name": "pipelineRun-namespace",
"value": "$(context.pipelineRun.namespace)"
},
{
"name": "pipelineRun-name",
"value": "$(context.pipelineRun.name)"
},
],
"taskSpec": {
"params": [
{
"name": "pipelineRun-namespace"
},
{
"name": "pipelineRun-name"
}
],
"steps": [
{
"name": "main",
"args": [
"-namespace",
"$(params.pipelineRun-namespace)",
"-prName",
"$(params.pipelineRun-name)",
"-taskList",
task_list
],
"command": [
"any-taskrun"
],
"image": "dspipelines/any-sequencer:latest"
}
]
}
}
return any_sequencer
21 changes: 19 additions & 2 deletions sdk/python/kfp_tekton/compiler/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
from kfp_tekton.compiler._k8s_helper import convert_k8s_obj_to_json, sanitize_k8s_name, sanitize_k8s_object
from kfp_tekton.compiler._op_to_template import _op_to_template
from kfp_tekton.compiler.yaml_utils import dump_yaml

from kfp_tekton.compiler.any_sequencer import generate_any_sequencer

DEFAULT_ARTIFACT_BUCKET = env.get('DEFAULT_ARTIFACT_BUCKET', 'mlpipeline')
DEFAULT_ARTIFACT_ENDPOINT = env.get('DEFAULT_ARTIFACT_ENDPOINT', 'minio-service.kubeflow:9000')
Expand Down Expand Up @@ -486,6 +486,20 @@ def _create_pipeline_workflow(self, args, pipeline, op_transformers=None, pipeli
for ref in condition_task_ref:
condition_task_refs_temp.append(ref)
condition_task_refs = condition_task_refs_temp

# Inject any sequencer condition task.
any_sequencer_taskrefs = []
any_sequencer_annotations = {}
for task in task_refs:
op = pipeline.ops.get(task['name'])
if hasattr(op, 'any_sequencer'):
any_sequencer_task = generate_any_sequencer(op.any_sequencer['tasks_list'])
any_sequencer_taskrefs.append(any_sequencer_task)
run_after = task.get('runAfter', [])
run_after.append(any_sequencer_task['name'])
task['runAfter'] = run_after
any_sequencer_annotations[any_sequencer_task['name']] = op.any_sequencer['tasks_list'].split(",")

# TODO: generate the PipelineRun template
pipeline_run = {
'apiVersion': tekton_api_version,
Expand All @@ -510,12 +524,15 @@ def _create_pipeline_workflow(self, args, pipeline, op_transformers=None, pipeli
} for p in params],
'pipelineSpec': {
'params': params,
'tasks': task_refs + condition_task_refs,
'tasks': task_refs + condition_task_refs + any_sequencer_taskrefs,
'finally': finally_tasks
}
}
}

if any_sequencer_annotations:
pipeline_run['metadata']['annotations']['anyConditions'] = json.dumps(any_sequencer_annotations)

# TODO: pipelineRun additions

# Generate TaskRunSpec PodTemplate:s
Expand Down
20 changes: 20 additions & 0 deletions sdk/python/tests/compiler/compiler_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,25 @@ def test_compose(self):
from .testdata import compose
self._test_nested_workflow('compose.yaml', [compose.save_most_frequent_word, compose.download_save_most_frequent_word])

def test_any_sequencer(self):
"""
Test any sequencer dependency.
"""
from .testdata.any_sequencer import any_sequence_pipeline

def _any_sequencer_normalize(file_context):
test_data_dir = os.path.join(os.path.dirname(__file__), 'testdata')
golden_yaml_file = os.path.join(test_data_dir, 'any_sequencer.yaml')
with open(golden_yaml_file, 'r') as f:
golden_file = yaml.safe_load(f)
golden_name = list(json.loads(golden_file['metadata']['annotations']['anyConditions']).keys())[0]
compiled_file = yaml.safe_load(file_context)
compiled_name = list(json.loads(compiled_file['metadata']['annotations']['anyConditions']).keys())[0]
update_context = file_context.replace(compiled_name, golden_name)
return update_context

self._test_pipeline_workflow(any_sequence_pipeline, 'any_sequencer.yaml', _any_sequencer_normalize)

def _test_pipeline_workflow(self,
pipeline_function,
pipeline_yaml,
Expand Down Expand Up @@ -400,3 +419,4 @@ def sort_items(obj):
self.assertEqual(golden, compiled_workflow,
msg="\n===[ " + golden_yaml_file.split(os.path.sep)[-1] + " ]===\n"
+ json.dumps(compiled_workflow, indent=2))

56 changes: 56 additions & 0 deletions sdk/python/tests/compiler/testdata/any_sequencer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# Copyright 2020 kubeflow.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from kfp import dsl
from kfp_tekton.compiler import TektonCompiler
from kfp_tekton.compiler.any_sequencer import after_any


@dsl.pipeline(
name="Any Sequencer",
description="Any Sequencer Component Demo",
)
def any_sequence_pipeline(
):
task1 = dsl.ContainerOp(
name="task1",
image="registry.access.redhat.com/ubi8/ubi-minimal",
command=["/bin/bash", "-c"],
arguments=["sleep 15"]
)

task2 = dsl.ContainerOp(
name="task2",
image="registry.access.redhat.com/ubi8/ubi-minimal",
command=["/bin/bash", "-c"],
arguments=["sleep 200"]
)

task3 = dsl.ContainerOp(
name="task3",
image="registry.access.redhat.com/ubi8/ubi-minimal",
command=["/bin/bash", "-c"],
arguments=["sleep 300"]
)

task4 = dsl.ContainerOp(
name="task4",
image="registry.access.redhat.com/ubi8/ubi-minimal",
command=["/bin/bash", "-c"],
arguments=["sleep 30"]
).apply(after_any([task1, task2, task3]))


if __name__ == "__main__":
TektonCompiler().compile(any_sequence_pipeline, "any_sequencer" + ".yaml")
80 changes: 80 additions & 0 deletions sdk/python/tests/compiler/testdata/any_sequencer.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# Copyright 2020 kubeflow.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: tekton.dev/v1beta1
kind: PipelineRun
metadata:
name: any-sequencer
annotations:
tekton.dev/output_artifacts: '{}'
tekton.dev/input_artifacts: '{}'
tekton.dev/artifact_bucket: mlpipeline
tekton.dev/artifact_endpoint: minio-service.kubeflow:9000
tekton.dev/artifact_endpoint_scheme: http://
tekton.dev/artifact_items: '{"task1": [], "task2": [], "task3": [], "task4": []}'
sidecar.istio.io/inject: "false"
anyConditions: '{"any-sequencer-4b2c9": ["task1", "task2", "task3"]}'
pipelines.kubeflow.org/pipeline_spec: '{"description": "Any Sequencer Component
Demo", "name": "Any Sequencer"}'
spec:
pipelineSpec:
tasks:
- name: task1
taskSpec:
steps:
- name: main
args: [sleep 15]
command: [/bin/bash, -c]
image: registry.access.redhat.com/ubi8/ubi-minimal
timeout: 0s
- name: task2
taskSpec:
steps:
- name: main
args: [sleep 200]
command: [/bin/bash, -c]
image: registry.access.redhat.com/ubi8/ubi-minimal
timeout: 0s
- name: task3
taskSpec:
steps:
- name: main
args: [sleep 300]
command: [/bin/bash, -c]
image: registry.access.redhat.com/ubi8/ubi-minimal
timeout: 0s
- name: task4
taskSpec:
steps:
- name: main
args: [sleep 30]
command: [/bin/bash, -c]
image: registry.access.redhat.com/ubi8/ubi-minimal
timeout: 0s
runAfter: [any-sequencer-4b2c9]
- name: any-sequencer-4b2c9
params:
- {name: pipelineRun-namespace, value: $(context.pipelineRun.namespace)}
- {name: pipelineRun-name, value: $(context.pipelineRun.name)}
taskSpec:
params:
- {name: pipelineRun-namespace}
- {name: pipelineRun-name}
steps:
- name: main
args: [-namespace, $(params.pipelineRun-namespace), -prName, $(params.pipelineRun-name),
-taskList, 'task1,task2,task3']
command: [any-taskrun]
image: dspipelines/any-sequencer:latest
timeout: 0s
1 change: 1 addition & 0 deletions tekton-catalog/any-sequencer/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
_output
4 changes: 4 additions & 0 deletions tekton-catalog/any-sequencer/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
FROM registry.access.redhat.com/ubi8/ubi-minimal

COPY _output/bin/any-taskrun /usr/local/bin

Loading

0 comments on commit 4ea94ec

Please sign in to comment.