Skip to content

Commit

Permalink
pr comments
Browse files Browse the repository at this point in the history
  • Loading branch information
kannon92 committed Feb 12, 2024
1 parent 99fb556 commit e3539bb
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 58 deletions.
48 changes: 20 additions & 28 deletions pkg/controllers/jobset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,25 +359,23 @@ func (r *JobSetReconciler) resumeJobSetIfNecessary(ctx context.Context, js *jobs
}
}
if inOrderStartupPolicy(startupPolicy) {
condOpts := ensureConditionOpts{
return r.ensureCondition(ctx, ensureConditionOpts{
jobset: js,
eventType: corev1.EventTypeNormal,
forceFalseUpdate: true,
condition: generateStartupPolicyCondition(startupPolicyInProgress),
}
return r.ensureCondition(ctx, condOpts)
condition: generateStartupPolicyCondition(metav1.ConditionFalse),
})
}
}
if inOrderStartupPolicy(startupPolicy) {
condOpts := ensureConditionOpts{
return r.ensureCondition(ctx, ensureConditionOpts{
jobset: js,
eventType: corev1.EventTypeNormal,
forceFalseUpdate: false,
condition: generateStartupPolicyCondition(startupPolicyCompleted),
}
return r.ensureCondition(ctx, condOpts)
condition: generateStartupPolicyCondition(metav1.ConditionTrue),
})
}
condOpts := ensureConditionOpts{
return r.ensureCondition(ctx, ensureConditionOpts{
jobset: js,
eventType: corev1.EventTypeNormal,
forceFalseUpdate: false,
Expand All @@ -388,8 +386,7 @@ func (r *JobSetReconciler) resumeJobSetIfNecessary(ctx context.Context, js *jobs
Reason: "ResumeJobs",
Message: "jobset is resumed",
},
}
return r.ensureCondition(ctx, condOpts)
})
}

func (r *JobSetReconciler) resumeJob(ctx context.Context, job *batchv1.Job, nodeAffinities map[string]map[string]string) error {
Expand Down Expand Up @@ -466,13 +463,12 @@ func (r *JobSetReconciler) createJobs(ctx context.Context, js *jobset.JobSet, ow
// This updates the StartupPolicy condition and notifies that we are waiting
// for this replicated job to finish.
if !jobSetSuspended(js) && inOrderStartupPolicy(startupPolicy) {
conditionOps := ensureConditionOpts{
return r.ensureCondition(ctx, ensureConditionOpts{
jobset: js,
eventType: corev1.EventTypeNormal,
forceFalseUpdate: true,
condition: generateStartupPolicyCondition(startupPolicyInProgress),
}
return r.ensureCondition(ctx, conditionOps)
condition: generateStartupPolicyCondition(metav1.ConditionFalse),
})
}
}
allErrs := errors.Join(finalErrs...)
Expand All @@ -481,13 +477,12 @@ func (r *JobSetReconciler) createJobs(ctx context.Context, js *jobset.JobSet, ow
}
// Skip emitting a condition for StartupPolicy if JobSet is suspended
if !jobSetSuspended(js) && inOrderStartupPolicy(startupPolicy) {
conditionOps := ensureConditionOpts{
return r.ensureCondition(ctx, ensureConditionOpts{
jobset: js,
eventType: corev1.EventTypeNormal,
forceFalseUpdate: false,
condition: generateStartupPolicyCondition(startupPolicyCompleted),
}
return r.ensureCondition(ctx, conditionOps)
condition: generateStartupPolicyCondition(metav1.ConditionTrue),
})
}
return allErrs
}
Expand Down Expand Up @@ -535,7 +530,7 @@ func (r *JobSetReconciler) createHeadlessSvcIfNotExist(ctx context.Context, js *
// Returns a boolean value indicating if the jobset was completed or not.
func (r *JobSetReconciler) executeSuccessPolicy(ctx context.Context, js *jobset.JobSet, ownedJobs *childJobs) (bool, error) {
if numJobsMatchingSuccessPolicy(js, ownedJobs.successful) >= numJobsExpectedToSucceed(js) {
conditionOpts := ensureConditionOpts{
if err := r.ensureCondition(ctx, ensureConditionOpts{
jobset: js,
eventType: corev1.EventTypeNormal,
forceFalseUpdate: false,
Expand All @@ -545,8 +540,7 @@ func (r *JobSetReconciler) executeSuccessPolicy(ctx context.Context, js *jobset.
Reason: "AllJobsCompleted",
Message: "jobset completed successfully",
},
}
if err := r.ensureCondition(ctx, conditionOpts); err != nil {
}); err != nil {
return false, err
}
return true, nil
Expand Down Expand Up @@ -576,7 +570,7 @@ func (r *JobSetReconciler) restartPolicyRecreateAll(ctx context.Context, js *job

// If JobSet has reached max number of restarts, mark it as failed and return.
if js.Status.Restarts >= js.Spec.FailurePolicy.MaxRestarts {
conditionOpts := ensureConditionOpts{
return r.ensureCondition(ctx, ensureConditionOpts{
jobset: js,
eventType: corev1.EventTypeWarning,
forceFalseUpdate: false,
Expand All @@ -586,8 +580,7 @@ func (r *JobSetReconciler) restartPolicyRecreateAll(ctx context.Context, js *job
Reason: "ReachedMaxRestarts",
Message: "jobset failed due to reaching max number of restarts",
},
}
return r.ensureCondition(ctx, conditionOpts)
})
}

// Increment JobSet restarts. This will trigger reconciliation and result in deletions
Expand Down Expand Up @@ -657,7 +650,7 @@ func (r *JobSetReconciler) ensureCondition(ctx context.Context, opts ensureCondi
}

func (r *JobSetReconciler) failJobSet(ctx context.Context, js *jobset.JobSet) error {
ensureConditionOpts := ensureConditionOpts{
return r.ensureCondition(ctx, ensureConditionOpts{
jobset: js,
condition: metav1.Condition{
Type: string(jobset.JobSetFailed),
Expand All @@ -667,8 +660,7 @@ func (r *JobSetReconciler) failJobSet(ctx context.Context, js *jobset.JobSet) er
},
eventType: corev1.EventTypeWarning,
forceFalseUpdate: false,
}
return r.ensureCondition(ctx, ensureConditionOpts)
})
}

func updateCondition(js *jobset.JobSet, condition metav1.Condition, forceFalseUpdate bool) bool {
Expand Down
9 changes: 3 additions & 6 deletions pkg/controllers/startup_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,10 @@ func inOrderStartupPolicy(sp *jobset.StartupPolicy) bool {
}

// generateStartupPolicyCondition generates the StartupPolicyCondition
// based on policy
// We use startupPolicyCompleted or startupPolicyInProgress
func generateStartupPolicyCondition(policy startupPolicyCondition) metav1.Condition {
condition := metav1.ConditionFalse
// based on the condition
func generateStartupPolicyCondition(condition metav1.ConditionStatus) metav1.Condition {
message := "startup policy in order starting"
if policy == startupPolicyCompleted {
condition = metav1.ConditionTrue
if condition == metav1.ConditionTrue {
message = "all replicated jobs have started"
}
return metav1.Condition{
Expand Down
6 changes: 3 additions & 3 deletions pkg/controllers/startup_policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,12 @@ func TestReplicatedJobStarted(t *testing.T) {
func TestGenerateStartupPolicyCondition(t *testing.T) {
tests := []struct {
name string
policyComplete startupPolicyCondition
policyComplete metav1.ConditionStatus
expectedCondition metav1.Condition
}{
{
name: "in progress startup condition on a",
policyComplete: startupPolicyInProgress,
policyComplete: metav1.ConditionFalse,
expectedCondition: metav1.Condition{
Type: string(jobset.JobSetStartupPolicyCompleted),
Status: metav1.ConditionFalse,
Expand All @@ -136,7 +136,7 @@ func TestGenerateStartupPolicyCondition(t *testing.T) {
},
{
name: "startup policy complete",
policyComplete: startupPolicyCompleted,
policyComplete: metav1.ConditionTrue,
expectedCondition: metav1.Condition{
Type: string(jobset.JobSetStartupPolicyCompleted),
Status: metav1.ConditionTrue,
Expand Down
21 changes: 0 additions & 21 deletions pkg/controllers/types.go

This file was deleted.

0 comments on commit e3539bb

Please sign in to comment.