Skip to content

Commit

Permalink
Distinguish between Preemption due to reclamation and fair sharing
Browse files Browse the repository at this point in the history
* Rewrite to Targets
* Cover CohortFairSharing
  • Loading branch information
vladikkuzn committed Jul 3, 2024
1 parent 72409da commit 5076e8d
Show file tree
Hide file tree
Showing 5 changed files with 166 additions and 99 deletions.
6 changes: 5 additions & 1 deletion pkg/scheduler/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"k8s.io/klog/v2"

"sigs.k8s.io/kueue/pkg/cache"
"sigs.k8s.io/kueue/pkg/scheduler/preemption"
"sigs.k8s.io/kueue/pkg/util/slices"
"sigs.k8s.io/kueue/pkg/workload"
)

Expand All @@ -37,7 +39,9 @@ func logAdmissionAttemptIfVerbose(log logr.Logger, e *entry) {
}
if log.V(4).Enabled() {
args = append(args, "nominatedAssignment", e.assignment)
args = append(args, "preempted", workload.References(e.preemptionTargets))
args = append(args, "preempted", workload.References(
slices.Map(e.preemptionTargets, func(p **preemption.Target) *workload.Info { return (*p).Wl }),
))
}
logV.Info("Workload evaluated for admission", args...)
}
Expand Down
129 changes: 74 additions & 55 deletions pkg/scheduler/preemption/preemption.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,14 @@ func candidatesFromCQOrUnderThreshold(candidates []*workload.Info, clusterQueue
return result
}

type Target struct {
Wl *workload.Info
Reason string
Scope string // ClusterQueue/Cohort
}

// GetTargets returns the list of workloads that should be evicted in order to make room for wl.
func (p *Preemptor) GetTargets(log logr.Logger, wl workload.Info, assignment flavorassigner.Assignment, snapshot *cache.Snapshot) []*workload.Info {
func (p *Preemptor) GetTargets(log logr.Logger, wl workload.Info, assignment flavorassigner.Assignment, snapshot *cache.Snapshot) []*Target {
resPerFlv := resourcesRequiringPreemption(assignment)
cq := snapshot.ClusterQueues[wl.ClusterQueue]

Expand Down Expand Up @@ -172,54 +178,39 @@ const (

InClusterQueueReason = "InClusterQueue"

InCohortReclamationReason = "InCohortReclamation"
InCohortFairSharingReason = "InCohortFairSharing"
InCohortReclamationReason = "InCohortReclamation"
InCohortFairSharingReason = "InCohortFairSharing"
InCohortReclaimWhileBorrowingReason = "InCohortReclaimWhileBorrowing"
)

// IssuePreemptions marks the target workloads as evicted.
func (p *Preemptor) IssuePreemptions(ctx context.Context, preemptor *workload.Info, targets []*workload.Info, cq *cache.ClusterQueue, borrowing bool) (int, error) {
func (p *Preemptor) IssuePreemptions(ctx context.Context, preemptor *workload.Info, targets []*Target, cq *cache.ClusterQueue) (int, error) {
log := ctrl.LoggerFrom(ctx)
errCh := routine.NewErrorChannel()
ctx, cancel := context.WithCancel(ctx)
var successfullyPreempted int64
defer cancel()
workqueue.ParallelizeUntil(ctx, parallelPreemptions, len(targets), func(i int) {
target := targets[i]
if !meta.IsStatusConditionTrue(target.Obj.Status.Conditions, kueue.WorkloadEvicted) {
origin, reason := p.getPreemptionReason(cq.Name == target.ClusterQueue, borrowing)

message := fmt.Sprintf("Preempted to accommodate a workload (UID: %s) in the %s", preemptor.Obj.UID, origin)
err := p.applyPreemption(ctx, target.Obj, reason, message)
if !meta.IsStatusConditionTrue(target.Wl.Obj.Status.Conditions, kueue.WorkloadEvicted) {
message := fmt.Sprintf("Preempted to accommodate a workload (UID: %s) in the %s", preemptor.Obj.UID, target.Scope)
err := p.applyPreemption(ctx, target.Wl.Obj, target.Reason, message)
if err != nil {
errCh.SendErrorWithCancel(err, cancel)
return
}

log.V(3).Info("Preempted", "targetWorkload", klog.KObj(target.Obj), "reason", reason, "message", message)
p.recorder.Eventf(target.Obj, corev1.EventTypeNormal, "Preempted", message)
metrics.ReportEvictedWorkloads(target.ClusterQueue, kueue.WorkloadEvictedByPreemption)
log.V(3).Info("Preempted", "targetWorkload", klog.KObj(target.Wl.Obj), "reason", target.Reason, "message", message)
p.recorder.Eventf(target.Wl.Obj, corev1.EventTypeNormal, "Preempted", message)
metrics.ReportEvictedWorkloads(target.Wl.ClusterQueue, kueue.WorkloadEvictedByPreemption)
} else {
log.V(3).Info("Preemption ongoing", "targetWorkload", klog.KObj(target.Obj))
log.V(3).Info("Preemption ongoing", "targetWorkload", klog.KObj(target.Wl.Obj))
}
atomic.AddInt64(&successfullyPreempted, 1)
})
return int(successfullyPreempted), errCh.ReceiveError()
}

func (p *Preemptor) getPreemptionReason(sameCq, borrowing bool) (string, string) {
origin := ClusterQueueOrigin
reason := InClusterQueueReason
if !sameCq {
origin = CohortOrigin
if borrowing {
reason = InCohortFairSharingReason
} else {
reason = InCohortReclamationReason
}
}
return origin, reason
}

func (p *Preemptor) applyPreemptionWithSSA(ctx context.Context, w *kueue.Workload, reason, message string) error {
w = w.DeepCopy()
workload.SetEvictedCondition(w, kueue.WorkloadEvictedByPreemption, message)
Expand All @@ -235,35 +226,51 @@ func (p *Preemptor) applyPreemptionWithSSA(ctx context.Context, w *kueue.Workloa
// Once the Workload fits, the heuristic tries to add Workloads back, in the
// reverse order in which they were removed, while the incoming Workload still
// fits.
func minimalPreemptions(log logr.Logger, wlReq resources.FlavorResourceQuantities, cq *cache.ClusterQueue, snapshot *cache.Snapshot, resPerFlv resourcesPerFlavor, candidates []*workload.Info, allowBorrowing bool, allowBorrowingBelowPriority *int32) []*workload.Info {
func minimalPreemptions(log logr.Logger, wlReq resources.FlavorResourceQuantities, cq *cache.ClusterQueue, snapshot *cache.Snapshot, resPerFlv resourcesPerFlavor, candidates []*workload.Info, allowBorrowing bool, allowBorrowingBelowPriority *int32) []*Target {
if logV := log.V(5); logV.Enabled() {
logV.Info("Simulating preemption", "candidates", workload.References(candidates), "resourcesRequiringPreemption", resPerFlv, "allowBorrowing", allowBorrowing, "allowBorrowingBelowPriority", allowBorrowingBelowPriority)
}
// Simulate removing all candidates from the ClusterQueue and cohort.
var targets []*workload.Info
var targets []*Target
fits := false
for _, candWl := range candidates {
candCQ := snapshot.ClusterQueues[candWl.ClusterQueue]
if cq != candCQ && !cqIsBorrowing(candCQ, resPerFlv) {
sameCq := cq == candCQ
scope := ClusterQueueOrigin
reason := InClusterQueueReason
if !sameCq && !cqIsBorrowing(candCQ, resPerFlv) {
continue
}
if cq != candCQ && allowBorrowingBelowPriority != nil && priority.Priority(candWl.Obj) >= *allowBorrowingBelowPriority {
// We set allowBorrowing=false if there is a candidate with priority
// exceeding allowBorrowingBelowPriority added to targets.
//
// We need to be careful mutating allowBorrowing. We rely on the
// fact that once there is a candidate exceeding the priority added
// to targets, then at least one such candidate is present in the
// final set of targets (after the second phase of the function).
//
// This is true, because the candidates are ordered according
// to priorities (from lowest to highest, using candidatesOrdering),
// and the last added target is not removed in the second phase of
// the function.
allowBorrowing = false
if !sameCq {
scope = CohortOrigin
if allowBorrowingBelowPriority != nil && priority.Priority(candWl.Obj) >= *allowBorrowingBelowPriority {
// We set allowBorrowing=false if there is a candidate with priority
// exceeding allowBorrowingBelowPriority added to targets.
//
// We need to be careful mutating allowBorrowing. We rely on the
// fact that once there is a candidate exceeding the priority added
// to targets, then at least one such candidate is present in the
// final set of targets (after the second phase of the function).
//
// This is true, because the candidates are ordered according
// to priorities (from lowest to highest, using candidatesOrdering),
// and the last added target is not removed in the second phase of
// the function.
allowBorrowing = false
} else {
reason = InCohortReclaimWhileBorrowingReason
}
}
snapshot.RemoveWorkload(candWl)
targets = append(targets, candWl)
if !sameCq && !allowBorrowing {
scope = CohortOrigin
reason = InCohortReclamationReason
}
targets = append(targets, &Target{
Wl: candWl,
Reason: reason,
Scope: scope,
})
if workloadFits(wlReq, cq, allowBorrowing) {
fits = true
break
Expand All @@ -278,24 +285,24 @@ func minimalPreemptions(log logr.Logger, wlReq resources.FlavorResourceQuantitie
return targets
}

func fillBackWorkloads(targets []*workload.Info, wlReq resources.FlavorResourceQuantities, cq *cache.ClusterQueue, snapshot *cache.Snapshot, allowBorrowing bool) []*workload.Info {
func fillBackWorkloads(targets []*Target, wlReq resources.FlavorResourceQuantities, cq *cache.ClusterQueue, snapshot *cache.Snapshot, allowBorrowing bool) []*Target {
// In the reverse order, check if any of the workloads can be added back.
for i := len(targets) - 2; i >= 0; i-- {
snapshot.AddWorkload(targets[i])
snapshot.AddWorkload(targets[i].Wl)
if workloadFits(wlReq, cq, allowBorrowing) {
// O(1) deletion: copy the last element into index i and reduce size.
targets[i] = targets[len(targets)-1]
targets = targets[:len(targets)-1]
} else {
snapshot.RemoveWorkload(targets[i])
snapshot.RemoveWorkload(targets[i].Wl)
}
}
return targets
}

func restoreSnapshot(snapshot *cache.Snapshot, targets []*workload.Info) {
func restoreSnapshot(snapshot *cache.Snapshot, targets []*Target) {
for _, t := range targets {
snapshot.AddWorkload(t)
snapshot.AddWorkload(t.Wl)
}
}

Expand Down Expand Up @@ -330,15 +337,15 @@ func parseStrategies(s []config.PreemptionStrategy) []fsStrategy {
return strategies
}

func (p *Preemptor) fairPreemptions(log logr.Logger, wl *workload.Info, assignment flavorassigner.Assignment, snapshot *cache.Snapshot, resPerFlv resourcesPerFlavor, candidates []*workload.Info, allowBorrowingBelowPriority *int32) []*workload.Info {
func (p *Preemptor) fairPreemptions(log logr.Logger, wl *workload.Info, assignment flavorassigner.Assignment, snapshot *cache.Snapshot, resPerFlv resourcesPerFlavor, candidates []*workload.Info, allowBorrowingBelowPriority *int32) []*Target {
if logV := log.V(5); logV.Enabled() {
logV.Info("Simulating fair preemption", "candidates", workload.References(candidates), "resourcesRequiringPreemption", resPerFlv, "allowBorrowingBelowPriority", allowBorrowingBelowPriority)
}
cqHeap := cqHeapFromCandidates(candidates, false, snapshot)
nominatedCQ := snapshot.ClusterQueues[wl.ClusterQueue]
wlReq := assignment.TotalRequestsFor(wl)
newNominatedShareValue, _ := nominatedCQ.DominantResourceShareWith(wlReq)
var targets []*workload.Info
var targets []*Target
fits := false
var retryCandidates []*workload.Info
for cqHeap.Len() > 0 && !fits {
Expand All @@ -347,7 +354,11 @@ func (p *Preemptor) fairPreemptions(log logr.Logger, wl *workload.Info, assignme
if candCQ.cq == nominatedCQ {
candWl := candCQ.workloads[0]
snapshot.RemoveWorkload(candWl)
targets = append(targets, candWl)
targets = append(targets, &Target{
Wl: candWl,
Reason: InClusterQueueReason,
Scope: ClusterQueueOrigin,
})
if workloadFits(wlReq, nominatedCQ, true) {
fits = true
break
Expand All @@ -366,7 +377,11 @@ func (p *Preemptor) fairPreemptions(log logr.Logger, wl *workload.Info, assignme
newCandShareVal, _ := candCQ.cq.DominantResourceShareWithout(candWl)
if belowThreshold || p.fsStrategies[0](newNominatedShareValue, candCQ.share, newCandShareVal) {
snapshot.RemoveWorkload(candWl)
targets = append(targets, candWl)
targets = append(targets, &Target{
Wl: candWl,
Reason: InCohortFairSharingReason,
Scope: CohortOrigin,
})
if workloadFits(wlReq, nominatedCQ, true) {
fits = true
break
Expand Down Expand Up @@ -395,7 +410,11 @@ func (p *Preemptor) fairPreemptions(log logr.Logger, wl *workload.Info, assignme
// The criteria doesn't depend on the preempted workload, so just preempt the first candidate.
candWl := candCQ.workloads[0]
snapshot.RemoveWorkload(candWl)
targets = append(targets, candWl)
targets = append(targets, &Target{
Wl: candWl,
Reason: InCohortFairSharingReason,
Scope: CohortOrigin,
})
if workloadFits(wlReq, nominatedCQ, true) {
fits = true
}
Expand Down
Loading

0 comments on commit 5076e8d

Please sign in to comment.