From f694062432ac3c9284d83fd603ba0431887b1c19 Mon Sep 17 00:00:00 2001 From: Traian Schiau Date: Tue, 30 May 2023 22:05:47 +0300 Subject: [PATCH] Fix concurrent admission in cohort when borrowing (#805) * [scheduler/tests] Concurrent preemption within cohort * [scheduler] Fix over-admission in concurrent preemption --- pkg/cache/cache.go | 28 ++++ pkg/scheduler/scheduler.go | 20 ++- pkg/scheduler/scheduler_test.go | 120 +++++++++++++++++- test/integration/scheduler/preemption_test.go | 72 ++++++++++- test/util/util.go | 12 ++ 5 files changed, 238 insertions(+), 14 deletions(-) diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index ab0514bcf3..97d45a56d9 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -112,6 +112,15 @@ func newCohort(name string, size int) *Cohort { } } +func (c *Cohort) HasBorrowingQueues() bool { + for cq := range c.Members { + if cq.IsBorrowing() { + return true + } + } + return false +} + const ( pending = metrics.CQStatusPending active = metrics.CQStatusActive @@ -139,6 +148,25 @@ type ClusterQueue struct { podsReadyTracking bool } +func (cq *ClusterQueue) IsBorrowing() bool { + if cq.Cohort == nil || len(cq.Usage) == 0 { + return false + } + for _, rg := range cq.ResourceGroups { + for _, flvQuotas := range rg.Flavors { + if flvUsage, isUsing := cq.Usage[flvQuotas.Name]; isUsing { + for rName, rQuota := range flvQuotas.Resources { + used := flvUsage[rName] + if used > rQuota.Nominal { + return true + } + } + } + } + } + return false +} + type queue struct { key string admittedWorkloads int diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 83c3225621..320a4021e2 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -140,14 +140,20 @@ func (s *Scheduler) schedule(ctx context.Context) { continue } cq := snapshot.ClusterQueues[e.ClusterQueue] - if e.assignment.Borrows() && cq.Cohort != nil && usedCohorts.Has(cq.Cohort.Name) { - e.status = skipped - e.inadmissibleMsg = "workloads in the cohort that don't require borrowing were prioritized and admitted first" - continue - } - // Even if there was a failure, we shouldn't admit other workloads to this - // cohort. if cq.Cohort != nil { + // Having more than one workloads from the same cohort admitted in the same scheduling cycle can lead + // to over admission if: + // 1. One of the workloads is borrowing, since during the nomination the usage of the other workloads + // evaluated in the same cycle is not taken into account. + // 2. An already admitted workload from a different cluster queue is borrowing, since all workloads + // evaluated in the current cycle will compete for the resources that are not borrowed. + if usedCohorts.Has(cq.Cohort.Name) && (e.assignment.Borrows() || cq.Cohort.HasBorrowingQueues()) { + e.status = skipped + e.inadmissibleMsg = "other workloads in the cohort were prioritized" + continue + } + // Even if there was a failure, we shouldn't admit other workloads to this + // cohort. usedCohorts.Insert(cq.Cohort.Name) } log := log.WithValues("workload", klog.KObj(e.Obj), "clusterQueue", klog.KRef("", e.ClusterQueue)) diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 7ac8286703..86cbeabfdd 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -187,6 +187,10 @@ func TestSchedule(t *testing.T) { wantInadmissibleLeft map[string]sets.Set[string] // wantPreempted is the keys of the workloads that get preempted in the scheduling cycle. wantPreempted sets.Set[string] + + // additional*Queues can hold any extra queues needed by the tc + additionalClusterQueues []kueue.ClusterQueue + additionalLocalQueues []kueue.LocalQueue }{ "workload fits in single clusterQueue": { workloads: []kueue.Workload{ @@ -605,6 +609,112 @@ func TestSchedule(t *testing.T) { "flavor-nonexistent-cq": sets.New("sales/foo"), }, }, + "only one workload is admitted in a cohort while borrowing": { + workloads: []kueue.Workload{ + *utiltesting.MakeWorkload("new", "eng-beta"). + Queue("main"). + Creation(time.Now().Add(-time.Second)). + PodSets(*utiltesting.MakePodSet("one", 50). + Request(corev1.ResourceCPU, "1"). + Obj()). + Obj(), + *utiltesting.MakeWorkload("new-gamma", "eng-beta"). + Queue("gamma"). + Creation(time.Now()). + PodSets(*utiltesting.MakePodSet("one", 50). + Request(corev1.ResourceCPU, "1"). + Obj()). + Obj(), + *utiltesting.MakeWorkload("existing", "eng-alpha"). + PodSets( + *utiltesting.MakePodSet("borrow-on-demand", 51). + Request(corev1.ResourceCPU, "1"). + Obj(), + *utiltesting.MakePodSet("use-all-spot", 100). + Request(corev1.ResourceCPU, "1"). + Obj(), + ). + Admit(utiltesting.MakeAdmission("eng-alpha"). + PodSets( + kueue.PodSetAssignment{ + Name: "borrow-on-demand", + Flavors: map[corev1.ResourceName]kueue.ResourceFlavorReference{ + corev1.ResourceCPU: "on-demand", + }, + ResourceUsage: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("51"), + }, + Count: 51, + }, + kueue.PodSetAssignment{ + Name: "use-all-spot", + Flavors: map[corev1.ResourceName]kueue.ResourceFlavorReference{ + corev1.ResourceCPU: "spot", + }, + ResourceUsage: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("100"), + }, + Count: 100, + }, + ). + Obj()). + Obj(), + }, + additionalClusterQueues: []kueue.ClusterQueue{ + *utiltesting.MakeClusterQueue("eng-gamma"). + Cohort("eng"). + Preemption(kueue.ClusterQueuePreemption{ + ReclaimWithinCohort: kueue.PreemptionPolicyAny, + WithinClusterQueue: kueue.PreemptionPolicyLowerPriority, + }). + ResourceGroup( + *utiltesting.MakeFlavorQuotas("on-demand"). + Resource(corev1.ResourceCPU, "50", "10").Obj(), + ). + Obj(), + }, + additionalLocalQueues: []kueue.LocalQueue{ + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "eng-beta", + Name: "gamma", + }, + Spec: kueue.LocalQueueSpec{ + ClusterQueue: "eng-gamma", + }, + }, + }, + wantAssignments: map[string]kueue.Admission{ + "eng-alpha/existing": *utiltesting.MakeAdmission("eng-alpha"). + PodSets( + kueue.PodSetAssignment{ + Name: "borrow-on-demand", + Flavors: map[corev1.ResourceName]kueue.ResourceFlavorReference{ + corev1.ResourceCPU: "on-demand", + }, + ResourceUsage: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("51"), + }, + Count: 51, + }, + kueue.PodSetAssignment{ + Name: "use-all-spot", + Flavors: map[corev1.ResourceName]kueue.ResourceFlavorReference{ + corev1.ResourceCPU: "spot", + }, + ResourceUsage: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("100"), + }, + Count: 100, + }, + ).Obj(), + "eng-beta/new": *utiltesting.MakeAdmission("eng-beta", "one").Assignment(corev1.ResourceCPU, "on-demand", "50").AssignmentPodCount(50).Obj(), + }, + wantScheduled: []string{"eng-beta/new"}, + wantLeft: map[string]sets.Set[string]{ + "eng-gamma": sets.New("eng-beta/new-gamma"), + }, + }, } for name, tc := range cases { t.Run(name, func(t *testing.T) { @@ -613,8 +723,12 @@ func TestSchedule(t *testing.T) { }) ctx := ctrl.LoggerInto(context.Background(), log) scheme := runtime.NewScheme() + + allQueues := append(queues, tc.additionalLocalQueues...) + allClusterQueues := append(clusterQueues, tc.additionalClusterQueues...) + clientBuilder := utiltesting.NewClientBuilder(). - WithLists(&kueue.WorkloadList{Items: tc.workloads}, &kueue.LocalQueueList{Items: queues}). + WithLists(&kueue.WorkloadList{Items: tc.workloads}, &kueue.LocalQueueList{Items: allQueues}). WithObjects( &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "eng-alpha", Labels: map[string]string{"dep": "eng"}}}, &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "eng-beta", Labels: map[string]string{"dep": "eng"}}}, @@ -627,7 +741,7 @@ func TestSchedule(t *testing.T) { cqCache := cache.New(cl) qManager := queue.NewManager(cl, cqCache) // Workloads are loaded into queues or clusterQueues as we add them. - for _, q := range queues { + for _, q := range allQueues { if err := qManager.AddLocalQueue(ctx, &q); err != nil { t.Fatalf("Inserting queue %s/%s in manager: %v", q.Namespace, q.Name, err) } @@ -635,7 +749,7 @@ func TestSchedule(t *testing.T) { for i := range resourceFlavors { cqCache.AddOrUpdateResourceFlavor(resourceFlavors[i]) } - for _, cq := range clusterQueues { + for _, cq := range allClusterQueues { if err := cqCache.AddClusterQueue(ctx, &cq); err != nil { t.Fatalf("Inserting clusterQueue %s in cache: %v", cq.Name, err) } diff --git a/test/integration/scheduler/preemption_test.go b/test/integration/scheduler/preemption_test.go index 47d0d26666..0dfc96d373 100644 --- a/test/integration/scheduler/preemption_test.go +++ b/test/integration/scheduler/preemption_test.go @@ -185,8 +185,8 @@ var _ = ginkgo.Describe("Preemption", func() { ginkgo.Context("In a ClusterQueue that is part of a cohort", func() { var ( - alphaCQ, betaCQ *kueue.ClusterQueue - alphaQ, betaQ *kueue.LocalQueue + alphaCQ, betaCQ, gammaCQ *kueue.ClusterQueue + alphaQ, betaQ, gammaQ *kueue.LocalQueue ) ginkgo.BeforeEach(func() { @@ -209,12 +209,25 @@ var _ = ginkgo.Describe("Preemption", func() { gomega.Expect(k8sClient.Create(ctx, betaCQ)).To(gomega.Succeed()) betaQ = testing.MakeLocalQueue("beta-q", ns.Name).ClusterQueue(betaCQ.Name).Obj() gomega.Expect(k8sClient.Create(ctx, betaQ)).To(gomega.Succeed()) + + gammaCQ = testing.MakeClusterQueue("gamma-cq"). + Cohort("all"). + ResourceGroup(*testing.MakeFlavorQuotas("alpha").Resource(corev1.ResourceCPU, "2").Obj()). + Preemption(kueue.ClusterQueuePreemption{ + WithinClusterQueue: kueue.PreemptionPolicyLowerPriority, + ReclaimWithinCohort: kueue.PreemptionPolicyAny, + }). + Obj() + gomega.Expect(k8sClient.Create(ctx, gammaCQ)).To(gomega.Succeed()) + gammaQ = testing.MakeLocalQueue("gamma-q", ns.Name).ClusterQueue(gammaCQ.Name).Obj() + gomega.Expect(k8sClient.Create(ctx, gammaQ)).To(gomega.Succeed()) }) ginkgo.AfterEach(func() { gomega.Expect(util.DeleteWorkloadsInNamespace(ctx, k8sClient, ns)).To(gomega.Succeed()) util.ExpectClusterQueueToBeDeleted(ctx, k8sClient, alphaCQ, true) util.ExpectClusterQueueToBeDeleted(ctx, k8sClient, betaCQ, true) + util.ExpectClusterQueueToBeDeleted(ctx, k8sClient, gammaCQ, true) }) ginkgo.It("Should preempt Workloads in the cohort borrowing quota, when the ClusterQueue is using less than nominal quota", func() { @@ -236,7 +249,7 @@ var _ = ginkgo.Describe("Preemption", func() { betaHighWl := testing.MakeWorkload("beta-high", ns.Name). Queue(betaQ.Name). Priority(highPriority). - Request(corev1.ResourceCPU, "2"). + Request(corev1.ResourceCPU, "4"). Obj() gomega.Expect(k8sClient.Create(ctx, betaHighWl)).To(gomega.Succeed()) @@ -269,7 +282,7 @@ var _ = ginkgo.Describe("Preemption", func() { betaLowWl := testing.MakeWorkload("beta-low", ns.Name). Queue(betaQ.Name). Priority(lowPriority). - Request(corev1.ResourceCPU, "2"). + Request(corev1.ResourceCPU, "4"). Obj() gomega.Expect(k8sClient.Create(ctx, betaLowWl)).To(gomega.Succeed()) @@ -289,6 +302,57 @@ var _ = ginkgo.Describe("Preemption", func() { util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, alphaCQ.Name, alphaHighWl1) util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, betaCQ.Name, betaLowWl) }) + + ginkgo.It("Should preempt all necessary workloads in concurrent scheduling", func() { + ginkgo.By("Creating workloads in beta-cq that borrow quota") + + betaMidWl := testing.MakeWorkload("beta-mid", ns.Name). + Queue(betaQ.Name). + Priority(midPriority). + Request(corev1.ResourceCPU, "3"). + Obj() + gomega.Expect(k8sClient.Create(ctx, betaMidWl)).To(gomega.Succeed()) + betaHighWl := testing.MakeWorkload("beta-high", ns.Name). + Queue(betaQ.Name). + Priority(highPriority). + Request(corev1.ResourceCPU, "3"). + Obj() + gomega.Expect(k8sClient.Create(ctx, betaHighWl)).To(gomega.Succeed()) + + util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, betaCQ.Name, betaMidWl, betaHighWl) + + ginkgo.By("Creating workload in alpha-cq and gamma-cq that need to preempt") + alphaMidWl := testing.MakeWorkload("alpha-mid", ns.Name). + Queue(alphaQ.Name). + Priority(midPriority). + Request(corev1.ResourceCPU, "2"). + Obj() + + gammaMidWl := testing.MakeWorkload("gamma-mid", ns.Name). + Queue(gammaQ.Name). + Priority(midPriority). + Request(corev1.ResourceCPU, "2"). + Obj() + + gomega.Expect(k8sClient.Create(ctx, alphaMidWl)).To(gomega.Succeed()) + gomega.Expect(k8sClient.Create(ctx, gammaMidWl)).To(gomega.Succeed()) + + // since the two pending workloads are not aware of each other both of them + // will request the eviction of betaMidWl only + util.FinishEvictionForWorkloads(ctx, k8sClient, betaMidWl) + + // one of alpha-mid and gamma-mid should be admitted + gomega.Eventually(func() []*kueue.Workload { return util.FilterAdmittedWorkloads(ctx, k8sClient, alphaMidWl, gammaMidWl) }, util.Interval*4, util.Interval).Should(gomega.HaveLen(1)) + + // betaHighWl remains admitted + util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, betaCQ.Name, betaHighWl) + + // the last one should request the preemption of betaHighWl + util.FinishEvictionForWorkloads(ctx, k8sClient, betaHighWl) + + util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, alphaCQ.Name, alphaMidWl) + util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, gammaCQ.Name, gammaMidWl) + }) }) }) diff --git a/test/util/util.go b/test/util/util.go index 4d1e0efd16..31c5380c38 100644 --- a/test/util/util.go +++ b/test/util/util.go @@ -148,6 +148,18 @@ func ExpectWorkloadsToBeAdmitted(ctx context.Context, k8sClient client.Client, c }, Timeout, Interval).Should(gomega.Equal(len(wls)), "Not enough workloads were admitted") } +func FilterAdmittedWorkloads(ctx context.Context, k8sClient client.Client, wls ...*kueue.Workload) []*kueue.Workload { + ret := make([]*kueue.Workload, 0, len(wls)) + var updatedWorkload kueue.Workload + for _, wl := range wls { + err := k8sClient.Get(ctx, client.ObjectKeyFromObject(wl), &updatedWorkload) + if err == nil && workload.IsAdmitted(&updatedWorkload) { + ret = append(ret, wl) + } + } + return ret +} + func ExpectWorkloadsToBePending(ctx context.Context, k8sClient client.Client, wls ...*kueue.Workload) { gomega.EventuallyWithOffset(1, func() int { pending := 0