Skip to content

Commit

Permalink
Merge pull request kubernetes#126 from denkensk/use-queueStrategy
Browse files Browse the repository at this point in the history
Use QueueStrategy in ClusterQueue construct
  • Loading branch information
k8s-ci-robot committed Mar 18, 2022
2 parents 32a2d30 + eb12462 commit 31f7287
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 7 deletions.
7 changes: 6 additions & 1 deletion pkg/queue/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,12 @@ func (m *Manager) AddClusterQueue(ctx context.Context, cq *kueue.ClusterQueue) e
if _, ok := m.clusterQueues[cq.Name]; ok {
return errClusterQueueAlreadyExists
}
cqImpl := newClusterQueue(cq)

cqImpl, err := newClusterQueue(cq)
if err != nil {
return err
}

m.clusterQueues[cq.Name] = cqImpl

// Iterate through existing queues, as queues corresponding to this cluster
Expand Down
20 changes: 16 additions & 4 deletions pkg/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,24 @@ type ClusterQueue struct {
heap heapImpl
}

func newClusterQueue(cq *kueue.ClusterQueue) *ClusterQueue {
func newClusterQueue(cq *kueue.ClusterQueue) (*ClusterQueue, error) {
var less lessFunc

switch cq.Spec.QueueingStrategy {
case kueue.StrictFIFO:
less = strictFIFO
default:
return nil, fmt.Errorf("invalid QueueingStrategy %q", cq.Spec.QueueingStrategy)
}

cqImpl := &ClusterQueue{
heap: heapImpl{
less: creationFIFO,
less: less,
items: make(map[string]*heapItem),
},
}
cqImpl.update(cq)
return cqImpl
return cqImpl, nil
}

func (cq *ClusterQueue) update(apiCQ *kueue.ClusterQueue) {
Expand Down Expand Up @@ -140,7 +149,10 @@ func (cq *ClusterQueue) Pop() *workload.Info {
return &w
}

func creationFIFO(a, b workload.Info) bool {
// strictFIFO is the function used by the clusterQueue heap algorithm to sort
// workloads. It sorts workloads based on their priority.
// When priorities are equal, it uses workloads.creationTimestamp.
func strictFIFO(a, b workload.Info) bool {
p1 := utilpriority.Priority(a.Obj)
p2 := utilpriority.Priority(b.Obj)

Expand Down
10 changes: 8 additions & 2 deletions pkg/queue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,14 @@ const (
)

func TestFIFOClusterQueue(t *testing.T) {
q := newClusterQueue(&kueue.ClusterQueue{
q, err := newClusterQueue(&kueue.ClusterQueue{
Spec: kueue.ClusterQueueSpec{
QueueingStrategy: kueue.StrictFIFO,
},
})
if err != nil {
t.Fatalf("Failed creating ClusterQueue %v", err)
}
now := metav1.Now()
ws := []*kueue.QueuedWorkload{
{
Expand Down Expand Up @@ -166,11 +169,14 @@ func TestStrictFIFO(t *testing.T) {
},
} {
t.Run(tt.name, func(t *testing.T) {
q := newClusterQueue(&kueue.ClusterQueue{
q, err := newClusterQueue(&kueue.ClusterQueue{
Spec: kueue.ClusterQueueSpec{
QueueingStrategy: kueue.StrictFIFO,
},
})
if err != nil {
t.Fatalf("Failed creating ClusterQueue %v", err)
}

q.PushOrUpdate(tt.w1)
q.PushOrUpdate(tt.w2)
Expand Down
3 changes: 3 additions & 0 deletions pkg/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func TestSchedule(t *testing.T) {
},
},
},
QueueingStrategy: kueue.StrictFIFO,
RequestableResources: []kueue.Resource{
{
Name: corev1.ResourceCPU,
Expand Down Expand Up @@ -91,6 +92,7 @@ func TestSchedule(t *testing.T) {
},
},
},
QueueingStrategy: kueue.StrictFIFO,
RequestableResources: []kueue.Resource{
{
Name: corev1.ResourceCPU,
Expand Down Expand Up @@ -127,6 +129,7 @@ func TestSchedule(t *testing.T) {
},
},
},
QueueingStrategy: kueue.StrictFIFO,
RequestableResources: []kueue.Resource{
{
Name: corev1.ResourceCPU,
Expand Down
1 change: 1 addition & 0 deletions pkg/util/testing/wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ func MakeClusterQueue(name string) *ClusterQueueWrapper {
},
Spec: kueue.ClusterQueueSpec{
NamespaceSelector: &metav1.LabelSelector{},
QueueingStrategy: kueue.StrictFIFO,
},
}}
}
Expand Down

0 comments on commit 31f7287

Please sign in to comment.