Skip to content

Commit

Permalink
Implement suspend semantics (#1859)
Browse files Browse the repository at this point in the history
Signed-off-by: Yuki Iwai <yuki.iwai.tz@gmail.com>
  • Loading branch information
tenzen-y authored Jul 20, 2023
1 parent 72f2512 commit 64e39f2
Show file tree
Hide file tree
Showing 51 changed files with 919 additions and 158 deletions.
2 changes: 2 additions & 0 deletions docs/api/kubeflow.org_v1_generated.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,8 @@ RunPolicy encapsulates various runtime policies of the distributed training job,
| *`activeDeadlineSeconds`* __integer__ | Specifies the duration in seconds relative to the startTime that the job may be active before the system tries to terminate it; value must be positive integer.
| *`backoffLimit`* __integer__ | Optional number of retries before marking this job failed.
| *`schedulingPolicy`* __xref:{anchor_prefix}-github-com-kubeflow-training-operator-pkg-apis-kubeflow-org-v1-schedulingpolicy[$$SchedulingPolicy$$]__ | SchedulingPolicy defines the policy related to scheduling, e.g. gang-scheduling
| *`suspend`* __boolean__ | suspend specifies whether the Job controller should create Pods or not. If a Job is created with suspend set to true, no Pods are created by the Job controller. If a Job is suspended after creation (i.e. the flag goes from false to true), the Job controller will delete all active Pods and PodGroups associated with this Job. Users must design their workload to gracefully handle this. Suspending a Job will reset the StartTime field of the Job.
Defaults to false.
|===


Expand Down
4 changes: 4 additions & 0 deletions hack/python-sdk/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,10 @@
"description": "SchedulingPolicy defines the policy related to scheduling, e.g. gang-scheduling",
"$ref": "#/definitions/kubeflow.org.v1.SchedulingPolicy"
},
"suspend": {
"description": "suspend specifies whether the Job controller should create Pods or not. If a Job is created with suspend set to true, no Pods are created by the Job controller. If a Job is suspended after creation (i.e. the flag goes from false to true), the Job controller will delete all active Pods and PodGroups associated with this Job. Users must design their workload to gracefully handle this. Suspending a Job will reset the StartTime field of the Job.\n\nDefaults to false.",
"type": "boolean"
},
"ttlSecondsAfterFinished": {
"description": "TTLSecondsAfterFinished is the TTL to clean up jobs. It may take extra ReconcilePeriod seconds for the cleanup, since reconcile gets called periodically. Default to infinite.",
"type": "integer",
Expand Down
10 changes: 10 additions & 0 deletions manifests/base/crds/kubeflow.org_mpijobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7702,6 +7702,16 @@ spec:
format: int32
type: integer
type: object
suspend:
default: false
description: suspend specifies whether the Job controller should
create Pods or not. If a Job is created with suspend set to
true, no Pods are created by the Job controller. If a Job is
suspended after creation (i.e. the flag goes from false to true),
the Job controller will delete all active Pods and PodGroups
associated with this Job. Users must design their workload to
gracefully handle this.
type: boolean
ttlSecondsAfterFinished:
description: TTLSecondsAfterFinished is the TTL to clean up jobs.
It may take extra ReconcilePeriod seconds for the cleanup, since
Expand Down
10 changes: 10 additions & 0 deletions manifests/base/crds/kubeflow.org_mxjobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7701,6 +7701,16 @@ spec:
format: int32
type: integer
type: object
suspend:
default: false
description: suspend specifies whether the Job controller should
create Pods or not. If a Job is created with suspend set to
true, no Pods are created by the Job controller. If a Job is
suspended after creation (i.e. the flag goes from false to true),
the Job controller will delete all active Pods and PodGroups
associated with this Job. Users must design their workload to
gracefully handle this.
type: boolean
ttlSecondsAfterFinished:
description: TTLSecondsAfterFinished is the TTL to clean up jobs.
It may take extra ReconcilePeriod seconds for the cleanup, since
Expand Down
10 changes: 10 additions & 0 deletions manifests/base/crds/kubeflow.org_paddlejobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8212,6 +8212,16 @@ spec:
format: int32
type: integer
type: object
suspend:
default: false
description: suspend specifies whether the Job controller should
create Pods or not. If a Job is created with suspend set to
true, no Pods are created by the Job controller. If a Job is
suspended after creation (i.e. the flag goes from false to true),
the Job controller will delete all active Pods and PodGroups
associated with this Job. Users must design their workload to
gracefully handle this.
type: boolean
ttlSecondsAfterFinished:
description: TTLSecondsAfterFinished is the TTL to clean up jobs.
It may take extra ReconcilePeriod seconds for the cleanup, since
Expand Down
10 changes: 10 additions & 0 deletions manifests/base/crds/kubeflow.org_pytorchjobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8247,6 +8247,16 @@ spec:
format: int32
type: integer
type: object
suspend:
default: false
description: suspend specifies whether the Job controller should
create Pods or not. If a Job is created with suspend set to
true, no Pods are created by the Job controller. If a Job is
suspended after creation (i.e. the flag goes from false to true),
the Job controller will delete all active Pods and PodGroups
associated with this Job. Users must design their workload to
gracefully handle this.
type: boolean
ttlSecondsAfterFinished:
description: TTLSecondsAfterFinished is the TTL to clean up jobs.
It may take extra ReconcilePeriod seconds for the cleanup, since
Expand Down
10 changes: 10 additions & 0 deletions manifests/base/crds/kubeflow.org_tfjobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,16 @@ spec:
format: int32
type: integer
type: object
suspend:
default: false
description: suspend specifies whether the Job controller should
create Pods or not. If a Job is created with suspend set to
true, no Pods are created by the Job controller. If a Job is
suspended after creation (i.e. the flag goes from false to true),
the Job controller will delete all active Pods and PodGroups
associated with this Job. Users must design their workload to
gracefully handle this.
type: boolean
ttlSecondsAfterFinished:
description: TTLSecondsAfterFinished is the TTL to clean up jobs.
It may take extra ReconcilePeriod seconds for the cleanup, since
Expand Down
10 changes: 10 additions & 0 deletions manifests/base/crds/kubeflow.org_xgboostjobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,16 @@ spec:
format: int32
type: integer
type: object
suspend:
default: false
description: suspend specifies whether the Job controller should
create Pods or not. If a Job is created with suspend set to
true, no Pods are created by the Job controller. If a Job is
suspended after creation (i.e. the flag goes from false to true),
the Job controller will delete all active Pods and PodGroups
associated with this Job. Users must design their workload to
gracefully handle this.
type: boolean
ttlSecondsAfterFinished:
description: TTLSecondsAfterFinished is the TTL to clean up jobs.
It may take extra ReconcilePeriod seconds for the cleanup, since
Expand Down
16 changes: 16 additions & 0 deletions pkg/apis/kubeflow.org/v1/common_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,9 @@ const (
// The training is complete without error.
JobSucceeded JobConditionType = "Succeeded"

// JobSuspended means the job has been suspended.
JobSuspended JobConditionType = "Suspended"

// JobFailed means one or more sub-resources (e.g. services/pods) of this job
// reached phase failed with no restarting.
// The training has failed its execution.
Expand Down Expand Up @@ -205,6 +208,19 @@ type RunPolicy struct {
// SchedulingPolicy defines the policy related to scheduling, e.g. gang-scheduling
// +optional
SchedulingPolicy *SchedulingPolicy `json:"schedulingPolicy,omitempty"`

// suspend specifies whether the Job controller should create Pods or not.
// If a Job is created with suspend set to true, no Pods are created by
// the Job controller. If a Job is suspended after creation (i.e. the
// flag goes from false to true), the Job controller will delete all
// active Pods and PodGroups associated with this Job.
// Users must design their workload to gracefully handle this.
// Suspending a Job will reset the StartTime field of the Job.
//
// Defaults to false.
// +kubebuilder:default:=false
// +optional
Suspend *bool `json:"suspend,omitempty"`
}

// SchedulingPolicy encapsulates various scheduling policies of the distributed training
Expand Down
7 changes: 7 additions & 0 deletions pkg/apis/kubeflow.org/v1/openapi_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions pkg/apis/kubeflow.org/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

103 changes: 78 additions & 25 deletions pkg/controller.v1/common/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/kubeflow/training-operator/pkg/core"
commonutil "github.com/kubeflow/training-operator/pkg/util"
"github.com/kubeflow/training-operator/pkg/util/k8sutil"
trainutil "github.com/kubeflow/training-operator/pkg/util/train"

log "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
Expand All @@ -38,28 +39,30 @@ import (
volcanov1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1"
)

func (jc *JobController) DeletePodsAndServices(runPolicy *apiv1.RunPolicy, job interface{}, pods []*corev1.Pod) error {
// DeletePodsAndServices deletes pods and services considering cleanPodPolicy.
// However, if the job doesn't have Succeeded or Failed condition, it ignores cleanPodPolicy.
func (jc *JobController) DeletePodsAndServices(runtimeObject runtime.Object, runPolicy *apiv1.RunPolicy, jobStatus apiv1.JobStatus, pods []*corev1.Pod) error {
if len(pods) == 0 {
return nil
}

// Delete nothing when the cleanPodPolicy is None.
if *runPolicy.CleanPodPolicy == apiv1.CleanPodPolicyNone {
// Delete nothing when the cleanPodPolicy is None and the job has Succeeded or Failed condition.
if commonutil.IsFinished(jobStatus) && *runPolicy.CleanPodPolicy == apiv1.CleanPodPolicyNone {
return nil
}

for _, pod := range pods {
// Note that pending pod will turn into running once schedulable,
// not cleaning it may leave orphan running pod in the future,
// we should treat it equivalent to running phase here.
if *runPolicy.CleanPodPolicy == apiv1.CleanPodPolicyRunning && pod.Status.Phase != corev1.PodRunning && pod.Status.Phase != corev1.PodPending {
if commonutil.IsFinished(jobStatus) && *runPolicy.CleanPodPolicy == apiv1.CleanPodPolicyRunning && pod.Status.Phase != corev1.PodRunning && pod.Status.Phase != corev1.PodPending {
continue
}
if err := jc.PodControl.DeletePod(pod.Namespace, pod.Name, job.(runtime.Object)); err != nil {
if err := jc.PodControl.DeletePod(pod.Namespace, pod.Name, runtimeObject); err != nil {
return err
}
// Pod and service have the same name, thus the service could be deleted using pod's name.
if err := jc.ServiceControl.DeleteService(pod.Namespace, pod.Name, job.(runtime.Object)); err != nil {
if err := jc.ServiceControl.DeleteService(pod.Namespace, pod.Name, runtimeObject); err != nil {
return err
}
}
Expand Down Expand Up @@ -117,23 +120,9 @@ func (jc *JobController) ReconcileJobs(
}

oldStatus := jobStatus.DeepCopy()
if commonutil.IsSucceeded(jobStatus) || commonutil.IsFailed(jobStatus) {
// If the Job is succeed or failed, delete all pods and services.
if err := jc.DeletePodsAndServices(runPolicy, job, pods); err != nil {
return err
}

if jc.Config.EnableGangScheduling() {
jc.Recorder.Event(runtimeObject, corev1.EventTypeNormal, "JobTerminated", "Job has been terminated. Deleting PodGroup")
if err := jc.DeletePodGroup(metaObject); err != nil {
jc.Recorder.Eventf(runtimeObject, corev1.EventTypeWarning, "FailedDeletePodGroup", "Error deleting: %v", err)
return err
} else {
jc.Recorder.Eventf(runtimeObject, corev1.EventTypeNormal, "SuccessfulDeletePodGroup", "Deleted PodGroup: %v", jobName)
}
}

if err := jc.CleanupJob(runPolicy, jobStatus, job); err != nil {
if commonutil.IsFinished(jobStatus) {
// If the Job is succeed or failed, delete all pods, services, and podGroup.
if err = jc.CleanUpResources(runPolicy, runtimeObject, metaObject, jobStatus, pods); err != nil {
return err
}

Expand All @@ -155,6 +144,44 @@ func (jc *JobController) ReconcileJobs(
return nil
}

if trainutil.IsJobSuspended(runPolicy) {
if err = jc.CleanUpResources(runPolicy, runtimeObject, metaObject, jobStatus, pods); err != nil {
return err
}
for rType := range jobStatus.ReplicaStatuses {
jobStatus.ReplicaStatuses[rType].Active = 0
}
msg := fmt.Sprintf("%s %s is suspended.", jobKind, jobName)
if commonutil.IsRunning(jobStatus) {
if err = commonutil.UpdateJobConditions(&jobStatus, apiv1.JobRunning, corev1.ConditionFalse,
commonutil.NewReason(jobKind, commonutil.JobSuspendedReason), msg); err != nil {
return err
}
}
// We add the suspended condition to the job only when the job doesn't have a suspended condition.
if !commonutil.IsSuspended(jobStatus) {
if err = commonutil.UpdateJobConditions(&jobStatus, apiv1.JobSuspended, corev1.ConditionTrue,
commonutil.NewReason(jobKind, commonutil.JobSuspendedReason), msg); err != nil {
return err
}
}
jc.Recorder.Event(runtimeObject, corev1.EventTypeNormal, commonutil.NewReason(jobKind, commonutil.JobSuspendedReason), msg)
if !reflect.DeepEqual(*oldStatus, jobStatus) {
return jc.Controller.UpdateJobStatusInApiServer(job, &jobStatus)
}
return nil
}
if commonutil.IsSuspended(jobStatus) {
msg := fmt.Sprintf("%s %s is resumed.", jobKind, jobName)
if err = commonutil.UpdateJobConditions(&jobStatus, apiv1.JobSuspended, corev1.ConditionFalse,
commonutil.NewReason(jobKind, commonutil.JobResumedReason), msg); err != nil {
return err
}
now := metav1.Now()
jobStatus.StartTime = &now
jc.Recorder.Eventf(runtimeObject, corev1.EventTypeNormal, commonutil.NewReason(jobKind, commonutil.JobResumedReason), msg)
}

// retrieve the previous number of retry
previousRetry := jc.WorkQueue.NumRequeues(jobKey)

Expand Down Expand Up @@ -205,7 +232,7 @@ func (jc *JobController) ReconcileJobs(

// If the Job exceeds backoff limit or is past active deadline
// delete all pods and services, then set the status to failed
if err := jc.DeletePodsAndServices(runPolicy, job, pods); err != nil {
if err := jc.DeletePodsAndServices(runtimeObject, runPolicy, jobStatus, pods); err != nil {
return err
}

Expand All @@ -225,7 +252,7 @@ func (jc *JobController) ReconcileJobs(

jc.Recorder.Event(runtimeObject, corev1.EventTypeNormal, commonutil.NewReason(jobKind, commonutil.JobFailedReason), failureMessage)

if err = commonutil.UpdateJobConditions(&jobStatus, apiv1.JobFailed, commonutil.NewReason(jobKind, commonutil.JobFailedReason), failureMessage); err != nil {
if err = commonutil.UpdateJobConditions(&jobStatus, apiv1.JobFailed, corev1.ConditionTrue, commonutil.NewReason(jobKind, commonutil.JobFailedReason), failureMessage); err != nil {
log.Infof("Append job condition error: %v", err)
return err
}
Expand Down Expand Up @@ -344,6 +371,32 @@ func (jc *JobController) ReconcileJobs(
return nil
}

func (jc *JobController) CleanUpResources(
runPolicy *apiv1.RunPolicy,
runtimeObject runtime.Object,
metaObject metav1.Object,
jobStatus apiv1.JobStatus,
pods []*v1.Pod,
) error {
if err := jc.DeletePodsAndServices(runtimeObject, runPolicy, jobStatus, pods); err != nil {
return err
}
if jc.Config.EnableGangScheduling() {

jc.Recorder.Event(runtimeObject, corev1.EventTypeNormal, "JobTerminated", "Job has been terminated. Deleting PodGroup")
if err := jc.DeletePodGroup(metaObject); err != nil {
jc.Recorder.Eventf(runtimeObject, corev1.EventTypeWarning, "FailedDeletePodGroup", "Error deleting: %v", err)
return err
} else {
jc.Recorder.Eventf(runtimeObject, corev1.EventTypeNormal, "SuccessfulDeletePodGroup", "Deleted PodGroup: %v", metaObject.GetName())
}
}
if err := jc.CleanupJob(runPolicy, jobStatus, runtimeObject); err != nil {

This comment has been minimized.

Copy link
@alculquicondor

This comment has been minimized.

Copy link
@tenzen-y

tenzen-y Aug 28, 2024

Author Member

@alculquicondor I guess that the only way to update the completion time is here:

err = jc.Controller.UpdateJobStatus(job, replicas, &jobStatus)

->

if ContainsMasterSpec(replicas) {
if rtype == kubeflowv1.PyTorchJobReplicaTypeMaster {
if running > 0 {
msg := fmt.Sprintf("PyTorchJob %s is running.", pytorchjob.Name)
commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.PyTorchJobKind, commonutil.JobRunningReason), msg)
}
// when master is succeed, the job is finished.
if expected == 0 {
msg := fmt.Sprintf("PyTorchJob %s is successfully completed.", pytorchjob.Name)
logrus.Info(msg)
r.Recorder.Event(pytorchjob, corev1.EventTypeNormal, commonutil.NewReason(kubeflowv1.PyTorchJobKind, commonutil.JobSucceededReason), msg)
if jobStatus.CompletionTime == nil {
now := metav1.Now()
jobStatus.CompletionTime = &now
}
commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.PyTorchJobKind, commonutil.JobSucceededReason), msg)
trainingoperatorcommon.SuccessfulJobsCounterInc(pytorchjob.Namespace, r.GetFrameworkName())
return nil
}
}
} else {
if rtype == kubeflowv1.PyTorchJobReplicaTypeWorker {
// TODO(gaocegege): Support SuccessPolicy
// Leave a succeeded condition for the following two cases:
// 1. If all workers are succeeded.
// 2. If `ElasticPolicy` is not nil and any worker has completed.
if expected == 0 || (pytorchjob.Spec.ElasticPolicy != nil && succeeded > 0) {
msg := fmt.Sprintf("PyTorchJob %s/%s successfully completed.",
pytorchjob.Namespace, pytorchjob.Name)
r.recorder.Event(pytorchjob, corev1.EventTypeNormal, commonutil.NewReason(kubeflowv1.PyTorchJobKind, commonutil.JobSucceededReason), msg)
if jobStatus.CompletionTime == nil {
now := metav1.Now()
jobStatus.CompletionTime = &now
}
commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.PyTorchJobKind, commonutil.JobSucceededReason), msg)
trainingoperatorcommon.SuccessfulJobsCounterInc(pytorchjob.Namespace, r.GetFrameworkName())
} else if running > 0 {
// Some workers are still running, leave a running condition.
msg := fmt.Sprintf("PyTorchJob %s/%s is running.",
pytorchjob.Namespace, pytorchjob.Name)
commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.PyTorchJobKind, commonutil.JobRunningReason), msg)
}
}
}
if failed > 0 && (specReplicas > succeeded+running) {
if spec.RestartPolicy != kubeflowv1.RestartPolicyNever {
msg := fmt.Sprintf("PyTorchJob %s is restarting because %d %s replica(s) failed.", pytorchjob.Name, failed, rtype)
r.Recorder.Event(pytorchjob, corev1.EventTypeWarning, commonutil.NewReason(kubeflowv1.PyTorchJobKind, commonutil.JobRestartingReason), msg)
commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRestarting, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.PyTorchJobKind, commonutil.JobRestartingReason), msg)
trainingoperatorcommon.RestartedJobsCounterInc(pytorchjob.Namespace, r.GetFrameworkName())
} else {
msg := fmt.Sprintf("PyTorchJob %s is failed because %d %s replica(s) failed.", pytorchjob.Name, failed, rtype)
r.Recorder.Event(pytorchjob, corev1.EventTypeNormal, commonutil.NewReason(kubeflowv1.PyTorchJobKind, commonutil.JobFailedReason), msg)
if jobStatus.CompletionTime == nil {
now := metav1.Now()
jobStatus.CompletionTime = &now
}
commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobFailed, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.PyTorchJobKind, commonutil.JobFailedReason), msg)
trainingoperatorcommon.FailedJobsCounterInc(pytorchjob.Namespace, r.GetFrameworkName())
}
}
}

But, when the Job is suspended and completionTime is nil, the training-operator doesn't seem to be able to remove owned resources.

This comment has been minimized.

Copy link
@tenzen-y

tenzen-y Aug 28, 2024

Author Member

But, these checks are passed every time 🧐

By("Updating the PyTorchJob with suspend=true")
Eventually(func() error {
Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed())
created.Spec.RunPolicy.Suspend = ptr.To(true)
return testK8sClient.Update(ctx, created)
}, testutil.Timeout, testutil.Interval).Should(Succeed())
By("Checking if the pods and services are removed")
Eventually(func() bool {
errMaster := testK8sClient.Get(ctx, masterKey, masterPod)
errWorker := testK8sClient.Get(ctx, worker0Key, workerPod)
return errors.IsNotFound(errMaster) && errors.IsNotFound(errWorker)
}, testutil.Timeout, testutil.Interval).Should(BeTrue())
Eventually(func() bool {
errMaster := testK8sClient.Get(ctx, masterKey, masterSvc)
errWorker := testK8sClient.Get(ctx, worker0Key, workerSvc)
return errors.IsNotFound(errMaster) && errors.IsNotFound(errWorker)
}, testutil.Timeout, testutil.Interval).Should(BeTrue())
Consistently(func() bool {
errMasterPod := testK8sClient.Get(ctx, masterKey, masterPod)
errWorkerPod := testK8sClient.Get(ctx, worker0Key, workerPod)
errMasterSvc := testK8sClient.Get(ctx, masterKey, masterSvc)
errWorkerSvc := testK8sClient.Get(ctx, worker0Key, workerSvc)
return errors.IsNotFound(errMasterPod) && errors.IsNotFound(errWorkerPod) &&
errors.IsNotFound(errMasterSvc) && errors.IsNotFound(errWorkerSvc)
}, testutil.ConsistentDuration, testutil.Interval).Should(BeTrue())

return err
}
return nil
}

// ResetExpectations reset the expectation for creates and deletes of pod/service to zero.
func (jc *JobController) ResetExpectations(jobKey string, replicas map[apiv1.ReplicaType]*apiv1.ReplicaSpec) error {
var allErrs error
Expand Down
Loading

0 comments on commit 64e39f2

Please sign in to comment.