Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KEP-74: support argo workflow #2976

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
350 changes: 350 additions & 0 deletions keps/74-support-argo-workflow/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,350 @@
# KEP-74: Support Argo Workflow

<!-- toc -->
- [Summary](#summary)
- [Motivation](#motivation)
- [Goals](#goals)
- [Non-Goals](#non-goals)
- [Proposal](#proposal)
- [User Stories](#user-stories)
- [Story 1](#story-1)
- [Story 2](#story-2)
- [Design Details](#design-details)
- [Workflow as An Unit](#workflow-as-an-unit)
- [Drawback and Limitations](#drawback-and-limitations)
- [Advantages](#advantages)
- [Layer as An Unit](#layer-as-an-unit)
- [Examples](#examples)
- [Example 1 (ParallelSteps Contains Leaf Template Only)](#example-1-parallelsteps-contains-leaf-template-only)
- [Example 2 (ParallelSteps Contains Leaf Template and Step)](#example-2-parallelsteps-contains-leaf-template-and-step)
- [Example 3 (Workflow with Single Container Template)](#example-3-workflow-with-single-container-template)
- [How to suspend a workflow step by step](#how-to-suspend-a-workflow-step-by-step)
- [Drawback and Limitations](#drawback-and-limitations-1)
- [Advantages](#advantages-1)
- [Plain Pod as An Unit](#plain-pod-as-an-unit)
- [Drawback and Limitations](#drawback-and-limitations-2)
- [Advantages](#advantages-2)
- [Additional Details](#additional-details)
- [Test Plan](#test-plan)
- [Unit Tests](#unit-tests)
- [Integration tests](#integration-tests)
- [Graduation Criteria](#graduation-criteria)
- [Implementation History](#implementation-history)
- [Drawbacks](#drawbacks)
- [Alternatives](#alternatives)
<!-- /toc -->

## Summary

This KEP outlines the proposal to integrate Argo Workflows within Kueue, discussing the advantages
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I'd like to see the proposal speaking generically about adding support for workflow logic, with Argo as an example. We should be able to think about this design via any workflow tool that can conform to the communications defined with kueue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been thinking about this use case too, and I think what would be interesting to try is a general strategy to submit the entire DAG, and then use suspend (on the level of the job) to control when steps are actually run, and have the workload manager checking status and un-suspend (or cancel) based on the that. The important detail here is we aren't adding additional burden on kueue - it works as expected, and the bulk or orchestration is handled by the workflow tool.

In terms of "which queue" the simplest first design is for the workflow manager to decide that. There could be more complex logic after that, but then you venture into multi-cluster space and that might be beyond the discussion in this issue. A few questions I'm still thinking about (and I'll share with others here):

  1. What kind of additional load are we placing on APIs for kueue/kubernetes in having as workflow tool that is going to be regularly checking state to unsuspend jobs? Is it reasonable? If yes, ok. If not, what other means can we offer to check updates of state that won't add that burden?
  2. Arguably, we could use jobs with suspend already without requiring kueue. What essential feature is kueue adding, and what is the simplest way to achieve it without (so to speak) hard coding workflow manager or workflow specific tasks into kueue?
  3. This might be more general to kubernetes, but I haven't found a good answer to how to handle steps that require shared inputs / outputs. Some tools handle retrieval / download between steps, and I'm not sure what people are doing these days using kueue. Is this an issue that has come up, and what are people doing?

Storage and sharing data between workflow steps, especially if the clusters are separate, seems like a hard problem. What does Argo do @KunWuLuan ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And I wonder if kueue could have a developer API, something with grpc, that would allow easier integration with workflow tools that can interact with that interface.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Storage and sharing data between workflow steps, especially if the clusters are separate, seems like a hard problem.

Hi, @terrytangyuan could you help to answer the question? Thanks very much.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Arguably, we could use jobs with suspend already without requiring kueue. What essential feature is kueue adding

I think what kueue do is exactly to decide when and where to place the workload.

and what is the simplest way to achieve it without (so to speak) hard coding workflow manager or workflow specific tasks into kueue?

If all workflow managers support a standard API or same field path to indicate the specific stage or workflow should run after with admission, the goal that a generic workflow controller in Kueue can be achieved, just like what training-operator did.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I'd like to see the proposal speaking generically about adding support for workflow logic, with Argo as an example. We should be able to think about this design via any workflow tool that can conform to the communications defined with kueue.

Good idea, thanks! I have update the kep according to the suggestion.

and disadvantages of queuing workflows at varying granularity levels, alongside detailing
the integration methodologies.

## Motivation

Workflows are pivotal components in domains like Scientific Computing and Simulations, where
administrators often enforce resource usage quotas for different users or departments. Currently,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would add a note here about what makes a workflow - generally some kind of dependency structure modeled in a DAG. Otherwise if it's just components that have quotas, you don't need this kep.

Argo Workflows lacks native support within Kueue.

### Goals

- Enable support for Argo Workflow within Kueue, allowing users to simply add a label
`kueue.x-k8s.io/queue-name` to their workflows and submit them initially in a suspended state.
- Should be easily extended to support other workflow managers.

### Non-Goals


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe: Kueue to directly implement workflow management

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add the non-goals. Thank you.


## Proposal


### User Stories

#### Story 1

As a machine learning engineer, I need to preprocess data before executing a training job. My
workflow includes two steps: data preprocessing (which doesn't require a GPU) followed by a
PyTorchJob. I desire that the data preprocessing stage proceeds independently of GPU quota
availability.
Comment on lines +63 to +68
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This use case can be solved directly with the DAG/conditional capability of the workflow steps, e.g. PyTorchJob can start upon the completion of data processing step.


#### Story 2

As an ML engineer, my workflow consists of several GPU-dependent stages with uniform resource
requirements. I aim to recycle resources allocated to earlier workflow stages to boost efficiency
and resource utilization.
Comment on lines +72 to +74
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Argo Workflows, you can release the resources from completed steps as soon as they complete. Are there any other requirements that Kueue can help?


## Design Details
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
## Design Details
## Design Details
We start by listing the possible components of a workflow that can be taken as admission units in Kueue:


### Workflow as An Unit

Given the diverse resource, node affinity, and toleration requirements among workflow pods,
determining the necessary resources for each flavor becomes challenging for the controller.
Users must specify workflow resources via annotations like kueue.k8s.io/max-resources, tolerations
with kueue.k8s.io/toleration, and node selectors with kueue.k8s.io/node-selector.

#### Drawback and Limitations

- Inability to set distinct nodeSelectors and tolerations for multiple pod sets within a workflow.
KunWuLuan marked this conversation as resolved.
Show resolved Hide resolved

#### Advantages

- Simplified architecture facilitating straightforward implementation.

### Layer as An Unit

A workflow's template definition can be a container invocation (leaf template) or a list
of steps. For workflows composed of a single leaf template, a single workload is generated.

#### Examples
KunWuLuan marked this conversation as resolved.
Show resolved Hide resolved

In the following example, we solely discuss which patterns of workflows should warrant the
creation of workloads, without delving into the specifics of how these workloads are created,
nor addressing the division of responsibilities between the workflow-controller and kueue.

##### Example 1 (ParallelSteps Contains Leaf Template Only)
For a parallelStep with only leaf templates, we create a workload for the parallelStep.
In the following example, we create workloads for `loop-example-depth-2(0:depth-1-1)` and
`loop-example-depth-2(1:depth-1-2)`. Patterns of DAGs are similar, so we do not discuss them
separately.

```
# kubectl create -f - << EOF
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: loops-
namespace: argo
spec:
entrypoint: loop-example-depth-1
templates:
- name: loop-example-depth-2
steps:
- - name: print-message-loop
template: print-message
arguments:
parameters:
- name: message
value: "{{item}}"
withItems: # invoke print-message once for each item in parallel
- hello world # item 1
- goodbye world # item 2
- name: loop-example-depth-1
steps:
- - name: loop-example-depth-2
template: loop-example-depth-2
withItems:
- depth-1-1
- depth-1-2
- name: print-message
inputs:
parameters:
- name: message
container:
image: busybox
command: [echo]
args: ["{{inputs.parameters.message}}"]
EOF

# argo get loops-mlr6m
...

STEP TEMPLATE PODNAME DURATION MESSAGE
✔ loops-mlr6m loop-example-depth-1
└─┬─✔ loop-example-depth-2(0:depth-1-1) loop-example-depth-2
│ └─┬─✔ print-message-loop(0:hello world) print-message loops-mlr6m-print-message-2545579066 6s
│ └─✔ print-message-loop(1:goodbye world) print-message loops-mlr6m-print-message-323962978 5s
└─✔ loop-example-depth-2(1:depth-1-2) loop-example-depth-2
└─┬─✔ print-message-loop(0:hello world) print-message loops-mlr6m-print-message-520674448 4s
└─✔ print-message-loop(1:goodbye world) print-message loops-mlr6m-print-message-2893948292 6s
```

##### Example 2 (ParallelSteps Contains Leaf Template and Step)

For the step composed by a leaf template and another step, we create workload for the
leaf template. And the workload for the other step is created separately.
In the following example, we will create workload for `loops-644ch` and `loop-example-depth-2-2`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this example, does any of the workloads that you will create need to run after the other? Or can they run in parallel?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my opinion, we should only create workloads when the steps or tasks is able to start(all their dependencies have completed). So I think workloads created in this example can run in parallel.


```
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: loops-
namespace: argo
spec:
entrypoint: loop-example-depth-1
templates:
- name: loop-example-depth-2
steps:
- - name: print-message-loop
template: print-message
arguments:
parameters:
- name: message
value: "{{item}}"
withItems: # invoke print-message once for each item in parallel
- depth-2-1 # item 1
- depth-2-2 # item 2
- name: loop-example-depth-1
steps:
- - name: print-message
template: print-message
arguments:
parameters:
- name: message
value: "{{item}}"
withItems:
- depth-1-1
- depth-1-2
- name: loop-example-depth-2-2
template: loop-example-depth-2
- name: print-message
inputs:
parameters:
- name: message
container:
image: busybox
command: [echo]
args: ["{{inputs.parameters.message}}"]

# argo get loops-644ch
...
STEP TEMPLATE PODNAME DURATION MESSAGE
✔ loops-644ch loop-example-depth-1
└─┬─✔ loop-example-depth-2-2 loop-example-depth-2
│ └─┬─✔ print-message-loop(0:depth-2-1) print-message loops-644ch-print-message-1796012204 4s
│ └─✔ print-message-loop(1:depth-2-2) print-message loops-644ch-print-message-1116167650 6s
├─✔ print-message(0:depth-1-1) print-message loops-644ch-print-message-413467513 5s
└─✔ print-message(1:depth-1-2) print-message loops-644ch-print-message-3356863351 5s
```

##### Example 3 (Workflow with Single Container Template)

We create a workload for the single container template. For example:
```
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: hello-
spec:
entrypoint: main
templates:
- name: main
plugin:
hello: { }

# argo get hello-jtlcw
...
STEP TEMPLATE PODNAME DURATION MESSAGE
◷ hello-jtlcw main
```

#### How to suspend a workflow step by step

It is hard for users to add suspend template manually in a workflow before each
leaf template. Some tools are needed to help them to do this.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIK, when we use the Plain Pod integration, users do not need to add suspend to each leaf template, right?
Bacause Kueue mutates schedulingGates to all Pod without admission.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

We introduce three ways to manage the workflow. Responsebilities are different for the
workflow-controller and kueue-controller in two ways.

1. Give users a CLI to help users modifying workflows to add a specific suspend template for each step.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this CLI mean argo, kubectl, or kueuectl? which one?
And, the CLI is new one or existing one?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my option, the CLI should be implemented by each workflow managers.

When the workflows are suspended on this special suspend template, the job-controller in Kueue
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the special suspend template? What is different from the Job suspend field?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This depends on the workflow managers. For Argo Workflow, I think it is ok to add a reserved template name like kueue-suspend. Other workflow mangers like tekton must implement its own suspend method for this way.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was not sure what is the behavior of special suspend template.
Once the user submit Workflow to the cluster, the cli insert the suspend template to the input workflow, right?
The suspend template blocks the following step creation, and after the behind step is finalized, the Workflow automatically remove the suspend template, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Special suspend template will suspend the workflow before specific steps and tasks. When the dependencies of these steps and tasks completed, Argo Workflow will create a suspend node in workflow's status, so the controller in Kueue can create the workflow for this step and after the workflow is admitted the controller can resume this suspend node in workflow's status. Finally Argo Workflow controller continue to execute the workflow.

create workloads for the next step. Modification of workflow-controller is not needed for
this way, so that it is easy to iterate, and no need to manage the version of argo and kueue.
By in this way, users can modify their workflows to skip waiting in kueue, which maybe is not
acceptable for some users.

2. Integrated Suspend Capability: We propose introducing a new specification field within workflows, such
as suspendBySteps. If workflow.spec.suspendBySteps is set to true, the workflow-controller automatically
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you provide actual YAML or Go API proposal for new field?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

injects a special suspend template into each stepGroup. The kueue's job-controller monitors these and
generates workloads for the following steps. Upon workload admission, the suspension step is marked as
completed.

3. (Recommended) Kueue Webhook Enhancement: A new webhook is added within Kueue to intercept pod creations in the cluster.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it the recommended option?

Imagine that Argo maintainers give you a blank check to modify the Workflow API as we wish. Would this still be the recommended option?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mark this option as recommended because this option have no dependencies on workload managers.
Yes, if argo maintainers can modify the workflow api, this will not be the best api. Have remove the marks.

This webhook verifies if the incoming pods are governed by a workflow and if the workflow carries the label
`kueue.x-k8s.io/queue-name`. When these conditions are met, scheduling gates are appended to the pods. The
job-controller in Kueue subsequently organizes these pods into groups (identifiable within the workflow's
status) and creates corresponding workloads for each group. Following workload acceptance, the scheduling
gates are removed from the pods, enabling their scheduling and execution.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't the scheduling gates only apply after a workload has been admitted? Wouldn't this require something like readiness gates as proposed by @alculquicondor ? Otherwise, the workflow will hog Kueue quota without actually running anything.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Job ReadinessGates is only for batch/v1 Job, but here I guess that @KunWuLuan indicates the Pod and it's schedulingGates.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I mean the schedulingGates of Pods. @KPostOffice
Thanks @tenzen-y.


Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can we push all steps and leaves sequentially in this approach?
IIUC, when others push other Jobs into same queue during Workflow preprocessing executed, the following Workflow step needs to wait until the other irrelevant Job is terminated.

Let's imagine the situation where we push a ArgoWorkflow and a Job into the same queue:

  1. Push ArgoWorkflow into queue
  2. Creation of Workload for the first Argoworkflow step
  3. Admit the first Workflow step Pod
  4. Push an irrelevant Job into queue
  5. Finish the first Workflow step Pod
  6. Admit an irrelevant Job.
  7. The second Workflow step Pod waits until the irrelevant Job finish.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is what I call Admission Reuse. Maybe we should support relevant Job to reuse the resources to avoid waiting for irrelevant. Maybe this should be a standard API in Kueue, not just for workflow.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This KEP will not discuss the solution for this problem. And maybe the solution can be consider with this issue.
#1243

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I did not mean the feature.
Couldn't we create all Workloads with active=false once the workflow is created?
So, the creation of all Workloads for all steps allows us to enqueue all steps sequentially, and we can avoid the reservation of the whole of WorkflowTemplate.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Conditional execution is supported in most workflow managers, so we can not determine whether the subsequent steps will run or how many replicas will be created until the step node is created.

#### Advantages

- It can support queuing by layer level.

### Plain Pod as An Unit

SchedulingGates are added to the pods governed by the workflow with `kueue.x-k8s.io/queue-name`. And create
a separate workload for each of these pods. Enabling their scheduling and execution after workloads are
admitted by Kueue.

#### Drawback and Limitations

- Pods in same stepGroup are queued by different workload.
- Gang for stepGroup is not available.

#### Advantages

- Can reuse the existing ability.

## Additional Details

The implementation details will be added after the discussion.

### Test Plan

<!--
**Note:** *Not required until targeted at a release.*
The goal is to ensure that we don't accept enhancements with inadequate testing.

All code is expected to have adequate tests (eventually with coverage
expectations). Please adhere to the [Kubernetes testing guidelines][testing-guidelines]
when drafting this test plan.

[testing-guidelines]: https://git.k8s.io/community/contributors/devel/sig-testing/testing.md
-->

[x] I/we understand the owners of the involved components may require updates to
existing tests to make this code solid enough prior to committing the changes necessary
to implement this enhancement.

#### Unit Tests

<!--
In principle every added code should have complete unit test coverage, so providing
the exact set of tests will not bring additional value.
However, if complete unit test coverage is not possible, explain the reason of it
together with explanation why this is acceptable.
-->

<!--
Additionally, try to enumerate the core package you will be touching
to implement this enhancement and provide the current unit coverage for those
in the form of:
- <package>: <date> - <current test coverage>

This can inform certain test coverage improvements that we want to do before
extending the production code to implement this enhancement.
-->
The code will adhere to regular best practices for unit tests and coverage.

#### Integration tests

Integration tests should be added to ensure workflow work well like other kinds of workloads.

### Graduation Criteria
<!--

Clearly define what it means for the feature to be implemented and
considered stable.

If the feature you are introducing has high complexity, consider adding graduation
milestones with these graduation criteria:
- [Maturity levels (`alpha`, `beta`, `stable`)][maturity-levels]
- [Feature gate][feature gate] lifecycle
- [Deprecation policy][deprecation-policy]

[feature gate]: https://git.k8s.io/community/contributors/devel/sig-architecture/feature-gates.md
[maturity-levels]: https://git.k8s.io/community/contributors/devel/sig-architecture/api_changes.md#alpha-beta-and-stable-versions
[deprecation-policy]: https://kubernetes.io/docs/reference/using-api/deprecation-policy/
-->

The feature starts at the beta level.
14 changes: 14 additions & 0 deletions keps/74-support-argo-workflow/kep.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
title:
kep-number: 74
authors:
- "@kunwuluan"
status: draft
creation-date: 2024-09-04
reviewers:

# The target maturity stage in the current dev cycle for this KEP.
stage: alpha

# The milestone at which this feature was, or is targeted to be, at each stage.
milestone:
alpha: "v0.9"