Skip to content

Commit

Permalink
Implement suspend semantics to PyTorchJob
Browse files Browse the repository at this point in the history
  • Loading branch information
tenzen-y committed Jul 9, 2023
1 parent ca85e2b commit 6cfeed8
Show file tree
Hide file tree
Showing 24 changed files with 660 additions and 135 deletions.
10 changes: 10 additions & 0 deletions manifests/base/crds/kubeflow.org_mpijobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7702,6 +7702,16 @@ spec:
format: int32
type: integer
type: object
suspend:
default: false
description: suspend specifies whether the Job controller should
create Pods or not. If a Job is created with suspend set to
true, no Pods are created by the Job controller. If a Job is
suspended after creation (i.e. the flag goes from false to true),
the Job controller will delete all active Pods and PodGroups
associated with this Job. Users must design their workload to
gracefully handle this.
type: boolean
ttlSecondsAfterFinished:
description: TTLSecondsAfterFinished is the TTL to clean up jobs.
It may take extra ReconcilePeriod seconds for the cleanup, since
Expand Down
10 changes: 10 additions & 0 deletions manifests/base/crds/kubeflow.org_mxjobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7701,6 +7701,16 @@ spec:
format: int32
type: integer
type: object
suspend:
default: false
description: suspend specifies whether the Job controller should
create Pods or not. If a Job is created with suspend set to
true, no Pods are created by the Job controller. If a Job is
suspended after creation (i.e. the flag goes from false to true),
the Job controller will delete all active Pods and PodGroups
associated with this Job. Users must design their workload to
gracefully handle this.
type: boolean
ttlSecondsAfterFinished:
description: TTLSecondsAfterFinished is the TTL to clean up jobs.
It may take extra ReconcilePeriod seconds for the cleanup, since
Expand Down
10 changes: 10 additions & 0 deletions manifests/base/crds/kubeflow.org_paddlejobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8212,6 +8212,16 @@ spec:
format: int32
type: integer
type: object
suspend:
default: false
description: suspend specifies whether the Job controller should
create Pods or not. If a Job is created with suspend set to
true, no Pods are created by the Job controller. If a Job is
suspended after creation (i.e. the flag goes from false to true),
the Job controller will delete all active Pods and PodGroups
associated with this Job. Users must design their workload to
gracefully handle this.
type: boolean
ttlSecondsAfterFinished:
description: TTLSecondsAfterFinished is the TTL to clean up jobs.
It may take extra ReconcilePeriod seconds for the cleanup, since
Expand Down
10 changes: 10 additions & 0 deletions manifests/base/crds/kubeflow.org_pytorchjobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8241,6 +8241,16 @@ spec:
format: int32
type: integer
type: object
suspend:
default: false
description: suspend specifies whether the Job controller should
create Pods or not. If a Job is created with suspend set to
true, no Pods are created by the Job controller. If a Job is
suspended after creation (i.e. the flag goes from false to true),
the Job controller will delete all active Pods and PodGroups
associated with this Job. Users must design their workload to
gracefully handle this.
type: boolean
ttlSecondsAfterFinished:
description: TTLSecondsAfterFinished is the TTL to clean up jobs.
It may take extra ReconcilePeriod seconds for the cleanup, since
Expand Down
10 changes: 10 additions & 0 deletions manifests/base/crds/kubeflow.org_tfjobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,16 @@ spec:
format: int32
type: integer
type: object
suspend:
default: false
description: suspend specifies whether the Job controller should
create Pods or not. If a Job is created with suspend set to
true, no Pods are created by the Job controller. If a Job is
suspended after creation (i.e. the flag goes from false to true),
the Job controller will delete all active Pods and PodGroups
associated with this Job. Users must design their workload to
gracefully handle this.
type: boolean
ttlSecondsAfterFinished:
description: TTLSecondsAfterFinished is the TTL to clean up jobs.
It may take extra ReconcilePeriod seconds for the cleanup, since
Expand Down
10 changes: 10 additions & 0 deletions manifests/base/crds/kubeflow.org_xgboostjobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,16 @@ spec:
format: int32
type: integer
type: object
suspend:
default: false
description: suspend specifies whether the Job controller should
create Pods or not. If a Job is created with suspend set to
true, no Pods are created by the Job controller. If a Job is
suspended after creation (i.e. the flag goes from false to true),
the Job controller will delete all active Pods and PodGroups
associated with this Job. Users must design their workload to
gracefully handle this.
type: boolean
ttlSecondsAfterFinished:
description: TTLSecondsAfterFinished is the TTL to clean up jobs.
It may take extra ReconcilePeriod seconds for the cleanup, since
Expand Down
86 changes: 65 additions & 21 deletions pkg/controller.v1/common/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/kubeflow/training-operator/pkg/core"
commonutil "github.com/kubeflow/training-operator/pkg/util"
"github.com/kubeflow/training-operator/pkg/util/k8sutil"
trainutil "github.com/kubeflow/training-operator/pkg/util/train"

log "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
Expand All @@ -38,7 +39,7 @@ import (
volcanov1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1"
)

func (jc *JobController) DeletePodsAndServices(runPolicy *apiv1.RunPolicy, job interface{}, pods []*corev1.Pod) error {
func (jc *JobController) DeletePodsAndServices(runPolicy *apiv1.RunPolicy, runtimeObject runtime.Object, pods []*corev1.Pod) error {
if len(pods) == 0 {
return nil
}
Expand All @@ -55,11 +56,11 @@ func (jc *JobController) DeletePodsAndServices(runPolicy *apiv1.RunPolicy, job i
if *runPolicy.CleanPodPolicy == apiv1.CleanPodPolicyRunning && pod.Status.Phase != corev1.PodRunning && pod.Status.Phase != corev1.PodPending {
continue
}
if err := jc.PodControl.DeletePod(pod.Namespace, pod.Name, job.(runtime.Object)); err != nil {
if err := jc.PodControl.DeletePod(pod.Namespace, pod.Name, runtimeObject); err != nil {
return err
}
// Pod and service have the same name, thus the service could be deleted using pod's name.
if err := jc.ServiceControl.DeleteService(pod.Namespace, pod.Name, job.(runtime.Object)); err != nil {
if err := jc.ServiceControl.DeleteService(pod.Namespace, pod.Name, runtimeObject); err != nil {
return err
}
}
Expand Down Expand Up @@ -117,22 +118,8 @@ func (jc *JobController) ReconcileJobs(

oldStatus := jobStatus.DeepCopy()
if commonutil.IsSucceeded(jobStatus) || commonutil.IsFailed(jobStatus) {
// If the Job is succeed or failed, delete all pods and services.
if err := jc.DeletePodsAndServices(runPolicy, job, pods); err != nil {
return err
}

if jc.Config.EnableGangScheduling() {
jc.Recorder.Event(runtimeObject, corev1.EventTypeNormal, "JobTerminated", "Job has been terminated. Deleting PodGroup")
if err := jc.DeletePodGroup(metaObject); err != nil {
jc.Recorder.Eventf(runtimeObject, corev1.EventTypeWarning, "FailedDeletePodGroup", "Error deleting: %v", err)
return err
} else {
jc.Recorder.Eventf(runtimeObject, corev1.EventTypeNormal, "SuccessfulDeletePodGroup", "Deleted PodGroup: %v", jobName)
}
}

if err := jc.CleanupJob(runPolicy, jobStatus, job); err != nil {
// If the Job is succeed or failed, delete all pods, services, and podGroup.
if err = jc.CleanUpResources(runPolicy, runtimeObject, metaObject, jobStatus, pods); err != nil {
return err
}

Expand All @@ -154,6 +141,37 @@ func (jc *JobController) ReconcileJobs(
return nil
}

if trainutil.IsJobSuspended(runPolicy) {
if err = jc.CleanUpResources(runPolicy, runtimeObject, metaObject, jobStatus, pods); err != nil {
return err
}
for rType := range jobStatus.ReplicaStatuses {
jobStatus.ReplicaStatuses[rType].Active = 0
}
jobStatus.StartTime = nil
msg := fmt.Sprintf("%s %s is suspended.", jc.Controller.GetAPIGroupVersionKind().Kind, jobName)
if commonutil.IsRunning(jobStatus) {
if err = commonutil.UpdateJobConditions(&jobStatus, apiv1.JobRunning, corev1.ConditionFalse, commonutil.JobSuspendedReason, msg); err != nil {
return err
}
}
if err = commonutil.UpdateJobConditions(&jobStatus, apiv1.JobSuspended, corev1.ConditionTrue, commonutil.JobSuspendedReason, msg); err != nil {
return err
}
jc.Recorder.Event(runtimeObject, corev1.EventTypeNormal, commonutil.JobSuspendedReason, msg)
if !reflect.DeepEqual(*oldStatus, jobStatus) {
return jc.Controller.UpdateJobStatusInApiServer(job, &jobStatus)
}
return nil
}
if !trainutil.IsJobSuspended(runPolicy) && commonutil.IsSuspend(jobStatus) {
msg := fmt.Sprintf("%s %s is resumed.", jc.Controller.GetAPIGroupVersionKind().Kind, jobName)
if err = commonutil.UpdateJobConditions(&jobStatus, apiv1.JobSuspended, corev1.ConditionFalse, commonutil.JobResumedReason, msg); err != nil {
return err
}
jc.Recorder.Eventf(runtimeObject, corev1.EventTypeNormal, commonutil.JobResumedReason, msg)
}

// retrieve the previous number of retry
previousRetry := jc.WorkQueue.NumRequeues(jobKey)

Expand Down Expand Up @@ -204,7 +222,7 @@ func (jc *JobController) ReconcileJobs(

// If the Job exceeds backoff limit or is past active deadline
// delete all pods and services, then set the status to failed
if err := jc.DeletePodsAndServices(runPolicy, job, pods); err != nil {
if err := jc.DeletePodsAndServices(runPolicy, runtimeObject, pods); err != nil {
return err
}

Expand All @@ -224,7 +242,7 @@ func (jc *JobController) ReconcileJobs(

jc.Recorder.Event(runtimeObject, corev1.EventTypeNormal, commonutil.JobFailedReason, failureMessage)

if err := commonutil.UpdateJobConditions(&jobStatus, apiv1.JobFailed, commonutil.JobFailedReason, failureMessage); err != nil {
if err := commonutil.UpdateJobConditions(&jobStatus, apiv1.JobFailed, corev1.ConditionTrue, commonutil.JobFailedReason, failureMessage); err != nil {
log.Infof("Append job condition error: %v", err)
return err
}
Expand Down Expand Up @@ -343,6 +361,32 @@ func (jc *JobController) ReconcileJobs(
return nil
}

func (jc *JobController) CleanUpResources(
runPolicy *apiv1.RunPolicy,
runtimeObject runtime.Object,
metaObject metav1.Object,
jobStatus apiv1.JobStatus,
pods []*v1.Pod,
) error {
if err := jc.DeletePodsAndServices(runPolicy, runtimeObject, pods); err != nil {
return err
}
if jc.Config.EnableGangScheduling() {

jc.Recorder.Event(runtimeObject, corev1.EventTypeNormal, "JobTerminated", "Job has been terminated. Deleting PodGroup")
if err := jc.DeletePodGroup(metaObject); err != nil {
jc.Recorder.Eventf(runtimeObject, corev1.EventTypeWarning, "FailedDeletePodGroup", "Error deleting: %v", err)
return err
} else {
jc.Recorder.Eventf(runtimeObject, corev1.EventTypeNormal, "SuccessfulDeletePodGroup", "Deleted PodGroup: %v", metaObject.GetName())
}
}
if err := jc.CleanupJob(runPolicy, jobStatus, runtimeObject); err != nil {
return err
}
return nil
}

// ResetExpectations reset the expectation for creates and deletes of pod/service to zero.
func (jc *JobController) ResetExpectations(jobKey string, replicas map[apiv1.ReplicaType]*apiv1.ReplicaSpec) error {
var allErrs error
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller.v1/common/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ func (jc *JobController) ReconcilePods(
msg := fmt.Sprintf("job %s is restarting because %s replica(s) failed.",
metaObject.GetName(), rType)
jc.Recorder.Event(runtimeObject, v1.EventTypeWarning, "JobRestarting", msg)
if err := commonutil.UpdateJobConditions(jobStatus, apiv1.JobRestarting, "JobRestarting", msg); err != nil {
if err := commonutil.UpdateJobConditions(jobStatus, apiv1.JobRestarting, v1.ConditionTrue, "JobRestarting", msg); err != nil {
commonutil.LoggerForJob(metaObject).Infof("Append job condition error: %v", err)
return err
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/controller.v1/mpi/mpijob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ func (jc *MPIJobReconciler) onOwnerCreateFunc() func(event.CreateEvent) bool {
msg := fmt.Sprintf("MPIJob %s/%s is created.", mpiJob.Namespace, e.Object.GetName())
logrus.Info(msg)
trainingoperatorcommon.CreatedJobsCounterInc(mpiJob.Namespace, jc.GetFrameworkName())
if err := commonutil.UpdateJobConditions(&mpiJob.Status, kubeflowv1.JobCreated, mpiJobCreatedReason, msg); err != nil {
if err := commonutil.UpdateJobConditions(&mpiJob.Status, kubeflowv1.JobCreated, corev1.ConditionTrue, mpiJobCreatedReason, msg); err != nil {
log.Log.Error(err, "append job condition error")
return false
}
Expand Down Expand Up @@ -581,7 +581,7 @@ func (jc *MPIJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubefl
if rtype == kubeflowv1.MPIJobReplicaTypeLauncher {
if running > 0 {
msg := fmt.Sprintf("MPIJob %s is running.", mpiJob.Name)
err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, commonutil.JobRunningReason, msg)
err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, corev1.ConditionTrue, commonutil.JobRunningReason, msg)
if err != nil {
commonutil.LoggerForJob(mpiJob).Infof("Append job condition error: %v", err)
return err
Expand All @@ -596,7 +596,7 @@ func (jc *MPIJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubefl
now := metav1.Now()
jobStatus.CompletionTime = &now
}
err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, commonutil.JobSucceededReason, msg)
err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, corev1.ConditionTrue, commonutil.JobSucceededReason, msg)
if err != nil {
commonutil.LoggerForJob(mpiJob).Infof("Append job condition error: %v", err)
return err
Expand All @@ -609,7 +609,7 @@ func (jc *MPIJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubefl
if spec.RestartPolicy == kubeflowv1.RestartPolicyExitCode {
msg := fmt.Sprintf("MPIJob %s is restarting because %d %s replica(s) failed.", mpiJob.Name, failed, rtype)
jc.Recorder.Event(mpiJob, corev1.EventTypeWarning, commonutil.JobRestartingReason, msg)
err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRestarting, commonutil.JobRestartingReason, msg)
err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRestarting, corev1.ConditionTrue, commonutil.JobRestartingReason, msg)
if err != nil {
commonutil.LoggerForJob(mpiJob).Infof("Append job condition error: %v", err)
return err
Expand All @@ -622,7 +622,7 @@ func (jc *MPIJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubefl
now := metav1.Now()
jobStatus.CompletionTime = &now
}
err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobFailed, commonutil.JobFailedReason, msg)
err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobFailed, corev1.ConditionTrue, commonutil.JobFailedReason, msg)
if err != nil {
commonutil.LoggerForJob(mpiJob).Infof("Append job condition error: %v", err)
return err
Expand Down
10 changes: 5 additions & 5 deletions pkg/controller.v1/mxnet/mxjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ func (r *MXJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubeflow
if rtype == kubeflowv1.MXJobReplicaTypeScheduler || singleTraining {
if running > 0 {
msg := fmt.Sprintf("MXJob %s is running.", mxjob.Name)
err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, mxJobRunningReason, msg)
err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, corev1.ConditionTrue, mxJobRunningReason, msg)
if err != nil {
logrus.Infof("Append mxjob condition error: %v", err)
return err
Expand All @@ -393,7 +393,7 @@ func (r *MXJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubeflow
now := metav1.Now()
jobStatus.CompletionTime = &now
}
err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, mxJobSucceededReason, msg)
err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, corev1.ConditionTrue, mxJobSucceededReason, msg)
if err != nil {
logrus.Infof("Append mxjob condition error: %v", err)
return err
Expand All @@ -406,7 +406,7 @@ func (r *MXJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubeflow
if spec.RestartPolicy == kubeflowv1.RestartPolicyExitCode {
msg := fmt.Sprintf("mxjob %s is restarting because %d %s replica(s) failed.", mxjob.Name, failed, rtype)
r.Recorder.Event(mxjob, corev1.EventTypeWarning, mxJobRestartingReason, msg)
err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRestarting, mxJobRestartingReason, msg)
err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRestarting, corev1.ConditionTrue, mxJobRestartingReason, msg)
if err != nil {
logrus.Infof("Append job condition error: %v", err)
return err
Expand All @@ -419,7 +419,7 @@ func (r *MXJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubeflow
now := metav1.Now()
jobStatus.CompletionTime = &now
}
err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobFailed, mxJobFailedReason, msg)
err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobFailed, corev1.ConditionTrue, mxJobFailedReason, msg)
if err != nil {
logrus.Infof("Append job condition error: %v", err)
return err
Expand Down Expand Up @@ -486,7 +486,7 @@ func (r *MXJobReconciler) onOwnerCreateFunc() func(event.CreateEvent) bool {
msg := fmt.Sprintf("MXJob %s is created.", e.Object.GetName())
logrus.Info(msg)
trainingoperatorcommon.CreatedJobsCounterInc(mxJob.Namespace, r.GetFrameworkName())
if err := commonutil.UpdateJobConditions(&mxJob.Status, kubeflowv1.JobCreated, "MXJobCreated", msg); err != nil {
if err := commonutil.UpdateJobConditions(&mxJob.Status, kubeflowv1.JobCreated, corev1.ConditionTrue, "MXJobCreated", msg); err != nil {
logrus.Error(err, "append job condition error")
return false
}
Expand Down
Loading

0 comments on commit 6cfeed8

Please sign in to comment.