Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Include first failed job name in event emitted when JobSet fails, as well as the JobSet failure condition #477

Merged
merged 5 commits into from
Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@ RUN go mod download
# Copy the go source
COPY main.go main.go
COPY api/ api/
COPY pkg/controllers/ pkg/controllers/
COPY pkg/util/ pkg/util/
COPY pkg/webhooks pkg/webhooks
COPY pkg/ pkg/

# Build
# the GOARCH has not a default value to allow the binary be built according to the host where the command
Expand Down
56 changes: 56 additions & 0 deletions pkg/constants/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
Copyright 2023 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package constants

const (
// JobOwnerKey is the field used to build the JobSet index, which enables looking up Jobs
// by the owner JobSet quickly.
JobOwnerKey = ".metadata.controller"

// RestartsKey is an annotation and label key which defines the restart attempt number
// the JobSet is currently on.
RestartsKey = "jobset.sigs.k8s.io/restart-attempt"

// MaxParallelism defines the maximum number of parallel Job creations/deltions that
// the JobSet controller can perform.
MaxParallelism = 50

// Event reason and message for when a JobSet fails due to reaching max restarts
// defined in its failure policy.
ReachedMaxRestartsReason = "ReachedMaxRestarts"
ReachedMaxRestartsMessage = "jobset failed due to reaching max number of restarts"

// Event reason and message for when a JobSet fails due to any Job failing, when
// no failure policy is defined.
// This is the default failure handling behavior.
FailedJobsReason = "FailedJobs"
FailedJobsMessage = "jobset failed due to one or more job failures"

// Event reason and message for when a Jobset completes successfully.
AllJobsCompletedReason = "AllJobsCompleted"
AllJobsCompletedMessage = "jobset completed successfully"

// Event reason used when a Job creation fails.
// The event uses the error(s) as the reason.
JobCreationFailedReason = "JobCreationFailed"

// Event reason and message for when the pod controller detects a violation
// of the JobSet exclusive placment policy (i.e., follower pods not colocated in
// the same topology domain as the leader pod for that Job).
ExclusivePlacementViolationReason = "ExclusivePlacementViolation"
ExclusivePlacementViolationMessage = "Pod violated JobSet exclusive placement policy"
)
153 changes: 69 additions & 84 deletions pkg/controllers/jobset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,12 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"

jobset "sigs.k8s.io/jobset/api/jobset/v1alpha2"
"sigs.k8s.io/jobset/pkg/constants"
"sigs.k8s.io/jobset/pkg/util/collections"
"sigs.k8s.io/jobset/pkg/util/placement"
)

const (
RestartsKey string = "jobset.sigs.k8s.io/restart-attempt"
maxParallelism int = 50
)

var (
jobOwnerKey = ".metadata.controller"
apiGVStr = jobset.GroupVersion.String()
)
var apiGVStr = jobset.GroupVersion.String()

// JobSetReconciler reconciles a JobSet object
type JobSetReconciler struct {
Expand Down Expand Up @@ -184,7 +177,7 @@ func (r *JobSetReconciler) SetupWithManager(mgr ctrl.Manager) error {
}

func SetupJobSetIndexes(ctx context.Context, indexer client.FieldIndexer) error {
return indexer.IndexField(ctx, &batchv1.Job{}, jobOwnerKey, func(obj client.Object) []string {
return indexer.IndexField(ctx, &batchv1.Job{}, constants.JobOwnerKey, func(obj client.Object) []string {
o := obj.(*batchv1.Job)
owner := metav1.GetControllerOf(o)
if owner == nil {
Expand All @@ -205,7 +198,7 @@ func (r *JobSetReconciler) getChildJobs(ctx context.Context, js *jobset.JobSet)

// Get all active jobs owned by JobSet.
var childJobList batchv1.JobList
if err := r.List(ctx, &childJobList, client.InNamespace(js.Namespace), client.MatchingFields{jobOwnerKey: js.Name}); err != nil {
if err := r.List(ctx, &childJobList, client.InNamespace(js.Namespace), client.MatchingFields{constants.JobOwnerKey: js.Name}); err != nil {
return nil, err
}

Expand All @@ -214,9 +207,9 @@ func (r *JobSetReconciler) getChildJobs(ctx context.Context, js *jobset.JobSet)
for i, job := range childJobList.Items {
// Jobs with jobset.sigs.k8s.io/restart-attempt < jobset.status.restarts are marked for
// deletion, as they were part of the previous JobSet run.
jobRestarts, err := strconv.Atoi(job.Labels[RestartsKey])
jobRestarts, err := strconv.Atoi(job.Labels[constants.RestartsKey])
if err != nil {
log.Error(err, fmt.Sprintf("invalid value for label %s, must be integer", RestartsKey))
log.Error(err, fmt.Sprintf("invalid value for label %s, must be integer", constants.RestartsKey))
ownedJobs.delete = append(ownedJobs.delete, &childJobList.Items[i])
return nil, err
}
Expand Down Expand Up @@ -435,12 +428,12 @@ func (r *JobSetReconciler) createJobs(ctx context.Context, js *jobset.JobSet, ow

status := findReplicatedStatus(replicatedJobStatus, replicatedJob.Name)
rjJobStarted := replicatedJobsStarted(replicatedJob.Replicas, status)
// For startup policy, if the job is started we can skip this loop
// Jobs have been created
// For startup policy, if the job is started we can skip this loop.
// Jobs have been created.
if !jobSetSuspended(js) && inOrderStartupPolicy(startupPolicy) && rjJobStarted {
continue
}
workqueue.ParallelizeUntil(ctx, maxParallelism, len(jobs), func(i int) {
workqueue.ParallelizeUntil(ctx, constants.MaxParallelism, len(jobs), func(i int) {
job := jobs[i]

// Set jobset controller as owner of the job for garbage collection and reconcilation.
Expand Down Expand Up @@ -477,7 +470,7 @@ func (r *JobSetReconciler) createJobs(ctx context.Context, js *jobset.JobSet, ow
if allErrs != nil {
// Emit event to propagate the Job creation failures up to be more visible to the user.
// TODO(#422): Investigate ways to validate Job templates at JobSet validation time.
r.Record.Eventf(js, corev1.EventTypeWarning, "JobCreationFailed", allErrs.Error())
r.Record.Eventf(js, corev1.EventTypeWarning, constants.JobCreationFailedReason, allErrs.Error())
return allErrs
}
// Skip emitting a condition for StartupPolicy if JobSet is suspended
Expand Down Expand Up @@ -540,8 +533,8 @@ func (r *JobSetReconciler) executeSuccessPolicy(ctx context.Context, js *jobset.
condition: metav1.Condition{
Type: string(jobset.JobSetCompleted),
Status: metav1.ConditionStatus(corev1.ConditionTrue),
Reason: "AllJobsCompleted",
Message: "jobset completed successfully",
Reason: constants.AllJobsCompletedReason,
Message: constants.AllJobsCompletedMessage,
},
}); err != nil {
return false, err
Expand All @@ -552,39 +545,25 @@ func (r *JobSetReconciler) executeSuccessPolicy(ctx context.Context, js *jobset.
}

func (r *JobSetReconciler) executeFailurePolicy(ctx context.Context, js *jobset.JobSet, ownedJobs *childJobs) error {
// If no failure policy is defined, the default failure policy is to mark the JobSet
// as failed if any of its jobs have failed.
// If no failure policy is defined, mark the JobSet as failed.
if js.Spec.FailurePolicy == nil {
return r.failJobSet(ctx, js)
firstFailedJob := findFirstFailedJob(ownedJobs.failed)
return r.failJobSet(ctx, js, constants.FailedJobsReason, messageWithFirstFailedJob(constants.FailedJobsMessage, firstFailedJob.Name))
}
// To reach this point a job must have failed.
return r.executeRestartPolicy(ctx, js, ownedJobs)
}

func (r *JobSetReconciler) executeRestartPolicy(ctx context.Context, js *jobset.JobSet, ownedJobs *childJobs) error {
if js.Spec.FailurePolicy.MaxRestarts == 0 {
return r.failJobSet(ctx, js)
// If JobSet has reached max restarts, fail the JobSet.
if js.Status.Restarts >= js.Spec.FailurePolicy.MaxRestarts {
firstFailedJob := findFirstFailedJob(ownedJobs.failed)
return r.failJobSet(ctx, js, constants.ReachedMaxRestartsReason, messageWithFirstFailedJob(constants.ReachedMaxRestartsMessage, firstFailedJob.Name))
}
return r.restartPolicyRecreateAll(ctx, js, ownedJobs)

// To reach this point a job must have failed.
return r.failurePolicyRecreateAll(ctx, js, ownedJobs)
}

func (r *JobSetReconciler) restartPolicyRecreateAll(ctx context.Context, js *jobset.JobSet, ownedJobs *childJobs) error {
func (r *JobSetReconciler) failurePolicyRecreateAll(ctx context.Context, js *jobset.JobSet, ownedJobs *childJobs) error {
log := ctrl.LoggerFrom(ctx)

// If JobSet has reached max number of restarts, mark it as failed and return.
if js.Status.Restarts >= js.Spec.FailurePolicy.MaxRestarts {
return r.ensureCondition(ctx, ensureConditionOpts{
jobset: js,
eventType: corev1.EventTypeWarning,
condition: metav1.Condition{
Type: string(jobset.JobSetFailed),
Status: metav1.ConditionStatus(corev1.ConditionTrue),
Reason: "ReachedMaxRestarts",
Message: "jobset failed due to reaching max number of restarts",
},
})
}

// Increment JobSet restarts. This will trigger reconciliation and result in deletions
// of old jobs not part of the current jobSet run.
js.Status.Restarts += 1
Expand All @@ -599,7 +578,7 @@ func (r *JobSetReconciler) deleteJobs(ctx context.Context, jobsForDeletion []*ba
log := ctrl.LoggerFrom(ctx)
lock := &sync.Mutex{}
var finalErrs []error
workqueue.ParallelizeUntil(ctx, maxParallelism, len(jobsForDeletion), func(i int) {
workqueue.ParallelizeUntil(ctx, constants.MaxParallelism, len(jobsForDeletion), func(i int) {
targetJob := jobsForDeletion[i]
// Skip deleting jobs with deletion timestamp already set.
if targetJob.DeletionTimestamp != nil {
Expand All @@ -615,7 +594,7 @@ func (r *JobSetReconciler) deleteJobs(ctx context.Context, jobsForDeletion []*ba
finalErrs = append(finalErrs, err)
return
}
log.V(2).Info("successfully deleted job", "job", klog.KObj(targetJob), "restart attempt", targetJob.Labels[targetJob.Labels[RestartsKey]])
log.V(2).Info("successfully deleted job", "job", klog.KObj(targetJob), "restart attempt", targetJob.Labels[targetJob.Labels[constants.RestartsKey]])
})
return errors.Join(finalErrs...)
}
Expand Down Expand Up @@ -651,14 +630,14 @@ func (r *JobSetReconciler) ensureCondition(ctx context.Context, opts ensureCondi
return nil
}

func (r *JobSetReconciler) failJobSet(ctx context.Context, js *jobset.JobSet) error {
func (r *JobSetReconciler) failJobSet(ctx context.Context, js *jobset.JobSet, reason, msg string) error {
return r.ensureCondition(ctx, ensureConditionOpts{
jobset: js,
condition: metav1.Condition{
Type: string(jobset.JobSetFailed),
Status: metav1.ConditionStatus(corev1.ConditionTrue),
Reason: "FailedJobs",
Message: "jobset failed due to one or more job failures",
Reason: reason,
Message: msg,
},
eventType: corev1.EventTypeWarning,
})
Expand Down Expand Up @@ -775,15 +754,15 @@ func labelAndAnnotateObject(obj metav1.Object, js *jobset.JobSet, rjob *jobset.R
labels := collections.CloneMap(obj.GetLabels())
labels[jobset.JobSetNameKey] = js.Name
labels[jobset.ReplicatedJobNameKey] = rjob.Name
labels[RestartsKey] = strconv.Itoa(int(js.Status.Restarts))
labels[constants.RestartsKey] = strconv.Itoa(int(js.Status.Restarts))
labels[jobset.ReplicatedJobReplicas] = strconv.Itoa(int(rjob.Replicas))
labels[jobset.JobIndexKey] = strconv.Itoa(jobIdx)
labels[jobset.JobKey] = jobHashKey(js.Namespace, jobName)

annotations := collections.CloneMap(obj.GetAnnotations())
annotations[jobset.JobSetNameKey] = js.Name
annotations[jobset.ReplicatedJobNameKey] = rjob.Name
annotations[RestartsKey] = strconv.Itoa(int(js.Status.Restarts))
annotations[constants.RestartsKey] = strconv.Itoa(int(js.Status.Restarts))
annotations[jobset.ReplicatedJobReplicas] = strconv.Itoa(int(rjob.Replicas))
annotations[jobset.JobIndexKey] = strconv.Itoa(jobIdx)
annotations[jobset.JobKey] = jobHashKey(js.Namespace, jobName)
Expand Down Expand Up @@ -867,39 +846,6 @@ func dnsHostnamesEnabled(js *jobset.JobSet) bool {
return js.Spec.Network.EnableDNSHostnames != nil && *js.Spec.Network.EnableDNSHostnames
}

func jobMatchesSuccessPolicy(js *jobset.JobSet, job *batchv1.Job) bool {
return len(js.Spec.SuccessPolicy.TargetReplicatedJobs) == 0 || collections.Contains(js.Spec.SuccessPolicy.TargetReplicatedJobs, job.ObjectMeta.Labels[jobset.ReplicatedJobNameKey])
}

func replicatedJobMatchesSuccessPolicy(js *jobset.JobSet, rjob *jobset.ReplicatedJob) bool {
return len(js.Spec.SuccessPolicy.TargetReplicatedJobs) == 0 || collections.Contains(js.Spec.SuccessPolicy.TargetReplicatedJobs, rjob.Name)
}

func numJobsMatchingSuccessPolicy(js *jobset.JobSet, jobs []*batchv1.Job) int {
total := 0
for _, job := range jobs {
if jobMatchesSuccessPolicy(js, job) {
total += 1
}
}
return total
}

func numJobsExpectedToSucceed(js *jobset.JobSet) int {
total := 0
switch js.Spec.SuccessPolicy.Operator {
case jobset.OperatorAny:
total = 1
case jobset.OperatorAll:
for _, rjob := range js.Spec.ReplicatedJobs {
if replicatedJobMatchesSuccessPolicy(js, &rjob) {
total += int(rjob.Replicas)
}
}
}
return total
}

func jobSetSuspended(js *jobset.JobSet) bool {
return ptr.Deref(js.Spec.Suspend, false)
}
Expand All @@ -916,3 +862,42 @@ func findReplicatedStatus(replicatedJobStatus []jobset.ReplicatedJobStatus, repl
}
return jobset.ReplicatedJobStatus{}
}

// messageWithFirstFailedJob appends the first failed job to the original event message in human readable way.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we move these out of this file?

They don't require the controller so it may be useful to move them to a separate file.

Copy link
Contributor Author

@danielvegamyhre danielvegamyhre Mar 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah jobset_controller.go is overdue for refactoring. To start I think we can refactor some of the many helper functions into separate files based on the feature (similar to what you did with startup policy).

I did some refactoring in this PR (e.g. moving some functions into success_policy.go, adding a constants pkg, etc.)

However, for these particular functions, I'm not sure of the best place to put them yet. They are about finding the first failed job for a Jobset and generating an event message for it, which doesn't fit into any existing (or new) logical grouping.

I think for now we should leave these 3 functions here and maybe in a separate PR we can refactor some more, I don't want to go overboard splitting things up.

func messageWithFirstFailedJob(msg, firstFailedJobName string) string {
return fmt.Sprintf("%s (first failed job: %s)", msg, firstFailedJobName)
}

// findFirstFailedJob accepts a slice of failed Jobs and returns the Job which has a JobFailed condition
// with the oldest transition time.
func findFirstFailedJob(failedJobs []*batchv1.Job) *batchv1.Job {
var (
firstFailedJob *batchv1.Job
firstFailureTime *metav1.Time
)
for _, job := range failedJobs {
failureTime := findJobFailureTime(job)
// If job has actually failed and it is the first (or only) failure we've seen,
// store the job for output.
if failureTime != nil && (firstFailedJob == nil || failureTime.Before(firstFailureTime)) {
firstFailedJob = job
firstFailureTime = failureTime
}
}
return firstFailedJob
}

// findJobFailureTime is a helper function which extracts the Job failure time from a Job,
// if the JobFailed condition exists and is true.
func findJobFailureTime(job *batchv1.Job) *metav1.Time {
if job == nil {
return nil
}
for _, c := range job.Status.Conditions {
// If this Job failed before the oldest known Job failiure, update the first failed job.
if c.Type == batchv1.JobFailed && c.Status == corev1.ConditionTrue {
return &c.LastTransitionTime
}
}
return nil
}
Loading