Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Admitted Workloads Field to LocalQueueStatus #382

Merged
merged 1 commit into from
Sep 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions apis/kueue/v1alpha2/localqueue_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,14 @@ type ClusterQueueReference string

// LocalQueueStatus defines the observed state of LocalQueue
type LocalQueueStatus struct {
// PendingWorkloads is the number of workloads currently admitted to this
// localQueue not yet admitted to a ClusterQueue.
// PendingWorkloads is the number of Workloads in the LocalQueue not yet admitted to a ClusterQueue
// +optional
PendingWorkloads int32 `json:"pendingWorkloads"`

// AdmittedWorkloads is the number of workloads in this LocalQueue
// admitted to a ClusterQueue and that haven't finished yet.
// +optional
AdmittedWorkloads int32 `json:"admittedWorkloads"`
}

//+kubebuilder:object:root=true
Expand Down
10 changes: 8 additions & 2 deletions config/components/crd/bases/kueue.x-k8s.io_localqueues.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,15 @@ spec:
status:
description: LocalQueueStatus defines the observed state of LocalQueue
properties:
admittedWorkloads:
description: AdmittedWorkloads is the number of workloads in this
LocalQueue admitted to a ClusterQueue and that haven't finished
yet.
format: int32
type: integer
pendingWorkloads:
description: PendingWorkloads is the number of workloads currently
admitted to this localQueue not yet admitted to a ClusterQueue.
description: PendingWorkloads is the number of Workloads in the LocalQueue
not yet admitted to a ClusterQueue
format: int32
type: integer
type: object
Expand Down
11 changes: 11 additions & 0 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,17 @@ func (c *Cache) newClusterQueue(cq *kueue.ClusterQueue) (*ClusterQueue, error) {
return cqImpl, nil
}

func (c *Cache) AdmittedWorkloadsInLocalQueue(localQueue *kueue.LocalQueue) int32 {
c.Lock()
defer c.Unlock()
cq, ok := c.clusterQueues[string(localQueue.Spec.ClusterQueue)]
if !ok {
return 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should also return error here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, the ClusterQueue might be created after the LocalQueue.

During that time, there can't be any admitted workloads. So 0 is the right value.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes, that's the case. Another approach is

queueObj.Status.AdmittedWorkloads, _ = r.cache.AdmittedWorkloadsLocalQueue(&queueObj)

But remaining the same also makes sense to me.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if there are no cases of error, I would not add error to the return values

}
qKey := queueKey(localQueue)
return int32(cq.admittedWorkloadsPerQueue[qKey])
}
kannon92 marked this conversation as resolved.
Show resolved Hide resolved

func (c *ClusterQueue) Active() bool {
return c.Status == active
}
Expand Down
109 changes: 38 additions & 71 deletions pkg/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1304,55 +1304,44 @@ func TestCacheQueueOperations(t *testing.T) {
}
cases := map[string]struct {
ops []func(context.Context, client.Client, *Cache) error
wantQueueCounts map[string]map[string]int
wantQueueCounts map[string]int32
}{
"insert cqs, queues, workloads": {
ops: []func(ctx context.Context, cl client.Client, cache *Cache) error{
insertAllClusterQueues,
insertAllQueues,
insertAllWorkloads,
},
wantQueueCounts: map[string]map[string]int{
"foo": {
"ns1/alpha": 1,
"ns2/beta": 2,
},
"bar": {
"ns1/gamma": 1,
},
wantQueueCounts: map[string]int32{
"ns1/alpha": 1,
"ns2/beta": 2,
"ns1/gamma": 1,
},
},
"insert cqs, workloads but no queues": {
ops: []func(context.Context, client.Client, *Cache) error{
insertAllClusterQueues,
insertAllWorkloads,
},
wantQueueCounts: map[string]map[string]int{
"foo": {},
"bar": {},
},
wantQueueCounts: map[string]int32{},
},
"insert queues, workloads but no cqs": {
ops: []func(context.Context, client.Client, *Cache) error{
insertAllQueues,
insertAllWorkloads,
},
wantQueueCounts: map[string]map[string]int{},
wantQueueCounts: map[string]int32{},
},
"insert queues last": {
ops: []func(context.Context, client.Client, *Cache) error{
insertAllClusterQueues,
insertAllWorkloads,
insertAllQueues,
},
wantQueueCounts: map[string]map[string]int{
"foo": {
"ns1/alpha": 1,
"ns2/beta": 2,
},
"bar": {
"ns1/gamma": 1,
},
wantQueueCounts: map[string]int32{
"ns1/alpha": 1,
"ns2/beta": 2,
"ns1/gamma": 1,
},
},
"insert cqs last": {
Expand All @@ -1361,14 +1350,10 @@ func TestCacheQueueOperations(t *testing.T) {
insertAllWorkloads,
insertAllClusterQueues,
},
wantQueueCounts: map[string]map[string]int{
"foo": {
"ns1/alpha": 1,
"ns2/beta": 2,
},
"bar": {
"ns1/gamma": 1,
},
wantQueueCounts: map[string]int32{
"ns1/alpha": 1,
"ns2/beta": 2,
"ns1/gamma": 1,
},
},
"assume": {
Expand All @@ -1383,14 +1368,10 @@ func TestCacheQueueOperations(t *testing.T) {
return cache.AssumeWorkload(wl)
},
},
wantQueueCounts: map[string]map[string]int{
"foo": {
"ns1/alpha": 1,
"ns2/beta": 0,
},
"bar": {
"ns1/gamma": 0,
},
wantQueueCounts: map[string]int32{
"ns1/alpha": 1,
"ns2/beta": 0,
"ns1/gamma": 0,
},
},
"assume and forget": {
Expand All @@ -1408,14 +1389,10 @@ func TestCacheQueueOperations(t *testing.T) {
return cache.ForgetWorkload(wl)
},
},
wantQueueCounts: map[string]map[string]int{
"foo": {
"ns1/alpha": 0,
"ns2/beta": 0,
},
"bar": {
"ns1/gamma": 0,
},
wantQueueCounts: map[string]int32{
"ns1/alpha": 0,
"ns2/beta": 0,
"ns1/gamma": 0,
},
},
"delete workload": {
Expand All @@ -1427,14 +1404,10 @@ func TestCacheQueueOperations(t *testing.T) {
return cache.DeleteWorkload(workloads[0])
},
},
wantQueueCounts: map[string]map[string]int{
"foo": {
"ns1/alpha": 0,
"ns2/beta": 2,
},
"bar": {
"ns1/gamma": 1,
},
wantQueueCounts: map[string]int32{
"ns1/alpha": 0,
"ns2/beta": 2,
"ns1/gamma": 1,
},
},
"delete cq": {
Expand All @@ -1447,10 +1420,8 @@ func TestCacheQueueOperations(t *testing.T) {
return nil
},
},
wantQueueCounts: map[string]map[string]int{
"bar": {
"ns1/gamma": 1,
},
wantQueueCounts: map[string]int32{
"ns1/gamma": 1,
},
},
"delete queue": {
Expand All @@ -1463,13 +1434,9 @@ func TestCacheQueueOperations(t *testing.T) {
return nil
},
},
wantQueueCounts: map[string]map[string]int{
"foo": {
"ns2/beta": 2,
},
"bar": {
"ns1/gamma": 1,
},
wantQueueCounts: map[string]int32{
"ns2/beta": 2,
"ns1/gamma": 1,
},
},
// Not tested: changing a workload's queue and changing a queue's cluster queue.
Expand All @@ -1489,12 +1456,12 @@ func TestCacheQueueOperations(t *testing.T) {
t.Fatalf("Running op %d: %v", i, err)
}
}
qCounts := make(map[string]map[string]int)
for _, cq := range cache.clusterQueues {
qCounts[cq.Name] = cq.admittedWorkloadsPerQueue
}
if diff := cmp.Diff(tc.wantQueueCounts, qCounts); diff != "" {
t.Errorf("Wrong active workloads counters for queues (-want,+got):\n%s", diff)
for _, lq := range queues {
queueAdmitted := cache.AdmittedWorkloadsInLocalQueue(lq)
key := fmt.Sprintf("%s/%s", lq.Namespace, lq.Name)
if diff := cmp.Diff(tc.wantQueueCounts[key], queueAdmitted); diff != "" {
t.Errorf("Want %d but got %d for %s", tc.wantQueueCounts[key], queueAdmitted, key)
}
}
})
}
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/core/localqueue_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ func (r *LocalQueueReconciler) Reconcile(ctx context.Context, req ctrl.Request)
}

queueObj.Status.PendingWorkloads = pending
queueObj.Status.AdmittedWorkloads = r.cache.AdmittedWorkloadsInLocalQueue(&queueObj)
if !equality.Semantic.DeepEqual(oldStatus, queueObj.Status) {
err := r.client.Status().Update(ctx, &queueObj)
return ctrl.Result{}, client.IgnoreNotFound(err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ var _ = ginkgo.Describe("Queue controller", func() {
var updatedQueue kueue.LocalQueue
gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(queue), &updatedQueue)).To(gomega.Succeed())
return updatedQueue.Status
}, framework.Timeout, framework.Interval).Should(gomega.BeComparableTo(kueue.LocalQueueStatus{PendingWorkloads: 3}))
}, framework.Timeout, framework.Interval).Should(gomega.BeComparableTo(kueue.LocalQueueStatus{AdmittedWorkloads: 0, PendingWorkloads: 3}))

ginkgo.By("Admitting workloads")
for _, w := range workloads {
Expand All @@ -98,7 +98,7 @@ var _ = ginkgo.Describe("Queue controller", func() {
var updatedQueue kueue.LocalQueue
gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(queue), &updatedQueue)).To(gomega.Succeed())
return updatedQueue.Status
}, framework.Timeout, framework.Interval).Should(gomega.BeComparableTo(kueue.LocalQueueStatus{PendingWorkloads: 0}))
}, framework.Timeout, framework.Interval).Should(gomega.BeComparableTo(kueue.LocalQueueStatus{AdmittedWorkloads: 3, PendingWorkloads: 0}))

ginkgo.By("Finishing workloads")
framework.FinishWorkloads(ctx, k8sClient, workloads...)
Expand Down