Skip to content

Commit

Permalink
Merge pull request #382 from kannon92/issue-259-clean-history
Browse files Browse the repository at this point in the history
Add Admitted Workloads Field to LocalQueueStatus
  • Loading branch information
k8s-ci-robot committed Sep 9, 2022
2 parents 93a2d21 + e434fe7 commit b2e20c7
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 77 deletions.
8 changes: 6 additions & 2 deletions apis/kueue/v1alpha2/localqueue_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,14 @@ type ClusterQueueReference string

// LocalQueueStatus defines the observed state of LocalQueue
type LocalQueueStatus struct {
// PendingWorkloads is the number of workloads currently admitted to this
// localQueue not yet admitted to a ClusterQueue.
// PendingWorkloads is the number of Workloads in the LocalQueue not yet admitted to a ClusterQueue
// +optional
PendingWorkloads int32 `json:"pendingWorkloads"`

// AdmittedWorkloads is the number of workloads in this LocalQueue
// admitted to a ClusterQueue and that haven't finished yet.
// +optional
AdmittedWorkloads int32 `json:"admittedWorkloads"`
}

//+kubebuilder:object:root=true
Expand Down
10 changes: 8 additions & 2 deletions config/components/crd/bases/kueue.x-k8s.io_localqueues.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,15 @@ spec:
status:
description: LocalQueueStatus defines the observed state of LocalQueue
properties:
admittedWorkloads:
description: AdmittedWorkloads is the number of workloads in this
LocalQueue admitted to a ClusterQueue and that haven't finished
yet.
format: int32
type: integer
pendingWorkloads:
description: PendingWorkloads is the number of workloads currently
admitted to this localQueue not yet admitted to a ClusterQueue.
description: PendingWorkloads is the number of Workloads in the LocalQueue
not yet admitted to a ClusterQueue
format: int32
type: integer
type: object
Expand Down
11 changes: 11 additions & 0 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,17 @@ func (c *Cache) newClusterQueue(cq *kueue.ClusterQueue) (*ClusterQueue, error) {
return cqImpl, nil
}

func (c *Cache) AdmittedWorkloadsInLocalQueue(localQueue *kueue.LocalQueue) int32 {
c.Lock()
defer c.Unlock()
cq, ok := c.clusterQueues[string(localQueue.Spec.ClusterQueue)]
if !ok {
return 0
}
qKey := queueKey(localQueue)
return int32(cq.admittedWorkloadsPerQueue[qKey])
}

func (c *ClusterQueue) Active() bool {
return c.Status == active
}
Expand Down
109 changes: 38 additions & 71 deletions pkg/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1304,55 +1304,44 @@ func TestCacheQueueOperations(t *testing.T) {
}
cases := map[string]struct {
ops []func(context.Context, client.Client, *Cache) error
wantQueueCounts map[string]map[string]int
wantQueueCounts map[string]int32
}{
"insert cqs, queues, workloads": {
ops: []func(ctx context.Context, cl client.Client, cache *Cache) error{
insertAllClusterQueues,
insertAllQueues,
insertAllWorkloads,
},
wantQueueCounts: map[string]map[string]int{
"foo": {
"ns1/alpha": 1,
"ns2/beta": 2,
},
"bar": {
"ns1/gamma": 1,
},
wantQueueCounts: map[string]int32{
"ns1/alpha": 1,
"ns2/beta": 2,
"ns1/gamma": 1,
},
},
"insert cqs, workloads but no queues": {
ops: []func(context.Context, client.Client, *Cache) error{
insertAllClusterQueues,
insertAllWorkloads,
},
wantQueueCounts: map[string]map[string]int{
"foo": {},
"bar": {},
},
wantQueueCounts: map[string]int32{},
},
"insert queues, workloads but no cqs": {
ops: []func(context.Context, client.Client, *Cache) error{
insertAllQueues,
insertAllWorkloads,
},
wantQueueCounts: map[string]map[string]int{},
wantQueueCounts: map[string]int32{},
},
"insert queues last": {
ops: []func(context.Context, client.Client, *Cache) error{
insertAllClusterQueues,
insertAllWorkloads,
insertAllQueues,
},
wantQueueCounts: map[string]map[string]int{
"foo": {
"ns1/alpha": 1,
"ns2/beta": 2,
},
"bar": {
"ns1/gamma": 1,
},
wantQueueCounts: map[string]int32{
"ns1/alpha": 1,
"ns2/beta": 2,
"ns1/gamma": 1,
},
},
"insert cqs last": {
Expand All @@ -1361,14 +1350,10 @@ func TestCacheQueueOperations(t *testing.T) {
insertAllWorkloads,
insertAllClusterQueues,
},
wantQueueCounts: map[string]map[string]int{
"foo": {
"ns1/alpha": 1,
"ns2/beta": 2,
},
"bar": {
"ns1/gamma": 1,
},
wantQueueCounts: map[string]int32{
"ns1/alpha": 1,
"ns2/beta": 2,
"ns1/gamma": 1,
},
},
"assume": {
Expand All @@ -1383,14 +1368,10 @@ func TestCacheQueueOperations(t *testing.T) {
return cache.AssumeWorkload(wl)
},
},
wantQueueCounts: map[string]map[string]int{
"foo": {
"ns1/alpha": 1,
"ns2/beta": 0,
},
"bar": {
"ns1/gamma": 0,
},
wantQueueCounts: map[string]int32{
"ns1/alpha": 1,
"ns2/beta": 0,
"ns1/gamma": 0,
},
},
"assume and forget": {
Expand All @@ -1408,14 +1389,10 @@ func TestCacheQueueOperations(t *testing.T) {
return cache.ForgetWorkload(wl)
},
},
wantQueueCounts: map[string]map[string]int{
"foo": {
"ns1/alpha": 0,
"ns2/beta": 0,
},
"bar": {
"ns1/gamma": 0,
},
wantQueueCounts: map[string]int32{
"ns1/alpha": 0,
"ns2/beta": 0,
"ns1/gamma": 0,
},
},
"delete workload": {
Expand All @@ -1427,14 +1404,10 @@ func TestCacheQueueOperations(t *testing.T) {
return cache.DeleteWorkload(workloads[0])
},
},
wantQueueCounts: map[string]map[string]int{
"foo": {
"ns1/alpha": 0,
"ns2/beta": 2,
},
"bar": {
"ns1/gamma": 1,
},
wantQueueCounts: map[string]int32{
"ns1/alpha": 0,
"ns2/beta": 2,
"ns1/gamma": 1,
},
},
"delete cq": {
Expand All @@ -1447,10 +1420,8 @@ func TestCacheQueueOperations(t *testing.T) {
return nil
},
},
wantQueueCounts: map[string]map[string]int{
"bar": {
"ns1/gamma": 1,
},
wantQueueCounts: map[string]int32{
"ns1/gamma": 1,
},
},
"delete queue": {
Expand All @@ -1463,13 +1434,9 @@ func TestCacheQueueOperations(t *testing.T) {
return nil
},
},
wantQueueCounts: map[string]map[string]int{
"foo": {
"ns2/beta": 2,
},
"bar": {
"ns1/gamma": 1,
},
wantQueueCounts: map[string]int32{
"ns2/beta": 2,
"ns1/gamma": 1,
},
},
// Not tested: changing a workload's queue and changing a queue's cluster queue.
Expand All @@ -1489,12 +1456,12 @@ func TestCacheQueueOperations(t *testing.T) {
t.Fatalf("Running op %d: %v", i, err)
}
}
qCounts := make(map[string]map[string]int)
for _, cq := range cache.clusterQueues {
qCounts[cq.Name] = cq.admittedWorkloadsPerQueue
}
if diff := cmp.Diff(tc.wantQueueCounts, qCounts); diff != "" {
t.Errorf("Wrong active workloads counters for queues (-want,+got):\n%s", diff)
for _, lq := range queues {
queueAdmitted := cache.AdmittedWorkloadsInLocalQueue(lq)
key := fmt.Sprintf("%s/%s", lq.Namespace, lq.Name)
if diff := cmp.Diff(tc.wantQueueCounts[key], queueAdmitted); diff != "" {
t.Errorf("Want %d but got %d for %s", tc.wantQueueCounts[key], queueAdmitted, key)
}
}
})
}
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/core/localqueue_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ func (r *LocalQueueReconciler) Reconcile(ctx context.Context, req ctrl.Request)
}

queueObj.Status.PendingWorkloads = pending
queueObj.Status.AdmittedWorkloads = r.cache.AdmittedWorkloadsInLocalQueue(&queueObj)
if !equality.Semantic.DeepEqual(oldStatus, queueObj.Status) {
err := r.client.Status().Update(ctx, &queueObj)
return ctrl.Result{}, client.IgnoreNotFound(err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ var _ = ginkgo.Describe("Queue controller", func() {
var updatedQueue kueue.LocalQueue
gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(queue), &updatedQueue)).To(gomega.Succeed())
return updatedQueue.Status
}, framework.Timeout, framework.Interval).Should(gomega.BeComparableTo(kueue.LocalQueueStatus{PendingWorkloads: 3}))
}, framework.Timeout, framework.Interval).Should(gomega.BeComparableTo(kueue.LocalQueueStatus{AdmittedWorkloads: 0, PendingWorkloads: 3}))

ginkgo.By("Admitting workloads")
for _, w := range workloads {
Expand All @@ -98,7 +98,7 @@ var _ = ginkgo.Describe("Queue controller", func() {
var updatedQueue kueue.LocalQueue
gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(queue), &updatedQueue)).To(gomega.Succeed())
return updatedQueue.Status
}, framework.Timeout, framework.Interval).Should(gomega.BeComparableTo(kueue.LocalQueueStatus{PendingWorkloads: 0}))
}, framework.Timeout, framework.Interval).Should(gomega.BeComparableTo(kueue.LocalQueueStatus{AdmittedWorkloads: 3, PendingWorkloads: 0}))

ginkgo.By("Finishing workloads")
framework.FinishWorkloads(ctx, k8sClient, workloads...)
Expand Down

0 comments on commit b2e20c7

Please sign in to comment.