Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: suspend job status can not turn into Suspended and report error… #2159

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions pkg/controller.v1/common/job.go
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This attempts to clean up the resource without making any judgment, but since the completion time of the suspended state task is nil, it will always block, resulting in the task state always Running, which will cause logic exceptions for other controllers that rely on kubeflowJob, such as in kueue, where tasks are scheduled according to a priority policy. When a low-priority task is evicated due to the enqueue of a high-priority task, the task's runPolicy.Suspend is true, but its state is still Running. As a result, kueue cannot reclaim the resources of the evicated task

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for creating this @xuxianzhang!

Isn't the CleanupJob flow will be trigger only when TTLSecondsAfterFinished is set ?

Why state is Running after Job has been suspended ? It should convert Job to the Suspended condition according to this:

commonutil.UpdateJobConditions(&jobStatus, apiv1.JobSuspended, corev1.ConditionTrue, commonutil.NewReason(jobKind, commonutil.JobSuspendedReason), msg)

@alculquicondor @tenzen-y Do we know how the cleanup looks like for MPIJob ? It looks that for MPI, we only set the TTL seconds for launcher Job:
https://github.com/kubeflow/mpi-operator/blob/52cda2c7e85ac22284ea23e1d905c4e2eaefdc11/pkg/controller/mpi_job_controller.go#L1478C51-L1478C74

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when job's runPolicy.Suspend turn into true, IsJobSuspended return true and tigger CleanUpResources,but it will return an error because job isn't finished. And never reachL159.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if trainutil.IsJobSuspended(runPolicy) {
if err = jc.CleanUpResources(runPolicy, runtimeObject, metaObject, jobStatus, pods); err != nil {
return err
}
for rType := range jobStatus.ReplicaStatuses {
jobStatus.ReplicaStatuses[rType].Active = 0
}
msg := fmt.Sprintf("%s %s is suspended.", jobKind, jobName)
if commonutil.IsRunning(jobStatus) {
commonutil.UpdateJobConditions(&jobStatus, apiv1.JobRunning, corev1.ConditionFalse, commonutil.NewReason(jobKind, commonutil.JobSuspendedReason), msg)
}
// We add the suspended condition to the job only when the job doesn't have a suspended condition.
if !commonutil.IsSuspended(jobStatus) {
commonutil.UpdateJobConditions(&jobStatus, apiv1.JobSuspended, corev1.ConditionTrue, commonutil.NewReason(jobKind, commonutil.JobSuspendedReason), msg)
}
jc.Recorder.Event(runtimeObject, corev1.EventTypeNormal, commonutil.NewReason(jobKind, commonutil.JobSuspendedReason), msg)
if !reflect.DeepEqual(*oldStatus, jobStatus) {
return jc.Controller.UpdateJobStatusInApiServer(job, &jobStatus)
}
return nil
}
if commonutil.IsSuspended(jobStatus) {
msg := fmt.Sprintf("%s %s is resumed.", jobKind, jobName)
commonutil.UpdateJobConditions(&jobStatus, apiv1.JobSuspended, corev1.ConditionFalse, commonutil.NewReason(jobKind, commonutil.JobResumedReason), msg)
now := metav1.Now()
jobStatus.StartTime = &now
jc.Recorder.Eventf(runtimeObject, corev1.EventTypeNormal, commonutil.NewReason(jobKind, commonutil.JobResumedReason), msg)
}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, we need CleanUpResources return nil when job is suspend,and tigger UpdateJobConditions

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the preconditions are TTLSecondsAfterFinished is set

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but it will return an error because job isn't finished

Does it return this error ?

if jobStatus.CompletionTime == nil {
return fmt.Errorf("job completion time is nil, cannot cleanup")
}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

Original file line number Diff line number Diff line change
Expand Up @@ -383,8 +383,11 @@ func (jc *JobController) CleanUpResources(
jc.Recorder.Eventf(runtimeObject, corev1.EventTypeNormal, "SuccessfulDeletePodGroup", "Deleted PodGroup: %v", metaObject.GetName())
}
}
if err := jc.CleanupJob(runPolicy, jobStatus, runtimeObject); err != nil {
return err

if !trainutil.IsJobSuspended(runPolicy) {
if err := jc.CleanupJob(runPolicy, jobStatus, runtimeObject); err != nil {
return err
}
}
return nil
}
Expand Down
Loading