diff --git a/pkg/scheduler/logging.go b/pkg/scheduler/logging.go index 9680460507..17a747aea6 100644 --- a/pkg/scheduler/logging.go +++ b/pkg/scheduler/logging.go @@ -22,8 +22,6 @@ import ( "sigs.k8s.io/kueue/pkg/cache" "sigs.k8s.io/kueue/pkg/scheduler/preemption" - "sigs.k8s.io/kueue/pkg/util/slices" - "sigs.k8s.io/kueue/pkg/workload" ) func logAdmissionAttemptIfVerbose(log logr.Logger, e *entry) { @@ -39,9 +37,7 @@ func logAdmissionAttemptIfVerbose(log logr.Logger, e *entry) { } if log.V(4).Enabled() { args = append(args, "nominatedAssignment", e.assignment) - args = append(args, "preempted", workload.References( - slices.Map(e.preemptionTargets, func(p **preemption.Target) *workload.Info { return (*p).Wl }), - )) + args = append(args, "preempted", preemption.GetWorkloadReferences(e.preemptionTargets)) } logV.Info("Workload evaluated for admission", args...) } diff --git a/pkg/scheduler/preemption/preemption.go b/pkg/scheduler/preemption/preemption.go index d90ae0221b..e215c7d414 100644 --- a/pkg/scheduler/preemption/preemption.go +++ b/pkg/scheduler/preemption/preemption.go @@ -99,7 +99,17 @@ func candidatesFromCQOrUnderThreshold(candidates []*workload.Info, clusterQueue type Target struct { Wl *workload.Info Reason string - Scope string // ClusterQueue/Cohort +} + +func GetWorkloadReferences(targets []*Target) []klog.ObjectRef { + if len(targets) == 0 { + return nil + } + keys := make([]klog.ObjectRef, len(targets)) + for i, t := range targets { + keys[i] = klog.KObj(t.Wl.Obj) + } + return keys } // GetTargets returns the list of workloads that should be evicted in order to make room for wl. @@ -175,14 +185,27 @@ const ( ClusterQueueOrigin = "ClusterQueue" // CohortOrigin indicates that preemption originated from cohort CohortOrigin = "Cohort" +) - InClusterQueueReason = "InClusterQueue" +// In cluster queue preemption reasons +const ( + InClusterQueueReason string = "InClusterQueue" +) - InCohortReclamationReason = "InCohortReclamation" - InCohortFairSharingReason = "InCohortFairSharing" - InCohortReclaimWhileBorrowingReason = "InCohortReclaimWhileBorrowing" +// In cohort preemption reasons +const ( + InCohortReclamationReason string = "InCohortReclamation" + InCohortFairSharingReason string = "InCohortFairSharing" + InCohortReclaimWhileBorrowingReason string = "InCohortReclaimWhileBorrowing" ) +var HumanReadablePreemptionReasons = map[string]string{ + InClusterQueueReason: "prioritization in the ClusterQueue", + InCohortReclamationReason: "reclamation within the cohort", + InCohortFairSharingReason: "fair sharing within the cohort", + InCohortReclaimWhileBorrowingReason: "reclamation within the cohort while borrowing", +} + // IssuePreemptions marks the target workloads as evicted. func (p *Preemptor) IssuePreemptions(ctx context.Context, preemptor *workload.Info, targets []*Target, cq *cache.ClusterQueue) (int, error) { log := ctrl.LoggerFrom(ctx) @@ -193,7 +216,7 @@ func (p *Preemptor) IssuePreemptions(ctx context.Context, preemptor *workload.In workqueue.ParallelizeUntil(ctx, parallelPreemptions, len(targets), func(i int) { target := targets[i] if !meta.IsStatusConditionTrue(target.Wl.Obj.Status.Conditions, kueue.WorkloadEvicted) { - message := fmt.Sprintf("Preempted to accommodate a workload (UID: %s) in the %s", preemptor.Obj.UID, target.Scope) + message := fmt.Sprintf("Preempted to accommodate a workload (UID: %s) due to %s", preemptor.Obj.UID, HumanReadablePreemptionReasons[target.Reason]) err := p.applyPreemption(ctx, target.Wl.Obj, target.Reason, message) if err != nil { errCh.SendErrorWithCancel(err, cancel) @@ -236,13 +259,12 @@ func minimalPreemptions(log logr.Logger, wlReq resources.FlavorResourceQuantitie for _, candWl := range candidates { candCQ := snapshot.ClusterQueues[candWl.ClusterQueue] sameCq := cq == candCQ - scope := ClusterQueueOrigin reason := InClusterQueueReason if !sameCq && !cqIsBorrowing(candCQ, resPerFlv) { continue } if !sameCq { - scope = CohortOrigin + reason = InCohortReclamationReason if allowBorrowingBelowPriority != nil && priority.Priority(candWl.Obj) >= *allowBorrowingBelowPriority { // We set allowBorrowing=false if there is a candidate with priority // exceeding allowBorrowingBelowPriority added to targets. @@ -262,14 +284,9 @@ func minimalPreemptions(log logr.Logger, wlReq resources.FlavorResourceQuantitie } } snapshot.RemoveWorkload(candWl) - if !sameCq && !allowBorrowing { - scope = CohortOrigin - reason = InCohortReclamationReason - } targets = append(targets, &Target{ Wl: candWl, Reason: reason, - Scope: scope, }) if workloadFits(wlReq, cq, allowBorrowing) { fits = true @@ -357,7 +374,6 @@ func (p *Preemptor) fairPreemptions(log logr.Logger, wl *workload.Info, assignme targets = append(targets, &Target{ Wl: candWl, Reason: InClusterQueueReason, - Scope: ClusterQueueOrigin, }) if workloadFits(wlReq, nominatedCQ, true) { fits = true @@ -375,12 +391,17 @@ func (p *Preemptor) fairPreemptions(log logr.Logger, wl *workload.Info, assignme for i, candWl := range candCQ.workloads { belowThreshold := allowBorrowingBelowPriority != nil && priority.Priority(candWl.Obj) < *allowBorrowingBelowPriority newCandShareVal, _ := candCQ.cq.DominantResourceShareWithout(candWl) - if belowThreshold || p.fsStrategies[0](newNominatedShareValue, candCQ.share, newCandShareVal) { + strategy := p.fsStrategies[0](newNominatedShareValue, candCQ.share, newCandShareVal) + if belowThreshold || strategy { snapshot.RemoveWorkload(candWl) + reason := InCohortFairSharingReason + if !strategy && belowThreshold { + reason = InCohortReclaimWhileBorrowingReason + } + targets = append(targets, &Target{ Wl: candWl, - Reason: InCohortFairSharingReason, - Scope: CohortOrigin, + Reason: reason, }) if workloadFits(wlReq, nominatedCQ, true) { fits = true @@ -413,7 +434,6 @@ func (p *Preemptor) fairPreemptions(log logr.Logger, wl *workload.Info, assignme targets = append(targets, &Target{ Wl: candWl, Reason: InCohortFairSharingReason, - Scope: CohortOrigin, }) if workloadFits(wlReq, nominatedCQ, true) { fits = true diff --git a/pkg/scheduler/preemption/preemption_test.go b/pkg/scheduler/preemption/preemption_test.go index 877ca28f57..67aa103009 100644 --- a/pkg/scheduler/preemption/preemption_test.go +++ b/pkg/scheduler/preemption/preemption_test.go @@ -1552,7 +1552,7 @@ func TestFairPreemptions(t *testing.T) { }, incoming: unitWl.Clone().Name("c_incoming").Obj(), targetCQ: "c", - wantPreempted: sets.New(targetKeyReasonScope("/b1", InCohortFairSharingReason, CohortOrigin)), + wantPreempted: sets.New(targetKeyReasonScope("/b1", InCohortFairSharingReason)), }, "can reclaim from queue using less, if taking the latest workload from user using the most isn't enough": { clusterQueues: baseCQs, @@ -1564,7 +1564,7 @@ func TestFairPreemptions(t *testing.T) { }, incoming: utiltesting.MakeWorkload("c_incoming", "").Request(corev1.ResourceCPU, "3").SimpleReserveQuota("a", "default", now).Obj(), targetCQ: "c", - wantPreempted: sets.New(targetKeyReasonScope("/a1", InCohortFairSharingReason, CohortOrigin)), // attempts to preempt b1, but it's not enough. + wantPreempted: sets.New(targetKeyReasonScope("/a1", InCohortFairSharingReason)), // attempts to preempt b1, but it's not enough. }, "reclaim borrowable quota from user using the most": { clusterQueues: baseCQs, @@ -1581,7 +1581,7 @@ func TestFairPreemptions(t *testing.T) { }, incoming: unitWl.Clone().Name("a_incoming").Obj(), targetCQ: "a", - wantPreempted: sets.New(targetKeyReasonScope("/b1", InCohortFairSharingReason, CohortOrigin)), + wantPreempted: sets.New(targetKeyReasonScope("/b1", InCohortFairSharingReason)), }, "preempt one from each CQ borrowing": { clusterQueues: baseCQs, @@ -1596,8 +1596,8 @@ func TestFairPreemptions(t *testing.T) { incoming: utiltesting.MakeWorkload("c_incoming", "").Request(corev1.ResourceCPU, "2").Obj(), targetCQ: "c", wantPreempted: sets.New( - targetKeyReasonScope("/a1", InCohortFairSharingReason, CohortOrigin), - targetKeyReasonScope("/b1", InCohortFairSharingReason, CohortOrigin), + targetKeyReasonScope("/a1", InCohortFairSharingReason), + targetKeyReasonScope("/b1", InCohortFairSharingReason), ), }, "can't preempt when everyone under nominal": { @@ -1647,8 +1647,8 @@ func TestFairPreemptions(t *testing.T) { incoming: utiltesting.MakeWorkload("a_incoming", "").Request(corev1.ResourceCPU, "2").Obj(), targetCQ: "a", wantPreempted: sets.New( - targetKeyReasonScope("/a1_low", InClusterQueueReason, ClusterQueueOrigin), - targetKeyReasonScope("/a2_low", InClusterQueueReason, ClusterQueueOrigin), + targetKeyReasonScope("/a1_low", InClusterQueueReason), + targetKeyReasonScope("/a2_low", InClusterQueueReason), ), }, "can preempt a combination of same CQ and highest user": { @@ -1667,8 +1667,8 @@ func TestFairPreemptions(t *testing.T) { incoming: utiltesting.MakeWorkload("a_incoming", "").Request(corev1.ResourceCPU, "2").Obj(), targetCQ: "a", wantPreempted: sets.New( - targetKeyReasonScope("/a_low", InClusterQueueReason, ClusterQueueOrigin), - targetKeyReasonScope("/b1", InCohortFairSharingReason, CohortOrigin), + targetKeyReasonScope("/a_low", InClusterQueueReason), + targetKeyReasonScope("/b1", InCohortFairSharingReason), ), }, "preempt huge workload if there is no other option, as long as the target CQ gets a lower share": { @@ -1678,7 +1678,7 @@ func TestFairPreemptions(t *testing.T) { }, incoming: utiltesting.MakeWorkload("a_incoming", "").Request(corev1.ResourceCPU, "2").Obj(), targetCQ: "a", - wantPreempted: sets.New(targetKeyReasonScope("/b1", InCohortFairSharingReason, CohortOrigin)), + wantPreempted: sets.New(targetKeyReasonScope("/b1", InCohortFairSharingReason)), }, "can't preempt huge workload if the incoming is also huge": { clusterQueues: baseCQs, @@ -1710,8 +1710,8 @@ func TestFairPreemptions(t *testing.T) { incoming: utiltesting.MakeWorkload("a_incoming", "").Request(corev1.ResourceCPU, "4").Obj(), targetCQ: "a", wantPreempted: sets.New( - targetKeyReasonScope("/a1_low", InClusterQueueReason, ClusterQueueOrigin), - targetKeyReasonScope("/b1", InCohortFairSharingReason, CohortOrigin), + targetKeyReasonScope("/a1_low", InClusterQueueReason), + targetKeyReasonScope("/b1", InCohortFairSharingReason), ), }, "prefer to preempt workloads that don't make the target CQ have the biggest share": { @@ -1725,7 +1725,7 @@ func TestFairPreemptions(t *testing.T) { incoming: utiltesting.MakeWorkload("a_incoming", "").Request(corev1.ResourceCPU, "3.5").Obj(), targetCQ: "a", // It would have been possible to preempt "/b1" under rule S2-b, but S2-a was possible first. - wantPreempted: sets.New(targetKeyReasonScope("/b2", InCohortFairSharingReason, CohortOrigin)), + wantPreempted: sets.New(targetKeyReasonScope("/b2", InCohortFairSharingReason)), }, "preempt from different cluster queues if the end result has a smaller max share": { clusterQueues: baseCQs, @@ -1738,8 +1738,8 @@ func TestFairPreemptions(t *testing.T) { incoming: utiltesting.MakeWorkload("a_incoming", "").Request(corev1.ResourceCPU, "3.5").Obj(), targetCQ: "a", wantPreempted: sets.New( - targetKeyReasonScope("/b1", InCohortFairSharingReason, CohortOrigin), - targetKeyReasonScope("/c1", InCohortFairSharingReason, CohortOrigin), + targetKeyReasonScope("/b1", InCohortFairSharingReason), + targetKeyReasonScope("/c1", InCohortFairSharingReason), ), }, "scenario above does not flap": { @@ -1778,8 +1778,8 @@ func TestFairPreemptions(t *testing.T) { incoming: utiltesting.MakeWorkload("a_incoming", "").Request(corev1.ResourceCPU, "2").Obj(), targetCQ: "a", wantPreempted: sets.New( - targetKeyReasonScope("/preemptible1", InCohortFairSharingReason, CohortOrigin), - targetKeyReasonScope("/preemptible2", InCohortFairSharingReason, CohortOrigin), + targetKeyReasonScope("/preemptible1", InCohortFairSharingReason), + targetKeyReasonScope("/preemptible2", InCohortReclaimWhileBorrowingReason), ), }, "preempt lower priority first, even if big": { @@ -1792,7 +1792,7 @@ func TestFairPreemptions(t *testing.T) { }, incoming: utiltesting.MakeWorkload("a_incoming", "").Request(corev1.ResourceCPU, "1").Obj(), targetCQ: "a", - wantPreempted: sets.New(targetKeyReasonScope("/b_low", InCohortFairSharingReason, CohortOrigin)), + wantPreempted: sets.New(targetKeyReasonScope("/b_low", InCohortFairSharingReason)), }, "preempt workload that doesn't transfer the imbalance, even if high priority": { clusterQueues: baseCQs, @@ -1804,7 +1804,7 @@ func TestFairPreemptions(t *testing.T) { }, incoming: utiltesting.MakeWorkload("a_incoming", "").Request(corev1.ResourceCPU, "1").Obj(), targetCQ: "a", - wantPreempted: sets.New(targetKeyReasonScope("/b_high", InCohortFairSharingReason, CohortOrigin)), + wantPreempted: sets.New(targetKeyReasonScope("/b_high", InCohortFairSharingReason)), }, "CQ with higher weight can preempt more": { clusterQueues: []*kueue.ClusterQueue{ @@ -1851,8 +1851,8 @@ func TestFairPreemptions(t *testing.T) { incoming: utiltesting.MakeWorkload("a_incoming", "").Request(corev1.ResourceCPU, "2").Obj(), targetCQ: "a", wantPreempted: sets.New( - targetKeyReasonScope("/b1", InCohortFairSharingReason, CohortOrigin), - targetKeyReasonScope("/b2", InCohortFairSharingReason, CohortOrigin), + targetKeyReasonScope("/b1", InCohortFairSharingReason), + targetKeyReasonScope("/b2", InCohortFairSharingReason), ), }, "can preempt anything borrowing from CQ with 0 weight": { @@ -1900,9 +1900,9 @@ func TestFairPreemptions(t *testing.T) { incoming: utiltesting.MakeWorkload("a_incoming", "").Request(corev1.ResourceCPU, "3").Obj(), targetCQ: "a", wantPreempted: sets.New( - targetKeyReasonScope("/b1", InCohortFairSharingReason, CohortOrigin), - targetKeyReasonScope("/b2", InCohortFairSharingReason, CohortOrigin), - targetKeyReasonScope("/b3", InCohortFairSharingReason, CohortOrigin), + targetKeyReasonScope("/b1", InCohortFairSharingReason), + targetKeyReasonScope("/b2", InCohortFairSharingReason), + targetKeyReasonScope("/b3", InCohortFairSharingReason), ), }, "can't preempt nominal from CQ with 0 weight": { @@ -1978,7 +1978,7 @@ func TestFairPreemptions(t *testing.T) { }, ), &snapshot) gotTargets := sets.New(slices.Map(targets, func(t **Target) string { - return targetKeyReasonScope(workload.Key((*t).Wl.Obj), (*t).Reason, (*t).Scope) + return targetKeyReasonScope(workload.Key((*t).Wl.Obj), (*t).Reason) })...) if diff := cmp.Diff(tc.wantPreempted, gotTargets, cmpopts.EquateEmpty()); diff != "" { t.Errorf("Issued preemptions (-want,+got):\n%s", diff) @@ -1987,8 +1987,8 @@ func TestFairPreemptions(t *testing.T) { } } -func targetKeyReasonScope(key, reason, scope string) string { - return fmt.Sprintf("%s:%s:%s", key, reason, scope) +func targetKeyReasonScope(key, reason string) string { + return fmt.Sprintf("%s:%s", key, reason) } func TestCandidatesOrdering(t *testing.T) { diff --git a/test/integration/scheduler/preemption_test.go b/test/integration/scheduler/preemption_test.go index 535cc439a1..c8f2379c31 100644 --- a/test/integration/scheduler/preemption_test.go +++ b/test/integration/scheduler/preemption_test.go @@ -281,25 +281,8 @@ var _ = ginkgo.Describe("Preemption", func() { conditionCmpOpts := cmpopts.IgnoreFields(metav1.Condition{}, "LastTransitionTime") ginkgo.By("Verify the Preempted condition", func() { - gomega.Eventually(func(g gomega.Gomega) { - g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(alphaLowWl), alphaLowWl)).To(gomega.Succeed()) - g.Expect(apimeta.FindStatusCondition(alphaLowWl.Status.Conditions, kueue.WorkloadPreempted)).To(gomega.BeComparableTo(&metav1.Condition{ - Type: kueue.WorkloadPreempted, - Status: metav1.ConditionTrue, - Reason: preemption.InClusterQueueReason, - Message: fmt.Sprintf("Preempted to accommodate a workload (UID: %s) in the %s", alphaMidWl.UID, preemption.ClusterQueueOrigin), - }, conditionCmpOpts)) - }, util.Timeout, util.Interval).Should(gomega.Succeed()) - - gomega.Eventually(func(g gomega.Gomega) { - g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(betaMidWl), betaMidWl)).To(gomega.Succeed()) - g.Expect(apimeta.FindStatusCondition(betaMidWl.Status.Conditions, kueue.WorkloadPreempted)).To(gomega.BeComparableTo(&metav1.Condition{ - Type: kueue.WorkloadPreempted, - Status: metav1.ConditionTrue, - Reason: preemption.InCohortReclamationReason, - Message: fmt.Sprintf("Preempted to accommodate a workload (UID: %s) in the %s", alphaMidWl.UID, preemption.CohortOrigin), - }, conditionCmpOpts)) - }, util.Timeout, util.Interval).Should(gomega.Succeed()) + util.ExpectPreemptedCondition(ctx, k8sClient, preemption.InClusterQueueReason, metav1.ConditionTrue, alphaLowWl, alphaMidWl) + util.ExpectPreemptedCondition(ctx, k8sClient, preemption.InCohortReclamationReason, metav1.ConditionTrue, betaMidWl, alphaMidWl) }) ginkgo.By("Verify the Preempted condition on re-admission, as the preemptor is finished", func() { @@ -705,16 +688,7 @@ var _ = ginkgo.Describe("Preemption", func() { Obj() gomega.Expect(k8sClient.Create(ctx, aStandardVeryHighWl)).To(gomega.Succeed()) - conditionCmpOpts := cmpopts.IgnoreFields(metav1.Condition{}, "LastTransitionTime", "ObservedGeneration") - gomega.Eventually(func(g gomega.Gomega) { - g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(aBestEffortLowWl), aBestEffortLowWl)).To(gomega.Succeed()) - g.Expect(aBestEffortLowWl.Status.Conditions).To(gomega.ContainElements(gomega.BeComparableTo(metav1.Condition{ - Type: kueue.WorkloadPreempted, - Status: metav1.ConditionTrue, - Reason: preemption.InCohortReclaimWhileBorrowingReason, - Message: fmt.Sprintf("Preempted to accommodate a workload (UID: %s) in the %s", aStandardVeryHighWl.UID, preemption.CohortOrigin), - }, conditionCmpOpts))) - }, util.Timeout, util.Interval).Should(gomega.Succeed()) + util.ExpectPreemptedCondition(ctx, k8sClient, preemption.InCohortReclaimWhileBorrowingReason, metav1.ConditionTrue, aBestEffortLowWl, aStandardVeryHighWl) ginkgo.By("Finish eviction fo the a-best-effort-low workload") util.FinishEvictionForWorkloads(ctx, k8sClient, aBestEffortLowWl) diff --git a/test/util/util.go b/test/util/util.go index fbb8516f80..137025cccb 100644 --- a/test/util/util.go +++ b/test/util/util.go @@ -49,6 +49,7 @@ import ( kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" "sigs.k8s.io/kueue/pkg/controller/jobs/pod" "sigs.k8s.io/kueue/pkg/metrics" + "sigs.k8s.io/kueue/pkg/scheduler/preemption" "sigs.k8s.io/kueue/pkg/util/testing" "sigs.k8s.io/kueue/pkg/workload" ) @@ -792,6 +793,19 @@ readCh: gomega.ExpectWithOffset(1, gotObjs).To(gomega.Equal(objs)) } +func ExpectPreemptedCondition(ctx context.Context, k8sClient client.Client, reason string, status metav1.ConditionStatus, preemptedWl, preempteeWl *kueue.Workload) { + conditionCmpOpts := cmpopts.IgnoreFields(metav1.Condition{}, "LastTransitionTime", "ObservedGeneration") + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(preemptedWl), preemptedWl)).To(gomega.Succeed()) + g.Expect(preemptedWl.Status.Conditions).To(gomega.ContainElements(gomega.BeComparableTo(metav1.Condition{ + Type: kueue.WorkloadPreempted, + Status: status, + Reason: reason, + Message: fmt.Sprintf("Preempted to accommodate a workload (UID: %s) due to %s", preempteeWl.UID, preemption.HumanReadablePreemptionReasons[reason]), + }, conditionCmpOpts))) + }, Timeout, Interval).Should(gomega.Succeed()) +} + func NewTestingLogger(writer io.Writer, level int) logr.Logger { opts := func(o *zap.Options) { o.TimeEncoder = zapcore.RFC3339NanoTimeEncoder