Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for the execution policy API in JobSet #672

Open
3 tasks done
Tracked by #2170
andreyvelich opened this issue Sep 6, 2024 · 29 comments
Open
3 tasks done
Tracked by #2170

Support for the execution policy API in JobSet #672

andreyvelich opened this issue Sep 6, 2024 · 29 comments
Assignees
Labels
kind/api-change Categorizes issue or PR as related to adding, removing, or otherwise changing an API kind/feature Categorizes issue or PR as related to a new feature.

Comments

@andreyvelich
Copy link
Contributor

andreyvelich commented Sep 6, 2024

What would you like to be added:

The new executionPolicy API which allows to submit replicated Jobs in order.
When the first replicated Jobs are the reached required condition, the next replicated Jobs are created.

Note. The complex DAG workflow capability is out of scope of this API, since we don't want to implement workflow functionality as part of this KEP. Users should consider to use Argo Workflows or Tekton Pipelines if they need it.

The initial API design:

type JobSetSpec struct {
	ExecutionPolicy *ExecutionPolicy `json:"executionPolicy,omitempty"`
}

type ExecutionPolicy struct {
	// Order in which Jobs will be created. The default is AnyOrder.
	ExecutionPolicyOrder ExecutionPolicyOption `json:"executionPolicyOrder"`

	// After all replicated Jobs reach this status, the JobSet will create the next replicated Jobs.
	ReplicatedJobsStatus ReplicatedJobsStatusOption `json:"replicatedJobsStatus"`
}

type ExecutionPolicyOption string

const (
	AnyOrder ExecutionPolicyOption = "AnyOrder"

	InOrder ExecutionPolicyOption = "InOrder"
)

type ReplicatedJobsStatusOption string

// We don't add Ready condition here, since users can use the `startupPolicy` API for that.
const (
	ReadyStatus ReplicatedJobsStatusOption = "Succeeded"

	FailedStatus ReplicatedJobsStatusOption = "Failed"

	ActiveStatus ReplicatedJobsStatusOption = "Active"

	SuspendedStatus ReplicatedJobsStatusOption = "Suspended"
)

Why is this needed:

More context in this Kubernetes wg-batch thread: https://kubernetes.slack.com/archives/C032ZE66A2X/p1725400839102729

As part of the Kubeflow Training V2 APIs, we want to implement the LLM runtimes for LLMs fine-tuning: kubeflow/training-operator#2170
That will require JobSet to orchestrate the sequence of 2-3 Jobs: Initializer -> Trainer -> Post-Processor.
The capacity management for such workload should be allocated for all Jobs combined and be controlled by Kueue.
When TrainJob is suspended, we will suspend all underlying Jobs.

I think, we might have more use-cases from the HPC side. Any thoughts @vsoch @alculquicondor ?

This enhancement requires the following artifacts:

  • Design doc
  • API change
  • Docs update

The artifacts should be linked in subsequent comments.

cc @tenzen-y @kannon92 @ahg-g @johnugeorge @akshaychitneni @shravan-achar

@andreyvelich andreyvelich changed the title Support for execution policy API in JobSet Support for the execution policy API in JobSet Sep 6, 2024
@andreyvelich
Copy link
Contributor Author

/kind feature
/kind api-change

@k8s-ci-robot k8s-ci-robot added kind/feature Categorizes issue or PR as related to a new feature. kind/api-change Categorizes issue or PR as related to adding, removing, or otherwise changing an API labels Sep 6, 2024
@kannon92
Copy link
Contributor

kannon92 commented Sep 6, 2024

SGTM!

(The description shouldn’t be a design but I think that we don’t need suspended. And you also declare ReadyStatus for the succeeded state)

@vsoch
Copy link
Contributor

vsoch commented Sep 6, 2024

This would be great! I agree to not try to create a workflow tool, but allowing this basic dependency structure is something that workflow tools can use. +1 from me.

@googs1025
Copy link
Member

That sounds like a really cool feature! I'm curious about something though. Currently, in the Jobset, we have multiple policies like StartupPolicy and FailurePolicy. As we keep adding more features to our policies, we might to think about whether we should make them work together or if we need to set some restrictions.(for example, can StartupPolicy and ExecutionPolicy coexist) This way, users can avoid making a lot of mistakes when using them.

@danielvegamyhre
Copy link
Contributor

I think the feature makes sense; executing "Initializer -> Trainer -> Post-Processor" stages sequentially for LLM fine-tuning is an interesting concrete use case. @andreyvelich can you educate me on why the fine-tuning steps must be decomposed into separate jobs? Why can't the same job perform initialization, training, then post-processing?

As we keep adding more features to our policies, we might to think about whether we should make them work together or if we need to set some restrictions.(for example, can StartupPolicy and ExecutionPolicy coexist) This way, users can avoid making a lot of mistakes when using them.

@googs1025 we can add validation to the JobSet webhook which ensures the spec configurations are compatible with eachother.

@andreyvelich
Copy link
Contributor Author

@andreyvelich can you educate me on why the fine-tuning steps must be decomposed into separate jobs?

@danielvegamyhre Sure. Given the size of today's models (e.g. >100b params) and datasets, the download time for pre-trained model and dataset takes time, e.g. 1-2 hours. If we define this initialization steps as initContainer on the training Node, it will lock the GPUs that we allocate for this training Node.
We don't want to waste GPU resources when we do initialization.

@tenzen-y
Copy link
Member

tenzen-y commented Sep 9, 2024

As we keep adding more features to our policies, we might to think about whether we should make them work together or if we need to set some restrictions.(for example, can StartupPolicy and ExecutionPolicy coexist) This way, users can avoid making a lot of mistakes when using them.

The current JobSet APi is alpha. So, I think it would be great to consider how we can cooperate some policies and how we can consolidate some fields into one once we graduate to beta stage.

This is one of the reasons why we keep parking the alpha stage, as we discussed in the previous meeting.

@alculquicondor
Copy link

If the goal is not to look the GPUs during other stages, it is important to note what your expectations are for the future when queuing in Kueue.
It sounds like we should be queuing the steps separately, meaning that we need separate "suspend" fields or 3 different objects.

This would be great! I agree to not try to create a workflow tool, but allowing this basic dependency structure is something that workflow tools can use. +1 from me.

My concern is where do we stop. You add this feature now, then someone comes and asks "what if we add X", and suddenly we have a workflow tool.

@tenzen-y
Copy link
Member

tenzen-y commented Sep 9, 2024

If the goal is not to look the GPUs during other stages, it is important to note what your expectations are for the future when queuing in Kueue. It sounds like we should be queuing the steps separately, meaning that we need separate "suspend" fields or 3 different objects.

Good point. Actually, I guess that the JobSet already has the same problem in the StartupPolicy. The problem is limited compared with executionPolicy, though. So, I'm wondering if we should investigate a solid approach to support the Jobs with ordering and steps.

My concern is where do we stop. You add this feature now, then someone comes and asks "what if we add X", and suddenly we have a workflow tool.

I have the same concern. So, if we introduce this feature, I think we need to consider API to prevent conditional parameters.
Specifically, the below API has potential extensibility, but I believe that we should not introduce such an extensibility.

type ExecutionPolicy struct {
	// Order in which Jobs will be created. The default is AnyOrder.
	ExecutionPolicyOrder ExecutionPolicyOption `json:"executionPolicyOrder"`

	// After all replicated Jobs reach this status, the JobSet will create the next replicated Jobs.
	ReplicatedJobsStatus ReplicatedJobsStatusOption `json:"replicatedJobsStatus"`
}

@tenzen-y
Copy link
Member

tenzen-y commented Sep 9, 2024

It sounds like we should be queuing the steps separately, meaning that we need separate "suspend" fields or 3 different objects.

I think that the separate "suspend" field and 3 different objects are hard to maintain and not straightforward.
So, I'm leaning toward the JobReadynessGate (current K8s does not support it) since it is a straightforward approach and intuitive, simple, and generic approach.

@danielvegamyhre
Copy link
Contributor

My concern is where do we stop. You add this feature now, then someone comes and asks "what if we add X", and suddenly we have a workflow tool.

+1

I think that the separate "suspend" field and 3 different objects are hard to maintain and not straightforward.

Are you referring to suspending each of the 3 jobs ( Initializer job, Trainer job, Post-Processor job) and resuming them one at a time as the prior job's execution completes?

@vsoch
Copy link
Contributor

vsoch commented Sep 12, 2024

My concern is where do we stop. You add this feature now, then someone comes and asks "what if we add X", and suddenly we have a workflow tool.

You are starting to sound like me! The answer is that we stop there with a "depends on." If you look at workload managers, it's fairly common to be able to say:

submit job a
submit job b that depends on a

And that's it. There is no further orchestration, suspend or waiting, etc. It's just a flat level depends on that can provide a simple API for actual workflow tools (that represent a dag) to use.

@alculquicondor
Copy link

alculquicondor commented Sep 13, 2024

You are saying that you want to support arbitrary DAGs already, whereas the initial proposal is just to support a sequence of jobs. That's the kind of leaps I was rather against.

An arbitrary DAG is already a workflow manager. Why aren't we using a workflow manager as opposed to add workflow capabilities to jobset?

@vsoch
Copy link
Contributor

vsoch commented Sep 13, 2024

You are saying that you want to support arbitrary DAGs already, whereas the initial proposal is just to support a sequence of jobs. That's the kind of leaps I was rather against.

Where did I say that?

@alculquicondor
Copy link

The answer is that we stop there with a "depends on."

Unless I misinterpreted what you were trying to say.

@vsoch
Copy link
Contributor

vsoch commented Sep 13, 2024

Depends on (in and of itself) does not create what most would consider substantial enough for a workflow DAG. It provides a minimal API in the workload manager so that workflow tools can more easily create complex logic.

The workload manager has no knowledge of checking state, submission, or custom action logic beyond the very simple "A depends on B." It's not really enough to be called a workflow but it enables workflow tools to better interact with it. I am absolutely not saying that I want to support arbitrary DAGs. I am pointing out a common pattern in the workload manager ecosystem for HPC that has existed for decades, and made it possible for workflow tools to better integrate.

@alculquicondor
Copy link

Do you have a minimal proposal for what jobset could provide? Are you saying that you want to say "jobset X depends on jobset Y"? I ask that because the current proposal is along the lines of "this jobset has multiple steps within it, each of which can be started after the other".

Those are two very different approaches.

@vsoch
Copy link
Contributor

vsoch commented Sep 13, 2024

No I don’t think depends on would fall on the level of the jobset, more likely the job.

@danielvegamyhre
Copy link
Contributor

No I don’t think depends on would fall on the level of the jobset, more likely the job.

I think having the "depends on" relationship operate on the ReplicatedJob level may be better - for example, for something like the "initializer -> trainer -> post-processor" use case described in this issue, I think we may want to have each stage be a separate ReplicatedJob (since for example, the "trainer" stage may be composed of several concurrent jobs running on different groups of infrastructure, but they all depend on the "initializer" stage being completed).

@ahg-g
Copy link
Contributor

ahg-g commented Sep 13, 2024

No I don’t think depends on would fall on the level of the jobset, more likely the job.

I think having the "depends on" relationship operate on the ReplicatedJob level may be better - for example, for something like the "initializer -> trainer -> post-processor" use case described in this issue, I think we may want to have each stage be a separate ReplicatedJob (since for example, the "trainer" stage may be composed of several concurrent jobs running on different groups of infrastructure, but they all depend on the "initializer" stage being completed).

Exactly, that is what I had in mind when I suggested this in the slack channel, similar to StartupPolicy which is executed at the ReplicatedJob level too. ExecutionPolicy and StartupPolicy are similar, but one is based on readiness status, the other based on completion status.

@ahg-g
Copy link
Contributor

ahg-g commented Sep 13, 2024

also, the suggestion here is not to have an API to explicitly define the chain, it is based on the order in the replicatedJob list, again similar to StartupPolicy.

@danielvegamyhre
Copy link
Contributor

No I don’t think depends on would fall on the level of the jobset, more likely the job.

I think having the "depends on" relationship operate on the ReplicatedJob level may be better - for example, for something like the "initializer -> trainer -> post-processor" use case described in this issue, I think we may want to have each stage be a separate ReplicatedJob (since for example, the "trainer" stage may be composed of several concurrent jobs running on different groups of infrastructure, but they all depend on the "initializer" stage being completed).

Exactly, that is what I had in mind when I suggested this in the slack channel, similar to StartupPolicy which is executed at the ReplicatedJob level too. ExecutionPolicy and StartupPolicy are similar, but one is based on readiness status, the other based on completion status.

Awesome, well if we are all aligned on this I can implement it.

@ahg-g
Copy link
Contributor

ahg-g commented Sep 14, 2024

The only thing we probably want to be configured is how many entries to run sequentially. The default (expressed as nil) is all, but the user could set it to 1 for example to run the first entry (does some initialization for the whole workload) and the rest of entries can execute in parallel.

@alculquicondor
Copy link

How should we calculate quota usage for such a sequence?

@alculquicondor
Copy link

cc @mimowo @mwielgus

@andreyvelich
Copy link
Contributor Author

andreyvelich commented Sep 24, 2024

Yes, this proposal is only for Job sequence, and as @vsoch mentioned this concept exists in HPC space for a long time.

Awesome, well if we are all aligned on this I can implement it.

@ahg-g @danielvegamyhre Do we need a small KEP for it or we can jump directly to the implementation ?

How should we calculate quota usage for such a sequence?

@alculquicondor I think, in the first iteration we should calculate the quota as for normal JobSet (e.g. admin the JobSet if all replicated Jobs resources are available). However, it would be nice to admin/suspend steps in Job sequence separately.
But I feel that we should discuss it after we integrate Argo Workflow into Kueue: kubernetes-sigs/kueue#2976
@tenzen-y Any thoughts ?

@kannon92
Copy link
Contributor

kannon92 commented Sep 24, 2024

@ahg-g @danielvegamyhre Do we need a small KEP for it or we can jump directly to the implementation ?

We should have a small KEP for this.

@alculquicondor
Copy link

I think, in the first iteration we should calculate the quota as for normal JobSet (e.g. admin the JobSet if all replicated Jobs resources are available). However, it would be nice to admin/suspend steps in Job sequence separately.

I agree that we can leave implementation details for a second iteration. However, there are too many moving pieces here to completely ignore the question until later. We at least need a high level plan of how it's going to work.
Users will be very surprised that Kueue needs to reserve space for all of the steps of the sequence.

But I feel that we should discuss it after we integrate Argo Workflow into Kueue:

Not really. We need to figure out if the proposed design for Argo will also work for Jobset. I don't think it currently does, as the proposal relies on the pod group integration, which wouldn't be ideal for jobset.

@andreyvelich
Copy link
Contributor Author

/assign @andreyvelich

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kind/api-change Categorizes issue or PR as related to adding, removing, or otherwise changing an API kind/feature Categorizes issue or PR as related to a new feature.
Projects
None yet
Development

No branches or pull requests

9 participants