Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement suspend semantics #1859

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/api/kubeflow.org_v1_generated.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,8 @@ RunPolicy encapsulates various runtime policies of the distributed training job,
| *`activeDeadlineSeconds`* __integer__ | Specifies the duration in seconds relative to the startTime that the job may be active before the system tries to terminate it; value must be positive integer.
| *`backoffLimit`* __integer__ | Optional number of retries before marking this job failed.
| *`schedulingPolicy`* __xref:{anchor_prefix}-github-com-kubeflow-training-operator-pkg-apis-kubeflow-org-v1-schedulingpolicy[$$SchedulingPolicy$$]__ | SchedulingPolicy defines the policy related to scheduling, e.g. gang-scheduling
| *`suspend`* __boolean__ | suspend specifies whether the Job controller should create Pods or not. If a Job is created with suspend set to true, no Pods are created by the Job controller. If a Job is suspended after creation (i.e. the flag goes from false to true), the Job controller will delete all active Pods and PodGroups associated with this Job. Users must design their workload to gracefully handle this. Suspending a Job will reset the StartTime field of the Job.
Defaults to false.
|===


Expand Down
4 changes: 4 additions & 0 deletions hack/python-sdk/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,10 @@
"description": "SchedulingPolicy defines the policy related to scheduling, e.g. gang-scheduling",
"$ref": "#/definitions/kubeflow.org.v1.SchedulingPolicy"
},
"suspend": {
"description": "suspend specifies whether the Job controller should create Pods or not. If a Job is created with suspend set to true, no Pods are created by the Job controller. If a Job is suspended after creation (i.e. the flag goes from false to true), the Job controller will delete all active Pods and PodGroups associated with this Job. Users must design their workload to gracefully handle this. Suspending a Job will reset the StartTime field of the Job.\n\nDefaults to false.",
"type": "boolean"
},
"ttlSecondsAfterFinished": {
"description": "TTLSecondsAfterFinished is the TTL to clean up jobs. It may take extra ReconcilePeriod seconds for the cleanup, since reconcile gets called periodically. Default to infinite.",
"type": "integer",
Expand Down
10 changes: 10 additions & 0 deletions manifests/base/crds/kubeflow.org_mpijobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7702,6 +7702,16 @@ spec:
format: int32
type: integer
type: object
suspend:
default: false
description: suspend specifies whether the Job controller should
create Pods or not. If a Job is created with suspend set to
true, no Pods are created by the Job controller. If a Job is
suspended after creation (i.e. the flag goes from false to true),
the Job controller will delete all active Pods and PodGroups
associated with this Job. Users must design their workload to
gracefully handle this.
type: boolean
ttlSecondsAfterFinished:
description: TTLSecondsAfterFinished is the TTL to clean up jobs.
It may take extra ReconcilePeriod seconds for the cleanup, since
Expand Down
10 changes: 10 additions & 0 deletions manifests/base/crds/kubeflow.org_mxjobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7701,6 +7701,16 @@ spec:
format: int32
type: integer
type: object
suspend:
default: false
description: suspend specifies whether the Job controller should
create Pods or not. If a Job is created with suspend set to
true, no Pods are created by the Job controller. If a Job is
suspended after creation (i.e. the flag goes from false to true),
the Job controller will delete all active Pods and PodGroups
associated with this Job. Users must design their workload to
gracefully handle this.
type: boolean
ttlSecondsAfterFinished:
description: TTLSecondsAfterFinished is the TTL to clean up jobs.
It may take extra ReconcilePeriod seconds for the cleanup, since
Expand Down
10 changes: 10 additions & 0 deletions manifests/base/crds/kubeflow.org_paddlejobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8212,6 +8212,16 @@ spec:
format: int32
type: integer
type: object
suspend:
default: false
description: suspend specifies whether the Job controller should
create Pods or not. If a Job is created with suspend set to
true, no Pods are created by the Job controller. If a Job is
suspended after creation (i.e. the flag goes from false to true),
the Job controller will delete all active Pods and PodGroups
associated with this Job. Users must design their workload to
gracefully handle this.
type: boolean
ttlSecondsAfterFinished:
description: TTLSecondsAfterFinished is the TTL to clean up jobs.
It may take extra ReconcilePeriod seconds for the cleanup, since
Expand Down
10 changes: 10 additions & 0 deletions manifests/base/crds/kubeflow.org_pytorchjobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8247,6 +8247,16 @@ spec:
format: int32
type: integer
type: object
suspend:
default: false
description: suspend specifies whether the Job controller should
create Pods or not. If a Job is created with suspend set to
true, no Pods are created by the Job controller. If a Job is
suspended after creation (i.e. the flag goes from false to true),
the Job controller will delete all active Pods and PodGroups
associated with this Job. Users must design their workload to
gracefully handle this.
type: boolean
ttlSecondsAfterFinished:
description: TTLSecondsAfterFinished is the TTL to clean up jobs.
It may take extra ReconcilePeriod seconds for the cleanup, since
Expand Down
10 changes: 10 additions & 0 deletions manifests/base/crds/kubeflow.org_tfjobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,16 @@ spec:
format: int32
type: integer
type: object
suspend:
default: false
description: suspend specifies whether the Job controller should
create Pods or not. If a Job is created with suspend set to
true, no Pods are created by the Job controller. If a Job is
suspended after creation (i.e. the flag goes from false to true),
the Job controller will delete all active Pods and PodGroups
associated with this Job. Users must design their workload to
gracefully handle this.
type: boolean
ttlSecondsAfterFinished:
description: TTLSecondsAfterFinished is the TTL to clean up jobs.
It may take extra ReconcilePeriod seconds for the cleanup, since
Expand Down
10 changes: 10 additions & 0 deletions manifests/base/crds/kubeflow.org_xgboostjobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,16 @@ spec:
format: int32
type: integer
type: object
suspend:
default: false
description: suspend specifies whether the Job controller should
create Pods or not. If a Job is created with suspend set to
true, no Pods are created by the Job controller. If a Job is
suspended after creation (i.e. the flag goes from false to true),
the Job controller will delete all active Pods and PodGroups
associated with this Job. Users must design their workload to
gracefully handle this.
type: boolean
ttlSecondsAfterFinished:
description: TTLSecondsAfterFinished is the TTL to clean up jobs.
It may take extra ReconcilePeriod seconds for the cleanup, since
Expand Down
16 changes: 16 additions & 0 deletions pkg/apis/kubeflow.org/v1/common_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,9 @@ const (
// The training is complete without error.
JobSucceeded JobConditionType = "Succeeded"

// JobSuspended means the job has been suspended.
JobSuspended JobConditionType = "Suspended"

// JobFailed means one or more sub-resources (e.g. services/pods) of this job
// reached phase failed with no restarting.
// The training has failed its execution.
Expand Down Expand Up @@ -205,6 +208,19 @@ type RunPolicy struct {
// SchedulingPolicy defines the policy related to scheduling, e.g. gang-scheduling
// +optional
SchedulingPolicy *SchedulingPolicy `json:"schedulingPolicy,omitempty"`

// suspend specifies whether the Job controller should create Pods or not.
// If a Job is created with suspend set to true, no Pods are created by
// the Job controller. If a Job is suspended after creation (i.e. the
// flag goes from false to true), the Job controller will delete all
// active Pods and PodGroups associated with this Job.
// Users must design their workload to gracefully handle this.
// Suspending a Job will reset the StartTime field of the Job.
//
// Defaults to false.
// +kubebuilder:default:=false
// +optional
Suspend *bool `json:"suspend,omitempty"`
}

// SchedulingPolicy encapsulates various scheduling policies of the distributed training
Expand Down
7 changes: 7 additions & 0 deletions pkg/apis/kubeflow.org/v1/openapi_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions pkg/apis/kubeflow.org/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

103 changes: 78 additions & 25 deletions pkg/controller.v1/common/job.go
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the core logic for suspend semantics.

Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/kubeflow/training-operator/pkg/core"
commonutil "github.com/kubeflow/training-operator/pkg/util"
"github.com/kubeflow/training-operator/pkg/util/k8sutil"
trainutil "github.com/kubeflow/training-operator/pkg/util/train"

log "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
Expand All @@ -38,28 +39,30 @@ import (
volcanov1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1"
)

func (jc *JobController) DeletePodsAndServices(runPolicy *apiv1.RunPolicy, job interface{}, pods []*corev1.Pod) error {
// DeletePodsAndServices deletes pods and services considering cleanPodPolicy.
// However, if the job doesn't have Succeeded or Failed condition, it ignores cleanPodPolicy.
func (jc *JobController) DeletePodsAndServices(runtimeObject runtime.Object, runPolicy *apiv1.RunPolicy, jobStatus apiv1.JobStatus, pods []*corev1.Pod) error {
if len(pods) == 0 {
return nil
}

// Delete nothing when the cleanPodPolicy is None.
if *runPolicy.CleanPodPolicy == apiv1.CleanPodPolicyNone {
// Delete nothing when the cleanPodPolicy is None and the job has Succeeded or Failed condition.
if commonutil.IsFinished(jobStatus) && *runPolicy.CleanPodPolicy == apiv1.CleanPodPolicyNone {
return nil
}

for _, pod := range pods {
// Note that pending pod will turn into running once schedulable,
// not cleaning it may leave orphan running pod in the future,
// we should treat it equivalent to running phase here.
if *runPolicy.CleanPodPolicy == apiv1.CleanPodPolicyRunning && pod.Status.Phase != corev1.PodRunning && pod.Status.Phase != corev1.PodPending {
if commonutil.IsFinished(jobStatus) && *runPolicy.CleanPodPolicy == apiv1.CleanPodPolicyRunning && pod.Status.Phase != corev1.PodRunning && pod.Status.Phase != corev1.PodPending {
continue
}
if err := jc.PodControl.DeletePod(pod.Namespace, pod.Name, job.(runtime.Object)); err != nil {
if err := jc.PodControl.DeletePod(pod.Namespace, pod.Name, runtimeObject); err != nil {
return err
}
// Pod and service have the same name, thus the service could be deleted using pod's name.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

side question: why is there a service per pod? That sounds like unnecessary load.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, ml framework configs need different FQDN for each pod.

For example, tensorflow ClusterSpec: https://www.tensorflow.org/api_docs/python/tf/train/ClusterSpec

Copy link

@alculquicondor alculquicondor Jul 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wouldn't a single headless Service allow that? similar to this https://kubernetes.io/docs/tasks/job/job-with-pod-to-pod-communication/

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, right.
Actually, using a single headless service is planning, although it is closed :(

#1030

if err := jc.ServiceControl.DeleteService(pod.Namespace, pod.Name, job.(runtime.Object)); err != nil {
if err := jc.ServiceControl.DeleteService(pod.Namespace, pod.Name, runtimeObject); err != nil {
return err
}
}
Expand Down Expand Up @@ -117,23 +120,9 @@ func (jc *JobController) ReconcileJobs(
}

oldStatus := jobStatus.DeepCopy()
if commonutil.IsSucceeded(jobStatus) || commonutil.IsFailed(jobStatus) {
// If the Job is succeed or failed, delete all pods and services.
if err := jc.DeletePodsAndServices(runPolicy, job, pods); err != nil {
return err
}

if jc.Config.EnableGangScheduling() {
jc.Recorder.Event(runtimeObject, corev1.EventTypeNormal, "JobTerminated", "Job has been terminated. Deleting PodGroup")
if err := jc.DeletePodGroup(metaObject); err != nil {
jc.Recorder.Eventf(runtimeObject, corev1.EventTypeWarning, "FailedDeletePodGroup", "Error deleting: %v", err)
return err
} else {
jc.Recorder.Eventf(runtimeObject, corev1.EventTypeNormal, "SuccessfulDeletePodGroup", "Deleted PodGroup: %v", jobName)
}
}

if err := jc.CleanupJob(runPolicy, jobStatus, job); err != nil {
if commonutil.IsFinished(jobStatus) {
// If the Job is succeed or failed, delete all pods, services, and podGroup.
if err = jc.CleanUpResources(runPolicy, runtimeObject, metaObject, jobStatus, pods); err != nil {
return err
}

Expand All @@ -155,6 +144,44 @@ func (jc *JobController) ReconcileJobs(
return nil
}

if trainutil.IsJobSuspended(runPolicy) {
if err = jc.CleanUpResources(runPolicy, runtimeObject, metaObject, jobStatus, pods); err != nil {
return err
}
for rType := range jobStatus.ReplicaStatuses {
jobStatus.ReplicaStatuses[rType].Active = 0
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this would be set anyway by the other code once the replica pods are cleaned up. This is the approach we take in MPIJob and Job. I would like if we could apply here the same approach

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't have any other codes to reset the Active field, and the replicaStatus[*].Active is reset only by

if commonutil.IsSucceeded(jobStatus) {
for rtype := range jobStatus.ReplicaStatuses {
jobStatus.ReplicaStatuses[rtype].Succeeded += jobStatus.ReplicaStatuses[rtype].Active
jobStatus.ReplicaStatuses[rtype].Active = 0
}
}
.

So we need to reset the Active field here if the job is suspended.

However, since I think we should reset the Active field when cleaning up replica pods, I would do the refactoring in the follow-ups.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there any code that sets Active to non zero?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there any code that sets Active to non zero?

Yes, here:

updateJobReplicaStatuses(jobStatus, rType, pod)
->
func updateJobReplicaStatuses(jobStatus *apiv1.JobStatus, rtype apiv1.ReplicaType, pod *corev1.Pod) {
->
func UpdateJobReplicaStatuses(jobStatus *apiv1.JobStatus, rtype apiv1.ReplicaType, pod *corev1.Pod) {
switch pod.Status.Phase {
case corev1.PodRunning:
if pod.DeletionTimestamp != nil {
// when node is not ready, the pod will be in terminating state.
// Count deleted Pods as failures to account for orphan Pods that
// never have a chance to reach the Failed phase.
jobStatus.ReplicaStatuses[rtype].Failed++
} else {
jobStatus.ReplicaStatuses[rtype].Active++
}
case corev1.PodSucceeded:
jobStatus.ReplicaStatuses[rtype].Succeeded++
case corev1.PodFailed:
jobStatus.ReplicaStatuses[rtype].Failed++
}
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so those functions wouldn't be called in the next reconcile, essentially resetting the number of active pods?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so those functions wouldn't be called in the next reconcile

Yes. If the job is suspended, JobController never calls ReconcilePods(): https://github.com/tenzen-y/training-operator/blob/ce7259ecfaacbd529b6b1095dd6b632517dac0d0/pkg/controller.v1/common/job.go#L147-L173

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of setting counts manually, why can't it be derived from the status of all pods? Since we have already cleaned up, active pods will be zero. We can do this refactoring separately as well.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't have functions to decrease Active count, only have functions to reset the count. So I guess we should refactor ReconcileJob(). However, the refactor will affect Succeeded and Failed conditions, too. So I would like to work on another PR.

}
msg := fmt.Sprintf("%s %s is suspended.", jobKind, jobName)
if commonutil.IsRunning(jobStatus) {
if err = commonutil.UpdateJobConditions(&jobStatus, apiv1.JobRunning, corev1.ConditionFalse,
commonutil.NewReason(jobKind, commonutil.JobSuspendedReason), msg); err != nil {
return err
}
}
// We add the suspended condition to the job only when the job doesn't have a suspended condition.
if !commonutil.IsSuspended(jobStatus) {
if err = commonutil.UpdateJobConditions(&jobStatus, apiv1.JobSuspended, corev1.ConditionTrue,
commonutil.NewReason(jobKind, commonutil.JobSuspendedReason), msg); err != nil {
return err
}
}
jc.Recorder.Event(runtimeObject, corev1.EventTypeNormal, commonutil.NewReason(jobKind, commonutil.JobSuspendedReason), msg)
if !reflect.DeepEqual(*oldStatus, jobStatus) {
return jc.Controller.UpdateJobStatusInApiServer(job, &jobStatus)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to let the reconcile function continue, so that other status fields are updated, such as Active (mentioned above)? I think it is preferable not to update here but let other fields be updated too, so that we can update as much as possible in a single reconciliation run.

Copy link
Member Author

@tenzen-y tenzen-y Jul 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is preferable not to update here but let other fields be updated too, so that we can update as much as possible in a single reconciliation run.

That makes sense. As I say above, we need to refactor the ReconcilePods:

func (jc *JobController) ReconcilePods(
.

So I would do your suggestion in follow-ups.

}
return nil
}
if commonutil.IsSuspended(jobStatus) {
tenzen-y marked this conversation as resolved.
Show resolved Hide resolved
msg := fmt.Sprintf("%s %s is resumed.", jobKind, jobName)
if err = commonutil.UpdateJobConditions(&jobStatus, apiv1.JobSuspended, corev1.ConditionFalse,
commonutil.NewReason(jobKind, commonutil.JobResumedReason), msg); err != nil {
return err
}
now := metav1.Now()
jobStatus.StartTime = &now
jc.Recorder.Eventf(runtimeObject, corev1.EventTypeNormal, commonutil.NewReason(jobKind, commonutil.JobResumedReason), msg)
}

// retrieve the previous number of retry
previousRetry := jc.WorkQueue.NumRequeues(jobKey)

Expand Down Expand Up @@ -205,7 +232,7 @@ func (jc *JobController) ReconcileJobs(

// If the Job exceeds backoff limit or is past active deadline
// delete all pods and services, then set the status to failed
if err := jc.DeletePodsAndServices(runPolicy, job, pods); err != nil {
if err := jc.DeletePodsAndServices(runtimeObject, runPolicy, jobStatus, pods); err != nil {
return err
}

Expand All @@ -225,7 +252,7 @@ func (jc *JobController) ReconcileJobs(

jc.Recorder.Event(runtimeObject, corev1.EventTypeNormal, commonutil.NewReason(jobKind, commonutil.JobFailedReason), failureMessage)

if err = commonutil.UpdateJobConditions(&jobStatus, apiv1.JobFailed, commonutil.NewReason(jobKind, commonutil.JobFailedReason), failureMessage); err != nil {
if err = commonutil.UpdateJobConditions(&jobStatus, apiv1.JobFailed, corev1.ConditionTrue, commonutil.NewReason(jobKind, commonutil.JobFailedReason), failureMessage); err != nil {
log.Infof("Append job condition error: %v", err)
return err
}
Expand Down Expand Up @@ -344,6 +371,32 @@ func (jc *JobController) ReconcileJobs(
return nil
}

func (jc *JobController) CleanUpResources(
runPolicy *apiv1.RunPolicy,
runtimeObject runtime.Object,
metaObject metav1.Object,
jobStatus apiv1.JobStatus,
pods []*v1.Pod,
) error {
if err := jc.DeletePodsAndServices(runtimeObject, runPolicy, jobStatus, pods); err != nil {
return err
}
if jc.Config.EnableGangScheduling() {

jc.Recorder.Event(runtimeObject, corev1.EventTypeNormal, "JobTerminated", "Job has been terminated. Deleting PodGroup")
if err := jc.DeletePodGroup(metaObject); err != nil {
jc.Recorder.Eventf(runtimeObject, corev1.EventTypeWarning, "FailedDeletePodGroup", "Error deleting: %v", err)
return err
} else {
jc.Recorder.Eventf(runtimeObject, corev1.EventTypeNormal, "SuccessfulDeletePodGroup", "Deleted PodGroup: %v", metaObject.GetName())
}
}
if err := jc.CleanupJob(runPolicy, jobStatus, runtimeObject); err != nil {
return err
}
return nil
}

// ResetExpectations reset the expectation for creates and deletes of pod/service to zero.
func (jc *JobController) ResetExpectations(jobKey string, replicas map[apiv1.ReplicaType]*apiv1.ReplicaSpec) error {
var allErrs error
Expand Down
Loading