Skip to content

Commit

Permalink
No cleaning up a job if the job is suspended.
Browse files Browse the repository at this point in the history
  • Loading branch information
mszadkow committed Aug 29, 2024
1 parent e9766d1 commit 9b3b976
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 9 deletions.
4 changes: 2 additions & 2 deletions pkg/controller.v1/common/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,8 +421,8 @@ func (jc *JobController) CleanupJob(runPolicy *apiv1.RunPolicy, jobStatus apiv1.
currentTime := time.Now()
metaObject, _ := job.(metav1.Object)
ttl := runPolicy.TTLSecondsAfterFinished
if ttl == nil {
return nil
if ttl == nil || commonutil.IsSuspended(jobStatus) {
return nil
}
duration := time.Second * time.Duration(*ttl)
if jobStatus.CompletionTime == nil {
Expand Down
23 changes: 17 additions & 6 deletions pkg/controller.v1/tensorflow/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -531,12 +531,12 @@ var _ = Describe("TFJob controller", func() {
testCases := []testCase{
{
description: "succeeded job with TTL 3s",
tfJob: tftestutil.NewTFJobWithCleanupJobDelay(0, 1, 0, ptr.To[int32](3)),
tfJob: tftestutil.NewTFJobWithCleanupJobDelay(0, 1, 0, ptr.To[int32](3), ptr.To(false)),
phase: corev1.PodSucceeded,
},
{
description: "failed job with TTL 3s",
tfJob: tftestutil.NewTFJobWithCleanupJobDelay(0, 1, 0, ptr.To[int32](3)),
tfJob: tftestutil.NewTFJobWithCleanupJobDelay(0, 1, 0, ptr.To[int32](3), ptr.To(false)),
phase: corev1.PodFailed,
},
}
Expand Down Expand Up @@ -655,16 +655,27 @@ var _ = Describe("Test for controller.v1/common", func() {
}
},
Entry("TFJob shouldn't be removed since TTL is nil", &cleanUpCases{
tfJob: tftestutil.NewTFJobWithCleanupJobDelay(1, 2, 0, nil),
tfJob: tftestutil.NewTFJobWithCleanupJobDelay(1, 2, 0, nil, ptr.To(false)),
runPolicy: &kubeflowv1.RunPolicy{
TTLSecondsAfterFinished: nil,
},
jobStatus: kubeflowv1.JobStatus{},
wantTFJobIsRemoved: false,
wantErr: false,
}),
Entry("No error with completionTime is nil if suspended", &cleanUpCases{
tfJob: tftestutil.NewTFJobWithCleanupJobDelay(1, 2, 0, nil, ptr.To(true)),
runPolicy: &kubeflowv1.RunPolicy{
TTLSecondsAfterFinished: nil,
},
jobStatus: kubeflowv1.JobStatus{
CompletionTime: nil,
},
wantTFJobIsRemoved: false,
wantErr: false,
}),
Entry("Error is occurred since completionTime is nil", &cleanUpCases{
tfJob: tftestutil.NewTFJobWithCleanupJobDelay(1, 2, 0, ptr.To[int32](10)),
tfJob: tftestutil.NewTFJobWithCleanupJobDelay(1, 2, 0, ptr.To[int32](10), ptr.To(false)),
runPolicy: &kubeflowv1.RunPolicy{
TTLSecondsAfterFinished: ptr.To[int32](10),
},
Expand All @@ -675,7 +686,7 @@ var _ = Describe("Test for controller.v1/common", func() {
wantErr: true,
}),
Entry("TFJob is removed since exceeded TTL (TTL is 180s)", &cleanUpCases{
tfJob: tftestutil.NewTFJobWithCleanupJobDelay(1, 2, 0, ptr.To[int32](180)),
tfJob: tftestutil.NewTFJobWithCleanupJobDelay(1, 2, 0, ptr.To[int32](180), ptr.To(false)),
runPolicy: &kubeflowv1.RunPolicy{
TTLSecondsAfterFinished: ptr.To[int32](180),
},
Expand All @@ -688,7 +699,7 @@ var _ = Describe("Test for controller.v1/common", func() {
wantErr: false,
}),
Entry("TFJob is removed since (TTL is 0s)", &cleanUpCases{
tfJob: tftestutil.NewTFJobWithCleanupJobDelay(1, 2, 0, ptr.To[int32](0)),
tfJob: tftestutil.NewTFJobWithCleanupJobDelay(1, 2, 0, ptr.To[int32](0), ptr.To(false)),
runPolicy: &kubeflowv1.RunPolicy{
TTLSecondsAfterFinished: ptr.To[int32](0),
},
Expand Down
4 changes: 3 additions & 1 deletion pkg/controller.v1/tensorflow/testutil/tfjob.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,16 @@ func NewTFJobWithCleanPolicy(chief, worker, ps int, policy kubeflowv1.CleanPodPo
return tfJob
}

func NewTFJobWithCleanupJobDelay(chief, worker, ps int, ttl *int32) *kubeflowv1.TFJob {
func NewTFJobWithCleanupJobDelay(chief, worker, ps int, ttl *int32, suspended *bool) *kubeflowv1.TFJob {
if chief == 1 {
tfJob := NewTFJobWithChief(worker, ps)
tfJob.Spec.RunPolicy.Suspend = suspended
tfJob.Spec.RunPolicy.TTLSecondsAfterFinished = ttl
tfJob.Spec.RunPolicy.CleanPodPolicy = kubeflowv1.CleanPodPolicyPointer(kubeflowv1.CleanPodPolicyNone)
return tfJob
}
tfJob := NewTFJob(worker, ps)
tfJob.Spec.RunPolicy.Suspend = suspended
tfJob.Spec.RunPolicy.TTLSecondsAfterFinished = ttl
tfJob.Spec.RunPolicy.CleanPodPolicy = kubeflowv1.CleanPodPolicyPointer(kubeflowv1.CleanPodPolicyNone)
return tfJob
Expand Down

0 comments on commit 9b3b976

Please sign in to comment.