diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index f83015f48c..ce4ab739c7 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -111,6 +111,15 @@ func newCohort(name string, size int) *Cohort { } } +func (c *Cohort) HasBorrowingQueues() bool { + for cq := range c.Members { + if cq.IsBorrowing() { + return true + } + } + return false +} + const ( pending = metrics.CQStatusPending active = metrics.CQStatusActive @@ -137,6 +146,25 @@ type ClusterQueue struct { podsReadyTracking bool } +func (cq *ClusterQueue) IsBorrowing() bool { + if cq.Cohort == nil || len(cq.Usage) == 0 { + return false + } + for _, rg := range cq.ResourceGroups { + for _, flvQuotas := range rg.Flavors { + if flvUsage, isUsing := cq.Usage[flvQuotas.Name]; isUsing { + for rName, rQuota := range flvQuotas.Resources { + used := flvUsage[rName] + if used > rQuota.Nominal { + return true + } + } + } + } + } + return false +} + type ResourceGroup struct { CoveredResources sets.Set[corev1.ResourceName] Flavors []FlavorQuotas diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 983fcaf903..0805733867 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -149,14 +149,20 @@ func (s *Scheduler) schedule(ctx context.Context) { continue } cq := snapshot.ClusterQueues[e.ClusterQueue] - if e.assignment.Borrows() && cq.Cohort != nil && usedCohorts.Has(cq.Cohort.Name) { - e.status = skipped - e.inadmissibleMsg = "workloads in the cohort that don't require borrowing were prioritized and admitted first" - continue - } - // Even if there was a failure, we shouldn't admit other workloads to this - // cohort. if cq.Cohort != nil { + // Having more than one workloads from the same cohort admitted in the same scheduling cycle can lead + // to over admission if: + // 1. One of the workloads is borrowing, since during the nomination the usage of the other workloads + // evaluated in the same cycle is not taken into account. + // 2. An already admitted workload from a different cluster queue is borrowing, since all workloads + // evaluated in the current cycle will compete for the resources that are not borrowed. + if usedCohorts.Has(cq.Cohort.Name) && (e.assignment.Borrows() || cq.Cohort.HasBorrowingQueues()) { + e.status = skipped + e.inadmissibleMsg = "other workloads in the cohort were prioritized" + continue + } + // Even if there was a failure, we shouldn't admit other workloads to this + // cohort. usedCohorts.Insert(cq.Cohort.Name) } log := log.WithValues("workload", klog.KObj(e.Obj), "clusterQueue", klog.KRef("", e.ClusterQueue)) diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 19c915935c..1d1d9867b6 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -187,6 +187,10 @@ func TestSchedule(t *testing.T) { wantInadmissibleLeft map[string]sets.Set[string] // wantPreempted is the keys of the workloads that get preempted in the scheduling cycle. wantPreempted sets.Set[string] + + // additional*Queues can hold any extra queues needed by the tc + additionalClusterQueues []kueue.ClusterQueue + additionalLocalQueues []kueue.LocalQueue }{ "workload fits in single clusterQueue": { workloads: []kueue.Workload{ @@ -594,6 +598,108 @@ func TestSchedule(t *testing.T) { "flavor-nonexistent-cq": sets.New("sales/foo"), }, }, + "only one workload is admitted in a cohort while borrowing": { + workloads: []kueue.Workload{ + *utiltesting.MakeWorkload("new", "eng-beta"). + Queue("main"). + Creation(time.Now().Add(-time.Second)). + PodSets(*utiltesting.MakePodSet("one", 50). + Request(corev1.ResourceCPU, "1"). + Obj()). + Obj(), + *utiltesting.MakeWorkload("new-gamma", "eng-beta"). + Queue("gamma"). + Creation(time.Now()). + PodSets(*utiltesting.MakePodSet("one", 50). + Request(corev1.ResourceCPU, "1"). + Obj()). + Obj(), + *utiltesting.MakeWorkload("existing", "eng-alpha"). + PodSets( + *utiltesting.MakePodSet("borrow-on-demand", 51). + Request(corev1.ResourceCPU, "1"). + Obj(), + *utiltesting.MakePodSet("use-all-spot", 100). + Request(corev1.ResourceCPU, "1"). + Obj(), + ). + Admit(utiltesting.MakeAdmission("eng-alpha"). + PodSets( + kueue.PodSetAssignment{ + Name: "borrow-on-demand", + Flavors: map[corev1.ResourceName]kueue.ResourceFlavorReference{ + corev1.ResourceCPU: "on-demand", + }, + ResourceUsage: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("51"), + }, + }, + kueue.PodSetAssignment{ + Name: "use-all-spot", + Flavors: map[corev1.ResourceName]kueue.ResourceFlavorReference{ + corev1.ResourceCPU: "spot", + }, + ResourceUsage: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("100"), + }, + }, + ). + Obj()). + Obj(), + }, + additionalClusterQueues: []kueue.ClusterQueue{ + *utiltesting.MakeClusterQueue("eng-gamma"). + Cohort("eng"). + Preemption(kueue.ClusterQueuePreemption{ + ReclaimWithinCohort: kueue.PreemptionPolicyAny, + WithinClusterQueue: kueue.PreemptionPolicyLowerPriority, + }). + ResourceGroup( + *utiltesting.MakeFlavorQuotas("on-demand"). + Resource(corev1.ResourceCPU, "50", "10").Obj(), + ). + Obj(), + }, + additionalLocalQueues: []kueue.LocalQueue{ + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "eng-beta", + Name: "gamma", + }, + Spec: kueue.LocalQueueSpec{ + ClusterQueue: "eng-gamma", + }, + }, + }, + wantAssignments: map[string]kueue.Admission{ + "eng-alpha/existing": *utiltesting.MakeAdmission("eng-alpha"). + PodSets( + kueue.PodSetAssignment{ + Name: "borrow-on-demand", + Flavors: map[corev1.ResourceName]kueue.ResourceFlavorReference{ + corev1.ResourceCPU: "on-demand", + }, + ResourceUsage: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("51"), + }, + }, + kueue.PodSetAssignment{ + Name: "use-all-spot", + Flavors: map[corev1.ResourceName]kueue.ResourceFlavorReference{ + corev1.ResourceCPU: "spot", + }, + ResourceUsage: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("100"), + }, + }, + ).Obj(), + "eng-beta/new": *utiltesting.MakeAdmission("eng-beta", "one").Assignment(corev1.ResourceCPU, "on-demand", "50").Obj(), + }, + wantScheduled: []string{"eng-beta/new"}, + wantLeft: map[string]sets.Set[string]{ + "eng-gamma": sets.New("eng-beta/new-gamma"), + }, + }, } for name, tc := range cases { t.Run(name, func(t *testing.T) { @@ -602,8 +708,12 @@ func TestSchedule(t *testing.T) { }) ctx := ctrl.LoggerInto(context.Background(), log) scheme := runtime.NewScheme() + + allQueues := append(queues, tc.additionalLocalQueues...) + allClusterQueues := append(clusterQueues, tc.additionalClusterQueues...) + clientBuilder := utiltesting.NewClientBuilder(). - WithLists(&kueue.WorkloadList{Items: tc.workloads}, &kueue.LocalQueueList{Items: queues}). + WithLists(&kueue.WorkloadList{Items: tc.workloads}, &kueue.LocalQueueList{Items: allQueues}). WithObjects( &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "eng-alpha", Labels: map[string]string{"dep": "eng"}}}, &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "eng-beta", Labels: map[string]string{"dep": "eng"}}}, @@ -616,7 +726,7 @@ func TestSchedule(t *testing.T) { cqCache := cache.New(cl) qManager := queue.NewManager(cl, cqCache) // Workloads are loaded into queues or clusterQueues as we add them. - for _, q := range queues { + for _, q := range allQueues { if err := qManager.AddLocalQueue(ctx, &q); err != nil { t.Fatalf("Inserting queue %s/%s in manager: %v", q.Namespace, q.Name, err) } @@ -624,7 +734,7 @@ func TestSchedule(t *testing.T) { for i := range resourceFlavors { cqCache.AddOrUpdateResourceFlavor(resourceFlavors[i]) } - for _, cq := range clusterQueues { + for _, cq := range allClusterQueues { if err := cqCache.AddClusterQueue(ctx, &cq); err != nil { t.Fatalf("Inserting clusterQueue %s in cache: %v", cq.Name, err) } diff --git a/test/integration/scheduler/preemption_test.go b/test/integration/scheduler/preemption_test.go index fbe6367825..6cf4ffd4dd 100644 --- a/test/integration/scheduler/preemption_test.go +++ b/test/integration/scheduler/preemption_test.go @@ -133,8 +133,8 @@ var _ = ginkgo.Describe("Preemption", func() { ginkgo.Context("In a ClusterQueue that is part of a cohort", func() { var ( - alphaCQ, betaCQ *kueue.ClusterQueue - alphaQ, betaQ *kueue.LocalQueue + alphaCQ, betaCQ, gammaCQ *kueue.ClusterQueue + alphaQ, betaQ, gammaQ *kueue.LocalQueue ) ginkgo.BeforeEach(func() { @@ -157,12 +157,25 @@ var _ = ginkgo.Describe("Preemption", func() { gomega.Expect(k8sClient.Create(ctx, betaCQ)).To(gomega.Succeed()) betaQ = testing.MakeLocalQueue("beta-q", ns.Name).ClusterQueue(betaCQ.Name).Obj() gomega.Expect(k8sClient.Create(ctx, betaQ)).To(gomega.Succeed()) + + gammaCQ = testing.MakeClusterQueue("gamma-cq"). + Cohort("all"). + ResourceGroup(*testing.MakeFlavorQuotas("alpha").Resource(corev1.ResourceCPU, "2").Obj()). + Preemption(kueue.ClusterQueuePreemption{ + WithinClusterQueue: kueue.PreemptionPolicyLowerPriority, + ReclaimWithinCohort: kueue.PreemptionPolicyAny, + }). + Obj() + gomega.Expect(k8sClient.Create(ctx, gammaCQ)).To(gomega.Succeed()) + gammaQ = testing.MakeLocalQueue("gamma-q", ns.Name).ClusterQueue(gammaCQ.Name).Obj() + gomega.Expect(k8sClient.Create(ctx, gammaQ)).To(gomega.Succeed()) }) ginkgo.AfterEach(func() { gomega.Expect(util.DeleteWorkloadsInNamespace(ctx, k8sClient, ns)).To(gomega.Succeed()) util.ExpectClusterQueueToBeDeleted(ctx, k8sClient, alphaCQ, true) util.ExpectClusterQueueToBeDeleted(ctx, k8sClient, betaCQ, true) + util.ExpectClusterQueueToBeDeleted(ctx, k8sClient, gammaCQ, true) }) ginkgo.It("Should preempt Workloads in the cohort borrowing quota, when the ClusterQueue is using less than nominal quota", func() { @@ -184,7 +197,7 @@ var _ = ginkgo.Describe("Preemption", func() { betaHighWl := testing.MakeWorkload("beta-high", ns.Name). Queue(betaQ.Name). Priority(highPriority). - Request(corev1.ResourceCPU, "2"). + Request(corev1.ResourceCPU, "4"). Obj() gomega.Expect(k8sClient.Create(ctx, betaHighWl)).To(gomega.Succeed()) @@ -215,7 +228,7 @@ var _ = ginkgo.Describe("Preemption", func() { betaLowWl := testing.MakeWorkload("beta-low", ns.Name). Queue(betaQ.Name). Priority(lowPriority). - Request(corev1.ResourceCPU, "2"). + Request(corev1.ResourceCPU, "4"). Obj() gomega.Expect(k8sClient.Create(ctx, betaLowWl)).To(gomega.Succeed()) @@ -235,6 +248,44 @@ var _ = ginkgo.Describe("Preemption", func() { util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, alphaCQ.Name, alphaHighWl1) util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, betaCQ.Name, betaLowWl) }) + + ginkgo.It("Should preempt all necessary workloads in concurrent scheduling", func() { + ginkgo.By("Creating workloads in beta-cq that borrow quota") + + betaMidWl := testing.MakeWorkload("beta-mid", ns.Name). + Queue(betaQ.Name). + Priority(midPriority). + Request(corev1.ResourceCPU, "3"). + Obj() + gomega.Expect(k8sClient.Create(ctx, betaMidWl)).To(gomega.Succeed()) + betaHighWl := testing.MakeWorkload("beta-high", ns.Name). + Queue(betaQ.Name). + Priority(highPriority). + Request(corev1.ResourceCPU, "3"). + Obj() + gomega.Expect(k8sClient.Create(ctx, betaHighWl)).To(gomega.Succeed()) + + util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, betaCQ.Name, betaMidWl, betaHighWl) + + ginkgo.By("Creating workload in alpha-cq and gamma-cq that need to preempt") + alphaMidWl := testing.MakeWorkload("alpha-mid", ns.Name). + Queue(alphaQ.Name). + Priority(midPriority). + Request(corev1.ResourceCPU, "2"). + Obj() + + gammaMidWl := testing.MakeWorkload("gamma-mid", ns.Name). + Queue(gammaQ.Name). + Priority(midPriority). + Request(corev1.ResourceCPU, "2"). + Obj() + + gomega.Expect(k8sClient.Create(ctx, alphaMidWl)).To(gomega.Succeed()) + gomega.Expect(k8sClient.Create(ctx, gammaMidWl)).To(gomega.Succeed()) + + util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, alphaCQ.Name, alphaMidWl) + util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, gammaCQ.Name, gammaMidWl) + }) }) })