Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Any Sequencer implements #399

Merged
merged 1 commit into from
Jan 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 36 additions & 22 deletions sdk/FEATURES.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,30 @@ and test pipelines found in the KFP repository.
<!-- # Table of Contents -->

- [Pipeline DSL Features with Native Tekton Implementation](#pipeline-dsl-features-with-native-tekton-implementation)
+ [pod_annotations and pod_labels](#pod_annotations-and-pod_labels)
+ [Retries](#retries)
+ [Volumes](#volumes)
+ [Timeout for Tasks and Pipelines](#timeout-for-tasks-and-pipelines)
+ [RunAfter](#runafter)
+ [Input Parameters](#input-parameters)
+ [ContainerOp](#containerop)
+ [Affinity, Node Selector, and Tolerations](#affinity-node-selector-and-tolerations)
+ [ImagePullSecrets](#imagepullsecrets)
+ [Exit Handler](#exit-handler)
- [pod_annotations and pod_labels](#pod_annotations-and-pod_labels)
- [Retries](#retries)
- [Volumes](#volumes)
- [Timeout for Tasks and Pipelines](#timeout-for-tasks-and-pipelines)
- [RunAfter](#runafter)
- [Input Parameters](#input-parameters)
- [ContainerOp](#containerop)
- [Affinity, Node Selector, and Tolerations](#affinity-node-selector-and-tolerations)
- [ImagePullSecrets](#imagepullsecrets)
- [Exit Handler](#exit-handler)
- [Any sequencer](#any-sequencer)
- [Pipeline DSL Features with a Custom Tekton Implementation](#pipeline-dsl-features-with-a-custom-tekton-implementation)
* [Features with the Same Behavior as Argo](#features-with-the-same-behavior-as-argo)
+ [InitContainers](#initcontainers)
+ [Conditions](#conditions)
+ [ResourceOp, VolumeOp, and VolumeSnapshotOp](#resourceop-volumeop-and-volumesnapshotop)
+ [Output Parameters](#output-parameters)
+ [Input Artifacts](#input-artifacts)
+ [Output Artifacts](#output-artifacts)
* [Features with Limitations](#features-with-limitations)
+ [ParallelFor](#parallelfor) - [Tracking issue][ParallelFor]
+ [Variable Substitutions](#variable-substitutions) - [Tracking issue][VarSub]
* [Features with a Different Behavior than Argo](#features-with-a-different-behavior-than-argo)
+ [Sidecars](#sidecars) - [Tracking issue][Sidecars]
- [Features with the Same Behavior as Argo](#features-with-the-same-behavior-as-argo)
- [InitContainers](#initcontainers)
- [Conditions](#conditions)
- [ResourceOp, VolumeOp, and VolumeSnapshotOp](#resourceop-volumeop-and-volumesnapshotop)
- [Output Parameters](#output-parameters)
- [Input Artifacts](#input-artifacts)
- [Output Artifacts](#output-artifacts)
- [Features with Limitations](#features-with-limitations)
- [ParallelFor](#parallelfor)
- [Variable Substitutions](#variable-substitutions)
- [Features with a Different Behavior than Argo](#features-with-a-different-behavior-than-argo)
- [Sidecars](#sidecars)


# Pipeline DSL Features with Native Tekton Implementation
Expand Down Expand Up @@ -125,6 +126,19 @@ the [exit_handler](/sdk/python/tests/compiler/testdata/exit_handler.py) compiler

The `finally` syntax is supported since Tekton version `0.14.0`.

### Any sequencer

When any one of the dependencies complete successfully, the Job will be started. Order doesn’t matter, but don’t wait for all the job status. For example:

```
from kfp_tekton.compiler.any_sequencer import after_any
dsl.ContainerOp(
...
).apply(after_any(([containerOps]))
```

Note that the service account of the `Any Sequencer` need get premission to watch the status of sepecified `taskRun`.

# Pipeline DSL Features with a Custom Tekton Implementation

Expand Down
4 changes: 3 additions & 1 deletion sdk/python/kfp_tekton/compiler/_data_passing_rewriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,8 @@ def mark_upstream_ios_of_output(template_output, marked_inputs,
# Remove input parameters unless they're used downstream.
# This also removes unused container template inputs if any.
for template in container_templates + [pipeline_template]:
if 'any-sequencer-' in template.get('name', {}):
continue
spec = template.get('taskSpec', {}) or template.get('pipelineSpec', {})
spec['params'] = [
input_parameter for input_parameter in spec.get('params', []) if (
Expand Down Expand Up @@ -340,7 +342,7 @@ def mark_upstream_ios_of_output(template_output, marked_inputs,

# Remove pipeline task parameters unless they're used downstream
for task in pipeline_tasks:
if 'condition-' not in task['name']:
if 'condition-' not in task['name'] and 'any-sequencer-' not in task['name']:
task['params'] = [
parameter_argument
for parameter_argument in task.get('params', [])
Expand Down
82 changes: 82 additions & 0 deletions sdk/python/kfp_tekton/compiler/any_sequencer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
# Copyright 2020 kubeflow.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import uuid
from typing import List
from kfp import dsl

from kfp_tekton.compiler._k8s_helper import sanitize_k8s_name


def after_any(container_ops: List[dsl.ContainerOp]):
'''
The function add a flag for any condition handler.
'''
tasks_list = []
for cop in container_ops:
cop_name = sanitize_k8s_name(cop.name)
tasks_list.append(cop_name)
task_list_str = ",".join(tasks_list)

def _after_components(cop):
cop.any_sequencer = {"tasks_list": task_list_str}
return cop
return _after_components


def generate_any_sequencer(task_list):
'''
Generate any sequencer task
'''
any_sequencer = {
"name": "any-sequencer-" + str(uuid.uuid4().hex[:5]),
"params": [
{
"name": "pipelineRun-namespace",
"value": "$(context.pipelineRun.namespace)"
},
{
"name": "pipelineRun-name",
"value": "$(context.pipelineRun.name)"
},
],
"taskSpec": {
"params": [
{
"name": "pipelineRun-namespace"
},
{
"name": "pipelineRun-name"
}
],
"steps": [
{
"name": "main",
"args": [
"-namespace",
"$(params.pipelineRun-namespace)",
"-prName",
"$(params.pipelineRun-name)",
"-taskList",
task_list
],
"command": [
"any-taskrun"
],
"image": "dspipelines/any-sequencer:latest"
}
]
}
}
return any_sequencer
21 changes: 19 additions & 2 deletions sdk/python/kfp_tekton/compiler/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
from kfp_tekton.compiler._k8s_helper import convert_k8s_obj_to_json, sanitize_k8s_name, sanitize_k8s_object
from kfp_tekton.compiler._op_to_template import _op_to_template
from kfp_tekton.compiler.yaml_utils import dump_yaml

from kfp_tekton.compiler.any_sequencer import generate_any_sequencer

DEFAULT_ARTIFACT_BUCKET = env.get('DEFAULT_ARTIFACT_BUCKET', 'mlpipeline')
DEFAULT_ARTIFACT_ENDPOINT = env.get('DEFAULT_ARTIFACT_ENDPOINT', 'minio-service.kubeflow:9000')
Expand Down Expand Up @@ -488,6 +488,20 @@ def _create_pipeline_workflow(self, args, pipeline, op_transformers=None, pipeli
for ref in condition_task_ref:
condition_task_refs_temp.append(ref)
condition_task_refs = condition_task_refs_temp

# Inject any sequencer condition task.
any_sequencer_taskrefs = []
any_sequencer_annotations = {}
for task in task_refs:
op = pipeline.ops.get(task['name'])
if hasattr(op, 'any_sequencer'):
any_sequencer_task = generate_any_sequencer(op.any_sequencer['tasks_list'])
any_sequencer_taskrefs.append(any_sequencer_task)
run_after = task.get('runAfter', [])
run_after.append(any_sequencer_task['name'])
task['runAfter'] = run_after
any_sequencer_annotations[any_sequencer_task['name']] = op.any_sequencer['tasks_list'].split(",")

pipeline_run = {
'apiVersion': tekton_api_version,
'kind': 'PipelineRun',
Expand All @@ -511,12 +525,15 @@ def _create_pipeline_workflow(self, args, pipeline, op_transformers=None, pipeli
} for p in params],
'pipelineSpec': {
'params': params,
'tasks': task_refs + condition_task_refs,
'tasks': task_refs + condition_task_refs + any_sequencer_taskrefs,
'finally': finally_tasks
}
}
}

if any_sequencer_annotations:
pipeline_run['metadata']['annotations']['anyConditions'] = json.dumps(any_sequencer_annotations)

# Generate TaskRunSpec PodTemplate:s
task_run_spec = []
for task in task_refs:
Expand Down
20 changes: 20 additions & 0 deletions sdk/python/tests/compiler/compiler_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,25 @@ def test_compose(self):
from .testdata import compose
self._test_nested_workflow('compose.yaml', [compose.save_most_frequent_word, compose.download_save_most_frequent_word])

def test_any_sequencer(self):
"""
Test any sequencer dependency.
"""
from .testdata.any_sequencer import any_sequence_pipeline

def _any_sequencer_normalize(file_context):
test_data_dir = os.path.join(os.path.dirname(__file__), 'testdata')
golden_yaml_file = os.path.join(test_data_dir, 'any_sequencer.yaml')
with open(golden_yaml_file, 'r') as f:
golden_file = yaml.safe_load(f)
golden_name = list(json.loads(golden_file['metadata']['annotations']['anyConditions']).keys())[0]
compiled_file = yaml.safe_load(file_context)
compiled_name = list(json.loads(compiled_file['metadata']['annotations']['anyConditions']).keys())[0]
update_context = file_context.replace(compiled_name, golden_name)
return update_context

self._test_pipeline_workflow(any_sequence_pipeline, 'any_sequencer.yaml', _any_sequencer_normalize)

def _test_pipeline_workflow(self,
pipeline_function,
pipeline_yaml,
Expand Down Expand Up @@ -400,3 +419,4 @@ def sort_items(obj):
self.assertEqual(golden, compiled_workflow,
msg="\n===[ " + golden_yaml_file.split(os.path.sep)[-1] + " ]===\n"
+ json.dumps(compiled_workflow, indent=2))

56 changes: 56 additions & 0 deletions sdk/python/tests/compiler/testdata/any_sequencer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# Copyright 2020 kubeflow.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from kfp import dsl
from kfp_tekton.compiler import TektonCompiler
from kfp_tekton.compiler.any_sequencer import after_any


@dsl.pipeline(
name="Any Sequencer",
description="Any Sequencer Component Demo",
)
def any_sequence_pipeline(
):
task1 = dsl.ContainerOp(
name="task1",
image="registry.access.redhat.com/ubi8/ubi-minimal",
command=["/bin/bash", "-c"],
arguments=["sleep 15"]
)

task2 = dsl.ContainerOp(
name="task2",
image="registry.access.redhat.com/ubi8/ubi-minimal",
command=["/bin/bash", "-c"],
arguments=["sleep 200"]
)

task3 = dsl.ContainerOp(
name="task3",
image="registry.access.redhat.com/ubi8/ubi-minimal",
command=["/bin/bash", "-c"],
arguments=["sleep 300"]
)

task4 = dsl.ContainerOp(
name="task4",
image="registry.access.redhat.com/ubi8/ubi-minimal",
command=["/bin/bash", "-c"],
arguments=["sleep 30"]
).apply(after_any([task1, task2, task3]))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quick question: Is there a limit for how many tasks the Any Sequencer can handle?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No limitation for task numbers. Thanks.



if __name__ == "__main__":
TektonCompiler().compile(any_sequence_pipeline, "any_sequencer" + ".yaml")
80 changes: 80 additions & 0 deletions sdk/python/tests/compiler/testdata/any_sequencer.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# Copyright 2020 kubeflow.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: tekton.dev/v1beta1
kind: PipelineRun
metadata:
name: any-sequencer
annotations:
tekton.dev/output_artifacts: '{}'
tekton.dev/input_artifacts: '{}'
tekton.dev/artifact_bucket: mlpipeline
tekton.dev/artifact_endpoint: minio-service.kubeflow:9000
tekton.dev/artifact_endpoint_scheme: http://
tekton.dev/artifact_items: '{"task1": [], "task2": [], "task3": [], "task4": []}'
sidecar.istio.io/inject: "false"
anyConditions: '{"any-sequencer-4b2c9": ["task1", "task2", "task3"]}'
pipelines.kubeflow.org/pipeline_spec: '{"description": "Any Sequencer Component
Demo", "name": "Any Sequencer"}'
spec:
pipelineSpec:
tasks:
- name: task1
taskSpec:
steps:
- name: main
args: [sleep 15]
command: [/bin/bash, -c]
image: registry.access.redhat.com/ubi8/ubi-minimal
timeout: 0s
- name: task2
taskSpec:
steps:
- name: main
args: [sleep 200]
command: [/bin/bash, -c]
image: registry.access.redhat.com/ubi8/ubi-minimal
timeout: 0s
- name: task3
taskSpec:
steps:
- name: main
args: [sleep 300]
command: [/bin/bash, -c]
image: registry.access.redhat.com/ubi8/ubi-minimal
timeout: 0s
- name: task4
taskSpec:
steps:
- name: main
args: [sleep 30]
command: [/bin/bash, -c]
image: registry.access.redhat.com/ubi8/ubi-minimal
timeout: 0s
runAfter: [any-sequencer-4b2c9]
- name: any-sequencer-4b2c9
params:
- {name: pipelineRun-namespace, value: $(context.pipelineRun.namespace)}
- {name: pipelineRun-name, value: $(context.pipelineRun.name)}
taskSpec:
params:
- {name: pipelineRun-namespace}
- {name: pipelineRun-name}
steps:
- name: main
args: [-namespace, $(params.pipelineRun-namespace), -prName, $(params.pipelineRun-name),
-taskList, 'task1,task2,task3']
command: [any-taskrun]
image: dspipelines/any-sequencer:latest
jinchihe marked this conversation as resolved.
Show resolved Hide resolved
timeout: 0s
1 change: 1 addition & 0 deletions tekton-catalog/any-sequencer/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
_output
4 changes: 4 additions & 0 deletions tekton-catalog/any-sequencer/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
FROM registry.access.redhat.com/ubi8/ubi-minimal

COPY _output/bin/any-taskrun /usr/local/bin

Loading