Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix concurrent admission in cohort when borrowing #805

Merged
merged 2 commits into from
May 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,15 @@ func newCohort(name string, size int) *Cohort {
}
}

func (c *Cohort) HasBorrowingQueues() bool {
for cq := range c.Members {
if cq.IsBorrowing() {
return true
}
}
return false
}

const (
pending = metrics.CQStatusPending
active = metrics.CQStatusActive
Expand Down Expand Up @@ -139,6 +148,25 @@ type ClusterQueue struct {
podsReadyTracking bool
}

func (cq *ClusterQueue) IsBorrowing() bool {
if cq.Cohort == nil || len(cq.Usage) == 0 {
return false
}
for _, rg := range cq.ResourceGroups {
for _, flvQuotas := range rg.Flavors {
if flvUsage, isUsing := cq.Usage[flvQuotas.Name]; isUsing {
for rName, rQuota := range flvQuotas.Resources {
used := flvUsage[rName]
if used > rQuota.Nominal {
return true
}
}
}
}
}
return false
}

type queue struct {
key string
admittedWorkloads int
Expand Down
20 changes: 13 additions & 7 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,14 +140,20 @@ func (s *Scheduler) schedule(ctx context.Context) {
continue
}
cq := snapshot.ClusterQueues[e.ClusterQueue]
if e.assignment.Borrows() && cq.Cohort != nil && usedCohorts.Has(cq.Cohort.Name) {
e.status = skipped
e.inadmissibleMsg = "workloads in the cohort that don't require borrowing were prioritized and admitted first"
continue
}
// Even if there was a failure, we shouldn't admit other workloads to this
// cohort.
if cq.Cohort != nil {
// Having more than one workloads from the same cohort admitted in the same scheduling cycle can lead
// to over admission if:
// 1. One of the workloads is borrowing, since during the nomination the usage of the other workloads
// evaluated in the same cycle is not taken into account.
// 2. An already admitted workload from a different cluster queue is borrowing, since all workloads
// evaluated in the current cycle will compete for the resources that are not borrowed.
if usedCohorts.Has(cq.Cohort.Name) && (e.assignment.Borrows() || cq.Cohort.HasBorrowingQueues()) {
trasc marked this conversation as resolved.
Show resolved Hide resolved
e.status = skipped
e.inadmissibleMsg = "other workloads in the cohort were prioritized"
continue
}
// Even if there was a failure, we shouldn't admit other workloads to this
// cohort.
usedCohorts.Insert(cq.Cohort.Name)
}
log := log.WithValues("workload", klog.KObj(e.Obj), "clusterQueue", klog.KRef("", e.ClusterQueue))
Expand Down
120 changes: 117 additions & 3 deletions pkg/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,10 @@ func TestSchedule(t *testing.T) {
wantInadmissibleLeft map[string]sets.Set[string]
// wantPreempted is the keys of the workloads that get preempted in the scheduling cycle.
wantPreempted sets.Set[string]

// additional*Queues can hold any extra queues needed by the tc
additionalClusterQueues []kueue.ClusterQueue
additionalLocalQueues []kueue.LocalQueue
}{
"workload fits in single clusterQueue": {
workloads: []kueue.Workload{
Expand Down Expand Up @@ -605,6 +609,112 @@ func TestSchedule(t *testing.T) {
"flavor-nonexistent-cq": sets.New("sales/foo"),
},
},
"only one workload is admitted in a cohort while borrowing": {
workloads: []kueue.Workload{
*utiltesting.MakeWorkload("new", "eng-beta").
Queue("main").
Creation(time.Now().Add(-time.Second)).
PodSets(*utiltesting.MakePodSet("one", 50).
Request(corev1.ResourceCPU, "1").
Obj()).
Obj(),
*utiltesting.MakeWorkload("new-gamma", "eng-beta").
Queue("gamma").
Creation(time.Now()).
PodSets(*utiltesting.MakePodSet("one", 50).
Request(corev1.ResourceCPU, "1").
Obj()).
Obj(),
*utiltesting.MakeWorkload("existing", "eng-alpha").
PodSets(
*utiltesting.MakePodSet("borrow-on-demand", 51).
Request(corev1.ResourceCPU, "1").
Obj(),
*utiltesting.MakePodSet("use-all-spot", 100).
Request(corev1.ResourceCPU, "1").
Obj(),
).
Admit(utiltesting.MakeAdmission("eng-alpha").
PodSets(
kueue.PodSetAssignment{
Name: "borrow-on-demand",
Flavors: map[corev1.ResourceName]kueue.ResourceFlavorReference{
corev1.ResourceCPU: "on-demand",
},
ResourceUsage: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("51"),
},
Count: 51,
},
kueue.PodSetAssignment{
Name: "use-all-spot",
Flavors: map[corev1.ResourceName]kueue.ResourceFlavorReference{
corev1.ResourceCPU: "spot",
},
ResourceUsage: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("100"),
},
Count: 100,
},
).
Obj()).
Obj(),
},
additionalClusterQueues: []kueue.ClusterQueue{
*utiltesting.MakeClusterQueue("eng-gamma").
Cohort("eng").
Preemption(kueue.ClusterQueuePreemption{
ReclaimWithinCohort: kueue.PreemptionPolicyAny,
WithinClusterQueue: kueue.PreemptionPolicyLowerPriority,
}).
ResourceGroup(
*utiltesting.MakeFlavorQuotas("on-demand").
Resource(corev1.ResourceCPU, "50", "10").Obj(),
).
Obj(),
},
additionalLocalQueues: []kueue.LocalQueue{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "eng-beta",
Name: "gamma",
},
Spec: kueue.LocalQueueSpec{
ClusterQueue: "eng-gamma",
},
},
},
wantAssignments: map[string]kueue.Admission{
"eng-alpha/existing": *utiltesting.MakeAdmission("eng-alpha").
PodSets(
kueue.PodSetAssignment{
Name: "borrow-on-demand",
Flavors: map[corev1.ResourceName]kueue.ResourceFlavorReference{
corev1.ResourceCPU: "on-demand",
},
ResourceUsage: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("51"),
},
Count: 51,
},
kueue.PodSetAssignment{
Name: "use-all-spot",
Flavors: map[corev1.ResourceName]kueue.ResourceFlavorReference{
corev1.ResourceCPU: "spot",
},
ResourceUsage: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("100"),
},
Count: 100,
},
).Obj(),
"eng-beta/new": *utiltesting.MakeAdmission("eng-beta", "one").Assignment(corev1.ResourceCPU, "on-demand", "50").AssignmentPodCount(50).Obj(),
},
wantScheduled: []string{"eng-beta/new"},
wantLeft: map[string]sets.Set[string]{
"eng-gamma": sets.New("eng-beta/new-gamma"),
},
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
Expand All @@ -613,8 +723,12 @@ func TestSchedule(t *testing.T) {
})
ctx := ctrl.LoggerInto(context.Background(), log)
scheme := runtime.NewScheme()

allQueues := append(queues, tc.additionalLocalQueues...)
allClusterQueues := append(clusterQueues, tc.additionalClusterQueues...)

clientBuilder := utiltesting.NewClientBuilder().
WithLists(&kueue.WorkloadList{Items: tc.workloads}, &kueue.LocalQueueList{Items: queues}).
WithLists(&kueue.WorkloadList{Items: tc.workloads}, &kueue.LocalQueueList{Items: allQueues}).
WithObjects(
&corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "eng-alpha", Labels: map[string]string{"dep": "eng"}}},
&corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "eng-beta", Labels: map[string]string{"dep": "eng"}}},
Expand All @@ -627,15 +741,15 @@ func TestSchedule(t *testing.T) {
cqCache := cache.New(cl)
qManager := queue.NewManager(cl, cqCache)
// Workloads are loaded into queues or clusterQueues as we add them.
for _, q := range queues {
for _, q := range allQueues {
if err := qManager.AddLocalQueue(ctx, &q); err != nil {
t.Fatalf("Inserting queue %s/%s in manager: %v", q.Namespace, q.Name, err)
}
}
for i := range resourceFlavors {
cqCache.AddOrUpdateResourceFlavor(resourceFlavors[i])
}
for _, cq := range clusterQueues {
for _, cq := range allClusterQueues {
if err := cqCache.AddClusterQueue(ctx, &cq); err != nil {
t.Fatalf("Inserting clusterQueue %s in cache: %v", cq.Name, err)
}
Expand Down
72 changes: 68 additions & 4 deletions test/integration/scheduler/preemption_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,8 @@ var _ = ginkgo.Describe("Preemption", func() {

ginkgo.Context("In a ClusterQueue that is part of a cohort", func() {
trasc marked this conversation as resolved.
Show resolved Hide resolved
var (
alphaCQ, betaCQ *kueue.ClusterQueue
alphaQ, betaQ *kueue.LocalQueue
alphaCQ, betaCQ, gammaCQ *kueue.ClusterQueue
alphaQ, betaQ, gammaQ *kueue.LocalQueue
)

ginkgo.BeforeEach(func() {
Expand All @@ -209,12 +209,25 @@ var _ = ginkgo.Describe("Preemption", func() {
gomega.Expect(k8sClient.Create(ctx, betaCQ)).To(gomega.Succeed())
betaQ = testing.MakeLocalQueue("beta-q", ns.Name).ClusterQueue(betaCQ.Name).Obj()
gomega.Expect(k8sClient.Create(ctx, betaQ)).To(gomega.Succeed())

gammaCQ = testing.MakeClusterQueue("gamma-cq").
Cohort("all").
ResourceGroup(*testing.MakeFlavorQuotas("alpha").Resource(corev1.ResourceCPU, "2").Obj()).
Preemption(kueue.ClusterQueuePreemption{
WithinClusterQueue: kueue.PreemptionPolicyLowerPriority,
ReclaimWithinCohort: kueue.PreemptionPolicyAny,
}).
Obj()
gomega.Expect(k8sClient.Create(ctx, gammaCQ)).To(gomega.Succeed())
gammaQ = testing.MakeLocalQueue("gamma-q", ns.Name).ClusterQueue(gammaCQ.Name).Obj()
gomega.Expect(k8sClient.Create(ctx, gammaQ)).To(gomega.Succeed())
})

ginkgo.AfterEach(func() {
gomega.Expect(util.DeleteWorkloadsInNamespace(ctx, k8sClient, ns)).To(gomega.Succeed())
util.ExpectClusterQueueToBeDeleted(ctx, k8sClient, alphaCQ, true)
util.ExpectClusterQueueToBeDeleted(ctx, k8sClient, betaCQ, true)
util.ExpectClusterQueueToBeDeleted(ctx, k8sClient, gammaCQ, true)
})

ginkgo.It("Should preempt Workloads in the cohort borrowing quota, when the ClusterQueue is using less than nominal quota", func() {
Expand All @@ -236,7 +249,7 @@ var _ = ginkgo.Describe("Preemption", func() {
betaHighWl := testing.MakeWorkload("beta-high", ns.Name).
Queue(betaQ.Name).
Priority(highPriority).
Request(corev1.ResourceCPU, "2").
Request(corev1.ResourceCPU, "4").
Obj()
gomega.Expect(k8sClient.Create(ctx, betaHighWl)).To(gomega.Succeed())

Expand Down Expand Up @@ -269,7 +282,7 @@ var _ = ginkgo.Describe("Preemption", func() {
betaLowWl := testing.MakeWorkload("beta-low", ns.Name).
Queue(betaQ.Name).
Priority(lowPriority).
Request(corev1.ResourceCPU, "2").
Request(corev1.ResourceCPU, "4").
Obj()
gomega.Expect(k8sClient.Create(ctx, betaLowWl)).To(gomega.Succeed())

Expand All @@ -289,6 +302,57 @@ var _ = ginkgo.Describe("Preemption", func() {
util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, alphaCQ.Name, alphaHighWl1)
util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, betaCQ.Name, betaLowWl)
})

ginkgo.It("Should preempt all necessary workloads in concurrent scheduling", func() {
ginkgo.By("Creating workloads in beta-cq that borrow quota")

betaMidWl := testing.MakeWorkload("beta-mid", ns.Name).
Queue(betaQ.Name).
Priority(midPriority).
Request(corev1.ResourceCPU, "3").
Obj()
gomega.Expect(k8sClient.Create(ctx, betaMidWl)).To(gomega.Succeed())
betaHighWl := testing.MakeWorkload("beta-high", ns.Name).
Queue(betaQ.Name).
Priority(highPriority).
Request(corev1.ResourceCPU, "3").
Obj()
gomega.Expect(k8sClient.Create(ctx, betaHighWl)).To(gomega.Succeed())

util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, betaCQ.Name, betaMidWl, betaHighWl)

ginkgo.By("Creating workload in alpha-cq and gamma-cq that need to preempt")
alphaMidWl := testing.MakeWorkload("alpha-mid", ns.Name).
Queue(alphaQ.Name).
Priority(midPriority).
Request(corev1.ResourceCPU, "2").
Obj()

gammaMidWl := testing.MakeWorkload("gamma-mid", ns.Name).
Queue(gammaQ.Name).
Priority(midPriority).
Request(corev1.ResourceCPU, "2").
Obj()

gomega.Expect(k8sClient.Create(ctx, alphaMidWl)).To(gomega.Succeed())
gomega.Expect(k8sClient.Create(ctx, gammaMidWl)).To(gomega.Succeed())

// since the two pending workloads are not aware of each other both of them
// will request the eviction of betaMidWl only
util.FinishEvictionForWorkloads(ctx, k8sClient, betaMidWl)

// one of alpha-mid and gamma-mid should be admitted
gomega.Eventually(func() []*kueue.Workload { return util.FilterAdmittedWorkloads(ctx, k8sClient, alphaMidWl, gammaMidWl) }, util.Interval*4, util.Interval).Should(gomega.HaveLen(1))

// betaHighWl remains admitted
util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, betaCQ.Name, betaHighWl)

// the last one should request the preemption of betaHighWl
util.FinishEvictionForWorkloads(ctx, k8sClient, betaHighWl)

util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, alphaCQ.Name, alphaMidWl)
util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, gammaCQ.Name, gammaMidWl)
})
})

})
12 changes: 12 additions & 0 deletions test/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,18 @@ func ExpectWorkloadsToBeAdmitted(ctx context.Context, k8sClient client.Client, c
}, Timeout, Interval).Should(gomega.Equal(len(wls)), "Not enough workloads were admitted")
}

func FilterAdmittedWorkloads(ctx context.Context, k8sClient client.Client, wls ...*kueue.Workload) []*kueue.Workload {
ret := make([]*kueue.Workload, 0, len(wls))
var updatedWorkload kueue.Workload
for _, wl := range wls {
err := k8sClient.Get(ctx, client.ObjectKeyFromObject(wl), &updatedWorkload)
if err == nil && workload.IsAdmitted(&updatedWorkload) {
ret = append(ret, wl)
}
}
return ret
}

func ExpectWorkloadsToBePending(ctx context.Context, k8sClient client.Client, wls ...*kueue.Workload) {
gomega.EventuallyWithOffset(1, func() int {
pending := 0
Expand Down