From 5bca510784da3c1ef9c21f0d0a759c8f30aeb2a4 Mon Sep 17 00:00:00 2001 From: vladikkuzn <51460778+vladikkuzn@users.noreply.github.com> Date: Wed, 10 Jul 2024 18:28:56 +0300 Subject: [PATCH] Distinguish between Preemption due to reclamation and fair sharing (#2411) * Distinguish between Preemption due to reclamation and fair sharing * Distinguish between Preemption due to reclamation and fair sharing * extract getPreemptionReason * borrowing in favour of reclamation * Distinguish between Preemption due to reclamation and fair sharing * Rewrite to Targets * Cover CohortFairSharing * Distinguish between Preemption due to reclamation and fair sharing * Dedicated GetWorkloadReferences function * Extract humanReadablePreemptionReasons * !strategy && belowThreshold -> InCohortReclaimWhileBorrowingReason * ExpectPreemptedCondition test util * Distinguish between Preemption due to reclamation and fair sharing * Rename WL -> WorkloadInfo * Move getWorkloadReferences to logging * Distinguish between Preemption due to reclamation and fair sharing * Use slice in getWorkloadReferences --- pkg/scheduler/logging.go | 9 +- pkg/scheduler/preemption/preemption.go | 132 ++++++++++++------ pkg/scheduler/preemption/preemption_test.go | 99 ++++++++----- pkg/scheduler/scheduler.go | 8 +- test/integration/scheduler/preemption_test.go | 28 +--- test/util/util.go | 14 ++ 6 files changed, 185 insertions(+), 105 deletions(-) diff --git a/pkg/scheduler/logging.go b/pkg/scheduler/logging.go index 5895bac0f3..dda61e0c5b 100644 --- a/pkg/scheduler/logging.go +++ b/pkg/scheduler/logging.go @@ -21,7 +21,8 @@ import ( "k8s.io/klog/v2" "sigs.k8s.io/kueue/pkg/cache" - "sigs.k8s.io/kueue/pkg/workload" + "sigs.k8s.io/kueue/pkg/scheduler/preemption" + "sigs.k8s.io/kueue/pkg/util/slices" ) func logAdmissionAttemptIfVerbose(log logr.Logger, e *entry) { @@ -37,7 +38,7 @@ func logAdmissionAttemptIfVerbose(log logr.Logger, e *entry) { } if log.V(4).Enabled() { args = append(args, "nominatedAssignment", e.assignment) - args = append(args, "preempted", workload.References(e.preemptionTargets)) + args = append(args, "preempted", getWorkloadReferences(e.preemptionTargets)) } logV.Info("Workload evaluated for admission", args...) } @@ -47,3 +48,7 @@ func logSnapshotIfVerbose(log logr.Logger, s *cache.Snapshot) { s.Log(logV) } } + +func getWorkloadReferences(targets []*preemption.Target) []klog.ObjectRef { + return slices.Map(targets, func(t **preemption.Target) klog.ObjectRef { return klog.KObj((*t).WorkloadInfo.Obj) }) +} diff --git a/pkg/scheduler/preemption/preemption.go b/pkg/scheduler/preemption/preemption.go index 887e65810c..4e4fa3fe26 100644 --- a/pkg/scheduler/preemption/preemption.go +++ b/pkg/scheduler/preemption/preemption.go @@ -96,8 +96,13 @@ func candidatesFromCQOrUnderThreshold(candidates []*workload.Info, clusterQueue return result } +type Target struct { + WorkloadInfo *workload.Info + Reason string +} + // GetTargets returns the list of workloads that should be evicted in order to make room for wl. -func (p *Preemptor) GetTargets(log logr.Logger, wl workload.Info, assignment flavorassigner.Assignment, snapshot *cache.Snapshot) []*workload.Info { +func (p *Preemptor) GetTargets(log logr.Logger, wl workload.Info, assignment flavorassigner.Assignment, snapshot *cache.Snapshot) []*Target { resPerFlv := resourcesRequiringPreemption(assignment) cq := snapshot.ClusterQueues[wl.ClusterQueue] @@ -164,8 +169,27 @@ func canBorrowWithinCohort(cq *cache.ClusterQueueSnapshot, wl *kueue.Workload) ( return true, &threshold } +// In cluster queue preemption reasons +const ( + InClusterQueueReason string = "InClusterQueue" +) + +// In cohort preemption reasons +const ( + InCohortReclamationReason string = "InCohortReclamation" + InCohortFairSharingReason string = "InCohortFairSharing" + InCohortReclaimWhileBorrowingReason string = "InCohortReclaimWhileBorrowing" +) + +var HumanReadablePreemptionReasons = map[string]string{ + InClusterQueueReason: "prioritization in the ClusterQueue", + InCohortReclamationReason: "reclamation within the cohort", + InCohortFairSharingReason: "fair sharing within the cohort", + InCohortReclaimWhileBorrowingReason: "reclamation within the cohort while borrowing", +} + // IssuePreemptions marks the target workloads as evicted. -func (p *Preemptor) IssuePreemptions(ctx context.Context, preemptor *workload.Info, targets []*workload.Info, cq *cache.ClusterQueueSnapshot) (int, error) { +func (p *Preemptor) IssuePreemptions(ctx context.Context, preemptor *workload.Info, targets []*Target, cq *cache.ClusterQueueSnapshot) (int, error) { log := ctrl.LoggerFrom(ctx) errCh := routine.NewErrorChannel() ctx, cancel := context.WithCancel(ctx) @@ -173,26 +197,19 @@ func (p *Preemptor) IssuePreemptions(ctx context.Context, preemptor *workload.In defer cancel() workqueue.ParallelizeUntil(ctx, parallelPreemptions, len(targets), func(i int) { target := targets[i] - if !meta.IsStatusConditionTrue(target.Obj.Status.Conditions, kueue.WorkloadEvicted) { - origin := "ClusterQueue" - reason := "InClusterQueue" - if cq.Name != target.ClusterQueue { - origin = "cohort" - reason = "InCohort" - } - - message := fmt.Sprintf("Preempted to accommodate a workload (UID: %s) in the %s", preemptor.Obj.UID, origin) - err := p.applyPreemption(ctx, target.Obj, reason, message) + if !meta.IsStatusConditionTrue(target.WorkloadInfo.Obj.Status.Conditions, kueue.WorkloadEvicted) { + message := fmt.Sprintf("Preempted to accommodate a workload (UID: %s) due to %s", preemptor.Obj.UID, HumanReadablePreemptionReasons[target.Reason]) + err := p.applyPreemption(ctx, target.WorkloadInfo.Obj, target.Reason, message) if err != nil { errCh.SendErrorWithCancel(err, cancel) return } - log.V(3).Info("Preempted", "targetWorkload", klog.KObj(target.Obj), "reason", reason, "message", message) - p.recorder.Eventf(target.Obj, corev1.EventTypeNormal, "Preempted", message) - metrics.ReportEvictedWorkloads(target.ClusterQueue, kueue.WorkloadEvictedByPreemption) + log.V(3).Info("Preempted", "targetWorkload", klog.KObj(target.WorkloadInfo.Obj), "reason", target.Reason, "message", message) + p.recorder.Eventf(target.WorkloadInfo.Obj, corev1.EventTypeNormal, "Preempted", message) + metrics.ReportEvictedWorkloads(target.WorkloadInfo.ClusterQueue, kueue.WorkloadEvictedByPreemption) } else { - log.V(3).Info("Preemption ongoing", "targetWorkload", klog.KObj(target.Obj)) + log.V(3).Info("Preemption ongoing", "targetWorkload", klog.KObj(target.WorkloadInfo.Obj)) } atomic.AddInt64(&successfullyPreempted, 1) }) @@ -214,35 +231,45 @@ func (p *Preemptor) applyPreemptionWithSSA(ctx context.Context, w *kueue.Workloa // Once the Workload fits, the heuristic tries to add Workloads back, in the // reverse order in which they were removed, while the incoming Workload still // fits. -func minimalPreemptions(log logr.Logger, wlReq resources.FlavorResourceQuantitiesFlat, cq *cache.ClusterQueueSnapshot, snapshot *cache.Snapshot, resPerFlv resourcesPerFlavor, candidates []*workload.Info, allowBorrowing bool, allowBorrowingBelowPriority *int32) []*workload.Info { +func minimalPreemptions(log logr.Logger, wlReq resources.FlavorResourceQuantitiesFlat, cq *cache.ClusterQueueSnapshot, snapshot *cache.Snapshot, resPerFlv resourcesPerFlavor, candidates []*workload.Info, allowBorrowing bool, allowBorrowingBelowPriority *int32) []*Target { if logV := log.V(5); logV.Enabled() { logV.Info("Simulating preemption", "candidates", workload.References(candidates), "resourcesRequiringPreemption", resPerFlv, "allowBorrowing", allowBorrowing, "allowBorrowingBelowPriority", allowBorrowingBelowPriority) } // Simulate removing all candidates from the ClusterQueue and cohort. - var targets []*workload.Info + var targets []*Target fits := false for _, candWl := range candidates { candCQ := snapshot.ClusterQueues[candWl.ClusterQueue] - if cq != candCQ && !cqIsBorrowing(candCQ, resPerFlv) { + sameCq := cq == candCQ + reason := InClusterQueueReason + if !sameCq && !cqIsBorrowing(candCQ, resPerFlv) { continue } - if cq != candCQ && allowBorrowingBelowPriority != nil && priority.Priority(candWl.Obj) >= *allowBorrowingBelowPriority { - // We set allowBorrowing=false if there is a candidate with priority - // exceeding allowBorrowingBelowPriority added to targets. - // - // We need to be careful mutating allowBorrowing. We rely on the - // fact that once there is a candidate exceeding the priority added - // to targets, then at least one such candidate is present in the - // final set of targets (after the second phase of the function). - // - // This is true, because the candidates are ordered according - // to priorities (from lowest to highest, using candidatesOrdering), - // and the last added target is not removed in the second phase of - // the function. - allowBorrowing = false + if !sameCq { + reason = InCohortReclamationReason + if allowBorrowingBelowPriority != nil && priority.Priority(candWl.Obj) >= *allowBorrowingBelowPriority { + // We set allowBorrowing=false if there is a candidate with priority + // exceeding allowBorrowingBelowPriority added to targets. + // + // We need to be careful mutating allowBorrowing. We rely on the + // fact that once there is a candidate exceeding the priority added + // to targets, then at least one such candidate is present in the + // final set of targets (after the second phase of the function). + // + // This is true, because the candidates are ordered according + // to priorities (from lowest to highest, using candidatesOrdering), + // and the last added target is not removed in the second phase of + // the function. + allowBorrowing = false + } else { + reason = InCohortReclaimWhileBorrowingReason + } } snapshot.RemoveWorkload(candWl) - targets = append(targets, candWl) + targets = append(targets, &Target{ + WorkloadInfo: candWl, + Reason: reason, + }) if workloadFits(wlReq, cq, allowBorrowing) { fits = true break @@ -257,24 +284,24 @@ func minimalPreemptions(log logr.Logger, wlReq resources.FlavorResourceQuantitie return targets } -func fillBackWorkloads(targets []*workload.Info, wlReq resources.FlavorResourceQuantitiesFlat, cq *cache.ClusterQueueSnapshot, snapshot *cache.Snapshot, allowBorrowing bool) []*workload.Info { +func fillBackWorkloads(targets []*Target, wlReq resources.FlavorResourceQuantitiesFlat, cq *cache.ClusterQueueSnapshot, snapshot *cache.Snapshot, allowBorrowing bool) []*Target { // In the reverse order, check if any of the workloads can be added back. for i := len(targets) - 2; i >= 0; i-- { - snapshot.AddWorkload(targets[i]) + snapshot.AddWorkload(targets[i].WorkloadInfo) if workloadFits(wlReq, cq, allowBorrowing) { // O(1) deletion: copy the last element into index i and reduce size. targets[i] = targets[len(targets)-1] targets = targets[:len(targets)-1] } else { - snapshot.RemoveWorkload(targets[i]) + snapshot.RemoveWorkload(targets[i].WorkloadInfo) } } return targets } -func restoreSnapshot(snapshot *cache.Snapshot, targets []*workload.Info) { +func restoreSnapshot(snapshot *cache.Snapshot, targets []*Target) { for _, t := range targets { - snapshot.AddWorkload(t) + snapshot.AddWorkload(t.WorkloadInfo) } } @@ -309,7 +336,7 @@ func parseStrategies(s []config.PreemptionStrategy) []fsStrategy { return strategies } -func (p *Preemptor) fairPreemptions(log logr.Logger, wl *workload.Info, assignment flavorassigner.Assignment, snapshot *cache.Snapshot, resPerFlv resourcesPerFlavor, candidates []*workload.Info, allowBorrowingBelowPriority *int32) []*workload.Info { +func (p *Preemptor) fairPreemptions(log logr.Logger, wl *workload.Info, assignment flavorassigner.Assignment, snapshot *cache.Snapshot, resPerFlv resourcesPerFlavor, candidates []*workload.Info, allowBorrowingBelowPriority *int32) []*Target { if logV := log.V(5); logV.Enabled() { logV.Info("Simulating fair preemption", "candidates", workload.References(candidates), "resourcesRequiringPreemption", resPerFlv, "allowBorrowingBelowPriority", allowBorrowingBelowPriority) } @@ -317,7 +344,7 @@ func (p *Preemptor) fairPreemptions(log logr.Logger, wl *workload.Info, assignme nominatedCQ := snapshot.ClusterQueues[wl.ClusterQueue] wlReq := assignment.TotalRequestsFor(wl) newNominatedShareValue, _ := nominatedCQ.DominantResourceShareWith(wlReq) - var targets []*workload.Info + var targets []*Target fits := false var retryCandidates []*workload.Info for cqHeap.Len() > 0 && !fits { @@ -326,7 +353,10 @@ func (p *Preemptor) fairPreemptions(log logr.Logger, wl *workload.Info, assignme if candCQ.cq == nominatedCQ { candWl := candCQ.workloads[0] snapshot.RemoveWorkload(candWl) - targets = append(targets, candWl) + targets = append(targets, &Target{ + WorkloadInfo: candWl, + Reason: InClusterQueueReason, + }) if workloadFits(wlReq, nominatedCQ, true) { fits = true break @@ -343,9 +373,18 @@ func (p *Preemptor) fairPreemptions(log logr.Logger, wl *workload.Info, assignme for i, candWl := range candCQ.workloads { belowThreshold := allowBorrowingBelowPriority != nil && priority.Priority(candWl.Obj) < *allowBorrowingBelowPriority newCandShareVal, _ := candCQ.cq.DominantResourceShareWithout(candWl.FlavorResourceUsage()) - if belowThreshold || p.fsStrategies[0](newNominatedShareValue, candCQ.share, newCandShareVal) { + strategy := p.fsStrategies[0](newNominatedShareValue, candCQ.share, newCandShareVal) + if belowThreshold || strategy { snapshot.RemoveWorkload(candWl) - targets = append(targets, candWl) + reason := InCohortFairSharingReason + if !strategy { + reason = InCohortReclaimWhileBorrowingReason + } + + targets = append(targets, &Target{ + WorkloadInfo: candWl, + Reason: reason, + }) if workloadFits(wlReq, nominatedCQ, true) { fits = true break @@ -374,7 +413,10 @@ func (p *Preemptor) fairPreemptions(log logr.Logger, wl *workload.Info, assignme // The criteria doesn't depend on the preempted workload, so just preempt the first candidate. candWl := candCQ.workloads[0] snapshot.RemoveWorkload(candWl) - targets = append(targets, candWl) + targets = append(targets, &Target{ + WorkloadInfo: candWl, + Reason: InCohortFairSharingReason, + }) if workloadFits(wlReq, nominatedCQ, true) { fits = true } diff --git a/pkg/scheduler/preemption/preemption_test.go b/pkg/scheduler/preemption/preemption_test.go index 31eaa44722..c8f9e98639 100644 --- a/pkg/scheduler/preemption/preemption_test.go +++ b/pkg/scheduler/preemption/preemption_test.go @@ -18,6 +18,7 @@ package preemption import ( "context" + "fmt" "sort" "sync" "testing" @@ -1440,6 +1441,9 @@ func TestPreemption(t *testing.T) { gotPreempted := sets.New[string]() broadcaster := record.NewBroadcaster() scheme := runtime.NewScheme() + if err := kueue.AddToScheme(scheme); err != nil { + t.Fatalf("Failed adding kueue scheme: %v", err) + } recorder := broadcaster.NewRecorder(scheme, corev1.EventSource{Component: constants.AdmissionName}) preemptor := New(cl, workload.Ordering{}, recorder, config.FairSharing{}) preemptor.applyPreemption = func(ctx context.Context, w *kueue.Workload, _, _ string) error { @@ -1548,7 +1552,7 @@ func TestFairPreemptions(t *testing.T) { }, incoming: unitWl.Clone().Name("c_incoming").Obj(), targetCQ: "c", - wantPreempted: sets.New("/b1"), + wantPreempted: sets.New(targetKeyReasonScope("/b1", InCohortFairSharingReason)), }, "can reclaim from queue using less, if taking the latest workload from user using the most isn't enough": { clusterQueues: baseCQs, @@ -1560,7 +1564,7 @@ func TestFairPreemptions(t *testing.T) { }, incoming: utiltesting.MakeWorkload("c_incoming", "").Request(corev1.ResourceCPU, "3").SimpleReserveQuota("a", "default", now).Obj(), targetCQ: "c", - wantPreempted: sets.New("/a1"), // attempts to preempt b1, but it's not enough. + wantPreempted: sets.New(targetKeyReasonScope("/a1", InCohortFairSharingReason)), // attempts to preempt b1, but it's not enough. }, "reclaim borrowable quota from user using the most": { clusterQueues: baseCQs, @@ -1577,7 +1581,7 @@ func TestFairPreemptions(t *testing.T) { }, incoming: unitWl.Clone().Name("a_incoming").Obj(), targetCQ: "a", - wantPreempted: sets.New("/b1"), + wantPreempted: sets.New(targetKeyReasonScope("/b1", InCohortFairSharingReason)), }, "preempt one from each CQ borrowing": { clusterQueues: baseCQs, @@ -1589,9 +1593,12 @@ func TestFairPreemptions(t *testing.T) { *utiltesting.MakeWorkload("b2", "").Request(corev1.ResourceCPU, "0.5").SimpleReserveQuota("b", "default", now).Obj(), *utiltesting.MakeWorkload("b3", "").Request(corev1.ResourceCPU, "3").SimpleReserveQuota("b", "default", now).Obj(), }, - incoming: utiltesting.MakeWorkload("c_incoming", "").Request(corev1.ResourceCPU, "2").Obj(), - targetCQ: "c", - wantPreempted: sets.New("/a1", "/b1"), + incoming: utiltesting.MakeWorkload("c_incoming", "").Request(corev1.ResourceCPU, "2").Obj(), + targetCQ: "c", + wantPreempted: sets.New( + targetKeyReasonScope("/a1", InCohortFairSharingReason), + targetKeyReasonScope("/b1", InCohortFairSharingReason), + ), }, "can't preempt when everyone under nominal": { clusterQueues: baseCQs, @@ -1637,9 +1644,12 @@ func TestFairPreemptions(t *testing.T) { *unitWl.Clone().Name("b4").SimpleReserveQuota("b", "default", now).Obj(), *unitWl.Clone().Name("b5").SimpleReserveQuota("b", "default", now).Obj(), }, - incoming: utiltesting.MakeWorkload("a_incoming", "").Request(corev1.ResourceCPU, "2").Obj(), - targetCQ: "a", - wantPreempted: sets.New("/a1_low", "/a2_low"), + incoming: utiltesting.MakeWorkload("a_incoming", "").Request(corev1.ResourceCPU, "2").Obj(), + targetCQ: "a", + wantPreempted: sets.New( + targetKeyReasonScope("/a1_low", InClusterQueueReason), + targetKeyReasonScope("/a2_low", InClusterQueueReason), + ), }, "can preempt a combination of same CQ and highest user": { clusterQueues: baseCQs, @@ -1654,9 +1664,12 @@ func TestFairPreemptions(t *testing.T) { *unitWl.Clone().Name("b5").SimpleReserveQuota("b", "default", now).Obj(), *unitWl.Clone().Name("b6").SimpleReserveQuota("b", "default", now).Obj(), }, - incoming: utiltesting.MakeWorkload("a_incoming", "").Request(corev1.ResourceCPU, "2").Obj(), - targetCQ: "a", - wantPreempted: sets.New("/a_low", "/b1"), + incoming: utiltesting.MakeWorkload("a_incoming", "").Request(corev1.ResourceCPU, "2").Obj(), + targetCQ: "a", + wantPreempted: sets.New( + targetKeyReasonScope("/a_low", InClusterQueueReason), + targetKeyReasonScope("/b1", InCohortFairSharingReason), + ), }, "preempt huge workload if there is no other option, as long as the target CQ gets a lower share": { clusterQueues: baseCQs, @@ -1665,7 +1678,7 @@ func TestFairPreemptions(t *testing.T) { }, incoming: utiltesting.MakeWorkload("a_incoming", "").Request(corev1.ResourceCPU, "2").Obj(), targetCQ: "a", - wantPreempted: sets.New("/b1"), + wantPreempted: sets.New(targetKeyReasonScope("/b1", InCohortFairSharingReason)), }, "can't preempt huge workload if the incoming is also huge": { clusterQueues: baseCQs, @@ -1694,9 +1707,12 @@ func TestFairPreemptions(t *testing.T) { *utiltesting.MakeWorkload("b1", "").Request(corev1.ResourceCPU, "3").SimpleReserveQuota("b", "default", now).Obj(), *utiltesting.MakeWorkload("b2", "").Request(corev1.ResourceCPU, "3").SimpleReserveQuota("b", "default", now).Obj(), }, - incoming: utiltesting.MakeWorkload("a_incoming", "").Request(corev1.ResourceCPU, "4").Obj(), - targetCQ: "a", - wantPreempted: sets.New("/a1_low", "/b1"), + incoming: utiltesting.MakeWorkload("a_incoming", "").Request(corev1.ResourceCPU, "4").Obj(), + targetCQ: "a", + wantPreempted: sets.New( + targetKeyReasonScope("/a1_low", InClusterQueueReason), + targetKeyReasonScope("/b1", InCohortFairSharingReason), + ), }, "prefer to preempt workloads that don't make the target CQ have the biggest share": { clusterQueues: baseCQs, @@ -1709,7 +1725,7 @@ func TestFairPreemptions(t *testing.T) { incoming: utiltesting.MakeWorkload("a_incoming", "").Request(corev1.ResourceCPU, "3.5").Obj(), targetCQ: "a", // It would have been possible to preempt "/b1" under rule S2-b, but S2-a was possible first. - wantPreempted: sets.New("/b2"), + wantPreempted: sets.New(targetKeyReasonScope("/b2", InCohortFairSharingReason)), }, "preempt from different cluster queues if the end result has a smaller max share": { clusterQueues: baseCQs, @@ -1719,9 +1735,12 @@ func TestFairPreemptions(t *testing.T) { *utiltesting.MakeWorkload("c1", "").Request(corev1.ResourceCPU, "2").SimpleReserveQuota("c", "default", now).Obj(), *utiltesting.MakeWorkload("c2", "").Request(corev1.ResourceCPU, "2.5").SimpleReserveQuota("c", "default", now).Obj(), }, - incoming: utiltesting.MakeWorkload("a_incoming", "").Request(corev1.ResourceCPU, "3.5").Obj(), - targetCQ: "a", - wantPreempted: sets.New("/b1", "/c1"), + incoming: utiltesting.MakeWorkload("a_incoming", "").Request(corev1.ResourceCPU, "3.5").Obj(), + targetCQ: "a", + wantPreempted: sets.New( + targetKeyReasonScope("/b1", InCohortFairSharingReason), + targetKeyReasonScope("/c1", InCohortFairSharingReason), + ), }, "scenario above does not flap": { clusterQueues: baseCQs, @@ -1756,9 +1775,12 @@ func TestFairPreemptions(t *testing.T) { *unitWl.Clone().Name("preemptible2").Priority(-3).SimpleReserveQuota("preemptible", "default", now).Obj(), *unitWl.Clone().Name("preemptible3").Priority(-3).SimpleReserveQuota("preemptible", "default", now).Obj(), }, - incoming: utiltesting.MakeWorkload("a_incoming", "").Request(corev1.ResourceCPU, "2").Obj(), - targetCQ: "a", - wantPreempted: sets.New("/preemptible1", "/preemptible2"), + incoming: utiltesting.MakeWorkload("a_incoming", "").Request(corev1.ResourceCPU, "2").Obj(), + targetCQ: "a", + wantPreempted: sets.New( + targetKeyReasonScope("/preemptible1", InCohortFairSharingReason), + targetKeyReasonScope("/preemptible2", InCohortReclaimWhileBorrowingReason), + ), }, "preempt lower priority first, even if big": { clusterQueues: baseCQs, @@ -1770,7 +1792,7 @@ func TestFairPreemptions(t *testing.T) { }, incoming: utiltesting.MakeWorkload("a_incoming", "").Request(corev1.ResourceCPU, "1").Obj(), targetCQ: "a", - wantPreempted: sets.New("/b_low"), + wantPreempted: sets.New(targetKeyReasonScope("/b_low", InCohortFairSharingReason)), }, "preempt workload that doesn't transfer the imbalance, even if high priority": { clusterQueues: baseCQs, @@ -1782,7 +1804,7 @@ func TestFairPreemptions(t *testing.T) { }, incoming: utiltesting.MakeWorkload("a_incoming", "").Request(corev1.ResourceCPU, "1").Obj(), targetCQ: "a", - wantPreempted: sets.New("/b_high"), + wantPreempted: sets.New(targetKeyReasonScope("/b_high", InCohortFairSharingReason)), }, "CQ with higher weight can preempt more": { clusterQueues: []*kueue.ClusterQueue{ @@ -1826,9 +1848,12 @@ func TestFairPreemptions(t *testing.T) { *unitWl.Clone().Name("b5").SimpleReserveQuota("b", "default", now).Obj(), *unitWl.Clone().Name("b6").SimpleReserveQuota("b", "default", now).Obj(), }, - incoming: utiltesting.MakeWorkload("a_incoming", "").Request(corev1.ResourceCPU, "2").Obj(), - targetCQ: "a", - wantPreempted: sets.New("/b1", "/b2"), + incoming: utiltesting.MakeWorkload("a_incoming", "").Request(corev1.ResourceCPU, "2").Obj(), + targetCQ: "a", + wantPreempted: sets.New( + targetKeyReasonScope("/b1", InCohortFairSharingReason), + targetKeyReasonScope("/b2", InCohortFairSharingReason), + ), }, "can preempt anything borrowing from CQ with 0 weight": { clusterQueues: []*kueue.ClusterQueue{ @@ -1872,9 +1897,13 @@ func TestFairPreemptions(t *testing.T) { *unitWl.Clone().Name("b5").SimpleReserveQuota("b", "default", now).Obj(), *unitWl.Clone().Name("b6").SimpleReserveQuota("b", "default", now).Obj(), }, - incoming: utiltesting.MakeWorkload("a_incoming", "").Request(corev1.ResourceCPU, "3").Obj(), - targetCQ: "a", - wantPreempted: sets.New("/b1", "/b2", "/b3"), + incoming: utiltesting.MakeWorkload("a_incoming", "").Request(corev1.ResourceCPU, "3").Obj(), + targetCQ: "a", + wantPreempted: sets.New( + targetKeyReasonScope("/b1", InCohortFairSharingReason), + targetKeyReasonScope("/b2", InCohortFairSharingReason), + targetKeyReasonScope("/b3", InCohortFairSharingReason), + ), }, "can't preempt nominal from CQ with 0 weight": { clusterQueues: []*kueue.ClusterQueue{ @@ -1948,8 +1977,8 @@ func TestFairPreemptions(t *testing.T) { }, }, ), &snapshot) - gotTargets := sets.New(slices.Map(targets, func(w **workload.Info) string { - return workload.Key((*w).Obj) + gotTargets := sets.New(slices.Map(targets, func(t **Target) string { + return targetKeyReasonScope(workload.Key((*t).WorkloadInfo.Obj), (*t).Reason) })...) if diff := cmp.Diff(tc.wantPreempted, gotTargets, cmpopts.EquateEmpty()); diff != "" { t.Errorf("Issued preemptions (-want,+got):\n%s", diff) @@ -1958,6 +1987,10 @@ func TestFairPreemptions(t *testing.T) { } } +func targetKeyReasonScope(key, reason string) string { + return fmt.Sprintf("%s:%s", key, reason) +} + func TestCandidatesOrdering(t *testing.T) { now := time.Now() candidates := []*workload.Info{ diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index f04cd0fe8e..7a8f5d849a 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -332,7 +332,7 @@ type entry struct { status entryStatus inadmissibleMsg string requeueReason queue.RequeueReason - preemptionTargets []*workload.Info + preemptionTargets []*preemption.Target } // nominate returns the workloads with their requirements (resource flavors, borrowing) if @@ -409,14 +409,14 @@ func resourcesToReserve(e *entry, cq *cache.ClusterQueueSnapshot) resources.Flav type partialAssignment struct { assignment flavorassigner.Assignment - preemptionTargets []*workload.Info + preemptionTargets []*preemption.Target } -func (s *Scheduler) getAssignments(log logr.Logger, wl *workload.Info, snap *cache.Snapshot) (flavorassigner.Assignment, []*workload.Info) { +func (s *Scheduler) getAssignments(log logr.Logger, wl *workload.Info, snap *cache.Snapshot) (flavorassigner.Assignment, []*preemption.Target) { cq := snap.ClusterQueues[wl.ClusterQueue] flvAssigner := flavorassigner.New(wl, cq, snap.ResourceFlavors, s.fairSharing.Enable) fullAssignment := flvAssigner.Assign(log, nil) - var faPreemtionTargets []*workload.Info + var faPreemtionTargets []*preemption.Target arm := fullAssignment.RepresentativeMode() if arm == flavorassigner.Fit { diff --git a/test/integration/scheduler/preemption_test.go b/test/integration/scheduler/preemption_test.go index 980b639000..4b38752217 100644 --- a/test/integration/scheduler/preemption_test.go +++ b/test/integration/scheduler/preemption_test.go @@ -31,6 +31,7 @@ import ( kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" "sigs.k8s.io/kueue/pkg/features" + "sigs.k8s.io/kueue/pkg/scheduler/preemption" "sigs.k8s.io/kueue/pkg/util/testing" "sigs.k8s.io/kueue/test/util" ) @@ -280,25 +281,8 @@ var _ = ginkgo.Describe("Preemption", func() { conditionCmpOpts := cmpopts.IgnoreFields(metav1.Condition{}, "LastTransitionTime") ginkgo.By("Verify the Preempted condition", func() { - gomega.Eventually(func(g gomega.Gomega) { - g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(alphaLowWl), alphaLowWl)).To(gomega.Succeed()) - g.Expect(apimeta.FindStatusCondition(alphaLowWl.Status.Conditions, kueue.WorkloadPreempted)).To(gomega.BeComparableTo(&metav1.Condition{ - Type: kueue.WorkloadPreempted, - Status: metav1.ConditionTrue, - Reason: "InClusterQueue", - Message: fmt.Sprintf("Preempted to accommodate a workload (UID: %s) in the ClusterQueue", alphaMidWl.UID), - }, conditionCmpOpts)) - }, util.Timeout, util.Interval).Should(gomega.Succeed()) - - gomega.Eventually(func(g gomega.Gomega) { - g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(betaMidWl), betaMidWl)).To(gomega.Succeed()) - g.Expect(apimeta.FindStatusCondition(betaMidWl.Status.Conditions, kueue.WorkloadPreempted)).To(gomega.BeComparableTo(&metav1.Condition{ - Type: kueue.WorkloadPreempted, - Status: metav1.ConditionTrue, - Reason: "InCohort", - Message: fmt.Sprintf("Preempted to accommodate a workload (UID: %s) in the cohort", alphaMidWl.UID), - }, conditionCmpOpts)) - }, util.Timeout, util.Interval).Should(gomega.Succeed()) + util.ExpectPreemptedCondition(ctx, k8sClient, preemption.InClusterQueueReason, metav1.ConditionTrue, alphaLowWl, alphaMidWl) + util.ExpectPreemptedCondition(ctx, k8sClient, preemption.InCohortReclaimWhileBorrowingReason, metav1.ConditionTrue, betaMidWl, alphaMidWl) }) ginkgo.By("Verify the Preempted condition on re-admission, as the preemptor is finished", func() { @@ -312,7 +296,7 @@ var _ = ginkgo.Describe("Preemption", func() { Type: kueue.WorkloadPreempted, Status: metav1.ConditionFalse, Reason: "QuotaReserved", - Message: fmt.Sprintf("Previously: Preempted to accommodate a workload (UID: %s) in the ClusterQueue", alphaMidWl.UID), + Message: fmt.Sprintf("Previously: Preempted to accommodate a workload (UID: %s) due to %s", alphaMidWl.UID, preemption.HumanReadablePreemptionReasons[preemption.InClusterQueueReason]), }, conditionCmpOpts)) }, util.Timeout, util.Interval).Should(gomega.Succeed()) @@ -322,7 +306,7 @@ var _ = ginkgo.Describe("Preemption", func() { Type: kueue.WorkloadPreempted, Status: metav1.ConditionFalse, Reason: "QuotaReserved", - Message: fmt.Sprintf("Previously: Preempted to accommodate a workload (UID: %s) in the cohort", alphaMidWl.UID), + Message: fmt.Sprintf("Previously: Preempted to accommodate a workload (UID: %s) due to %s", alphaMidWl.UID, preemption.HumanReadablePreemptionReasons[preemption.InCohortReclaimWhileBorrowingReason]), }, conditionCmpOpts)) }, util.Timeout, util.Interval).Should(gomega.Succeed()) }) @@ -704,6 +688,8 @@ var _ = ginkgo.Describe("Preemption", func() { Obj() gomega.Expect(k8sClient.Create(ctx, aStandardVeryHighWl)).To(gomega.Succeed()) + util.ExpectPreemptedCondition(ctx, k8sClient, preemption.InCohortReclaimWhileBorrowingReason, metav1.ConditionTrue, aBestEffortLowWl, aStandardVeryHighWl) + ginkgo.By("Finish eviction fo the a-best-effort-low workload") util.FinishEvictionForWorkloads(ctx, k8sClient, aBestEffortLowWl) diff --git a/test/util/util.go b/test/util/util.go index fbb8516f80..137025cccb 100644 --- a/test/util/util.go +++ b/test/util/util.go @@ -49,6 +49,7 @@ import ( kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" "sigs.k8s.io/kueue/pkg/controller/jobs/pod" "sigs.k8s.io/kueue/pkg/metrics" + "sigs.k8s.io/kueue/pkg/scheduler/preemption" "sigs.k8s.io/kueue/pkg/util/testing" "sigs.k8s.io/kueue/pkg/workload" ) @@ -792,6 +793,19 @@ readCh: gomega.ExpectWithOffset(1, gotObjs).To(gomega.Equal(objs)) } +func ExpectPreemptedCondition(ctx context.Context, k8sClient client.Client, reason string, status metav1.ConditionStatus, preemptedWl, preempteeWl *kueue.Workload) { + conditionCmpOpts := cmpopts.IgnoreFields(metav1.Condition{}, "LastTransitionTime", "ObservedGeneration") + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(preemptedWl), preemptedWl)).To(gomega.Succeed()) + g.Expect(preemptedWl.Status.Conditions).To(gomega.ContainElements(gomega.BeComparableTo(metav1.Condition{ + Type: kueue.WorkloadPreempted, + Status: status, + Reason: reason, + Message: fmt.Sprintf("Preempted to accommodate a workload (UID: %s) due to %s", preempteeWl.UID, preemption.HumanReadablePreemptionReasons[reason]), + }, conditionCmpOpts))) + }, Timeout, Interval).Should(gomega.Succeed()) +} + func NewTestingLogger(writer io.Writer, level int) logr.Logger { opts := func(o *zap.Options) { o.TimeEncoder = zapcore.RFC3339NanoTimeEncoder