Skip to content

Commit

Permalink
address comments and refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
danielvegamyhre committed Mar 27, 2024
1 parent 3c293f4 commit 4d1bc06
Show file tree
Hide file tree
Showing 6 changed files with 133 additions and 76 deletions.
40 changes: 40 additions & 0 deletions pkg/controllers/constants/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package constants

const (
// JobOwnerKey is
JobOwnerKey = ".metadata.controller"
RestartsKey string = "jobset.sigs.k8s.io/restart-attempt"
MaxParallelism int = 50

// Event reasons and messages.
ReachedMaxRestartsReason = "ReachedMaxRestarts"
ReachedMaxRestartsMessage = "jobset failed due to reaching max number of restarts"

FailedJobsReason = "FailedJobs"
FailedJobsMessage = "jobset failed due to one or more job failures"

AllJobsCompletedReason = "AllJobsCompleted"
AllJobsCompletedMessage = "jobset completed successfully"

// JobCreationFailed event uses the error(s) as the reason.
JobCreationFailedReason = "JobCreationFailed"

ExclusivePlacementViolationReason = "ExclusivePlacementViolation"
ExclusivePlacementViolationMessage = "Pod violated JobSet exclusive placement policy"
)
88 changes: 18 additions & 70 deletions pkg/controllers/jobset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,31 +36,12 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"

jobset "sigs.k8s.io/jobset/api/jobset/v1alpha2"
"sigs.k8s.io/jobset/pkg/controllers/constants"
"sigs.k8s.io/jobset/pkg/util/collections"
"sigs.k8s.io/jobset/pkg/util/placement"
)

const (
RestartsKey string = "jobset.sigs.k8s.io/restart-attempt"
maxParallelism int = 50

// Event reasons and messages
ReachedMaxRestartsReason = "ReachedMaxRestarts"
ReachedMaxRestartsMessage = "jobset failed due to reaching max number of restarts"

FailedJobsReason = "FailedJobs"
FailedJobsMessage = "jobset failed due to one or more job failures"

JobCreationFailedReason = "JobCreationFailed"

AllJobsCompletedReason = "AllJobsCompleted"
AllJobsCompletedMessage = "jobset completed successfully"
)

var (
jobOwnerKey = ".metadata.controller"
apiGVStr = jobset.GroupVersion.String()
)
var apiGVStr = jobset.GroupVersion.String()

// JobSetReconciler reconciles a JobSet object
type JobSetReconciler struct {
Expand Down Expand Up @@ -196,7 +177,7 @@ func (r *JobSetReconciler) SetupWithManager(mgr ctrl.Manager) error {
}

func SetupJobSetIndexes(ctx context.Context, indexer client.FieldIndexer) error {
return indexer.IndexField(ctx, &batchv1.Job{}, jobOwnerKey, func(obj client.Object) []string {
return indexer.IndexField(ctx, &batchv1.Job{}, constants.JobOwnerKey, func(obj client.Object) []string {
o := obj.(*batchv1.Job)
owner := metav1.GetControllerOf(o)
if owner == nil {
Expand All @@ -217,7 +198,7 @@ func (r *JobSetReconciler) getChildJobs(ctx context.Context, js *jobset.JobSet)

// Get all active jobs owned by JobSet.
var childJobList batchv1.JobList
if err := r.List(ctx, &childJobList, client.InNamespace(js.Namespace), client.MatchingFields{jobOwnerKey: js.Name}); err != nil {
if err := r.List(ctx, &childJobList, client.InNamespace(js.Namespace), client.MatchingFields{constants.JobOwnerKey: js.Name}); err != nil {
return nil, err
}

Expand All @@ -226,9 +207,9 @@ func (r *JobSetReconciler) getChildJobs(ctx context.Context, js *jobset.JobSet)
for i, job := range childJobList.Items {
// Jobs with jobset.sigs.k8s.io/restart-attempt < jobset.status.restarts are marked for
// deletion, as they were part of the previous JobSet run.
jobRestarts, err := strconv.Atoi(job.Labels[RestartsKey])
jobRestarts, err := strconv.Atoi(job.Labels[constants.RestartsKey])
if err != nil {
log.Error(err, fmt.Sprintf("invalid value for label %s, must be integer", RestartsKey))
log.Error(err, fmt.Sprintf("invalid value for label %s, must be integer", constants.RestartsKey))
ownedJobs.delete = append(ownedJobs.delete, &childJobList.Items[i])
return nil, err
}
Expand Down Expand Up @@ -447,12 +428,12 @@ func (r *JobSetReconciler) createJobs(ctx context.Context, js *jobset.JobSet, ow

status := findReplicatedStatus(replicatedJobStatus, replicatedJob.Name)
rjJobStarted := replicatedJobsStarted(replicatedJob.Replicas, status)
// For startup policy, if the job is started we can skip this loop
// Jobs have been created
// For startup policy, if the job is started we can skip this loop.
// Jobs have been created.
if !jobSetSuspended(js) && inOrderStartupPolicy(startupPolicy) && rjJobStarted {
continue
}
workqueue.ParallelizeUntil(ctx, maxParallelism, len(jobs), func(i int) {
workqueue.ParallelizeUntil(ctx, constants.MaxParallelism, len(jobs), func(i int) {
job := jobs[i]

// Set jobset controller as owner of the job for garbage collection and reconcilation.
Expand Down Expand Up @@ -489,7 +470,7 @@ func (r *JobSetReconciler) createJobs(ctx context.Context, js *jobset.JobSet, ow
if allErrs != nil {
// Emit event to propagate the Job creation failures up to be more visible to the user.
// TODO(#422): Investigate ways to validate Job templates at JobSet validation time.
r.Record.Eventf(js, corev1.EventTypeWarning, JobCreationFailedReason, allErrs.Error())
r.Record.Eventf(js, corev1.EventTypeWarning, constants.JobCreationFailedReason, allErrs.Error())
return allErrs
}
// Skip emitting a condition for StartupPolicy if JobSet is suspended
Expand Down Expand Up @@ -552,8 +533,8 @@ func (r *JobSetReconciler) executeSuccessPolicy(ctx context.Context, js *jobset.
condition: metav1.Condition{
Type: string(jobset.JobSetCompleted),
Status: metav1.ConditionStatus(corev1.ConditionTrue),
Reason: AllJobsCompletedReason,
Message: AllJobsCompletedMessage,
Reason: constants.AllJobsCompletedReason,
Message: constants.AllJobsCompletedMessage,
},
}); err != nil {
return false, err
Expand All @@ -567,13 +548,13 @@ func (r *JobSetReconciler) executeFailurePolicy(ctx context.Context, js *jobset.
// If no failure policy is defined, mark the JobSet as failed.
if js.Spec.FailurePolicy == nil {
firstFailedJob := findFirstFailedJob(ownedJobs.failed)
return r.failJobSet(ctx, js, FailedJobsReason, messageWithFirstFailedJob(FailedJobsMessage, firstFailedJob.Name))
return r.failJobSet(ctx, js, constants.FailedJobsReason, messageWithFirstFailedJob(constants.FailedJobsMessage, firstFailedJob.Name))
}

// If JobSet has reached max restarts, fail the JobSet.
if js.Status.Restarts >= js.Spec.FailurePolicy.MaxRestarts {
firstFailedJob := findFirstFailedJob(ownedJobs.failed)
return r.failJobSet(ctx, js, ReachedMaxRestartsReason, messageWithFirstFailedJob(ReachedMaxRestartsMessage, firstFailedJob.Name))
return r.failJobSet(ctx, js, constants.ReachedMaxRestartsReason, messageWithFirstFailedJob(constants.ReachedMaxRestartsMessage, firstFailedJob.Name))
}

// To reach this point a job must have failed.
Expand All @@ -597,7 +578,7 @@ func (r *JobSetReconciler) deleteJobs(ctx context.Context, jobsForDeletion []*ba
log := ctrl.LoggerFrom(ctx)
lock := &sync.Mutex{}
var finalErrs []error
workqueue.ParallelizeUntil(ctx, maxParallelism, len(jobsForDeletion), func(i int) {
workqueue.ParallelizeUntil(ctx, constants.MaxParallelism, len(jobsForDeletion), func(i int) {
targetJob := jobsForDeletion[i]
// Skip deleting jobs with deletion timestamp already set.
if targetJob.DeletionTimestamp != nil {
Expand All @@ -613,7 +594,7 @@ func (r *JobSetReconciler) deleteJobs(ctx context.Context, jobsForDeletion []*ba
finalErrs = append(finalErrs, err)
return
}
log.V(2).Info("successfully deleted job", "job", klog.KObj(targetJob), "restart attempt", targetJob.Labels[targetJob.Labels[RestartsKey]])
log.V(2).Info("successfully deleted job", "job", klog.KObj(targetJob), "restart attempt", targetJob.Labels[targetJob.Labels[constants.RestartsKey]])
})
return errors.Join(finalErrs...)
}
Expand Down Expand Up @@ -773,15 +754,15 @@ func labelAndAnnotateObject(obj metav1.Object, js *jobset.JobSet, rjob *jobset.R
labels := collections.CloneMap(obj.GetLabels())
labels[jobset.JobSetNameKey] = js.Name
labels[jobset.ReplicatedJobNameKey] = rjob.Name
labels[RestartsKey] = strconv.Itoa(int(js.Status.Restarts))
labels[constants.RestartsKey] = strconv.Itoa(int(js.Status.Restarts))
labels[jobset.ReplicatedJobReplicas] = strconv.Itoa(int(rjob.Replicas))
labels[jobset.JobIndexKey] = strconv.Itoa(jobIdx)
labels[jobset.JobKey] = jobHashKey(js.Namespace, jobName)

annotations := collections.CloneMap(obj.GetAnnotations())
annotations[jobset.JobSetNameKey] = js.Name
annotations[jobset.ReplicatedJobNameKey] = rjob.Name
annotations[RestartsKey] = strconv.Itoa(int(js.Status.Restarts))
annotations[constants.RestartsKey] = strconv.Itoa(int(js.Status.Restarts))
annotations[jobset.ReplicatedJobReplicas] = strconv.Itoa(int(rjob.Replicas))
annotations[jobset.JobIndexKey] = strconv.Itoa(jobIdx)
annotations[jobset.JobKey] = jobHashKey(js.Namespace, jobName)
Expand Down Expand Up @@ -865,39 +846,6 @@ func dnsHostnamesEnabled(js *jobset.JobSet) bool {
return js.Spec.Network.EnableDNSHostnames != nil && *js.Spec.Network.EnableDNSHostnames
}

func jobMatchesSuccessPolicy(js *jobset.JobSet, job *batchv1.Job) bool {
return len(js.Spec.SuccessPolicy.TargetReplicatedJobs) == 0 || collections.Contains(js.Spec.SuccessPolicy.TargetReplicatedJobs, job.ObjectMeta.Labels[jobset.ReplicatedJobNameKey])
}

func replicatedJobMatchesSuccessPolicy(js *jobset.JobSet, rjob *jobset.ReplicatedJob) bool {
return len(js.Spec.SuccessPolicy.TargetReplicatedJobs) == 0 || collections.Contains(js.Spec.SuccessPolicy.TargetReplicatedJobs, rjob.Name)
}

func numJobsMatchingSuccessPolicy(js *jobset.JobSet, jobs []*batchv1.Job) int {
total := 0
for _, job := range jobs {
if jobMatchesSuccessPolicy(js, job) {
total += 1
}
}
return total
}

func numJobsExpectedToSucceed(js *jobset.JobSet) int {
total := 0
switch js.Spec.SuccessPolicy.Operator {
case jobset.OperatorAny:
total = 1
case jobset.OperatorAll:
for _, rjob := range js.Spec.ReplicatedJobs {
if replicatedJobMatchesSuccessPolicy(js, &rjob) {
total += int(rjob.Replicas)
}
}
}
return total
}

func jobSetSuspended(js *jobset.JobSet) bool {
return ptr.Deref(js.Spec.Suspend, false)
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/controllers/jobset_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client/fake"

jobset "sigs.k8s.io/jobset/api/jobset/v1alpha2"
"sigs.k8s.io/jobset/pkg/controllers/constants"
testutils "sigs.k8s.io/jobset/pkg/util/testing"
)

Expand Down Expand Up @@ -1192,15 +1193,15 @@ func makeJob(args *makeJobArgs) *testutils.JobWrapper {
jobset.ReplicatedJobNameKey: args.replicatedJobName,
jobset.ReplicatedJobReplicas: strconv.Itoa(args.replicas),
jobset.JobIndexKey: strconv.Itoa(args.jobIdx),
RestartsKey: strconv.Itoa(args.restarts),
constants.RestartsKey: strconv.Itoa(args.restarts),
jobset.JobKey: jobHashKey(args.ns, args.jobName),
}
annotations := map[string]string{
jobset.JobSetNameKey: args.jobSetName,
jobset.ReplicatedJobNameKey: args.replicatedJobName,
jobset.ReplicatedJobReplicas: strconv.Itoa(args.replicas),
jobset.JobIndexKey: strconv.Itoa(args.jobIdx),
RestartsKey: strconv.Itoa(args.restarts),
constants.RestartsKey: strconv.Itoa(args.restarts),
jobset.JobKey: jobHashKey(args.ns, args.jobName),
}
// Only set exclusive key if we are using exclusive placement per topology.
Expand Down
7 changes: 4 additions & 3 deletions pkg/controllers/pod_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/predicate"

"sigs.k8s.io/jobset/pkg/controllers/constants"
"sigs.k8s.io/jobset/pkg/util/placement"

jobset "sigs.k8s.io/jobset/api/jobset/v1alpha2"
Expand Down Expand Up @@ -198,7 +199,7 @@ func (r *PodReconciler) deleteFollowerPods(ctx context.Context, pods []corev1.Po
lock := &sync.Mutex{}
var finalErrs []error

workqueue.ParallelizeUntil(ctx, maxParallelism, len(pods), func(i int) {
workqueue.ParallelizeUntil(ctx, constants.MaxParallelism, len(pods), func(i int) {
pod := pods[i]
// Do not delete leader pod.
if placement.IsLeaderPod(&pod) {
Expand All @@ -210,8 +211,8 @@ func (r *PodReconciler) deleteFollowerPods(ctx context.Context, pods []corev1.Po
condition := corev1.PodCondition{
Type: corev1.DisruptionTarget,
Status: v1.ConditionTrue,
Reason: "ExclusivePlacementViolation",
Message: "Pod violated JobSet exclusive placement policy",
Reason: constants.ExclusivePlacementViolationReason,
Message: constants.ExclusivePlacementViolationMessage,
}

// If pod status already has this condition, we don't need to send the update again.
Expand Down
66 changes: 66 additions & 0 deletions pkg/controllers/success_policy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package controllers

import (
batchv1 "k8s.io/api/batch/v1"

jobset "sigs.k8s.io/jobset/api/jobset/v1alpha2"

"sigs.k8s.io/jobset/pkg/util/collections"
)

// TODO: add unit tests for the functions in this file.

// jobMatchesSuccessPolicy returns a boolean value indicating if the Job is part of a
// ReplicatedJob that matches the JobSet's success policy.
func jobMatchesSuccessPolicy(js *jobset.JobSet, job *batchv1.Job) bool {
return len(js.Spec.SuccessPolicy.TargetReplicatedJobs) == 0 || collections.Contains(js.Spec.SuccessPolicy.TargetReplicatedJobs, job.ObjectMeta.Labels[jobset.ReplicatedJobNameKey])
}

// replicatedJobMatchesSuccessPolicy returns a boolean value indicating if the ReplicatedJob
// matches the JobSet's success policy.
func replicatedJobMatchesSuccessPolicy(js *jobset.JobSet, rjob *jobset.ReplicatedJob) bool {
return len(js.Spec.SuccessPolicy.TargetReplicatedJobs) == 0 || collections.Contains(js.Spec.SuccessPolicy.TargetReplicatedJobs, rjob.Name)
}

// replicatedJobMatchesSuccessPolicy returns the number of jobs in the given slice `jobs`
// which match the JobSet's success policy.
func numJobsMatchingSuccessPolicy(js *jobset.JobSet, jobs []*batchv1.Job) int {
total := 0
for _, job := range jobs {
if jobMatchesSuccessPolicy(js, job) {
total += 1
}
}
return total
}

// numJobsExpectedToSucceed the number of jobs that must complete successfully
// in order to satisfy the JobSet's success policy and mark the JobSet complete.
// This is determined based on the JobSet spec.
func numJobsExpectedToSucceed(js *jobset.JobSet) int {
total := 0
switch js.Spec.SuccessPolicy.Operator {
case jobset.OperatorAny:
total = 1
case jobset.OperatorAll:
for _, rjob := range js.Spec.ReplicatedJobs {
if replicatedJobMatchesSuccessPolicy(js, &rjob) {
total += int(rjob.Replicas)
}
}
}
return total
}
3 changes: 2 additions & 1 deletion test/integration/controller/jobset_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (

jobset "sigs.k8s.io/jobset/api/jobset/v1alpha2"
"sigs.k8s.io/jobset/pkg/controllers"
"sigs.k8s.io/jobset/pkg/controllers/constants"
"sigs.k8s.io/jobset/pkg/util/collections"
"sigs.k8s.io/jobset/pkg/util/testing"
testutil "sigs.k8s.io/jobset/test/util"
Expand Down Expand Up @@ -1461,7 +1462,7 @@ func checkJobsRecreated(js *jobset.JobSet, expectedRestarts int) (bool, error) {
}
// Check all the jobs restart counter has been incremented.
for _, job := range jobList.Items {
if job.Labels[controllers.RestartsKey] != strconv.Itoa(expectedRestarts) {
if job.Labels[constants.RestartsKey] != strconv.Itoa(expectedRestarts) {
return false, nil
}
}
Expand Down

0 comments on commit 4d1bc06

Please sign in to comment.