Skip to content

Commit

Permalink
refactor in order startup policy condition into two mutually exclusiv…
Browse files Browse the repository at this point in the history
…e conditions
  • Loading branch information
danielvegamyhre committed Apr 13, 2024
1 parent f5e5364 commit e346ab3
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 86 deletions.
6 changes: 4 additions & 2 deletions api/jobset/v1alpha2/jobset_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,11 @@ const (
JobSetCompleted JobSetConditionType = "Completed"
// JobSetFailed means the job has failed its execution.
JobSetFailed JobSetConditionType = "Failed"
// JobSetSuspended means the job is suspended
// JobSetSuspended means the job is suspended.
JobSetSuspended JobSetConditionType = "Suspended"
// JobSetStartupPolicyCompleted means the StartupPolicy was complete
// JobSetStartupPolicyInProgress means the StartupPolicy is in progress.
JobSetStartupPolicyInProgress JobSetConditionType = "StartupPolicyInProgress"
// JobSetStartupPolicyCompleted means the StartupPolicy has completed.
JobSetStartupPolicyCompleted JobSetConditionType = "StartupPolicyCompleted"
)

Expand Down
6 changes: 4 additions & 2 deletions pkg/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,10 @@ const (
ExclusivePlacementViolationMessage = "Pod violated JobSet exclusive placement policy"

// Event reason and messages related to startup policy.
InOrderStartupPolicyReason = "StartupPolicyInOrder"
InOrderStartupPolicyExecutingMessage = "in order startup policy is executing"
InOrderStartupPolicyInProgressReason = "InOrderStartupPolicyInProgress"
InOrderStartupPolicyInProgressMessage = "in order startup policy is in progress"

InOrderStartupPolicyCompletedReason = "InOrderStartupPolicyCompleted"
InOrderStartupPolicyCompletedMessage = "in order startup policy has completed"

// Event reason and messages related to JobSet restarts.
Expand Down
126 changes: 75 additions & 51 deletions pkg/controllers/jobset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,10 +198,12 @@ func (r *JobSetReconciler) reconcile(ctx context.Context, js *jobset.JobSet, upd
return ctrl.Result{}, err
}
} else {
if err := r.resumeJobsIfNecessary(ctx, js, ownedJobs.active, rjobStatuses, updateStatusOpts); err != nil {
requeue, err := r.resumeJobsIfNecessary(ctx, js, ownedJobs.active, rjobStatuses, updateStatusOpts)
if err != nil {
log.Error(err, "resuming jobset")
return ctrl.Result{}, err
}
return ctrl.Result{Requeue: requeue}, nil
}
return ctrl.Result{}, nil
}
Expand Down Expand Up @@ -379,7 +381,11 @@ func (r *JobSetReconciler) suspendJobs(ctx context.Context, js *jobset.JobSet, a
return nil
}

func (r *JobSetReconciler) resumeJobsIfNecessary(ctx context.Context, js *jobset.JobSet, activeJobs []*batchv1.Job, replicatedJobStatuses []jobset.ReplicatedJobStatus, updateStatusOpts *statusUpdateOpts) error {
// resumeJobsIfNecessary iterates through each replicatedJob, resuming any suspended jobs if the JobSet
// is not suspended. Returns a boolean value indicating if the JobSet should be requeued for reconciliation.
// This is so in-order startup policy can be respected, where after resuming one replicatedJob, we must
// wait for it to become ready before resuming the next.
func (r *JobSetReconciler) resumeJobsIfNecessary(ctx context.Context, js *jobset.JobSet, activeJobs []*batchv1.Job, replicatedJobStatuses []jobset.ReplicatedJobStatus, updateStatusOpts *statusUpdateOpts) (bool, error) {
// Store node selector for each replicatedJob template.
nodeAffinities := map[string]map[string]string{}
for _, replicatedJob := range js.Spec.ReplicatedJobs {
Expand All @@ -394,7 +400,6 @@ func (r *JobSetReconciler) resumeJobsIfNecessary(ctx context.Context, js *jobset
}

startupPolicy := js.Spec.StartupPolicy
numJobsResumed := 0
// If JobSpec is unsuspended, ensure all active child Jobs are also
// unsuspended and update the suspend condition to true.
for _, replicatedJob := range js.Spec.ReplicatedJobs {
Expand All @@ -409,31 +414,21 @@ func (r *JobSetReconciler) resumeJobsIfNecessary(ctx context.Context, js *jobset
continue
}
if err := r.resumeJob(ctx, job, nodeAffinities); err != nil {
return err
return false, err
}
numJobsResumed += 1
}
// If using in order startup policy, return early and wait for the replicated job to be ready.
if numJobsResumed > 0 && inOrderStartupPolicy(startupPolicy) {
// If in order startup policy, we need to return early and allow for
// this replicatedJob to become ready before resuming the next.
if inOrderStartupPolicy(startupPolicy) {
setInOrderStartupPolicyInProgressCondition(js, updateStatusOpts)
return nil
return true, nil
}
}

// If no jobs were resumed / no action was taken, there's nothing more to do here.
if numJobsResumed == 0 {
return nil
}
// At this point all replicated jobs have had their jobs resumed.
// If using an in order startup policy. add a condition to the JobSet indicating the
// in order startup policy has completed.
if inOrderStartupPolicy(startupPolicy) {
setInOrderStartupPolicyCompletedCondition(js, updateStatusOpts)
}
// Finally, set the suspended condition on the JobSet to false to indicate
// the JobSet is no longer suspended.
setJobSetResumedCondition(js, updateStatusOpts)
return nil
return false, nil
}

func (r *JobSetReconciler) resumeJob(ctx context.Context, job *batchv1.Job, nodeAffinities map[string]map[string]string) error {
Expand Down Expand Up @@ -471,7 +466,7 @@ func (r *JobSetReconciler) createJobs(ctx context.Context, js *jobset.JobSet, ow

status := findReplicatedJobStatus(replicatedJobStatus, replicatedJob.Name)

// For startup policy, if the job is started we can skip this loop.
// For startup policy, if the replicatedJob is started we can skip this loop.
// Jobs have been created.
if !jobSetSuspended(js) && inOrderStartupPolicy(startupPolicy) && allReplicasStarted(replicatedJob.Replicas, status) {
continue
Expand All @@ -490,7 +485,7 @@ func (r *JobSetReconciler) createJobs(ctx context.Context, js *jobset.JobSet, ow

// Create the job.
// TODO(#18): Deal with the case where the job exists but is not owned by the jobset.
if err := r.Create(ctx, job); err != nil {
if err := r.Create(ctx, job); client.IgnoreAlreadyExists(err) != nil {
lock.Lock()
defer lock.Unlock()
finalErrs = append(finalErrs, fmt.Errorf("job %q creation failed with error: %v", job.Name, err))
Expand All @@ -501,7 +496,7 @@ func (r *JobSetReconciler) createJobs(ctx context.Context, js *jobset.JobSet, ow

// If we are using inOrder StartupPolicy, then we return to wait for jobs to be ready.
// This updates the StartupPolicy condition and notifies that we are waiting
// for this replicated job to finish.
// for this replicated job to start up before moving onto the next one.
if !jobSetSuspended(js) && inOrderStartupPolicy(startupPolicy) {
setInOrderStartupPolicyInProgressCondition(js, updateStatusOpts)
return nil
Expand Down Expand Up @@ -905,7 +900,7 @@ type conditionOpts struct {
// setCondition will add a new condition to the JobSet status (or update an existing one),
// and enqueue an event for emission if the status update succeeds at the end of the reconcile.
func setCondition(js *jobset.JobSet, condOpts *conditionOpts, updateStatusOpts *statusUpdateOpts) {
// Return early if this condition is already set.
// Return early if no status update is required for this condition.
if !updateCondition(js, condOpts) {
return
}
Expand Down Expand Up @@ -934,27 +929,46 @@ func updateCondition(js *jobset.JobSet, opts *conditionOpts) bool {
return false
}

condition := *opts.condition
condition.LastTransitionTime = metav1.Now()
for i, val := range js.Status.Conditions {
if condition.Type == val.Type && condition.Status != val.Status {
js.Status.Conditions[i] = condition
// Condition found but different status so we should update.
return true
} else if condition.Type == val.Type && condition.Status == val.Status && condition.Reason == val.Reason && condition.Message == val.Message {
// Duplicate condition so no update.
return false
found := false
shouldUpdate := false
newCond := *opts.condition
newCond.LastTransitionTime = metav1.Now()

for i, currCond := range js.Status.Conditions {
// If condition type has a status change, update it.
if newCond.Type == currCond.Type {
// Status change of an existing condition. Update status call will be required.
if newCond.Status != currCond.Status {
js.Status.Conditions[i] = newCond
shouldUpdate = true
}

// If both are true or both are false, this is a duplicate condition, do nothing.
found = true
} else {
// If conditions are of different types, only perform an update if they are both true
// and they are mutually exclusive. If so, then set the existing condition status to
// false before adding the new condition.
if exclusiveConditions(currCond, newCond) &&
currCond.Status == metav1.ConditionTrue &&
newCond.Status == metav1.ConditionTrue {
js.Status.Conditions[i].Status = metav1.ConditionFalse
shouldUpdate = true
}
}
}

// Condition doesn't exist, add it.
js.Status.Conditions = append(js.Status.Conditions, condition)
return true
// Condition doesn't exist, add it if condition status is true.
if !found && newCond.Status == metav1.ConditionTrue {
js.Status.Conditions = append(js.Status.Conditions, newCond)
shouldUpdate = true
}
return shouldUpdate
}

// setJobSetCompletedCondition sets a condition on the JobSet status indicating it has completed.
func setJobSetCompletedCondition(js *jobset.JobSet, updateStatusOpts *statusUpdateOpts) {
setCondition(js, completedConditionsOpts, updateStatusOpts)
setCondition(js, makeCompletedConditionsOpts(), updateStatusOpts)
}

// setJobSetFailedCondition sets a condition on the JobSet status indicating it has failed.
Expand All @@ -964,24 +978,26 @@ func setJobSetFailedCondition(ctx context.Context, js *jobset.JobSet, reason, ms

// setJobSetSuspendedCondition sets a condition on the JobSet status indicating it is currently suspended.
func setJobSetSuspendedCondition(js *jobset.JobSet, updateStatusOpts *statusUpdateOpts) {
setCondition(js, makeSuspendedConditionOpts(metav1.Now()), updateStatusOpts)
setCondition(js, makeSuspendedConditionOpts(), updateStatusOpts)
}

// setJobSetResumedCondition sets a condition on the JobSet status indicating it has been resumed.
// This updates the "suspended" condition type from "true" to "false."
func setJobSetResumedCondition(js *jobset.JobSet, updateStatusOpts *statusUpdateOpts) {
setCondition(js, makeResumedConditionOpts(metav1.Now()), updateStatusOpts)
setCondition(js, makeResumedConditionOpts(), updateStatusOpts)
}

// completedConditionsOpts contains the options we use to generate the JobSet completed condition.
var completedConditionsOpts = &conditionOpts{
eventType: corev1.EventTypeNormal,
condition: &metav1.Condition{
Type: string(jobset.JobSetCompleted),
Status: metav1.ConditionStatus(corev1.ConditionTrue),
Reason: constants.AllJobsCompletedReason,
Message: constants.AllJobsCompletedMessage,
},
func makeCompletedConditionsOpts() *conditionOpts {
return &conditionOpts{
eventType: corev1.EventTypeNormal,
condition: &metav1.Condition{
Type: string(jobset.JobSetCompleted),
Status: metav1.ConditionStatus(corev1.ConditionTrue),
Reason: constants.AllJobsCompletedReason,
Message: constants.AllJobsCompletedMessage,
},
}
}

// makeFailedConditionOpts returns the options we use to generate the JobSet failed condition.
Expand All @@ -998,27 +1014,27 @@ func makeFailedConditionOpts(reason, msg string) *conditionOpts {
}

// makeSuspendedConditionOpts returns the options we use to generate the JobSet suspended condition.
func makeSuspendedConditionOpts(now metav1.Time) *conditionOpts {
func makeSuspendedConditionOpts() *conditionOpts {
return &conditionOpts{
eventType: corev1.EventTypeNormal,
condition: &metav1.Condition{
Type: string(jobset.JobSetSuspended),
Status: metav1.ConditionStatus(corev1.ConditionTrue),
LastTransitionTime: now,
LastTransitionTime: metav1.Now(),
Reason: constants.JobSetSuspendedReason,
Message: constants.JobSetSuspendedMessage,
},
}
}

// makeResumedConditionOpts returns the options we use to generate the JobSet resumed condition.
func makeResumedConditionOpts(now metav1.Time) *conditionOpts {
func makeResumedConditionOpts() *conditionOpts {
return &conditionOpts{
eventType: corev1.EventTypeNormal,
condition: &metav1.Condition{
Type: string(jobset.JobSetSuspended),
Status: metav1.ConditionStatus(corev1.ConditionFalse),
LastTransitionTime: now,
LastTransitionTime: metav1.Now(),
Reason: constants.JobSetResumedReason,
Message: constants.JobSetResumedMessage,
},
Expand All @@ -1037,3 +1053,11 @@ func replicatedJobStatusesEqual(oldStatuses, newStatuses []jobset.ReplicatedJobS
})
return apiequality.Semantic.DeepEqual(oldStatuses, newStatuses)
}

// exclusiveConditions accepts 2 conditions and returns a boolean indicating if
// they are mutually exclusive.
func exclusiveConditions(cond1, cond2 metav1.Condition) bool {
inProgressAndCompleted := cond1.Type == string(jobset.JobSetStartupPolicyInProgress) && cond2.Type == string(jobset.JobSetStartupPolicyCompleted)
completedAndInProgress := cond1.Type == string(jobset.JobSetStartupPolicyCompleted) && cond2.Type == string(jobset.JobSetStartupPolicyInProgress)
return inProgressAndCompleted || completedAndInProgress
}
9 changes: 4 additions & 5 deletions pkg/controllers/jobset_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -661,7 +661,6 @@ func TestUpdateConditions(t *testing.T) {
replicatedJobName = "replicated-job"
jobName = "test-job"
ns = "default"
now = metav1.Now()
)

tests := []struct {
Expand All @@ -687,7 +686,7 @@ func TestUpdateConditions(t *testing.T) {
Job(testutils.MakeJobTemplate(jobName, ns).Obj()).
Replicas(1).
Obj()).Obj(),
opts: completedConditionsOpts,
opts: makeCompletedConditionsOpts(),
expectedUpdate: true,
},
{
Expand All @@ -697,7 +696,7 @@ func TestUpdateConditions(t *testing.T) {
Job(testutils.MakeJobTemplate(jobName, ns).Obj()).
Replicas(1).
Obj()).Obj(),
opts: makeSuspendedConditionOpts(now),
opts: makeSuspendedConditionOpts(),
expectedUpdate: true,
},
{
Expand All @@ -717,7 +716,7 @@ func TestUpdateConditions(t *testing.T) {
},
}).
Obj(),
opts: makeResumedConditionOpts(now),
opts: makeResumedConditionOpts(),
expectedUpdate: true,
},
{
Expand All @@ -736,7 +735,7 @@ func TestUpdateConditions(t *testing.T) {
Status: metav1.ConditionStatus(corev1.ConditionTrue),
},
}).Obj(),
opts: completedConditionsOpts,
opts: makeCompletedConditionsOpts(),
expectedUpdate: false,
},
}
Expand Down
36 changes: 12 additions & 24 deletions pkg/controllers/startup_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,35 +34,18 @@ func inOrderStartupPolicy(sp *jobset.StartupPolicy) bool {
return sp != nil && sp.StartupPolicyOrder == jobset.InOrder
}

func inOrderStartupPolicyInProgressCondition() *metav1.Condition {
return &metav1.Condition{
Type: string(jobset.JobSetStartupPolicyCompleted),
// Status is True when in order startup policy is completed.
// Otherwise it is set as False to indicate it is still executing.
Status: metav1.ConditionFalse,
Reason: constants.InOrderStartupPolicyReason,
Message: constants.InOrderStartupPolicyExecutingMessage,
}
}

func inOrderStartupPolicyCompletedCondition() *metav1.Condition {
return &metav1.Condition{
Type: string(jobset.JobSetStartupPolicyCompleted),
// Status is True when in order startup policy is completed.
// Otherwise it is set as False to indicate it is still executing.
Status: metav1.ConditionTrue,
Reason: constants.InOrderStartupPolicyReason,
Message: constants.InOrderStartupPolicyCompletedMessage,
}
}

// setInOrderStartupPolicyInProgressCondition sets a condition on the JobSet status indicating it is
// currently executing an in-order startup policy.
func setInOrderStartupPolicyInProgressCondition(js *jobset.JobSet, updateStatusOpts *statusUpdateOpts) {
// Add a condition to the JobSet indicating the in order startup policy is executing.
setCondition(js, &conditionOpts{
eventType: corev1.EventTypeNormal,
condition: inOrderStartupPolicyInProgressCondition(),
condition: &metav1.Condition{
Type: string(jobset.JobSetStartupPolicyInProgress),
Status: metav1.ConditionTrue,
Reason: constants.InOrderStartupPolicyInProgressReason,
Message: constants.InOrderStartupPolicyInProgressMessage,
},
}, updateStatusOpts)
}

Expand All @@ -71,6 +54,11 @@ func setInOrderStartupPolicyInProgressCondition(js *jobset.JobSet, updateStatusO
func setInOrderStartupPolicyCompletedCondition(js *jobset.JobSet, updateStatusOpts *statusUpdateOpts) {
setCondition(js, &conditionOpts{
eventType: corev1.EventTypeNormal,
condition: inOrderStartupPolicyCompletedCondition(),
condition: &metav1.Condition{
Type: string(jobset.JobSetStartupPolicyCompleted),
Status: metav1.ConditionTrue,
Reason: constants.InOrderStartupPolicyCompletedReason,
Message: constants.InOrderStartupPolicyCompletedMessage,
},
}, updateStatusOpts)
}
4 changes: 2 additions & 2 deletions test/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ func JobSetStartupPolicyNotFinished(ctx context.Context, k8sClient client.Client
ginkgo.By(fmt.Sprintf("checking jobset condition %q status is %q", jobset.JobSetStartupPolicyCompleted, metav1.ConditionFalse))
conditions := []metav1.Condition{
{
Type: string(jobset.JobSetStartupPolicyCompleted),
Status: metav1.ConditionFalse,
Type: string(jobset.JobSetStartupPolicyInProgress),
Status: metav1.ConditionTrue,
},
}
gomega.Eventually(checkJobSetStatus, timeout, interval).WithArguments(ctx, k8sClient, js, conditions).Should(gomega.Equal(true))
Expand Down

0 comments on commit e346ab3

Please sign in to comment.