diff --git a/pkg/controllers/constants/constants.go b/pkg/controllers/constants/constants.go new file mode 100644 index 000000000..5c2567dc0 --- /dev/null +++ b/pkg/controllers/constants/constants.go @@ -0,0 +1,40 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package constants + +const ( + // JobOwnerKey is + JobOwnerKey = ".metadata.controller" + RestartsKey string = "jobset.sigs.k8s.io/restart-attempt" + MaxParallelism int = 50 + + // Event reasons and messages. + ReachedMaxRestartsReason = "ReachedMaxRestarts" + ReachedMaxRestartsMessage = "jobset failed due to reaching max number of restarts" + + FailedJobsReason = "FailedJobs" + FailedJobsMessage = "jobset failed due to one or more job failures" + + AllJobsCompletedReason = "AllJobsCompleted" + AllJobsCompletedMessage = "jobset completed successfully" + + // JobCreationFailed event uses the error(s) as the reason. + JobCreationFailedReason = "JobCreationFailed" + + ExclusivePlacementViolationReason = "ExclusivePlacementViolation" + ExclusivePlacementViolationMessage = "Pod violated JobSet exclusive placement policy" +) diff --git a/pkg/controllers/jobset_controller.go b/pkg/controllers/jobset_controller.go index 2bebb7634..e5203c0ad 100644 --- a/pkg/controllers/jobset_controller.go +++ b/pkg/controllers/jobset_controller.go @@ -36,31 +36,12 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" jobset "sigs.k8s.io/jobset/api/jobset/v1alpha2" + "sigs.k8s.io/jobset/pkg/controllers/constants" "sigs.k8s.io/jobset/pkg/util/collections" "sigs.k8s.io/jobset/pkg/util/placement" ) -const ( - RestartsKey string = "jobset.sigs.k8s.io/restart-attempt" - maxParallelism int = 50 - - // Event reasons and messages - ReachedMaxRestartsReason = "ReachedMaxRestarts" - ReachedMaxRestartsMessage = "jobset failed due to reaching max number of restarts" - - FailedJobsReason = "FailedJobs" - FailedJobsMessage = "jobset failed due to one or more job failures" - - JobCreationFailedReason = "JobCreationFailed" - - AllJobsCompletedReason = "AllJobsCompleted" - AllJobsCompletedMessage = "jobset completed successfully" -) - -var ( - jobOwnerKey = ".metadata.controller" - apiGVStr = jobset.GroupVersion.String() -) +var apiGVStr = jobset.GroupVersion.String() // JobSetReconciler reconciles a JobSet object type JobSetReconciler struct { @@ -196,7 +177,7 @@ func (r *JobSetReconciler) SetupWithManager(mgr ctrl.Manager) error { } func SetupJobSetIndexes(ctx context.Context, indexer client.FieldIndexer) error { - return indexer.IndexField(ctx, &batchv1.Job{}, jobOwnerKey, func(obj client.Object) []string { + return indexer.IndexField(ctx, &batchv1.Job{}, constants.JobOwnerKey, func(obj client.Object) []string { o := obj.(*batchv1.Job) owner := metav1.GetControllerOf(o) if owner == nil { @@ -217,7 +198,7 @@ func (r *JobSetReconciler) getChildJobs(ctx context.Context, js *jobset.JobSet) // Get all active jobs owned by JobSet. var childJobList batchv1.JobList - if err := r.List(ctx, &childJobList, client.InNamespace(js.Namespace), client.MatchingFields{jobOwnerKey: js.Name}); err != nil { + if err := r.List(ctx, &childJobList, client.InNamespace(js.Namespace), client.MatchingFields{constants.JobOwnerKey: js.Name}); err != nil { return nil, err } @@ -226,9 +207,9 @@ func (r *JobSetReconciler) getChildJobs(ctx context.Context, js *jobset.JobSet) for i, job := range childJobList.Items { // Jobs with jobset.sigs.k8s.io/restart-attempt < jobset.status.restarts are marked for // deletion, as they were part of the previous JobSet run. - jobRestarts, err := strconv.Atoi(job.Labels[RestartsKey]) + jobRestarts, err := strconv.Atoi(job.Labels[constants.RestartsKey]) if err != nil { - log.Error(err, fmt.Sprintf("invalid value for label %s, must be integer", RestartsKey)) + log.Error(err, fmt.Sprintf("invalid value for label %s, must be integer", constants.RestartsKey)) ownedJobs.delete = append(ownedJobs.delete, &childJobList.Items[i]) return nil, err } @@ -447,12 +428,12 @@ func (r *JobSetReconciler) createJobs(ctx context.Context, js *jobset.JobSet, ow status := findReplicatedStatus(replicatedJobStatus, replicatedJob.Name) rjJobStarted := replicatedJobsStarted(replicatedJob.Replicas, status) - // For startup policy, if the job is started we can skip this loop - // Jobs have been created + // For startup policy, if the job is started we can skip this loop. + // Jobs have been created. if !jobSetSuspended(js) && inOrderStartupPolicy(startupPolicy) && rjJobStarted { continue } - workqueue.ParallelizeUntil(ctx, maxParallelism, len(jobs), func(i int) { + workqueue.ParallelizeUntil(ctx, constants.MaxParallelism, len(jobs), func(i int) { job := jobs[i] // Set jobset controller as owner of the job for garbage collection and reconcilation. @@ -489,7 +470,7 @@ func (r *JobSetReconciler) createJobs(ctx context.Context, js *jobset.JobSet, ow if allErrs != nil { // Emit event to propagate the Job creation failures up to be more visible to the user. // TODO(#422): Investigate ways to validate Job templates at JobSet validation time. - r.Record.Eventf(js, corev1.EventTypeWarning, JobCreationFailedReason, allErrs.Error()) + r.Record.Eventf(js, corev1.EventTypeWarning, constants.JobCreationFailedReason, allErrs.Error()) return allErrs } // Skip emitting a condition for StartupPolicy if JobSet is suspended @@ -552,8 +533,8 @@ func (r *JobSetReconciler) executeSuccessPolicy(ctx context.Context, js *jobset. condition: metav1.Condition{ Type: string(jobset.JobSetCompleted), Status: metav1.ConditionStatus(corev1.ConditionTrue), - Reason: AllJobsCompletedReason, - Message: AllJobsCompletedMessage, + Reason: constants.AllJobsCompletedReason, + Message: constants.AllJobsCompletedMessage, }, }); err != nil { return false, err @@ -567,13 +548,13 @@ func (r *JobSetReconciler) executeFailurePolicy(ctx context.Context, js *jobset. // If no failure policy is defined, mark the JobSet as failed. if js.Spec.FailurePolicy == nil { firstFailedJob := findFirstFailedJob(ownedJobs.failed) - return r.failJobSet(ctx, js, FailedJobsReason, messageWithFirstFailedJob(FailedJobsMessage, firstFailedJob.Name)) + return r.failJobSet(ctx, js, constants.FailedJobsReason, messageWithFirstFailedJob(constants.FailedJobsMessage, firstFailedJob.Name)) } // If JobSet has reached max restarts, fail the JobSet. if js.Status.Restarts >= js.Spec.FailurePolicy.MaxRestarts { firstFailedJob := findFirstFailedJob(ownedJobs.failed) - return r.failJobSet(ctx, js, ReachedMaxRestartsReason, messageWithFirstFailedJob(ReachedMaxRestartsMessage, firstFailedJob.Name)) + return r.failJobSet(ctx, js, constants.ReachedMaxRestartsReason, messageWithFirstFailedJob(constants.ReachedMaxRestartsMessage, firstFailedJob.Name)) } // To reach this point a job must have failed. @@ -597,7 +578,7 @@ func (r *JobSetReconciler) deleteJobs(ctx context.Context, jobsForDeletion []*ba log := ctrl.LoggerFrom(ctx) lock := &sync.Mutex{} var finalErrs []error - workqueue.ParallelizeUntil(ctx, maxParallelism, len(jobsForDeletion), func(i int) { + workqueue.ParallelizeUntil(ctx, constants.MaxParallelism, len(jobsForDeletion), func(i int) { targetJob := jobsForDeletion[i] // Skip deleting jobs with deletion timestamp already set. if targetJob.DeletionTimestamp != nil { @@ -613,7 +594,7 @@ func (r *JobSetReconciler) deleteJobs(ctx context.Context, jobsForDeletion []*ba finalErrs = append(finalErrs, err) return } - log.V(2).Info("successfully deleted job", "job", klog.KObj(targetJob), "restart attempt", targetJob.Labels[targetJob.Labels[RestartsKey]]) + log.V(2).Info("successfully deleted job", "job", klog.KObj(targetJob), "restart attempt", targetJob.Labels[targetJob.Labels[constants.RestartsKey]]) }) return errors.Join(finalErrs...) } @@ -773,7 +754,7 @@ func labelAndAnnotateObject(obj metav1.Object, js *jobset.JobSet, rjob *jobset.R labels := collections.CloneMap(obj.GetLabels()) labels[jobset.JobSetNameKey] = js.Name labels[jobset.ReplicatedJobNameKey] = rjob.Name - labels[RestartsKey] = strconv.Itoa(int(js.Status.Restarts)) + labels[constants.RestartsKey] = strconv.Itoa(int(js.Status.Restarts)) labels[jobset.ReplicatedJobReplicas] = strconv.Itoa(int(rjob.Replicas)) labels[jobset.JobIndexKey] = strconv.Itoa(jobIdx) labels[jobset.JobKey] = jobHashKey(js.Namespace, jobName) @@ -781,7 +762,7 @@ func labelAndAnnotateObject(obj metav1.Object, js *jobset.JobSet, rjob *jobset.R annotations := collections.CloneMap(obj.GetAnnotations()) annotations[jobset.JobSetNameKey] = js.Name annotations[jobset.ReplicatedJobNameKey] = rjob.Name - annotations[RestartsKey] = strconv.Itoa(int(js.Status.Restarts)) + annotations[constants.RestartsKey] = strconv.Itoa(int(js.Status.Restarts)) annotations[jobset.ReplicatedJobReplicas] = strconv.Itoa(int(rjob.Replicas)) annotations[jobset.JobIndexKey] = strconv.Itoa(jobIdx) annotations[jobset.JobKey] = jobHashKey(js.Namespace, jobName) @@ -865,39 +846,6 @@ func dnsHostnamesEnabled(js *jobset.JobSet) bool { return js.Spec.Network.EnableDNSHostnames != nil && *js.Spec.Network.EnableDNSHostnames } -func jobMatchesSuccessPolicy(js *jobset.JobSet, job *batchv1.Job) bool { - return len(js.Spec.SuccessPolicy.TargetReplicatedJobs) == 0 || collections.Contains(js.Spec.SuccessPolicy.TargetReplicatedJobs, job.ObjectMeta.Labels[jobset.ReplicatedJobNameKey]) -} - -func replicatedJobMatchesSuccessPolicy(js *jobset.JobSet, rjob *jobset.ReplicatedJob) bool { - return len(js.Spec.SuccessPolicy.TargetReplicatedJobs) == 0 || collections.Contains(js.Spec.SuccessPolicy.TargetReplicatedJobs, rjob.Name) -} - -func numJobsMatchingSuccessPolicy(js *jobset.JobSet, jobs []*batchv1.Job) int { - total := 0 - for _, job := range jobs { - if jobMatchesSuccessPolicy(js, job) { - total += 1 - } - } - return total -} - -func numJobsExpectedToSucceed(js *jobset.JobSet) int { - total := 0 - switch js.Spec.SuccessPolicy.Operator { - case jobset.OperatorAny: - total = 1 - case jobset.OperatorAll: - for _, rjob := range js.Spec.ReplicatedJobs { - if replicatedJobMatchesSuccessPolicy(js, &rjob) { - total += int(rjob.Replicas) - } - } - } - return total -} - func jobSetSuspended(js *jobset.JobSet) bool { return ptr.Deref(js.Spec.Suspend, false) } diff --git a/pkg/controllers/jobset_controller_test.go b/pkg/controllers/jobset_controller_test.go index 5ceea3bbf..fcc0d292a 100644 --- a/pkg/controllers/jobset_controller_test.go +++ b/pkg/controllers/jobset_controller_test.go @@ -28,6 +28,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client/fake" jobset "sigs.k8s.io/jobset/api/jobset/v1alpha2" + "sigs.k8s.io/jobset/pkg/controllers/constants" testutils "sigs.k8s.io/jobset/pkg/util/testing" ) @@ -1192,7 +1193,7 @@ func makeJob(args *makeJobArgs) *testutils.JobWrapper { jobset.ReplicatedJobNameKey: args.replicatedJobName, jobset.ReplicatedJobReplicas: strconv.Itoa(args.replicas), jobset.JobIndexKey: strconv.Itoa(args.jobIdx), - RestartsKey: strconv.Itoa(args.restarts), + constants.RestartsKey: strconv.Itoa(args.restarts), jobset.JobKey: jobHashKey(args.ns, args.jobName), } annotations := map[string]string{ @@ -1200,7 +1201,7 @@ func makeJob(args *makeJobArgs) *testutils.JobWrapper { jobset.ReplicatedJobNameKey: args.replicatedJobName, jobset.ReplicatedJobReplicas: strconv.Itoa(args.replicas), jobset.JobIndexKey: strconv.Itoa(args.jobIdx), - RestartsKey: strconv.Itoa(args.restarts), + constants.RestartsKey: strconv.Itoa(args.restarts), jobset.JobKey: jobHashKey(args.ns, args.jobName), } // Only set exclusive key if we are using exclusive placement per topology. diff --git a/pkg/controllers/pod_controller.go b/pkg/controllers/pod_controller.go index 97a4e1b50..9367679ee 100644 --- a/pkg/controllers/pod_controller.go +++ b/pkg/controllers/pod_controller.go @@ -32,6 +32,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/jobset/pkg/controllers/constants" "sigs.k8s.io/jobset/pkg/util/placement" jobset "sigs.k8s.io/jobset/api/jobset/v1alpha2" @@ -198,7 +199,7 @@ func (r *PodReconciler) deleteFollowerPods(ctx context.Context, pods []corev1.Po lock := &sync.Mutex{} var finalErrs []error - workqueue.ParallelizeUntil(ctx, maxParallelism, len(pods), func(i int) { + workqueue.ParallelizeUntil(ctx, constants.MaxParallelism, len(pods), func(i int) { pod := pods[i] // Do not delete leader pod. if placement.IsLeaderPod(&pod) { @@ -210,8 +211,8 @@ func (r *PodReconciler) deleteFollowerPods(ctx context.Context, pods []corev1.Po condition := corev1.PodCondition{ Type: corev1.DisruptionTarget, Status: v1.ConditionTrue, - Reason: "ExclusivePlacementViolation", - Message: "Pod violated JobSet exclusive placement policy", + Reason: constants.ExclusivePlacementViolationReason, + Message: constants.ExclusivePlacementViolationMessage, } // If pod status already has this condition, we don't need to send the update again. diff --git a/pkg/controllers/success_policy.go b/pkg/controllers/success_policy.go new file mode 100644 index 000000000..72f9fc1db --- /dev/null +++ b/pkg/controllers/success_policy.go @@ -0,0 +1,66 @@ +/* +Copyright 2023 The Kubernetes Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controllers + +import ( + batchv1 "k8s.io/api/batch/v1" + + jobset "sigs.k8s.io/jobset/api/jobset/v1alpha2" + + "sigs.k8s.io/jobset/pkg/util/collections" +) + +// TODO: add unit tests for the functions in this file. + +// jobMatchesSuccessPolicy returns a boolean value indicating if the Job is part of a +// ReplicatedJob that matches the JobSet's success policy. +func jobMatchesSuccessPolicy(js *jobset.JobSet, job *batchv1.Job) bool { + return len(js.Spec.SuccessPolicy.TargetReplicatedJobs) == 0 || collections.Contains(js.Spec.SuccessPolicy.TargetReplicatedJobs, job.ObjectMeta.Labels[jobset.ReplicatedJobNameKey]) +} + +// replicatedJobMatchesSuccessPolicy returns a boolean value indicating if the ReplicatedJob +// matches the JobSet's success policy. +func replicatedJobMatchesSuccessPolicy(js *jobset.JobSet, rjob *jobset.ReplicatedJob) bool { + return len(js.Spec.SuccessPolicy.TargetReplicatedJobs) == 0 || collections.Contains(js.Spec.SuccessPolicy.TargetReplicatedJobs, rjob.Name) +} + +// replicatedJobMatchesSuccessPolicy returns the number of jobs in the given slice `jobs` +// which match the JobSet's success policy. +func numJobsMatchingSuccessPolicy(js *jobset.JobSet, jobs []*batchv1.Job) int { + total := 0 + for _, job := range jobs { + if jobMatchesSuccessPolicy(js, job) { + total += 1 + } + } + return total +} + +// numJobsExpectedToSucceed the number of jobs that must complete successfully +// in order to satisfy the JobSet's success policy and mark the JobSet complete. +// This is determined based on the JobSet spec. +func numJobsExpectedToSucceed(js *jobset.JobSet) int { + total := 0 + switch js.Spec.SuccessPolicy.Operator { + case jobset.OperatorAny: + total = 1 + case jobset.OperatorAll: + for _, rjob := range js.Spec.ReplicatedJobs { + if replicatedJobMatchesSuccessPolicy(js, &rjob) { + total += int(rjob.Replicas) + } + } + } + return total +} diff --git a/test/integration/controller/jobset_controller_test.go b/test/integration/controller/jobset_controller_test.go index 242846678..444355f26 100644 --- a/test/integration/controller/jobset_controller_test.go +++ b/test/integration/controller/jobset_controller_test.go @@ -35,6 +35,7 @@ import ( jobset "sigs.k8s.io/jobset/api/jobset/v1alpha2" "sigs.k8s.io/jobset/pkg/controllers" + "sigs.k8s.io/jobset/pkg/controllers/constants" "sigs.k8s.io/jobset/pkg/util/collections" "sigs.k8s.io/jobset/pkg/util/testing" testutil "sigs.k8s.io/jobset/test/util" @@ -1461,7 +1462,7 @@ func checkJobsRecreated(js *jobset.JobSet, expectedRestarts int) (bool, error) { } // Check all the jobs restart counter has been incremented. for _, job := range jobList.Items { - if job.Labels[controllers.RestartsKey] != strconv.Itoa(expectedRestarts) { + if job.Labels[constants.RestartsKey] != strconv.Itoa(expectedRestarts) { return false, nil } }