From 597774141b2f26ed8ab578e01f0e68b2c0644ab3 Mon Sep 17 00:00:00 2001 From: vladikkuzn Date: Wed, 10 Jul 2024 12:50:52 +0300 Subject: [PATCH] Distinguish between Preemption due to reclamation and fair sharing * Rename WL -> WorkloadInfo * Move getWorkloadReferences to logging --- pkg/scheduler/logging.go | 13 +++++- pkg/scheduler/preemption/preemption.go | 58 +++++++++----------------- 2 files changed, 32 insertions(+), 39 deletions(-) diff --git a/pkg/scheduler/logging.go b/pkg/scheduler/logging.go index 17a747aea6..903f8beca1 100644 --- a/pkg/scheduler/logging.go +++ b/pkg/scheduler/logging.go @@ -37,7 +37,7 @@ func logAdmissionAttemptIfVerbose(log logr.Logger, e *entry) { } if log.V(4).Enabled() { args = append(args, "nominatedAssignment", e.assignment) - args = append(args, "preempted", preemption.GetWorkloadReferences(e.preemptionTargets)) + args = append(args, "preempted", getWorkloadReferences(e.preemptionTargets)) } logV.Info("Workload evaluated for admission", args...) } @@ -47,3 +47,14 @@ func logSnapshotIfVerbose(log logr.Logger, s *cache.Snapshot) { s.Log(logV) } } + +func getWorkloadReferences(targets []*preemption.Target) []klog.ObjectRef { + if len(targets) == 0 { + return nil + } + keys := make([]klog.ObjectRef, len(targets)) + for i, t := range targets { + keys[i] = klog.KObj(t.WorkloadInfo.Obj) + } + return keys +} diff --git a/pkg/scheduler/preemption/preemption.go b/pkg/scheduler/preemption/preemption.go index a923c6a972..4e4fa3fe26 100644 --- a/pkg/scheduler/preemption/preemption.go +++ b/pkg/scheduler/preemption/preemption.go @@ -97,19 +97,8 @@ func candidatesFromCQOrUnderThreshold(candidates []*workload.Info, clusterQueue } type Target struct { - Wl *workload.Info - Reason string -} - -func GetWorkloadReferences(targets []*Target) []klog.ObjectRef { - if len(targets) == 0 { - return nil - } - keys := make([]klog.ObjectRef, len(targets)) - for i, t := range targets { - keys[i] = klog.KObj(t.Wl.Obj) - } - return keys + WorkloadInfo *workload.Info + Reason string } // GetTargets returns the list of workloads that should be evicted in order to make room for wl. @@ -180,13 +169,6 @@ func canBorrowWithinCohort(cq *cache.ClusterQueueSnapshot, wl *kueue.Workload) ( return true, &threshold } -const ( - // ClusterQueueOrigin indicates that preemption originated from cluster queue - ClusterQueueOrigin = "ClusterQueue" - // CohortOrigin indicates that preemption originated from cohort - CohortOrigin = "Cohort" -) - // In cluster queue preemption reasons const ( InClusterQueueReason string = "InClusterQueue" @@ -215,19 +197,19 @@ func (p *Preemptor) IssuePreemptions(ctx context.Context, preemptor *workload.In defer cancel() workqueue.ParallelizeUntil(ctx, parallelPreemptions, len(targets), func(i int) { target := targets[i] - if !meta.IsStatusConditionTrue(target.Wl.Obj.Status.Conditions, kueue.WorkloadEvicted) { + if !meta.IsStatusConditionTrue(target.WorkloadInfo.Obj.Status.Conditions, kueue.WorkloadEvicted) { message := fmt.Sprintf("Preempted to accommodate a workload (UID: %s) due to %s", preemptor.Obj.UID, HumanReadablePreemptionReasons[target.Reason]) - err := p.applyPreemption(ctx, target.Wl.Obj, target.Reason, message) + err := p.applyPreemption(ctx, target.WorkloadInfo.Obj, target.Reason, message) if err != nil { errCh.SendErrorWithCancel(err, cancel) return } - log.V(3).Info("Preempted", "targetWorkload", klog.KObj(target.Wl.Obj), "reason", target.Reason, "message", message) - p.recorder.Eventf(target.Wl.Obj, corev1.EventTypeNormal, "Preempted", message) - metrics.ReportEvictedWorkloads(target.Wl.ClusterQueue, kueue.WorkloadEvictedByPreemption) + log.V(3).Info("Preempted", "targetWorkload", klog.KObj(target.WorkloadInfo.Obj), "reason", target.Reason, "message", message) + p.recorder.Eventf(target.WorkloadInfo.Obj, corev1.EventTypeNormal, "Preempted", message) + metrics.ReportEvictedWorkloads(target.WorkloadInfo.ClusterQueue, kueue.WorkloadEvictedByPreemption) } else { - log.V(3).Info("Preemption ongoing", "targetWorkload", klog.KObj(target.Wl.Obj)) + log.V(3).Info("Preemption ongoing", "targetWorkload", klog.KObj(target.WorkloadInfo.Obj)) } atomic.AddInt64(&successfullyPreempted, 1) }) @@ -285,8 +267,8 @@ func minimalPreemptions(log logr.Logger, wlReq resources.FlavorResourceQuantitie } snapshot.RemoveWorkload(candWl) targets = append(targets, &Target{ - Wl: candWl, - Reason: reason, + WorkloadInfo: candWl, + Reason: reason, }) if workloadFits(wlReq, cq, allowBorrowing) { fits = true @@ -305,13 +287,13 @@ func minimalPreemptions(log logr.Logger, wlReq resources.FlavorResourceQuantitie func fillBackWorkloads(targets []*Target, wlReq resources.FlavorResourceQuantitiesFlat, cq *cache.ClusterQueueSnapshot, snapshot *cache.Snapshot, allowBorrowing bool) []*Target { // In the reverse order, check if any of the workloads can be added back. for i := len(targets) - 2; i >= 0; i-- { - snapshot.AddWorkload(targets[i].Wl) + snapshot.AddWorkload(targets[i].WorkloadInfo) if workloadFits(wlReq, cq, allowBorrowing) { // O(1) deletion: copy the last element into index i and reduce size. targets[i] = targets[len(targets)-1] targets = targets[:len(targets)-1] } else { - snapshot.RemoveWorkload(targets[i].Wl) + snapshot.RemoveWorkload(targets[i].WorkloadInfo) } } return targets @@ -319,7 +301,7 @@ func fillBackWorkloads(targets []*Target, wlReq resources.FlavorResourceQuantiti func restoreSnapshot(snapshot *cache.Snapshot, targets []*Target) { for _, t := range targets { - snapshot.AddWorkload(t.Wl) + snapshot.AddWorkload(t.WorkloadInfo) } } @@ -372,8 +354,8 @@ func (p *Preemptor) fairPreemptions(log logr.Logger, wl *workload.Info, assignme candWl := candCQ.workloads[0] snapshot.RemoveWorkload(candWl) targets = append(targets, &Target{ - Wl: candWl, - Reason: InClusterQueueReason, + WorkloadInfo: candWl, + Reason: InClusterQueueReason, }) if workloadFits(wlReq, nominatedCQ, true) { fits = true @@ -395,13 +377,13 @@ func (p *Preemptor) fairPreemptions(log logr.Logger, wl *workload.Info, assignme if belowThreshold || strategy { snapshot.RemoveWorkload(candWl) reason := InCohortFairSharingReason - if !strategy && belowThreshold { + if !strategy { reason = InCohortReclaimWhileBorrowingReason } targets = append(targets, &Target{ - Wl: candWl, - Reason: reason, + WorkloadInfo: candWl, + Reason: reason, }) if workloadFits(wlReq, nominatedCQ, true) { fits = true @@ -432,8 +414,8 @@ func (p *Preemptor) fairPreemptions(log logr.Logger, wl *workload.Info, assignme candWl := candCQ.workloads[0] snapshot.RemoveWorkload(candWl) targets = append(targets, &Target{ - Wl: candWl, - Reason: InCohortFairSharingReason, + WorkloadInfo: candWl, + Reason: InCohortFairSharingReason, }) if workloadFits(wlReq, nominatedCQ, true) { fits = true