diff --git a/pkg/controllers/jobset_controller.go b/pkg/controllers/jobset_controller.go index e15b9ccc3..2bebb7634 100644 --- a/pkg/controllers/jobset_controller.go +++ b/pkg/controllers/jobset_controller.go @@ -43,6 +43,18 @@ import ( const ( RestartsKey string = "jobset.sigs.k8s.io/restart-attempt" maxParallelism int = 50 + + // Event reasons and messages + ReachedMaxRestartsReason = "ReachedMaxRestarts" + ReachedMaxRestartsMessage = "jobset failed due to reaching max number of restarts" + + FailedJobsReason = "FailedJobs" + FailedJobsMessage = "jobset failed due to one or more job failures" + + JobCreationFailedReason = "JobCreationFailed" + + AllJobsCompletedReason = "AllJobsCompleted" + AllJobsCompletedMessage = "jobset completed successfully" ) var ( @@ -477,7 +489,7 @@ func (r *JobSetReconciler) createJobs(ctx context.Context, js *jobset.JobSet, ow if allErrs != nil { // Emit event to propagate the Job creation failures up to be more visible to the user. // TODO(#422): Investigate ways to validate Job templates at JobSet validation time. - r.Record.Eventf(js, corev1.EventTypeWarning, "JobCreationFailed", allErrs.Error()) + r.Record.Eventf(js, corev1.EventTypeWarning, JobCreationFailedReason, allErrs.Error()) return allErrs } // Skip emitting a condition for StartupPolicy if JobSet is suspended @@ -540,8 +552,8 @@ func (r *JobSetReconciler) executeSuccessPolicy(ctx context.Context, js *jobset. condition: metav1.Condition{ Type: string(jobset.JobSetCompleted), Status: metav1.ConditionStatus(corev1.ConditionTrue), - Reason: "AllJobsCompleted", - Message: "jobset completed successfully", + Reason: AllJobsCompletedReason, + Message: AllJobsCompletedMessage, }, }); err != nil { return false, err @@ -552,39 +564,25 @@ func (r *JobSetReconciler) executeSuccessPolicy(ctx context.Context, js *jobset. } func (r *JobSetReconciler) executeFailurePolicy(ctx context.Context, js *jobset.JobSet, ownedJobs *childJobs) error { - // If no failure policy is defined, the default failure policy is to mark the JobSet - // as failed if any of its jobs have failed. + // If no failure policy is defined, mark the JobSet as failed. if js.Spec.FailurePolicy == nil { - return r.failJobSet(ctx, js) + firstFailedJob := findFirstFailedJob(ownedJobs.failed) + return r.failJobSet(ctx, js, FailedJobsReason, messageWithFirstFailedJob(FailedJobsMessage, firstFailedJob.Name)) } - // To reach this point a job must have failed. - return r.executeRestartPolicy(ctx, js, ownedJobs) -} -func (r *JobSetReconciler) executeRestartPolicy(ctx context.Context, js *jobset.JobSet, ownedJobs *childJobs) error { - if js.Spec.FailurePolicy.MaxRestarts == 0 { - return r.failJobSet(ctx, js) + // If JobSet has reached max restarts, fail the JobSet. + if js.Status.Restarts >= js.Spec.FailurePolicy.MaxRestarts { + firstFailedJob := findFirstFailedJob(ownedJobs.failed) + return r.failJobSet(ctx, js, ReachedMaxRestartsReason, messageWithFirstFailedJob(ReachedMaxRestartsMessage, firstFailedJob.Name)) } - return r.restartPolicyRecreateAll(ctx, js, ownedJobs) + + // To reach this point a job must have failed. + return r.failurePolicyRecreateAll(ctx, js, ownedJobs) } -func (r *JobSetReconciler) restartPolicyRecreateAll(ctx context.Context, js *jobset.JobSet, ownedJobs *childJobs) error { +func (r *JobSetReconciler) failurePolicyRecreateAll(ctx context.Context, js *jobset.JobSet, ownedJobs *childJobs) error { log := ctrl.LoggerFrom(ctx) - // If JobSet has reached max number of restarts, mark it as failed and return. - if js.Status.Restarts >= js.Spec.FailurePolicy.MaxRestarts { - return r.ensureCondition(ctx, ensureConditionOpts{ - jobset: js, - eventType: corev1.EventTypeWarning, - condition: metav1.Condition{ - Type: string(jobset.JobSetFailed), - Status: metav1.ConditionStatus(corev1.ConditionTrue), - Reason: "ReachedMaxRestarts", - Message: "jobset failed due to reaching max number of restarts", - }, - }) - } - // Increment JobSet restarts. This will trigger reconciliation and result in deletions // of old jobs not part of the current jobSet run. js.Status.Restarts += 1 @@ -651,14 +649,14 @@ func (r *JobSetReconciler) ensureCondition(ctx context.Context, opts ensureCondi return nil } -func (r *JobSetReconciler) failJobSet(ctx context.Context, js *jobset.JobSet) error { +func (r *JobSetReconciler) failJobSet(ctx context.Context, js *jobset.JobSet, reason, msg string) error { return r.ensureCondition(ctx, ensureConditionOpts{ jobset: js, condition: metav1.Condition{ Type: string(jobset.JobSetFailed), Status: metav1.ConditionStatus(corev1.ConditionTrue), - Reason: "FailedJobs", - Message: "jobset failed due to one or more job failures", + Reason: reason, + Message: msg, }, eventType: corev1.EventTypeWarning, }) @@ -916,3 +914,42 @@ func findReplicatedStatus(replicatedJobStatus []jobset.ReplicatedJobStatus, repl } return jobset.ReplicatedJobStatus{} } + +// messageWithFirstFailedJob appends the first failed job to the original event message in human readable way. +func messageWithFirstFailedJob(msg, firstFailedJobName string) string { + return fmt.Sprintf("%s (first failed job: %s)", msg, firstFailedJobName) +} + +// findFirstFailedJob accepts a slice of failed Jobs and returns the Job which has a JobFailed condition +// with the oldest transition time. +func findFirstFailedJob(failedJobs []*batchv1.Job) *batchv1.Job { + var ( + firstFailedJob *batchv1.Job + firstFailureTime *metav1.Time + ) + for _, job := range failedJobs { + failureTime := findJobFailureTime(job) + // If job has actually failed and it is the first (or only) failure we've seen, + // store the job for output. + if failureTime != nil && (firstFailedJob == nil || failureTime.Before(firstFailureTime)) { + firstFailedJob = job + firstFailureTime = failureTime + } + } + return firstFailedJob +} + +// findJobFailureTime is a helper function which extracts the Job failure time from a Job, +// if the JobFailed condition exists and is true. +func findJobFailureTime(job *batchv1.Job) *metav1.Time { + if job == nil { + return nil + } + for _, c := range job.Status.Conditions { + // If this Job failed before the oldest known Job failiure, update the first failed job. + if c.Type == batchv1.JobFailed && c.Status == corev1.ConditionTrue { + return &c.LastTransitionTime + } + } + return nil +} diff --git a/pkg/controllers/jobset_controller_test.go b/pkg/controllers/jobset_controller_test.go index 4922e21d0..5ceea3bbf 100644 --- a/pkg/controllers/jobset_controller_test.go +++ b/pkg/controllers/jobset_controller_test.go @@ -17,9 +17,11 @@ import ( "context" "strconv" "testing" + "time" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" + "github.com/stretchr/testify/assert" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -1106,6 +1108,71 @@ func TestCalculateReplicatedJobStatuses(t *testing.T) { } } +func TestFindFirstFailedJob(t *testing.T) { + testCases := []struct { + name string + failedJobs []*batchv1.Job + expected *batchv1.Job + }{ + { + name: "No failed jobs", + failedJobs: []*batchv1.Job{}, + expected: nil, + }, + { + name: "Single failed job", + failedJobs: []*batchv1.Job{ + jobWithFailedCondition("job1", time.Now().Add(-1*time.Hour)), + }, + expected: jobWithFailedCondition("job1", time.Now().Add(-1*time.Hour)), + }, + { + name: "Multiple failed jobs, earliest first", + failedJobs: []*batchv1.Job{ + jobWithFailedCondition("job1", time.Now().Add(-3*time.Hour)), + jobWithFailedCondition("job2", time.Now().Add(-5*time.Hour)), + }, + expected: jobWithFailedCondition("job2", time.Now().Add(-5*time.Hour)), + }, + { + name: "Jobs without failed condition", + failedJobs: []*batchv1.Job{ + {ObjectMeta: metav1.ObjectMeta{Name: "job1"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "job2"}}, + }, + expected: nil, + }, + } + + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + result := findFirstFailedJob(tc.failedJobs) + if result != nil && tc.expected != nil { + assert.Equal(t, result.Name, tc.expected.Name) + } else if result != nil && tc.expected == nil || result == nil && tc.expected != nil { + t.Errorf("Expected: %v, got: %v)", result, tc.expected) + } + }) + } +} + +// Helper function to create a job object with a failed condition +func jobWithFailedCondition(name string, failureTime time.Time) *batchv1.Job { + return &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{Name: name}, + Status: batchv1.JobStatus{ + Conditions: []batchv1.JobCondition{ + { + Type: batchv1.JobFailed, + Status: corev1.ConditionTrue, + LastTransitionTime: metav1.NewTime(failureTime), + }, + }, + }, + } +} + type makeJobArgs struct { jobSetName string replicatedJobName string @@ -1118,6 +1185,7 @@ type makeJobArgs struct { nodeSelectorStrategy bool } +// Helper function to create a Job for unit testing. func makeJob(args *makeJobArgs) *testutils.JobWrapper { labels := map[string]string{ jobset.JobSetNameKey: args.jobSetName,