Skip to content

Commit

Permalink
Set ObservedGeneration in conditions and update tests
Browse files Browse the repository at this point in the history
  • Loading branch information
vladikkuzn committed Apr 3, 2024
1 parent 3761abf commit a139a91
Show file tree
Hide file tree
Showing 36 changed files with 449 additions and 389 deletions.
9 changes: 5 additions & 4 deletions pkg/controller/admissionchecks/multikueue/admissioncheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,11 @@ func (a *ACReconciler) Reconcile(ctx context.Context, req reconcile.Request) (re
log.V(2).Info("Reconcile AdmissionCheck")

newCondition := metav1.Condition{
Type: kueue.AdmissionCheckActive,
Status: metav1.ConditionTrue,
Reason: "Active",
Message: "The admission check is active",
Type: kueue.AdmissionCheckActive,
Status: metav1.ConditionTrue,
Reason: "Active",
Message: "The admission check is active",
ObservedGeneration: ac.Generation,
}

if cfg, err := a.helper.ConfigFromRef(ctx, ac.Spec.Parameters); err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -454,10 +454,11 @@ func (c *clustersReconciler) getKubeConfigFromPath(path string) ([]byte, bool, e

func (c *clustersReconciler) updateStatus(ctx context.Context, cluster *kueuealpha.MultiKueueCluster, active bool, reason, message string) error {
newCondition := metav1.Condition{
Type: kueuealpha.MultiKueueClusterActive,
Status: metav1.ConditionFalse,
Reason: reason,
Message: message,
Type: kueuealpha.MultiKueueClusterActive,
Status: metav1.ConditionFalse,
Reason: reason,
Message: message,
ObservedGeneration: cluster.Generation,
}
if active {
newCondition.Status = metav1.ConditionTrue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,11 @@ func (a *acReconciler) Reconcile(ctx context.Context, req reconcile.Request) (re

currentCondition := ptr.Deref(apimeta.FindStatusCondition(ac.Status.Conditions, kueue.AdmissionCheckActive), metav1.Condition{})
newCondition := metav1.Condition{
Type: kueue.AdmissionCheckActive,
Status: metav1.ConditionTrue,
Reason: "Active",
Message: "The admission check is active",
Type: kueue.AdmissionCheckActive,
Status: metav1.ConditionTrue,
Reason: "Active",
Message: "The admission check is active",
ObservedGeneration: ac.Generation,
}

if _, err := a.helper.ConfigFromRef(ctx, ac.Spec.Parameters); err != nil {
Expand Down
9 changes: 5 additions & 4 deletions pkg/controller/core/clusterqueue_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -650,10 +650,11 @@ func (r *ClusterQueueReconciler) updateCqStatusIfChanged(
cq.Status.PendingWorkloads = int32(pendingWorkloads)
cq.Status.PendingWorkloadsStatus = r.getWorkloadsStatus(cq)
meta.SetStatusCondition(&cq.Status.Conditions, metav1.Condition{
Type: kueue.ClusterQueueActive,
Status: conditionStatus,
Reason: reason,
Message: msg,
Type: kueue.ClusterQueueActive,
Status: conditionStatus,
Reason: reason,
Message: msg,
ObservedGeneration: cq.Generation,
})
if !equality.Semantic.DeepEqual(cq.Status, oldStatus) {
return r.client.Status().Update(ctx, cq)
Expand Down
9 changes: 5 additions & 4 deletions pkg/controller/core/localqueue_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,10 +288,11 @@ func (r *LocalQueueReconciler) UpdateStatusIfChanged(
queue.Status.FlavorUsage = stats.AdmittedResources
if len(conditionStatus) != 0 && len(reason) != 0 && len(msg) != 0 {
meta.SetStatusCondition(&queue.Status.Conditions, metav1.Condition{
Type: kueue.LocalQueueActive,
Status: conditionStatus,
Reason: reason,
Message: msg,
Type: kueue.LocalQueueActive,
Status: conditionStatus,
Reason: reason,
Message: msg,
ObservedGeneration: queue.Generation,
})
}
if !equality.Semantic.DeepEqual(oldStatus, queue.Status) {
Expand Down
9 changes: 5 additions & 4 deletions pkg/controller/jobframework/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -959,10 +959,11 @@ func generatePodsReadyCondition(job GenericJob, wl *kueue.Workload) metav1.Condi
message = "All pods were ready or succeeded since the workload admission"
}
return metav1.Condition{
Type: kueue.WorkloadPodsReady,
Status: conditionStatus,
Reason: "PodsReady",
Message: message,
Type: kueue.WorkloadPodsReady,
Status: conditionStatus,
Reason: "PodsReady",
Message: message,
ObservedGeneration: wl.Generation,
}
}

Expand Down
9 changes: 5 additions & 4 deletions pkg/controller/jobs/job/job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,10 +290,11 @@ func (j *Job) Finished() (metav1.Condition, bool) {
}

condition := metav1.Condition{
Type: kueue.WorkloadFinished,
Status: metav1.ConditionTrue,
Reason: "JobFinished",
Message: "Job finished successfully",
Type: kueue.WorkloadFinished,
Status: metav1.ConditionTrue,
Reason: "JobFinished",
Message: "Job finished successfully",
ObservedGeneration: j.Generation,
}
if conditionType == batchv1.JobFailed {
condition.Message = "Job failed"
Expand Down
9 changes: 5 additions & 4 deletions pkg/controller/jobs/mpijob/mpijob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,11 @@ func (j *MPIJob) Finished() (metav1.Condition, bool) {
message = "Job failed"
}
condition := metav1.Condition{
Type: kueue.WorkloadFinished,
Status: metav1.ConditionTrue,
Reason: "JobFinished",
Message: message,
Type: kueue.WorkloadFinished,
Status: metav1.ConditionTrue,
Reason: "JobFinished",
Message: message,
ObservedGeneration: j.Generation,
}
return condition, finished
}
Expand Down
9 changes: 5 additions & 4 deletions pkg/controller/jobs/rayjob/rayjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,10 +162,11 @@ func (j *RayJob) RestorePodSetsInfo(podSetsInfo []podset.PodSetInfo) bool {

func (j *RayJob) Finished() (metav1.Condition, bool) {
condition := metav1.Condition{
Type: kueue.WorkloadFinished,
Status: metav1.ConditionTrue,
Reason: string(j.Status.JobStatus),
Message: j.Status.Message,
Type: kueue.WorkloadFinished,
Status: metav1.ConditionTrue,
Reason: string(j.Status.JobStatus),
Message: j.Status.Message,
ObservedGeneration: j.Generation,
}

return condition, j.Status.JobStatus == rayjobapi.JobStatusFailed || j.Status.JobStatus == rayjobapi.JobStatusSucceeded
Expand Down
5 changes: 2 additions & 3 deletions pkg/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"errors"
"reflect"
"sigs.k8s.io/kueue/test/util"
"sort"
"sync"
"testing"
Expand Down Expand Up @@ -2051,8 +2052,6 @@ func TestLastSchedulingContext(t *testing.T) {
}
}

var ignoreConditionTimestamps = cmpopts.IgnoreFields(metav1.Condition{}, "LastTransitionTime")

func TestRequeueAndUpdate(t *testing.T) {
cq := utiltesting.MakeClusterQueue("cq").Obj()
q1 := utiltesting.MakeLocalQueue("q1", "ns1").ClusterQueue(cq.Name).Obj()
Expand Down Expand Up @@ -2181,7 +2180,7 @@ func TestRequeueAndUpdate(t *testing.T) {
if err := cl.Get(ctx, client.ObjectKeyFromObject(w1), &updatedWl); err != nil {
t.Fatalf("Failed obtaining updated object: %v", err)
}
if diff := cmp.Diff(tc.wantStatus, updatedWl.Status, ignoreConditionTimestamps); diff != "" {
if diff := cmp.Diff(tc.wantStatus, updatedWl.Status, util.IgnoreConditionTimestamps); diff != "" {
t.Errorf("Unexpected status after updating (-want,+got):\n%s", diff)
}
// Make sure a second call doesn't make unnecessary updates.
Expand Down
5 changes: 5 additions & 0 deletions pkg/util/testing/wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@ func (w *WorkloadWrapper) UID(uid types.UID) *WorkloadWrapper {
return w
}

func (w *WorkloadWrapper) Generation(num int64) *WorkloadWrapper {
w.ObjectMeta.Generation = num
return w
}

func (w *WorkloadWrapper) Name(name string) *WorkloadWrapper {
w.Workload.Name = name
return w
Expand Down
9 changes: 5 additions & 4 deletions pkg/workload/admissionchecks.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,11 @@ func SyncAdmittedCondition(w *kueue.Workload) bool {
return false
}
newCondition := metav1.Condition{
Type: kueue.WorkloadAdmitted,
Status: metav1.ConditionTrue,
Reason: "Admitted",
Message: "The workload is admitted",
Type: kueue.WorkloadAdmitted,
Status: metav1.ConditionTrue,
Reason: "Admitted",
Message: "The workload is admitted",
ObservedGeneration: w.Generation,
}
switch {
case !hasReservation && !hasAllChecksReady:
Expand Down
4 changes: 4 additions & 0 deletions pkg/workload/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ func UpdateStatus(ctx context.Context,
LastTransitionTime: now,
Reason: reason,
Message: api.TruncateConditionMessage(message),
ObservedGeneration: wl.Generation,
}

newWl := BaseSSAWorkload(wl)
Expand All @@ -328,6 +329,7 @@ func UnsetQuotaReservationWithCondition(wl *kueue.Workload, reason, message stri
LastTransitionTime: metav1.Now(),
Reason: reason,
Message: api.TruncateConditionMessage(message),
ObservedGeneration: wl.Generation,
}
changed := apimeta.SetStatusCondition(&wl.Status.Conditions, condition)
if wl.Status.Admission != nil {
Expand Down Expand Up @@ -374,6 +376,7 @@ func SetQuotaReservation(w *kueue.Workload, admission *kueue.Admission) {
LastTransitionTime: metav1.Now(),
Reason: "QuotaReserved",
Message: fmt.Sprintf("Quota reserved in ClusterQueue %s", w.Status.Admission.ClusterQueue),
ObservedGeneration: w.Generation,
}
apimeta.SetStatusCondition(&w.Status.Conditions, admittedCond)

Expand All @@ -391,6 +394,7 @@ func SetEvictedCondition(w *kueue.Workload, reason string, message string) {
LastTransitionTime: metav1.Now(),
Reason: reason,
Message: message,
ObservedGeneration: w.Generation,
}
apimeta.SetStatusCondition(&w.Status.Conditions, condition)
}
Expand Down
22 changes: 11 additions & 11 deletions pkg/workload/workload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,6 @@ func TestNewInfo(t *testing.T) {
}
}

var ignoreConditionTimestamps = cmpopts.IgnoreFields(metav1.Condition{}, "LastTransitionTime")

func TestUpdateWorkloadStatus(t *testing.T) {
cases := map[string]struct {
oldStatus kueue.WorkloadStatus
Expand All @@ -218,10 +216,11 @@ func TestUpdateWorkloadStatus(t *testing.T) {
wantStatus: kueue.WorkloadStatus{
Conditions: []metav1.Condition{
{
Type: kueue.WorkloadQuotaReserved,
Status: metav1.ConditionFalse,
Reason: "Pending",
Message: "didn't fit",
Type: kueue.WorkloadQuotaReserved,
Status: metav1.ConditionFalse,
Reason: "Pending",
Message: "didn't fit",
ObservedGeneration: 1,
},
},
},
Expand All @@ -243,17 +242,18 @@ func TestUpdateWorkloadStatus(t *testing.T) {
wantStatus: kueue.WorkloadStatus{
Conditions: []metav1.Condition{
{
Type: kueue.WorkloadQuotaReserved,
Status: metav1.ConditionTrue,
Reason: "Admitted",
Type: kueue.WorkloadQuotaReserved,
Status: metav1.ConditionTrue,
Reason: "Admitted",
ObservedGeneration: 1,
},
},
},
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
workload := utiltesting.MakeWorkload("foo", "bar").Obj()
workload := utiltesting.MakeWorkload("foo", "bar").Generation(1).Obj()
workload.Status = tc.oldStatus
cl := utiltesting.NewFakeClientSSAAsSM(workload)
ctx := context.Background()
Expand All @@ -265,7 +265,7 @@ func TestUpdateWorkloadStatus(t *testing.T) {
if err := cl.Get(ctx, client.ObjectKeyFromObject(workload), &updatedWl); err != nil {
t.Fatalf("Failed obtaining updated object: %v", err)
}
if diff := cmp.Diff(tc.wantStatus, updatedWl.Status, ignoreConditionTimestamps); diff != "" {
if diff := cmp.Diff(tc.wantStatus, updatedWl.Status, util.IgnoreConditionTimestamps); diff != "" {
t.Errorf("Unexpected status after updating (-want,+got):\n%s", diff)
}
})
Expand Down
8 changes: 4 additions & 4 deletions test/e2e/multikueue/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package mke2e

import (
"os/exec"
"sigs.k8s.io/kueue/test/util"

"github.com/google/go-cmp/cmp/cmpopts"
"github.com/onsi/ginkgo/v2"
Expand All @@ -40,7 +41,6 @@ import (
testingjob "sigs.k8s.io/kueue/pkg/util/testingjobs/job"
testingjobset "sigs.k8s.io/kueue/pkg/util/testingjobs/jobset"
"sigs.k8s.io/kueue/pkg/workload"
"sigs.k8s.io/kueue/test/util"
)

// +kubebuilder:docs-gen:collapse=Imports
Expand Down Expand Up @@ -221,7 +221,7 @@ var _ = ginkgo.Describe("MultiKueue", func() {
Status: metav1.ConditionTrue,
Reason: "JobFinished",
Message: `Job finished successfully`,
}, cmpopts.IgnoreFields(metav1.Condition{}, "LastTransitionTime")))
}, util.IgnoreConditionTimestamps, util.IgnoreConditionObservedGeneration))
}, util.LongTimeout, util.Interval).Should(gomega.Succeed())
})

Expand Down Expand Up @@ -286,7 +286,7 @@ var _ = ginkgo.Describe("MultiKueue", func() {
Status: metav1.ConditionTrue,
Reason: "Admitted",
Message: "The workload is admitted",
}, cmpopts.IgnoreFields(metav1.Condition{}, "LastTransitionTime")))
}, util.IgnoreConditionTimestamps, util.IgnoreConditionObservedGeneration))
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})

Expand Down Expand Up @@ -314,7 +314,7 @@ var _ = ginkgo.Describe("MultiKueue", func() {
Status: metav1.ConditionTrue,
Reason: "JobSetFinished",
Message: "JobSet finished successfully",
}, cmpopts.IgnoreFields(metav1.Condition{}, "LastTransitionTime")))
}, util.IgnoreConditionTimestamps, util.IgnoreConditionObservedGeneration))
}, util.LongTimeout, util.Interval).Should(gomega.Succeed())
})

Expand Down
4 changes: 2 additions & 2 deletions test/e2e/singlecluster/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/kueue/test/util"

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/controller/constants"
Expand All @@ -35,7 +36,6 @@ import (
"sigs.k8s.io/kueue/pkg/util/testing"
testingjob "sigs.k8s.io/kueue/pkg/util/testingjobs/job"
"sigs.k8s.io/kueue/pkg/workload"
"sigs.k8s.io/kueue/test/util"
)

// +kubebuilder:docs-gen:collapse=Imports
Expand Down Expand Up @@ -207,7 +207,7 @@ var _ = ginkgo.Describe("Kueue", func() {
Type: kueue.WorkloadFinished,
Status: metav1.ConditionTrue,
Reason: "JobFinished",
}, cmpopts.IgnoreFields(metav1.Condition{}, "LastTransitionTime", "Message"))))
}, util.IgnoreConditionMessage, util.IgnoreConditionTimestamps, util.IgnoreConditionObservedGeneration)))
}, util.LongTimeout, util.Interval).Should(gomega.Succeed())
})
})
Expand Down
3 changes: 1 addition & 2 deletions test/e2e/singlecluster/jobset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
package e2e

import (
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -107,7 +106,7 @@ var _ = ginkgo.Describe("Kueue", func() {
Status: metav1.ConditionTrue,
Reason: "JobSetFinished",
Message: "JobSet finished successfully",
}, cmpopts.IgnoreFields(metav1.Condition{}, "LastTransitionTime")))
}, util.IgnoreConditionTimestamps, util.IgnoreConditionObservedGeneration))
}, util.LongTimeout, util.Interval).Should(gomega.Succeed())
})
})
Expand Down
Loading

0 comments on commit a139a91

Please sign in to comment.