From 5e7d6b6d6d3fcc83da2aefe0c15a6c182b0a3965 Mon Sep 17 00:00:00 2001 From: vladikkuzn Date: Fri, 14 Jun 2024 05:18:29 +0300 Subject: [PATCH] Distinguish between Preemption due to reclamation and fair sharing --- pkg/scheduler/preemption/preemption.go | 29 +++++++++++++++++++++----- pkg/scheduler/scheduler.go | 2 +- 2 files changed, 25 insertions(+), 6 deletions(-) diff --git a/pkg/scheduler/preemption/preemption.go b/pkg/scheduler/preemption/preemption.go index 91d72b00c5..0d6bab5216 100644 --- a/pkg/scheduler/preemption/preemption.go +++ b/pkg/scheduler/preemption/preemption.go @@ -162,8 +162,21 @@ func canBorrowWithinCohort(cq *cache.ClusterQueue, wl *kueue.Workload) (bool, *i return true, &threshold } +const ( + // ClusterQueueOrigin indicates that preemption originated from cluster queue + ClusterQueueOrigin = "ClusterQueue" + // CohortOrigin indicates that preemption originated from cohort + CohortOrigin = "cohort" + + InClusterQueueReason = "InClusterQueue" + + PriorityReclamationReason = "PriorityReclamation" + InCohortReclamationReason = "InCohortReclamation" + InCohortFairSharingReason = "InCohortFairSharing" +) + // IssuePreemptions marks the target workloads as evicted. -func (p *Preemptor) IssuePreemptions(ctx context.Context, preemptor *workload.Info, targets []*workload.Info, cq *cache.ClusterQueue) (int, error) { +func (p *Preemptor) IssuePreemptions(ctx context.Context, preemptor *workload.Info, targets []*workload.Info, cq *cache.ClusterQueue, reclamation bool) (int, error) { log := ctrl.LoggerFrom(ctx) errCh := routine.NewErrorChannel() ctx, cancel := context.WithCancel(ctx) @@ -172,11 +185,17 @@ func (p *Preemptor) IssuePreemptions(ctx context.Context, preemptor *workload.In workqueue.ParallelizeUntil(ctx, parallelPreemptions, len(targets), func(i int) { target := targets[i] if !meta.IsStatusConditionTrue(target.Obj.Status.Conditions, kueue.WorkloadEvicted) { - origin := "ClusterQueue" - reason := "InClusterQueue" + origin := ClusterQueueOrigin + reason := InClusterQueueReason if cq.Name != target.ClusterQueue { - origin = "cohort" - reason = "InCohort" + origin = CohortOrigin + reason = PriorityReclamationReason + if p.enableFairSharing { + reason = InCohortFairSharingReason + } + if reclamation { + reason = InCohortReclamationReason + } } message := fmt.Sprintf("Preempted to accommodate a workload (UID: %s) in the %s", preemptor.Obj.UID, origin) diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index c2e3db8db3..69a2d0c21f 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -248,7 +248,7 @@ func (s *Scheduler) schedule(ctx context.Context) wait.SpeedSignal { if len(e.preemptionTargets) != 0 { // If preemptions are issued, the next attempt should try all the flavors. e.LastAssignment = nil - preempted, err := s.preemptor.IssuePreemptions(ctx, &e.Info, e.preemptionTargets, cq) + preempted, err := s.preemptor.IssuePreemptions(ctx, &e.Info, e.preemptionTargets, cq, !e.assignment.Borrowing) if err != nil { log.Error(err, "Failed to preempt workloads") }