From e434fe7345661483d515121b67a65a8e132423a4 Mon Sep 17 00:00:00 2001 From: Kevin Hannon Date: Wed, 7 Sep 2022 17:59:47 -0400 Subject: [PATCH] Add Admitted Workloads Field to LocalQueueStatus Update apis/kueue/v1alpha2/localqueue_types.go Co-authored-by: Aldo Culquicondor <1299064+alculquicondor@users.noreply.github.com> Update pkg/cache/cache.go Co-authored-by: Aldo Culquicondor <1299064+alculquicondor@users.noreply.github.com> Update pkg/cache/cache_test.go Co-authored-by: Aldo Culquicondor <1299064+alculquicondor@users.noreply.github.com> pr changes Updating localqueue_controller_test to refect AdmittedWorkload add Pending comment fix cache flattened use flattened map with just ns and name Change function name and cleanup test code --- apis/kueue/v1alpha2/localqueue_types.go | 8 +- .../crd/bases/kueue.x-k8s.io_localqueues.yaml | 10 +- pkg/cache/cache.go | 11 ++ pkg/cache/cache_test.go | 109 ++++++------------ pkg/controller/core/localqueue_controller.go | 1 + .../core/localqueue_controller_test.go | 4 +- 6 files changed, 66 insertions(+), 77 deletions(-) diff --git a/apis/kueue/v1alpha2/localqueue_types.go b/apis/kueue/v1alpha2/localqueue_types.go index a6efb36daf..773ad00dd7 100644 --- a/apis/kueue/v1alpha2/localqueue_types.go +++ b/apis/kueue/v1alpha2/localqueue_types.go @@ -31,10 +31,14 @@ type ClusterQueueReference string // LocalQueueStatus defines the observed state of LocalQueue type LocalQueueStatus struct { - // PendingWorkloads is the number of workloads currently admitted to this - // localQueue not yet admitted to a ClusterQueue. + // PendingWorkloads is the number of Workloads in the LocalQueue not yet admitted to a ClusterQueue // +optional PendingWorkloads int32 `json:"pendingWorkloads"` + + // AdmittedWorkloads is the number of workloads in this LocalQueue + // admitted to a ClusterQueue and that haven't finished yet. + // +optional + AdmittedWorkloads int32 `json:"admittedWorkloads"` } //+kubebuilder:object:root=true diff --git a/config/components/crd/bases/kueue.x-k8s.io_localqueues.yaml b/config/components/crd/bases/kueue.x-k8s.io_localqueues.yaml index 91e7b1ad5a..3f67df7f47 100644 --- a/config/components/crd/bases/kueue.x-k8s.io_localqueues.yaml +++ b/config/components/crd/bases/kueue.x-k8s.io_localqueues.yaml @@ -55,9 +55,15 @@ spec: status: description: LocalQueueStatus defines the observed state of LocalQueue properties: + admittedWorkloads: + description: AdmittedWorkloads is the number of workloads in this + LocalQueue admitted to a ClusterQueue and that haven't finished + yet. + format: int32 + type: integer pendingWorkloads: - description: PendingWorkloads is the number of workloads currently - admitted to this localQueue not yet admitted to a ClusterQueue. + description: PendingWorkloads is the number of Workloads in the LocalQueue + not yet admitted to a ClusterQueue format: int32 type: integer type: object diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index 20d0ebe810..3fb19cb705 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -148,6 +148,17 @@ func (c *Cache) newClusterQueue(cq *kueue.ClusterQueue) (*ClusterQueue, error) { return cqImpl, nil } +func (c *Cache) AdmittedWorkloadsInLocalQueue(localQueue *kueue.LocalQueue) int32 { + c.Lock() + defer c.Unlock() + cq, ok := c.clusterQueues[string(localQueue.Spec.ClusterQueue)] + if !ok { + return 0 + } + qKey := queueKey(localQueue) + return int32(cq.admittedWorkloadsPerQueue[qKey]) +} + func (c *ClusterQueue) Active() bool { return c.Status == active } diff --git a/pkg/cache/cache_test.go b/pkg/cache/cache_test.go index e3c10fed16..c017b9eec1 100644 --- a/pkg/cache/cache_test.go +++ b/pkg/cache/cache_test.go @@ -1304,7 +1304,7 @@ func TestCacheQueueOperations(t *testing.T) { } cases := map[string]struct { ops []func(context.Context, client.Client, *Cache) error - wantQueueCounts map[string]map[string]int + wantQueueCounts map[string]int32 }{ "insert cqs, queues, workloads": { ops: []func(ctx context.Context, cl client.Client, cache *Cache) error{ @@ -1312,14 +1312,10 @@ func TestCacheQueueOperations(t *testing.T) { insertAllQueues, insertAllWorkloads, }, - wantQueueCounts: map[string]map[string]int{ - "foo": { - "ns1/alpha": 1, - "ns2/beta": 2, - }, - "bar": { - "ns1/gamma": 1, - }, + wantQueueCounts: map[string]int32{ + "ns1/alpha": 1, + "ns2/beta": 2, + "ns1/gamma": 1, }, }, "insert cqs, workloads but no queues": { @@ -1327,17 +1323,14 @@ func TestCacheQueueOperations(t *testing.T) { insertAllClusterQueues, insertAllWorkloads, }, - wantQueueCounts: map[string]map[string]int{ - "foo": {}, - "bar": {}, - }, + wantQueueCounts: map[string]int32{}, }, "insert queues, workloads but no cqs": { ops: []func(context.Context, client.Client, *Cache) error{ insertAllQueues, insertAllWorkloads, }, - wantQueueCounts: map[string]map[string]int{}, + wantQueueCounts: map[string]int32{}, }, "insert queues last": { ops: []func(context.Context, client.Client, *Cache) error{ @@ -1345,14 +1338,10 @@ func TestCacheQueueOperations(t *testing.T) { insertAllWorkloads, insertAllQueues, }, - wantQueueCounts: map[string]map[string]int{ - "foo": { - "ns1/alpha": 1, - "ns2/beta": 2, - }, - "bar": { - "ns1/gamma": 1, - }, + wantQueueCounts: map[string]int32{ + "ns1/alpha": 1, + "ns2/beta": 2, + "ns1/gamma": 1, }, }, "insert cqs last": { @@ -1361,14 +1350,10 @@ func TestCacheQueueOperations(t *testing.T) { insertAllWorkloads, insertAllClusterQueues, }, - wantQueueCounts: map[string]map[string]int{ - "foo": { - "ns1/alpha": 1, - "ns2/beta": 2, - }, - "bar": { - "ns1/gamma": 1, - }, + wantQueueCounts: map[string]int32{ + "ns1/alpha": 1, + "ns2/beta": 2, + "ns1/gamma": 1, }, }, "assume": { @@ -1383,14 +1368,10 @@ func TestCacheQueueOperations(t *testing.T) { return cache.AssumeWorkload(wl) }, }, - wantQueueCounts: map[string]map[string]int{ - "foo": { - "ns1/alpha": 1, - "ns2/beta": 0, - }, - "bar": { - "ns1/gamma": 0, - }, + wantQueueCounts: map[string]int32{ + "ns1/alpha": 1, + "ns2/beta": 0, + "ns1/gamma": 0, }, }, "assume and forget": { @@ -1408,14 +1389,10 @@ func TestCacheQueueOperations(t *testing.T) { return cache.ForgetWorkload(wl) }, }, - wantQueueCounts: map[string]map[string]int{ - "foo": { - "ns1/alpha": 0, - "ns2/beta": 0, - }, - "bar": { - "ns1/gamma": 0, - }, + wantQueueCounts: map[string]int32{ + "ns1/alpha": 0, + "ns2/beta": 0, + "ns1/gamma": 0, }, }, "delete workload": { @@ -1427,14 +1404,10 @@ func TestCacheQueueOperations(t *testing.T) { return cache.DeleteWorkload(workloads[0]) }, }, - wantQueueCounts: map[string]map[string]int{ - "foo": { - "ns1/alpha": 0, - "ns2/beta": 2, - }, - "bar": { - "ns1/gamma": 1, - }, + wantQueueCounts: map[string]int32{ + "ns1/alpha": 0, + "ns2/beta": 2, + "ns1/gamma": 1, }, }, "delete cq": { @@ -1447,10 +1420,8 @@ func TestCacheQueueOperations(t *testing.T) { return nil }, }, - wantQueueCounts: map[string]map[string]int{ - "bar": { - "ns1/gamma": 1, - }, + wantQueueCounts: map[string]int32{ + "ns1/gamma": 1, }, }, "delete queue": { @@ -1463,13 +1434,9 @@ func TestCacheQueueOperations(t *testing.T) { return nil }, }, - wantQueueCounts: map[string]map[string]int{ - "foo": { - "ns2/beta": 2, - }, - "bar": { - "ns1/gamma": 1, - }, + wantQueueCounts: map[string]int32{ + "ns2/beta": 2, + "ns1/gamma": 1, }, }, // Not tested: changing a workload's queue and changing a queue's cluster queue. @@ -1489,12 +1456,12 @@ func TestCacheQueueOperations(t *testing.T) { t.Fatalf("Running op %d: %v", i, err) } } - qCounts := make(map[string]map[string]int) - for _, cq := range cache.clusterQueues { - qCounts[cq.Name] = cq.admittedWorkloadsPerQueue - } - if diff := cmp.Diff(tc.wantQueueCounts, qCounts); diff != "" { - t.Errorf("Wrong active workloads counters for queues (-want,+got):\n%s", diff) + for _, lq := range queues { + queueAdmitted := cache.AdmittedWorkloadsInLocalQueue(lq) + key := fmt.Sprintf("%s/%s", lq.Namespace, lq.Name) + if diff := cmp.Diff(tc.wantQueueCounts[key], queueAdmitted); diff != "" { + t.Errorf("Want %d but got %d for %s", tc.wantQueueCounts[key], queueAdmitted, key) + } } }) } diff --git a/pkg/controller/core/localqueue_controller.go b/pkg/controller/core/localqueue_controller.go index 59ae7e8e5d..562504f09b 100644 --- a/pkg/controller/core/localqueue_controller.go +++ b/pkg/controller/core/localqueue_controller.go @@ -84,6 +84,7 @@ func (r *LocalQueueReconciler) Reconcile(ctx context.Context, req ctrl.Request) } queueObj.Status.PendingWorkloads = pending + queueObj.Status.AdmittedWorkloads = r.cache.AdmittedWorkloadsInLocalQueue(&queueObj) if !equality.Semantic.DeepEqual(oldStatus, queueObj.Status) { err := r.client.Status().Update(ctx, &queueObj) return ctrl.Result{}, client.IgnoreNotFound(err) diff --git a/test/integration/controller/core/localqueue_controller_test.go b/test/integration/controller/core/localqueue_controller_test.go index e7d11d8d8b..3f80608d71 100644 --- a/test/integration/controller/core/localqueue_controller_test.go +++ b/test/integration/controller/core/localqueue_controller_test.go @@ -82,7 +82,7 @@ var _ = ginkgo.Describe("Queue controller", func() { var updatedQueue kueue.LocalQueue gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(queue), &updatedQueue)).To(gomega.Succeed()) return updatedQueue.Status - }, framework.Timeout, framework.Interval).Should(gomega.BeComparableTo(kueue.LocalQueueStatus{PendingWorkloads: 3})) + }, framework.Timeout, framework.Interval).Should(gomega.BeComparableTo(kueue.LocalQueueStatus{AdmittedWorkloads: 0, PendingWorkloads: 3})) ginkgo.By("Admitting workloads") for _, w := range workloads { @@ -98,7 +98,7 @@ var _ = ginkgo.Describe("Queue controller", func() { var updatedQueue kueue.LocalQueue gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(queue), &updatedQueue)).To(gomega.Succeed()) return updatedQueue.Status - }, framework.Timeout, framework.Interval).Should(gomega.BeComparableTo(kueue.LocalQueueStatus{PendingWorkloads: 0})) + }, framework.Timeout, framework.Interval).Should(gomega.BeComparableTo(kueue.LocalQueueStatus{AdmittedWorkloads: 3, PendingWorkloads: 0})) ginkgo.By("Finishing workloads") framework.FinishWorkloads(ctx, k8sClient, workloads...)