Skip to content

Commit

Permalink
Distinguish between Preemption due to reclamation and fair sharing (#…
Browse files Browse the repository at this point in the history
…2411)

* Distinguish between Preemption due to reclamation and fair sharing

* Distinguish between Preemption due to reclamation and fair sharing

* extract getPreemptionReason
* borrowing in favour of reclamation

* Distinguish between Preemption due to reclamation and fair sharing

* Rewrite to Targets
* Cover CohortFairSharing

* Distinguish between Preemption due to reclamation and fair sharing

* Dedicated GetWorkloadReferences function
* Extract humanReadablePreemptionReasons
* !strategy && belowThreshold -> InCohortReclaimWhileBorrowingReason
* ExpectPreemptedCondition test util

* Distinguish between Preemption due to reclamation and fair sharing

* Rename WL -> WorkloadInfo
* Move getWorkloadReferences to logging

* Distinguish between Preemption due to reclamation and fair sharing

* Use slice in getWorkloadReferences
  • Loading branch information
vladikkuzn committed Jul 10, 2024
1 parent 503bc68 commit 5bca510
Show file tree
Hide file tree
Showing 6 changed files with 185 additions and 105 deletions.
9 changes: 7 additions & 2 deletions pkg/scheduler/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ import (
"k8s.io/klog/v2"

"sigs.k8s.io/kueue/pkg/cache"
"sigs.k8s.io/kueue/pkg/workload"
"sigs.k8s.io/kueue/pkg/scheduler/preemption"
"sigs.k8s.io/kueue/pkg/util/slices"
)

func logAdmissionAttemptIfVerbose(log logr.Logger, e *entry) {
Expand All @@ -37,7 +38,7 @@ func logAdmissionAttemptIfVerbose(log logr.Logger, e *entry) {
}
if log.V(4).Enabled() {
args = append(args, "nominatedAssignment", e.assignment)
args = append(args, "preempted", workload.References(e.preemptionTargets))
args = append(args, "preempted", getWorkloadReferences(e.preemptionTargets))
}
logV.Info("Workload evaluated for admission", args...)
}
Expand All @@ -47,3 +48,7 @@ func logSnapshotIfVerbose(log logr.Logger, s *cache.Snapshot) {
s.Log(logV)
}
}

func getWorkloadReferences(targets []*preemption.Target) []klog.ObjectRef {
return slices.Map(targets, func(t **preemption.Target) klog.ObjectRef { return klog.KObj((*t).WorkloadInfo.Obj) })
}
132 changes: 87 additions & 45 deletions pkg/scheduler/preemption/preemption.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,13 @@ func candidatesFromCQOrUnderThreshold(candidates []*workload.Info, clusterQueue
return result
}

type Target struct {
WorkloadInfo *workload.Info
Reason string
}

// GetTargets returns the list of workloads that should be evicted in order to make room for wl.
func (p *Preemptor) GetTargets(log logr.Logger, wl workload.Info, assignment flavorassigner.Assignment, snapshot *cache.Snapshot) []*workload.Info {
func (p *Preemptor) GetTargets(log logr.Logger, wl workload.Info, assignment flavorassigner.Assignment, snapshot *cache.Snapshot) []*Target {
resPerFlv := resourcesRequiringPreemption(assignment)
cq := snapshot.ClusterQueues[wl.ClusterQueue]

Expand Down Expand Up @@ -164,35 +169,47 @@ func canBorrowWithinCohort(cq *cache.ClusterQueueSnapshot, wl *kueue.Workload) (
return true, &threshold
}

// In cluster queue preemption reasons
const (
InClusterQueueReason string = "InClusterQueue"
)

// In cohort preemption reasons
const (
InCohortReclamationReason string = "InCohortReclamation"
InCohortFairSharingReason string = "InCohortFairSharing"
InCohortReclaimWhileBorrowingReason string = "InCohortReclaimWhileBorrowing"
)

var HumanReadablePreemptionReasons = map[string]string{
InClusterQueueReason: "prioritization in the ClusterQueue",
InCohortReclamationReason: "reclamation within the cohort",
InCohortFairSharingReason: "fair sharing within the cohort",
InCohortReclaimWhileBorrowingReason: "reclamation within the cohort while borrowing",
}

// IssuePreemptions marks the target workloads as evicted.
func (p *Preemptor) IssuePreemptions(ctx context.Context, preemptor *workload.Info, targets []*workload.Info, cq *cache.ClusterQueueSnapshot) (int, error) {
func (p *Preemptor) IssuePreemptions(ctx context.Context, preemptor *workload.Info, targets []*Target, cq *cache.ClusterQueueSnapshot) (int, error) {
log := ctrl.LoggerFrom(ctx)
errCh := routine.NewErrorChannel()
ctx, cancel := context.WithCancel(ctx)
var successfullyPreempted int64
defer cancel()
workqueue.ParallelizeUntil(ctx, parallelPreemptions, len(targets), func(i int) {
target := targets[i]
if !meta.IsStatusConditionTrue(target.Obj.Status.Conditions, kueue.WorkloadEvicted) {
origin := "ClusterQueue"
reason := "InClusterQueue"
if cq.Name != target.ClusterQueue {
origin = "cohort"
reason = "InCohort"
}

message := fmt.Sprintf("Preempted to accommodate a workload (UID: %s) in the %s", preemptor.Obj.UID, origin)
err := p.applyPreemption(ctx, target.Obj, reason, message)
if !meta.IsStatusConditionTrue(target.WorkloadInfo.Obj.Status.Conditions, kueue.WorkloadEvicted) {
message := fmt.Sprintf("Preempted to accommodate a workload (UID: %s) due to %s", preemptor.Obj.UID, HumanReadablePreemptionReasons[target.Reason])
err := p.applyPreemption(ctx, target.WorkloadInfo.Obj, target.Reason, message)
if err != nil {
errCh.SendErrorWithCancel(err, cancel)
return
}

log.V(3).Info("Preempted", "targetWorkload", klog.KObj(target.Obj), "reason", reason, "message", message)
p.recorder.Eventf(target.Obj, corev1.EventTypeNormal, "Preempted", message)
metrics.ReportEvictedWorkloads(target.ClusterQueue, kueue.WorkloadEvictedByPreemption)
log.V(3).Info("Preempted", "targetWorkload", klog.KObj(target.WorkloadInfo.Obj), "reason", target.Reason, "message", message)
p.recorder.Eventf(target.WorkloadInfo.Obj, corev1.EventTypeNormal, "Preempted", message)
metrics.ReportEvictedWorkloads(target.WorkloadInfo.ClusterQueue, kueue.WorkloadEvictedByPreemption)
} else {
log.V(3).Info("Preemption ongoing", "targetWorkload", klog.KObj(target.Obj))
log.V(3).Info("Preemption ongoing", "targetWorkload", klog.KObj(target.WorkloadInfo.Obj))
}
atomic.AddInt64(&successfullyPreempted, 1)
})
Expand All @@ -214,35 +231,45 @@ func (p *Preemptor) applyPreemptionWithSSA(ctx context.Context, w *kueue.Workloa
// Once the Workload fits, the heuristic tries to add Workloads back, in the
// reverse order in which they were removed, while the incoming Workload still
// fits.
func minimalPreemptions(log logr.Logger, wlReq resources.FlavorResourceQuantitiesFlat, cq *cache.ClusterQueueSnapshot, snapshot *cache.Snapshot, resPerFlv resourcesPerFlavor, candidates []*workload.Info, allowBorrowing bool, allowBorrowingBelowPriority *int32) []*workload.Info {
func minimalPreemptions(log logr.Logger, wlReq resources.FlavorResourceQuantitiesFlat, cq *cache.ClusterQueueSnapshot, snapshot *cache.Snapshot, resPerFlv resourcesPerFlavor, candidates []*workload.Info, allowBorrowing bool, allowBorrowingBelowPriority *int32) []*Target {
if logV := log.V(5); logV.Enabled() {
logV.Info("Simulating preemption", "candidates", workload.References(candidates), "resourcesRequiringPreemption", resPerFlv, "allowBorrowing", allowBorrowing, "allowBorrowingBelowPriority", allowBorrowingBelowPriority)
}
// Simulate removing all candidates from the ClusterQueue and cohort.
var targets []*workload.Info
var targets []*Target
fits := false
for _, candWl := range candidates {
candCQ := snapshot.ClusterQueues[candWl.ClusterQueue]
if cq != candCQ && !cqIsBorrowing(candCQ, resPerFlv) {
sameCq := cq == candCQ
reason := InClusterQueueReason
if !sameCq && !cqIsBorrowing(candCQ, resPerFlv) {
continue
}
if cq != candCQ && allowBorrowingBelowPriority != nil && priority.Priority(candWl.Obj) >= *allowBorrowingBelowPriority {
// We set allowBorrowing=false if there is a candidate with priority
// exceeding allowBorrowingBelowPriority added to targets.
//
// We need to be careful mutating allowBorrowing. We rely on the
// fact that once there is a candidate exceeding the priority added
// to targets, then at least one such candidate is present in the
// final set of targets (after the second phase of the function).
//
// This is true, because the candidates are ordered according
// to priorities (from lowest to highest, using candidatesOrdering),
// and the last added target is not removed in the second phase of
// the function.
allowBorrowing = false
if !sameCq {
reason = InCohortReclamationReason
if allowBorrowingBelowPriority != nil && priority.Priority(candWl.Obj) >= *allowBorrowingBelowPriority {
// We set allowBorrowing=false if there is a candidate with priority
// exceeding allowBorrowingBelowPriority added to targets.
//
// We need to be careful mutating allowBorrowing. We rely on the
// fact that once there is a candidate exceeding the priority added
// to targets, then at least one such candidate is present in the
// final set of targets (after the second phase of the function).
//
// This is true, because the candidates are ordered according
// to priorities (from lowest to highest, using candidatesOrdering),
// and the last added target is not removed in the second phase of
// the function.
allowBorrowing = false
} else {
reason = InCohortReclaimWhileBorrowingReason
}
}
snapshot.RemoveWorkload(candWl)
targets = append(targets, candWl)
targets = append(targets, &Target{
WorkloadInfo: candWl,
Reason: reason,
})
if workloadFits(wlReq, cq, allowBorrowing) {
fits = true
break
Expand All @@ -257,24 +284,24 @@ func minimalPreemptions(log logr.Logger, wlReq resources.FlavorResourceQuantitie
return targets
}

func fillBackWorkloads(targets []*workload.Info, wlReq resources.FlavorResourceQuantitiesFlat, cq *cache.ClusterQueueSnapshot, snapshot *cache.Snapshot, allowBorrowing bool) []*workload.Info {
func fillBackWorkloads(targets []*Target, wlReq resources.FlavorResourceQuantitiesFlat, cq *cache.ClusterQueueSnapshot, snapshot *cache.Snapshot, allowBorrowing bool) []*Target {
// In the reverse order, check if any of the workloads can be added back.
for i := len(targets) - 2; i >= 0; i-- {
snapshot.AddWorkload(targets[i])
snapshot.AddWorkload(targets[i].WorkloadInfo)
if workloadFits(wlReq, cq, allowBorrowing) {
// O(1) deletion: copy the last element into index i and reduce size.
targets[i] = targets[len(targets)-1]
targets = targets[:len(targets)-1]
} else {
snapshot.RemoveWorkload(targets[i])
snapshot.RemoveWorkload(targets[i].WorkloadInfo)
}
}
return targets
}

func restoreSnapshot(snapshot *cache.Snapshot, targets []*workload.Info) {
func restoreSnapshot(snapshot *cache.Snapshot, targets []*Target) {
for _, t := range targets {
snapshot.AddWorkload(t)
snapshot.AddWorkload(t.WorkloadInfo)
}
}

Expand Down Expand Up @@ -309,15 +336,15 @@ func parseStrategies(s []config.PreemptionStrategy) []fsStrategy {
return strategies
}

func (p *Preemptor) fairPreemptions(log logr.Logger, wl *workload.Info, assignment flavorassigner.Assignment, snapshot *cache.Snapshot, resPerFlv resourcesPerFlavor, candidates []*workload.Info, allowBorrowingBelowPriority *int32) []*workload.Info {
func (p *Preemptor) fairPreemptions(log logr.Logger, wl *workload.Info, assignment flavorassigner.Assignment, snapshot *cache.Snapshot, resPerFlv resourcesPerFlavor, candidates []*workload.Info, allowBorrowingBelowPriority *int32) []*Target {
if logV := log.V(5); logV.Enabled() {
logV.Info("Simulating fair preemption", "candidates", workload.References(candidates), "resourcesRequiringPreemption", resPerFlv, "allowBorrowingBelowPriority", allowBorrowingBelowPriority)
}
cqHeap := cqHeapFromCandidates(candidates, false, snapshot)
nominatedCQ := snapshot.ClusterQueues[wl.ClusterQueue]
wlReq := assignment.TotalRequestsFor(wl)
newNominatedShareValue, _ := nominatedCQ.DominantResourceShareWith(wlReq)
var targets []*workload.Info
var targets []*Target
fits := false
var retryCandidates []*workload.Info
for cqHeap.Len() > 0 && !fits {
Expand All @@ -326,7 +353,10 @@ func (p *Preemptor) fairPreemptions(log logr.Logger, wl *workload.Info, assignme
if candCQ.cq == nominatedCQ {
candWl := candCQ.workloads[0]
snapshot.RemoveWorkload(candWl)
targets = append(targets, candWl)
targets = append(targets, &Target{
WorkloadInfo: candWl,
Reason: InClusterQueueReason,
})
if workloadFits(wlReq, nominatedCQ, true) {
fits = true
break
Expand All @@ -343,9 +373,18 @@ func (p *Preemptor) fairPreemptions(log logr.Logger, wl *workload.Info, assignme
for i, candWl := range candCQ.workloads {
belowThreshold := allowBorrowingBelowPriority != nil && priority.Priority(candWl.Obj) < *allowBorrowingBelowPriority
newCandShareVal, _ := candCQ.cq.DominantResourceShareWithout(candWl.FlavorResourceUsage())
if belowThreshold || p.fsStrategies[0](newNominatedShareValue, candCQ.share, newCandShareVal) {
strategy := p.fsStrategies[0](newNominatedShareValue, candCQ.share, newCandShareVal)
if belowThreshold || strategy {
snapshot.RemoveWorkload(candWl)
targets = append(targets, candWl)
reason := InCohortFairSharingReason
if !strategy {
reason = InCohortReclaimWhileBorrowingReason
}

targets = append(targets, &Target{
WorkloadInfo: candWl,
Reason: reason,
})
if workloadFits(wlReq, nominatedCQ, true) {
fits = true
break
Expand Down Expand Up @@ -374,7 +413,10 @@ func (p *Preemptor) fairPreemptions(log logr.Logger, wl *workload.Info, assignme
// The criteria doesn't depend on the preempted workload, so just preempt the first candidate.
candWl := candCQ.workloads[0]
snapshot.RemoveWorkload(candWl)
targets = append(targets, candWl)
targets = append(targets, &Target{
WorkloadInfo: candWl,
Reason: InCohortFairSharingReason,
})
if workloadFits(wlReq, nominatedCQ, true) {
fits = true
}
Expand Down
Loading

0 comments on commit 5bca510

Please sign in to comment.