Skip to content

Commit

Permalink
Distinguish between Preemption due to reclamation and fair sharing
Browse files Browse the repository at this point in the history
  • Loading branch information
vladikkuzn committed Jun 17, 2024
1 parent 8b217c6 commit abb03fe
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 6 deletions.
27 changes: 22 additions & 5 deletions pkg/scheduler/preemption/preemption.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,20 @@ func canBorrowWithinCohort(cq *cache.ClusterQueue, wl *kueue.Workload) (bool, *i
return true, &threshold
}

const (
// ClusterQueueOrigin indicates that preemption originated from cluster queue
ClusterQueueOrigin = "ClusterQueue"
// CohortOrigin indicates that preemption originated from cohort
CohortOrigin = "cohort"

InClusterQueueReason = "InClusterQueue"

InCohortReclamationReason = "InCohortReclamation"
InCohortFairSharingReason = "InCohortFairSharing"
)

// IssuePreemptions marks the target workloads as evicted.
func (p *Preemptor) IssuePreemptions(ctx context.Context, preemptor *workload.Info, targets []*workload.Info, cq *cache.ClusterQueue) (int, error) {
func (p *Preemptor) IssuePreemptions(ctx context.Context, preemptor *workload.Info, targets []*workload.Info, cq *cache.ClusterQueue, reclamation bool) (int, error) {
log := ctrl.LoggerFrom(ctx)
errCh := routine.NewErrorChannel()
ctx, cancel := context.WithCancel(ctx)
Expand All @@ -172,11 +184,16 @@ func (p *Preemptor) IssuePreemptions(ctx context.Context, preemptor *workload.In
workqueue.ParallelizeUntil(ctx, parallelPreemptions, len(targets), func(i int) {
target := targets[i]
if !meta.IsStatusConditionTrue(target.Obj.Status.Conditions, kueue.WorkloadEvicted) {
origin := "ClusterQueue"
reason := "InClusterQueue"
origin := ClusterQueueOrigin
reason := InClusterQueueReason
if cq.Name != target.ClusterQueue {
origin = "cohort"
reason = "InCohort"
origin = CohortOrigin
if p.enableFairSharing {
reason = InCohortFairSharingReason
}
if reclamation {
reason = InCohortReclamationReason
}
}

message := fmt.Sprintf("Preempted to accommodate a workload (UID: %s) in the %s", preemptor.Obj.UID, origin)
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ func (s *Scheduler) schedule(ctx context.Context) wait.SpeedSignal {
if len(e.preemptionTargets) != 0 {
// If preemptions are issued, the next attempt should try all the flavors.
e.LastAssignment = nil
preempted, err := s.preemptor.IssuePreemptions(ctx, &e.Info, e.preemptionTargets, cq)
preempted, err := s.preemptor.IssuePreemptions(ctx, &e.Info, e.preemptionTargets, cq, e.assignment.Borrowing)
if err != nil {
log.Error(err, "Failed to preempt workloads")
}
Expand Down

0 comments on commit abb03fe

Please sign in to comment.