Skip to content

Commit

Permalink
refactor controller to perform 1 status update per reconciliation
Browse files Browse the repository at this point in the history
attempt
  • Loading branch information
danielvegamyhre committed Apr 10, 2024
1 parent 547c8eb commit e6940bf
Show file tree
Hide file tree
Showing 7 changed files with 323 additions and 233 deletions.
11 changes: 11 additions & 0 deletions pkg/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,15 @@ const (
InOrderStartupPolicyReason = "StartupPolicyInOrder"
InOrderStartupPolicyExecutingMessage = "in order startup policy is executing"
InOrderStartupPolicyCompletedMessage = "in order startup policy has completed"

// Event reason and messages related to JobSet restarts.
JobSetRestartReason = "Restarting"

// Event reason and messages related to suspending a JobSet.
JobSetSuspendedReason = "SuspendedJobs"
JobSetSuspendedMessage = "jobset is suspended"

// Event reason and message related to resuming a JobSet.
JobSetResumedReason = "ResumeJobs"
JobSetResumedMessage = "jobset is resumed"
)
419 changes: 240 additions & 179 deletions pkg/controllers/jobset_controller.go

Large diffs are not rendered by default.

83 changes: 35 additions & 48 deletions pkg/controllers/jobset_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -661,103 +661,90 @@ func TestUpdateConditions(t *testing.T) {
replicatedJobName = "replicated-job"
jobName = "test-job"
ns = "default"
now = metav1.Now()
)

tests := []struct {
name string
js *jobset.JobSet
conditions []metav1.Condition
newCondition metav1.Condition
forceUpdate bool
opts *conditionOpts
expectedUpdate bool
}{
{
name: "no condition",
name: "no existing conditions, not adding conditions",
js: testutils.MakeJobSet(jobSetName, ns).
ReplicatedJob(testutils.MakeReplicatedJob(replicatedJobName).
Job(testutils.MakeJobTemplate(jobName, ns).Obj()).
Replicas(1).
Obj()).Obj(),
newCondition: metav1.Condition{},
conditions: []metav1.Condition{},
opts: &conditionOpts{},
expectedUpdate: false,
},
{
name: "do not update if false",
name: "no existing conditions, add a condition",
js: testutils.MakeJobSet(jobSetName, ns).
ReplicatedJob(testutils.MakeReplicatedJob(replicatedJobName).
Job(testutils.MakeJobTemplate(jobName, ns).Obj()).
Replicas(1).
Obj()).Obj(),
newCondition: metav1.Condition{Status: metav1.ConditionFalse, Type: string(jobset.JobSetSuspended), Reason: "JobsResumed"},
conditions: []metav1.Condition{},
expectedUpdate: false,
},
{
name: "force update if false",
js: testutils.MakeJobSet(jobSetName, ns).
ReplicatedJob(testutils.MakeReplicatedJob(replicatedJobName).
Job(testutils.MakeJobTemplate(jobName, ns).Obj()).
Replicas(1).
Obj()).Obj(),
newCondition: metav1.Condition{Status: metav1.ConditionFalse, Type: string(jobset.JobSetStartupPolicyCompleted), Reason: "StartupPolicy"},
conditions: []metav1.Condition{},
opts: completedConditionsOpts,
expectedUpdate: true,
forceUpdate: true,
},
{
name: "update if condition is true",
js: testutils.MakeJobSet(jobSetName, ns).
ReplicatedJob(testutils.MakeReplicatedJob(replicatedJobName).
Job(testutils.MakeJobTemplate(jobName, ns).Obj()).
Replicas(1).
Obj()).Obj(),
newCondition: metav1.Condition{Status: metav1.ConditionTrue, Type: string(jobset.JobSetSuspended), Reason: "JobsResumed"},
conditions: []metav1.Condition{},
expectedUpdate: true,
},

{
name: "suspended",
js: testutils.MakeJobSet(jobSetName, ns).
ReplicatedJob(testutils.MakeReplicatedJob(replicatedJobName).
Job(testutils.MakeJobTemplate(jobName, ns).Obj()).
Replicas(1).
Obj()).Obj(),
newCondition: metav1.Condition{Status: metav1.ConditionTrue, Type: string(jobset.JobSetSuspended), Reason: "JobsSuspended"},
conditions: []metav1.Condition{},
opts: makeSuspendedConditionOpts(now),
expectedUpdate: true,
},
{
name: "resumed",
name: "resume (update suspended condition type in-place)",
js: testutils.MakeJobSet(jobSetName, ns).
ReplicatedJob(testutils.MakeReplicatedJob(replicatedJobName).
Job(testutils.MakeJobTemplate(jobName, ns).Obj()).
Replicas(1).
Obj()).Obj(),
newCondition: metav1.Condition{Type: string(jobset.JobSetSuspended), Reason: "JobsResumed", Status: metav1.ConditionStatus(corev1.ConditionFalse)},
conditions: []metav1.Condition{{Type: string(jobset.JobSetSuspended), Reason: "JobsSuspended", Status: metav1.ConditionStatus(corev1.ConditionTrue)}},
Obj()).
Conditions([]metav1.Condition{
// JobSet is currrently suspended.
{
Type: string(jobset.JobSetSuspended),
Reason: constants.JobSetSuspendedReason,
Message: constants.JobSetSuspendedMessage,
Status: metav1.ConditionStatus(corev1.ConditionTrue),
},
}).
Obj(),
opts: makeResumedConditionOpts(now),
expectedUpdate: true,
},
{
name: "duplicateComplete",
name: "existing conditions, attempt to add duplicate",
js: testutils.MakeJobSet(jobSetName, ns).
ReplicatedJob(testutils.MakeReplicatedJob(replicatedJobName).
Job(testutils.MakeJobTemplate(jobName, ns).Obj()).
Replicas(1).
Obj()).Obj(),
newCondition: metav1.Condition{Type: string(jobset.JobSetCompleted), Message: "Jobs completed", Reason: "JobsCompleted", Status: metav1.ConditionTrue},
conditions: []metav1.Condition{{Type: string(jobset.JobSetCompleted), Message: "Jobs completed", Reason: "JobsCompleted", Status: metav1.ConditionTrue}},
Obj()).
Conditions([]metav1.Condition{
// JobSet is completed..
{
Type: string(jobset.JobSetCompleted),
Reason: constants.AllJobsCompletedReason,
Message: constants.AllJobsCompletedMessage,
Status: metav1.ConditionStatus(corev1.ConditionTrue),
},
}).Obj(),
opts: completedConditionsOpts,
expectedUpdate: false,
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
jsWithConditions := tc.js
jsWithConditions.Status.Conditions = tc.conditions
gotUpdate := updateCondition(jsWithConditions, tc.newCondition, tc.forceUpdate)
gotUpdate := updateCondition(tc.js, tc.opts)
if gotUpdate != tc.expectedUpdate {
t.Errorf("updateCondition return mismatch")
t.Errorf("updateCondition return mismatch (want: %v, got %v)", tc.expectedUpdate, gotUpdate)
}
})
}
Expand Down Expand Up @@ -1099,7 +1086,7 @@ func TestCalculateReplicatedJobStatuses(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
r := JobSetReconciler{Client: (fake.NewClientBuilder()).Build()}
statuses := r.calculateReplicatedJobStatuses(context.TODO(), tc.js, &tc.jobs)
var less interface{} = func(a, b jobset.ReplicatedJobStatus) bool {
less := func(a, b jobset.ReplicatedJobStatus) bool {
return a.Name < b.Name
}
if diff := cmp.Diff(tc.expected, statuses, cmpopts.SortSlices(less)); diff != "" {
Expand Down
28 changes: 24 additions & 4 deletions pkg/controllers/startup_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ limitations under the License.
package controllers

import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

jobset "sigs.k8s.io/jobset/api/jobset/v1alpha2"
Expand All @@ -33,8 +34,8 @@ func inOrderStartupPolicy(sp *jobset.StartupPolicy) bool {
return sp != nil && sp.StartupPolicyOrder == jobset.InOrder
}

func inOrderStartupPolicyExecutingCondition() metav1.Condition {
return metav1.Condition{
func inOrderStartupPolicyInProgressCondition() *metav1.Condition {
return &metav1.Condition{
Type: string(jobset.JobSetStartupPolicyCompleted),
// Status is True when in order startup policy is completed.
// Otherwise it is set as False to indicate it is still executing.
Expand All @@ -44,8 +45,8 @@ func inOrderStartupPolicyExecutingCondition() metav1.Condition {
}
}

func inOrderStartupPolicyCompletedCondition() metav1.Condition {
return metav1.Condition{
func inOrderStartupPolicyCompletedCondition() *metav1.Condition {
return &metav1.Condition{
Type: string(jobset.JobSetStartupPolicyCompleted),
// Status is True when in order startup policy is completed.
// Otherwise it is set as False to indicate it is still executing.
Expand All @@ -54,3 +55,22 @@ func inOrderStartupPolicyCompletedCondition() metav1.Condition {
Message: constants.InOrderStartupPolicyCompletedMessage,
}
}

// setInOrderStartupPolicyInProgress sets a condition on the JobSet status indicating it is
// currently executing an in-order startup policy.
func setInOrderStartupPolicyInProgress(js *jobset.JobSet, updateStatusOpts *statusUpdateOpts) {
// Add a condition to the JobSet indicating the in order startup policy is executing.
setCondition(js, &conditionOpts{
eventType: corev1.EventTypeNormal,
condition: inOrderStartupPolicyInProgressCondition(),
}, updateStatusOpts)
}

// setInOrderStartupPolicyCompleted sets a condition on the JobSet status indicating it has finished
// running an in-order startup policy to completion.
func setInOrderStartupPolicyCompleted(js *jobset.JobSet, updateStatusOpts *statusUpdateOpts) {
setCondition(js, &conditionOpts{
eventType: corev1.EventTypeNormal,
condition: inOrderStartupPolicyCompletedCondition(),
}, updateStatusOpts)
}
6 changes: 6 additions & 0 deletions pkg/util/testing/wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ func MakeJobSet(name, ns string) *JobSetWrapper {
}
}

// Conditions sets the value of jobSet.status.conditions
func (j *JobSetWrapper) Conditions(conditions []metav1.Condition) *JobSetWrapper {
j.Status.Conditions = conditions
return j
}

// ManagedBy sets the value of jobSet.spec.managedBy
func (j *JobSetWrapper) ManagedBy(managedBy string) *JobSetWrapper {
j.Spec.ManagedBy = ptr.To(managedBy)
Expand Down
5 changes: 5 additions & 0 deletions test/integration/controller/jobset_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -825,6 +825,7 @@ var _ = ginkgo.Describe("JobSet controller", func() {
})
},
updates: []*update{
// Ensure replicated job statuses report all child jobs are suspended.
{
checkJobSetState: func(js *jobset.JobSet) {
matchJobSetReplicatedStatus(js, []jobset.ReplicatedJobStatus{
Expand All @@ -839,6 +840,8 @@ var _ = ginkgo.Describe("JobSet controller", func() {
})
},
},
// Resume jobset. Only first replicated job should be unsuspended due to in-order
// startup policy.
{
jobSetUpdateFn: func(js *jobset.JobSet) {
suspendJobSet(js, false)
Expand All @@ -858,6 +861,8 @@ var _ = ginkgo.Describe("JobSet controller", func() {
})
},
},
// Update first replicatedJob so all its child jobs are ready. This will allow
// the next replicatedJob to proceed.
{
jobUpdateFn: func(jobList *batchv1.JobList) {
readyReplicatedJob(jobList, "replicated-job-a")
Expand Down
4 changes: 2 additions & 2 deletions test/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func JobSetResumed(ctx context.Context, k8sClient client.Client, js *jobset.JobS
}

func JobSetStartupPolicyComplete(ctx context.Context, k8sClient client.Client, js *jobset.JobSet, timeout time.Duration) {
ginkgo.By(fmt.Sprintf("checking jobset status is: %s", jobset.JobSetStartupPolicyCompleted))
ginkgo.By(fmt.Sprintf("checking jobset condition %q status is %q", jobset.JobSetStartupPolicyCompleted, metav1.ConditionTrue))
conditions := []metav1.Condition{
{
Type: string(jobset.JobSetStartupPolicyCompleted),
Expand All @@ -107,7 +107,7 @@ func JobSetStartupPolicyComplete(ctx context.Context, k8sClient client.Client, j
}

func JobSetStartupPolicyNotFinished(ctx context.Context, k8sClient client.Client, js *jobset.JobSet, timeout time.Duration) {
ginkgo.By(fmt.Sprintf("checking jobset status is: %s", jobset.JobSetStartupPolicyCompleted))
ginkgo.By(fmt.Sprintf("checking jobset condition %q status is %q", jobset.JobSetStartupPolicyCompleted, metav1.ConditionFalse))
conditions := []metav1.Condition{
{
Type: string(jobset.JobSetStartupPolicyCompleted),
Expand Down

0 comments on commit e6940bf

Please sign in to comment.