Skip to content

Commit

Permalink
include first failed job name in event and jobset failed condition
Browse files Browse the repository at this point in the history
  • Loading branch information
danielvegamyhre committed Mar 26, 2024
1 parent 8e15bae commit 3c293f4
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 31 deletions.
99 changes: 68 additions & 31 deletions pkg/controllers/jobset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,18 @@ import (
const (
RestartsKey string = "jobset.sigs.k8s.io/restart-attempt"
maxParallelism int = 50

// Event reasons and messages
ReachedMaxRestartsReason = "ReachedMaxRestarts"
ReachedMaxRestartsMessage = "jobset failed due to reaching max number of restarts"

FailedJobsReason = "FailedJobs"
FailedJobsMessage = "jobset failed due to one or more job failures"

JobCreationFailedReason = "JobCreationFailed"

AllJobsCompletedReason = "AllJobsCompleted"
AllJobsCompletedMessage = "jobset completed successfully"
)

var (
Expand Down Expand Up @@ -477,7 +489,7 @@ func (r *JobSetReconciler) createJobs(ctx context.Context, js *jobset.JobSet, ow
if allErrs != nil {
// Emit event to propagate the Job creation failures up to be more visible to the user.
// TODO(#422): Investigate ways to validate Job templates at JobSet validation time.
r.Record.Eventf(js, corev1.EventTypeWarning, "JobCreationFailed", allErrs.Error())
r.Record.Eventf(js, corev1.EventTypeWarning, JobCreationFailedReason, allErrs.Error())
return allErrs
}
// Skip emitting a condition for StartupPolicy if JobSet is suspended
Expand Down Expand Up @@ -540,8 +552,8 @@ func (r *JobSetReconciler) executeSuccessPolicy(ctx context.Context, js *jobset.
condition: metav1.Condition{
Type: string(jobset.JobSetCompleted),
Status: metav1.ConditionStatus(corev1.ConditionTrue),
Reason: "AllJobsCompleted",
Message: "jobset completed successfully",
Reason: AllJobsCompletedReason,
Message: AllJobsCompletedMessage,
},
}); err != nil {
return false, err
Expand All @@ -552,39 +564,25 @@ func (r *JobSetReconciler) executeSuccessPolicy(ctx context.Context, js *jobset.
}

func (r *JobSetReconciler) executeFailurePolicy(ctx context.Context, js *jobset.JobSet, ownedJobs *childJobs) error {
// If no failure policy is defined, the default failure policy is to mark the JobSet
// as failed if any of its jobs have failed.
// If no failure policy is defined, mark the JobSet as failed.
if js.Spec.FailurePolicy == nil {
return r.failJobSet(ctx, js)
firstFailedJob := findFirstFailedJob(ownedJobs.failed)
return r.failJobSet(ctx, js, FailedJobsReason, messageWithFirstFailedJob(FailedJobsMessage, firstFailedJob.Name))
}
// To reach this point a job must have failed.
return r.executeRestartPolicy(ctx, js, ownedJobs)
}

func (r *JobSetReconciler) executeRestartPolicy(ctx context.Context, js *jobset.JobSet, ownedJobs *childJobs) error {
if js.Spec.FailurePolicy.MaxRestarts == 0 {
return r.failJobSet(ctx, js)
// If JobSet has reached max restarts, fail the JobSet.
if js.Status.Restarts >= js.Spec.FailurePolicy.MaxRestarts {
firstFailedJob := findFirstFailedJob(ownedJobs.failed)
return r.failJobSet(ctx, js, ReachedMaxRestartsReason, messageWithFirstFailedJob(ReachedMaxRestartsMessage, firstFailedJob.Name))
}
return r.restartPolicyRecreateAll(ctx, js, ownedJobs)

// To reach this point a job must have failed.
return r.failurePolicyRecreateAll(ctx, js, ownedJobs)
}

func (r *JobSetReconciler) restartPolicyRecreateAll(ctx context.Context, js *jobset.JobSet, ownedJobs *childJobs) error {
func (r *JobSetReconciler) failurePolicyRecreateAll(ctx context.Context, js *jobset.JobSet, ownedJobs *childJobs) error {
log := ctrl.LoggerFrom(ctx)

// If JobSet has reached max number of restarts, mark it as failed and return.
if js.Status.Restarts >= js.Spec.FailurePolicy.MaxRestarts {
return r.ensureCondition(ctx, ensureConditionOpts{
jobset: js,
eventType: corev1.EventTypeWarning,
condition: metav1.Condition{
Type: string(jobset.JobSetFailed),
Status: metav1.ConditionStatus(corev1.ConditionTrue),
Reason: "ReachedMaxRestarts",
Message: "jobset failed due to reaching max number of restarts",
},
})
}

// Increment JobSet restarts. This will trigger reconciliation and result in deletions
// of old jobs not part of the current jobSet run.
js.Status.Restarts += 1
Expand Down Expand Up @@ -651,14 +649,14 @@ func (r *JobSetReconciler) ensureCondition(ctx context.Context, opts ensureCondi
return nil
}

func (r *JobSetReconciler) failJobSet(ctx context.Context, js *jobset.JobSet) error {
func (r *JobSetReconciler) failJobSet(ctx context.Context, js *jobset.JobSet, reason, msg string) error {
return r.ensureCondition(ctx, ensureConditionOpts{
jobset: js,
condition: metav1.Condition{
Type: string(jobset.JobSetFailed),
Status: metav1.ConditionStatus(corev1.ConditionTrue),
Reason: "FailedJobs",
Message: "jobset failed due to one or more job failures",
Reason: reason,
Message: msg,
},
eventType: corev1.EventTypeWarning,
})
Expand Down Expand Up @@ -916,3 +914,42 @@ func findReplicatedStatus(replicatedJobStatus []jobset.ReplicatedJobStatus, repl
}
return jobset.ReplicatedJobStatus{}
}

// messageWithFirstFailedJob appends the first failed job to the original event message in human readable way.
func messageWithFirstFailedJob(msg, firstFailedJobName string) string {
return fmt.Sprintf("%s (first failed job: %s)", msg, firstFailedJobName)
}

// findFirstFailedJob accepts a slice of failed Jobs and returns the Job which has a JobFailed condition
// with the oldest transition time.
func findFirstFailedJob(failedJobs []*batchv1.Job) *batchv1.Job {
var (
firstFailedJob *batchv1.Job
firstFailureTime *metav1.Time
)
for _, job := range failedJobs {
failureTime := findJobFailureTime(job)
// If job has actually failed and it is the first (or only) failure we've seen,
// store the job for output.
if failureTime != nil && (firstFailedJob == nil || failureTime.Before(firstFailureTime)) {
firstFailedJob = job
firstFailureTime = failureTime
}
}
return firstFailedJob
}

// findJobFailureTime is a helper function which extracts the Job failure time from a Job,
// if the JobFailed condition exists and is true.
func findJobFailureTime(job *batchv1.Job) *metav1.Time {
if job == nil {
return nil
}
for _, c := range job.Status.Conditions {
// If this Job failed before the oldest known Job failiure, update the first failed job.
if c.Type == batchv1.JobFailed && c.Status == corev1.ConditionTrue {
return &c.LastTransitionTime
}
}
return nil
}
68 changes: 68 additions & 0 deletions pkg/controllers/jobset_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ import (
"context"
"strconv"
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/stretchr/testify/assert"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -1106,6 +1108,71 @@ func TestCalculateReplicatedJobStatuses(t *testing.T) {
}
}

func TestFindFirstFailedJob(t *testing.T) {
testCases := []struct {
name string
failedJobs []*batchv1.Job
expected *batchv1.Job
}{
{
name: "No failed jobs",
failedJobs: []*batchv1.Job{},
expected: nil,
},
{
name: "Single failed job",
failedJobs: []*batchv1.Job{
jobWithFailedCondition("job1", time.Now().Add(-1*time.Hour)),
},
expected: jobWithFailedCondition("job1", time.Now().Add(-1*time.Hour)),
},
{
name: "Multiple failed jobs, earliest first",
failedJobs: []*batchv1.Job{
jobWithFailedCondition("job1", time.Now().Add(-3*time.Hour)),
jobWithFailedCondition("job2", time.Now().Add(-5*time.Hour)),
},
expected: jobWithFailedCondition("job2", time.Now().Add(-5*time.Hour)),
},
{
name: "Jobs without failed condition",
failedJobs: []*batchv1.Job{
{ObjectMeta: metav1.ObjectMeta{Name: "job1"}},
{ObjectMeta: metav1.ObjectMeta{Name: "job2"}},
},
expected: nil,
},
}

for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
result := findFirstFailedJob(tc.failedJobs)
if result != nil && tc.expected != nil {
assert.Equal(t, result.Name, tc.expected.Name)
} else if result != nil && tc.expected == nil || result == nil && tc.expected != nil {
t.Errorf("Expected: %v, got: %v)", result, tc.expected)
}
})
}
}

// Helper function to create a job object with a failed condition
func jobWithFailedCondition(name string, failureTime time.Time) *batchv1.Job {
return &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{Name: name},
Status: batchv1.JobStatus{
Conditions: []batchv1.JobCondition{
{
Type: batchv1.JobFailed,
Status: corev1.ConditionTrue,
LastTransitionTime: metav1.NewTime(failureTime),
},
},
},
}
}

type makeJobArgs struct {
jobSetName string
replicatedJobName string
Expand All @@ -1118,6 +1185,7 @@ type makeJobArgs struct {
nodeSelectorStrategy bool
}

// Helper function to create a Job for unit testing.
func makeJob(args *makeJobArgs) *testutils.JobWrapper {
labels := map[string]string{
jobset.JobSetNameKey: args.jobSetName,
Expand Down

0 comments on commit 3c293f4

Please sign in to comment.