Skip to content

Commit

Permalink
Fix concurrent admission in cohort when borrowing (#805)
Browse files Browse the repository at this point in the history
* [scheduler/tests] Concurrent preemption within cohort

* [scheduler] Fix over-admission in concurrent preemption
  • Loading branch information
trasc authored May 30, 2023
1 parent 4338816 commit f694062
Show file tree
Hide file tree
Showing 5 changed files with 238 additions and 14 deletions.
28 changes: 28 additions & 0 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,15 @@ func newCohort(name string, size int) *Cohort {
}
}

func (c *Cohort) HasBorrowingQueues() bool {
for cq := range c.Members {
if cq.IsBorrowing() {
return true
}
}
return false
}

const (
pending = metrics.CQStatusPending
active = metrics.CQStatusActive
Expand Down Expand Up @@ -139,6 +148,25 @@ type ClusterQueue struct {
podsReadyTracking bool
}

func (cq *ClusterQueue) IsBorrowing() bool {
if cq.Cohort == nil || len(cq.Usage) == 0 {
return false
}
for _, rg := range cq.ResourceGroups {
for _, flvQuotas := range rg.Flavors {
if flvUsage, isUsing := cq.Usage[flvQuotas.Name]; isUsing {
for rName, rQuota := range flvQuotas.Resources {
used := flvUsage[rName]
if used > rQuota.Nominal {
return true
}
}
}
}
}
return false
}

type queue struct {
key string
admittedWorkloads int
Expand Down
20 changes: 13 additions & 7 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,14 +140,20 @@ func (s *Scheduler) schedule(ctx context.Context) {
continue
}
cq := snapshot.ClusterQueues[e.ClusterQueue]
if e.assignment.Borrows() && cq.Cohort != nil && usedCohorts.Has(cq.Cohort.Name) {
e.status = skipped
e.inadmissibleMsg = "workloads in the cohort that don't require borrowing were prioritized and admitted first"
continue
}
// Even if there was a failure, we shouldn't admit other workloads to this
// cohort.
if cq.Cohort != nil {
// Having more than one workloads from the same cohort admitted in the same scheduling cycle can lead
// to over admission if:
// 1. One of the workloads is borrowing, since during the nomination the usage of the other workloads
// evaluated in the same cycle is not taken into account.
// 2. An already admitted workload from a different cluster queue is borrowing, since all workloads
// evaluated in the current cycle will compete for the resources that are not borrowed.
if usedCohorts.Has(cq.Cohort.Name) && (e.assignment.Borrows() || cq.Cohort.HasBorrowingQueues()) {
e.status = skipped
e.inadmissibleMsg = "other workloads in the cohort were prioritized"
continue
}
// Even if there was a failure, we shouldn't admit other workloads to this
// cohort.
usedCohorts.Insert(cq.Cohort.Name)
}
log := log.WithValues("workload", klog.KObj(e.Obj), "clusterQueue", klog.KRef("", e.ClusterQueue))
Expand Down
120 changes: 117 additions & 3 deletions pkg/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,10 @@ func TestSchedule(t *testing.T) {
wantInadmissibleLeft map[string]sets.Set[string]
// wantPreempted is the keys of the workloads that get preempted in the scheduling cycle.
wantPreempted sets.Set[string]

// additional*Queues can hold any extra queues needed by the tc
additionalClusterQueues []kueue.ClusterQueue
additionalLocalQueues []kueue.LocalQueue
}{
"workload fits in single clusterQueue": {
workloads: []kueue.Workload{
Expand Down Expand Up @@ -605,6 +609,112 @@ func TestSchedule(t *testing.T) {
"flavor-nonexistent-cq": sets.New("sales/foo"),
},
},
"only one workload is admitted in a cohort while borrowing": {
workloads: []kueue.Workload{
*utiltesting.MakeWorkload("new", "eng-beta").
Queue("main").
Creation(time.Now().Add(-time.Second)).
PodSets(*utiltesting.MakePodSet("one", 50).
Request(corev1.ResourceCPU, "1").
Obj()).
Obj(),
*utiltesting.MakeWorkload("new-gamma", "eng-beta").
Queue("gamma").
Creation(time.Now()).
PodSets(*utiltesting.MakePodSet("one", 50).
Request(corev1.ResourceCPU, "1").
Obj()).
Obj(),
*utiltesting.MakeWorkload("existing", "eng-alpha").
PodSets(
*utiltesting.MakePodSet("borrow-on-demand", 51).
Request(corev1.ResourceCPU, "1").
Obj(),
*utiltesting.MakePodSet("use-all-spot", 100).
Request(corev1.ResourceCPU, "1").
Obj(),
).
Admit(utiltesting.MakeAdmission("eng-alpha").
PodSets(
kueue.PodSetAssignment{
Name: "borrow-on-demand",
Flavors: map[corev1.ResourceName]kueue.ResourceFlavorReference{
corev1.ResourceCPU: "on-demand",
},
ResourceUsage: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("51"),
},
Count: 51,
},
kueue.PodSetAssignment{
Name: "use-all-spot",
Flavors: map[corev1.ResourceName]kueue.ResourceFlavorReference{
corev1.ResourceCPU: "spot",
},
ResourceUsage: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("100"),
},
Count: 100,
},
).
Obj()).
Obj(),
},
additionalClusterQueues: []kueue.ClusterQueue{
*utiltesting.MakeClusterQueue("eng-gamma").
Cohort("eng").
Preemption(kueue.ClusterQueuePreemption{
ReclaimWithinCohort: kueue.PreemptionPolicyAny,
WithinClusterQueue: kueue.PreemptionPolicyLowerPriority,
}).
ResourceGroup(
*utiltesting.MakeFlavorQuotas("on-demand").
Resource(corev1.ResourceCPU, "50", "10").Obj(),
).
Obj(),
},
additionalLocalQueues: []kueue.LocalQueue{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "eng-beta",
Name: "gamma",
},
Spec: kueue.LocalQueueSpec{
ClusterQueue: "eng-gamma",
},
},
},
wantAssignments: map[string]kueue.Admission{
"eng-alpha/existing": *utiltesting.MakeAdmission("eng-alpha").
PodSets(
kueue.PodSetAssignment{
Name: "borrow-on-demand",
Flavors: map[corev1.ResourceName]kueue.ResourceFlavorReference{
corev1.ResourceCPU: "on-demand",
},
ResourceUsage: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("51"),
},
Count: 51,
},
kueue.PodSetAssignment{
Name: "use-all-spot",
Flavors: map[corev1.ResourceName]kueue.ResourceFlavorReference{
corev1.ResourceCPU: "spot",
},
ResourceUsage: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("100"),
},
Count: 100,
},
).Obj(),
"eng-beta/new": *utiltesting.MakeAdmission("eng-beta", "one").Assignment(corev1.ResourceCPU, "on-demand", "50").AssignmentPodCount(50).Obj(),
},
wantScheduled: []string{"eng-beta/new"},
wantLeft: map[string]sets.Set[string]{
"eng-gamma": sets.New("eng-beta/new-gamma"),
},
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
Expand All @@ -613,8 +723,12 @@ func TestSchedule(t *testing.T) {
})
ctx := ctrl.LoggerInto(context.Background(), log)
scheme := runtime.NewScheme()

allQueues := append(queues, tc.additionalLocalQueues...)
allClusterQueues := append(clusterQueues, tc.additionalClusterQueues...)

clientBuilder := utiltesting.NewClientBuilder().
WithLists(&kueue.WorkloadList{Items: tc.workloads}, &kueue.LocalQueueList{Items: queues}).
WithLists(&kueue.WorkloadList{Items: tc.workloads}, &kueue.LocalQueueList{Items: allQueues}).
WithObjects(
&corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "eng-alpha", Labels: map[string]string{"dep": "eng"}}},
&corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "eng-beta", Labels: map[string]string{"dep": "eng"}}},
Expand All @@ -627,15 +741,15 @@ func TestSchedule(t *testing.T) {
cqCache := cache.New(cl)
qManager := queue.NewManager(cl, cqCache)
// Workloads are loaded into queues or clusterQueues as we add them.
for _, q := range queues {
for _, q := range allQueues {
if err := qManager.AddLocalQueue(ctx, &q); err != nil {
t.Fatalf("Inserting queue %s/%s in manager: %v", q.Namespace, q.Name, err)
}
}
for i := range resourceFlavors {
cqCache.AddOrUpdateResourceFlavor(resourceFlavors[i])
}
for _, cq := range clusterQueues {
for _, cq := range allClusterQueues {
if err := cqCache.AddClusterQueue(ctx, &cq); err != nil {
t.Fatalf("Inserting clusterQueue %s in cache: %v", cq.Name, err)
}
Expand Down
72 changes: 68 additions & 4 deletions test/integration/scheduler/preemption_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,8 @@ var _ = ginkgo.Describe("Preemption", func() {

ginkgo.Context("In a ClusterQueue that is part of a cohort", func() {
var (
alphaCQ, betaCQ *kueue.ClusterQueue
alphaQ, betaQ *kueue.LocalQueue
alphaCQ, betaCQ, gammaCQ *kueue.ClusterQueue
alphaQ, betaQ, gammaQ *kueue.LocalQueue
)

ginkgo.BeforeEach(func() {
Expand All @@ -209,12 +209,25 @@ var _ = ginkgo.Describe("Preemption", func() {
gomega.Expect(k8sClient.Create(ctx, betaCQ)).To(gomega.Succeed())
betaQ = testing.MakeLocalQueue("beta-q", ns.Name).ClusterQueue(betaCQ.Name).Obj()
gomega.Expect(k8sClient.Create(ctx, betaQ)).To(gomega.Succeed())

gammaCQ = testing.MakeClusterQueue("gamma-cq").
Cohort("all").
ResourceGroup(*testing.MakeFlavorQuotas("alpha").Resource(corev1.ResourceCPU, "2").Obj()).
Preemption(kueue.ClusterQueuePreemption{
WithinClusterQueue: kueue.PreemptionPolicyLowerPriority,
ReclaimWithinCohort: kueue.PreemptionPolicyAny,
}).
Obj()
gomega.Expect(k8sClient.Create(ctx, gammaCQ)).To(gomega.Succeed())
gammaQ = testing.MakeLocalQueue("gamma-q", ns.Name).ClusterQueue(gammaCQ.Name).Obj()
gomega.Expect(k8sClient.Create(ctx, gammaQ)).To(gomega.Succeed())
})

ginkgo.AfterEach(func() {
gomega.Expect(util.DeleteWorkloadsInNamespace(ctx, k8sClient, ns)).To(gomega.Succeed())
util.ExpectClusterQueueToBeDeleted(ctx, k8sClient, alphaCQ, true)
util.ExpectClusterQueueToBeDeleted(ctx, k8sClient, betaCQ, true)
util.ExpectClusterQueueToBeDeleted(ctx, k8sClient, gammaCQ, true)
})

ginkgo.It("Should preempt Workloads in the cohort borrowing quota, when the ClusterQueue is using less than nominal quota", func() {
Expand All @@ -236,7 +249,7 @@ var _ = ginkgo.Describe("Preemption", func() {
betaHighWl := testing.MakeWorkload("beta-high", ns.Name).
Queue(betaQ.Name).
Priority(highPriority).
Request(corev1.ResourceCPU, "2").
Request(corev1.ResourceCPU, "4").
Obj()
gomega.Expect(k8sClient.Create(ctx, betaHighWl)).To(gomega.Succeed())

Expand Down Expand Up @@ -269,7 +282,7 @@ var _ = ginkgo.Describe("Preemption", func() {
betaLowWl := testing.MakeWorkload("beta-low", ns.Name).
Queue(betaQ.Name).
Priority(lowPriority).
Request(corev1.ResourceCPU, "2").
Request(corev1.ResourceCPU, "4").
Obj()
gomega.Expect(k8sClient.Create(ctx, betaLowWl)).To(gomega.Succeed())

Expand All @@ -289,6 +302,57 @@ var _ = ginkgo.Describe("Preemption", func() {
util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, alphaCQ.Name, alphaHighWl1)
util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, betaCQ.Name, betaLowWl)
})

ginkgo.It("Should preempt all necessary workloads in concurrent scheduling", func() {
ginkgo.By("Creating workloads in beta-cq that borrow quota")

betaMidWl := testing.MakeWorkload("beta-mid", ns.Name).
Queue(betaQ.Name).
Priority(midPriority).
Request(corev1.ResourceCPU, "3").
Obj()
gomega.Expect(k8sClient.Create(ctx, betaMidWl)).To(gomega.Succeed())
betaHighWl := testing.MakeWorkload("beta-high", ns.Name).
Queue(betaQ.Name).
Priority(highPriority).
Request(corev1.ResourceCPU, "3").
Obj()
gomega.Expect(k8sClient.Create(ctx, betaHighWl)).To(gomega.Succeed())

util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, betaCQ.Name, betaMidWl, betaHighWl)

ginkgo.By("Creating workload in alpha-cq and gamma-cq that need to preempt")
alphaMidWl := testing.MakeWorkload("alpha-mid", ns.Name).
Queue(alphaQ.Name).
Priority(midPriority).
Request(corev1.ResourceCPU, "2").
Obj()

gammaMidWl := testing.MakeWorkload("gamma-mid", ns.Name).
Queue(gammaQ.Name).
Priority(midPriority).
Request(corev1.ResourceCPU, "2").
Obj()

gomega.Expect(k8sClient.Create(ctx, alphaMidWl)).To(gomega.Succeed())
gomega.Expect(k8sClient.Create(ctx, gammaMidWl)).To(gomega.Succeed())

// since the two pending workloads are not aware of each other both of them
// will request the eviction of betaMidWl only
util.FinishEvictionForWorkloads(ctx, k8sClient, betaMidWl)

// one of alpha-mid and gamma-mid should be admitted
gomega.Eventually(func() []*kueue.Workload { return util.FilterAdmittedWorkloads(ctx, k8sClient, alphaMidWl, gammaMidWl) }, util.Interval*4, util.Interval).Should(gomega.HaveLen(1))

// betaHighWl remains admitted
util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, betaCQ.Name, betaHighWl)

// the last one should request the preemption of betaHighWl
util.FinishEvictionForWorkloads(ctx, k8sClient, betaHighWl)

util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, alphaCQ.Name, alphaMidWl)
util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, gammaCQ.Name, gammaMidWl)
})
})

})
12 changes: 12 additions & 0 deletions test/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,18 @@ func ExpectWorkloadsToBeAdmitted(ctx context.Context, k8sClient client.Client, c
}, Timeout, Interval).Should(gomega.Equal(len(wls)), "Not enough workloads were admitted")
}

func FilterAdmittedWorkloads(ctx context.Context, k8sClient client.Client, wls ...*kueue.Workload) []*kueue.Workload {
ret := make([]*kueue.Workload, 0, len(wls))
var updatedWorkload kueue.Workload
for _, wl := range wls {
err := k8sClient.Get(ctx, client.ObjectKeyFromObject(wl), &updatedWorkload)
if err == nil && workload.IsAdmitted(&updatedWorkload) {
ret = append(ret, wl)
}
}
return ret
}

func ExpectWorkloadsToBePending(ctx context.Context, k8sClient client.Client, wls ...*kueue.Workload) {
gomega.EventuallyWithOffset(1, func() int {
pending := 0
Expand Down

0 comments on commit f694062

Please sign in to comment.