Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Distinguish between Preemption due to reclamation and fair sharing #2411

Merged
merged 6 commits into from
Jul 10, 2024
9 changes: 7 additions & 2 deletions pkg/scheduler/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ import (
"k8s.io/klog/v2"

"sigs.k8s.io/kueue/pkg/cache"
"sigs.k8s.io/kueue/pkg/workload"
"sigs.k8s.io/kueue/pkg/scheduler/preemption"
"sigs.k8s.io/kueue/pkg/util/slices"
)

func logAdmissionAttemptIfVerbose(log logr.Logger, e *entry) {
Expand All @@ -37,7 +38,7 @@ func logAdmissionAttemptIfVerbose(log logr.Logger, e *entry) {
}
if log.V(4).Enabled() {
args = append(args, "nominatedAssignment", e.assignment)
args = append(args, "preempted", workload.References(e.preemptionTargets))
args = append(args, "preempted", getWorkloadReferences(e.preemptionTargets))
}
logV.Info("Workload evaluated for admission", args...)
}
Expand All @@ -47,3 +48,7 @@ func logSnapshotIfVerbose(log logr.Logger, s *cache.Snapshot) {
s.Log(logV)
}
}

func getWorkloadReferences(targets []*preemption.Target) []klog.ObjectRef {
return slices.Map(targets, func(t **preemption.Target) klog.ObjectRef { return klog.KObj((*t).WorkloadInfo.Obj) })
}
132 changes: 87 additions & 45 deletions pkg/scheduler/preemption/preemption.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,13 @@ func candidatesFromCQOrUnderThreshold(candidates []*workload.Info, clusterQueue
return result
}

type Target struct {
WorkloadInfo *workload.Info
Reason string
}

// GetTargets returns the list of workloads that should be evicted in order to make room for wl.
func (p *Preemptor) GetTargets(log logr.Logger, wl workload.Info, assignment flavorassigner.Assignment, snapshot *cache.Snapshot) []*workload.Info {
func (p *Preemptor) GetTargets(log logr.Logger, wl workload.Info, assignment flavorassigner.Assignment, snapshot *cache.Snapshot) []*Target {
resPerFlv := resourcesRequiringPreemption(assignment)
cq := snapshot.ClusterQueues[wl.ClusterQueue]

Expand Down Expand Up @@ -164,35 +169,47 @@ func canBorrowWithinCohort(cq *cache.ClusterQueueSnapshot, wl *kueue.Workload) (
return true, &threshold
}

// In cluster queue preemption reasons
const (
InClusterQueueReason string = "InClusterQueue"
)

// In cohort preemption reasons
const (
InCohortReclamationReason string = "InCohortReclamation"
InCohortFairSharingReason string = "InCohortFairSharing"
InCohortReclaimWhileBorrowingReason string = "InCohortReclaimWhileBorrowing"
)

var HumanReadablePreemptionReasons = map[string]string{
InClusterQueueReason: "prioritization in the ClusterQueue",
InCohortReclamationReason: "reclamation within the cohort",
InCohortFairSharingReason: "fair sharing within the cohort",
InCohortReclaimWhileBorrowingReason: "reclamation within the cohort while borrowing",
}

// IssuePreemptions marks the target workloads as evicted.
func (p *Preemptor) IssuePreemptions(ctx context.Context, preemptor *workload.Info, targets []*workload.Info, cq *cache.ClusterQueueSnapshot) (int, error) {
func (p *Preemptor) IssuePreemptions(ctx context.Context, preemptor *workload.Info, targets []*Target, cq *cache.ClusterQueueSnapshot) (int, error) {
log := ctrl.LoggerFrom(ctx)
errCh := routine.NewErrorChannel()
ctx, cancel := context.WithCancel(ctx)
var successfullyPreempted int64
defer cancel()
workqueue.ParallelizeUntil(ctx, parallelPreemptions, len(targets), func(i int) {
target := targets[i]
if !meta.IsStatusConditionTrue(target.Obj.Status.Conditions, kueue.WorkloadEvicted) {
origin := "ClusterQueue"
reason := "InClusterQueue"
if cq.Name != target.ClusterQueue {
origin = "cohort"
reason = "InCohort"
}

message := fmt.Sprintf("Preempted to accommodate a workload (UID: %s) in the %s", preemptor.Obj.UID, origin)
err := p.applyPreemption(ctx, target.Obj, reason, message)
if !meta.IsStatusConditionTrue(target.WorkloadInfo.Obj.Status.Conditions, kueue.WorkloadEvicted) {
message := fmt.Sprintf("Preempted to accommodate a workload (UID: %s) due to %s", preemptor.Obj.UID, HumanReadablePreemptionReasons[target.Reason])
err := p.applyPreemption(ctx, target.WorkloadInfo.Obj, target.Reason, message)
if err != nil {
errCh.SendErrorWithCancel(err, cancel)
return
}

log.V(3).Info("Preempted", "targetWorkload", klog.KObj(target.Obj), "reason", reason, "message", message)
p.recorder.Eventf(target.Obj, corev1.EventTypeNormal, "Preempted", message)
metrics.ReportEvictedWorkloads(target.ClusterQueue, kueue.WorkloadEvictedByPreemption)
log.V(3).Info("Preempted", "targetWorkload", klog.KObj(target.WorkloadInfo.Obj), "reason", target.Reason, "message", message)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also please add targetClusterQueue?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

@vladikkuzn vladikkuzn Jul 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added in #2538
WDYT should I also add preemptor's cluster queue?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The preemptor's information is already part of the log (it comes from the scheduler.go file, which adds the fields to the contextual log).

p.recorder.Eventf(target.WorkloadInfo.Obj, corev1.EventTypeNormal, "Preempted", message)
metrics.ReportEvictedWorkloads(target.WorkloadInfo.ClusterQueue, kueue.WorkloadEvictedByPreemption)
} else {
log.V(3).Info("Preemption ongoing", "targetWorkload", klog.KObj(target.Obj))
log.V(3).Info("Preemption ongoing", "targetWorkload", klog.KObj(target.WorkloadInfo.Obj))
}
atomic.AddInt64(&successfullyPreempted, 1)
})
Expand All @@ -214,35 +231,45 @@ func (p *Preemptor) applyPreemptionWithSSA(ctx context.Context, w *kueue.Workloa
// Once the Workload fits, the heuristic tries to add Workloads back, in the
// reverse order in which they were removed, while the incoming Workload still
// fits.
func minimalPreemptions(log logr.Logger, wlReq resources.FlavorResourceQuantitiesFlat, cq *cache.ClusterQueueSnapshot, snapshot *cache.Snapshot, resPerFlv resourcesPerFlavor, candidates []*workload.Info, allowBorrowing bool, allowBorrowingBelowPriority *int32) []*workload.Info {
func minimalPreemptions(log logr.Logger, wlReq resources.FlavorResourceQuantitiesFlat, cq *cache.ClusterQueueSnapshot, snapshot *cache.Snapshot, resPerFlv resourcesPerFlavor, candidates []*workload.Info, allowBorrowing bool, allowBorrowingBelowPriority *int32) []*Target {
if logV := log.V(5); logV.Enabled() {
logV.Info("Simulating preemption", "candidates", workload.References(candidates), "resourcesRequiringPreemption", resPerFlv, "allowBorrowing", allowBorrowing, "allowBorrowingBelowPriority", allowBorrowingBelowPriority)
}
// Simulate removing all candidates from the ClusterQueue and cohort.
var targets []*workload.Info
var targets []*Target
fits := false
for _, candWl := range candidates {
candCQ := snapshot.ClusterQueues[candWl.ClusterQueue]
if cq != candCQ && !cqIsBorrowing(candCQ, resPerFlv) {
sameCq := cq == candCQ
reason := InClusterQueueReason
if !sameCq && !cqIsBorrowing(candCQ, resPerFlv) {
continue
}
if cq != candCQ && allowBorrowingBelowPriority != nil && priority.Priority(candWl.Obj) >= *allowBorrowingBelowPriority {
// We set allowBorrowing=false if there is a candidate with priority
// exceeding allowBorrowingBelowPriority added to targets.
//
// We need to be careful mutating allowBorrowing. We rely on the
// fact that once there is a candidate exceeding the priority added
// to targets, then at least one such candidate is present in the
// final set of targets (after the second phase of the function).
//
// This is true, because the candidates are ordered according
// to priorities (from lowest to highest, using candidatesOrdering),
// and the last added target is not removed in the second phase of
// the function.
allowBorrowing = false
if !sameCq {
reason = InCohortReclamationReason
if allowBorrowingBelowPriority != nil && priority.Priority(candWl.Obj) >= *allowBorrowingBelowPriority {
// We set allowBorrowing=false if there is a candidate with priority
// exceeding allowBorrowingBelowPriority added to targets.
//
// We need to be careful mutating allowBorrowing. We rely on the
// fact that once there is a candidate exceeding the priority added
// to targets, then at least one such candidate is present in the
// final set of targets (after the second phase of the function).
//
// This is true, because the candidates are ordered according
// to priorities (from lowest to highest, using candidatesOrdering),
// and the last added target is not removed in the second phase of
// the function.
allowBorrowing = false
} else {
reason = InCohortReclaimWhileBorrowingReason
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a bug here.

If allowBorrowingBelowPriority is nil, then the reason should stay as InCohortReclamationReason.

Copy link
Contributor

@alculquicondor alculquicondor Jul 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it should be more like

if allowBorrowingBelowPriority != nil {
  if priority.Priority(candWl.Obj) >= *allowBorrowingBelowPriority {
    allowBorrowing = false
  } else {
    reason = InCohortReclaimWhileBorrowingReason
  }
}

}
}
snapshot.RemoveWorkload(candWl)
targets = append(targets, candWl)
targets = append(targets, &Target{
WorkloadInfo: candWl,
Reason: reason,
})
if workloadFits(wlReq, cq, allowBorrowing) {
fits = true
break
Expand All @@ -257,24 +284,24 @@ func minimalPreemptions(log logr.Logger, wlReq resources.FlavorResourceQuantitie
return targets
}

func fillBackWorkloads(targets []*workload.Info, wlReq resources.FlavorResourceQuantitiesFlat, cq *cache.ClusterQueueSnapshot, snapshot *cache.Snapshot, allowBorrowing bool) []*workload.Info {
func fillBackWorkloads(targets []*Target, wlReq resources.FlavorResourceQuantitiesFlat, cq *cache.ClusterQueueSnapshot, snapshot *cache.Snapshot, allowBorrowing bool) []*Target {
// In the reverse order, check if any of the workloads can be added back.
for i := len(targets) - 2; i >= 0; i-- {
snapshot.AddWorkload(targets[i])
snapshot.AddWorkload(targets[i].WorkloadInfo)
if workloadFits(wlReq, cq, allowBorrowing) {
// O(1) deletion: copy the last element into index i and reduce size.
targets[i] = targets[len(targets)-1]
targets = targets[:len(targets)-1]
} else {
snapshot.RemoveWorkload(targets[i])
snapshot.RemoveWorkload(targets[i].WorkloadInfo)
}
}
return targets
}

func restoreSnapshot(snapshot *cache.Snapshot, targets []*workload.Info) {
func restoreSnapshot(snapshot *cache.Snapshot, targets []*Target) {
for _, t := range targets {
snapshot.AddWorkload(t)
snapshot.AddWorkload(t.WorkloadInfo)
}
}

Expand Down Expand Up @@ -309,15 +336,15 @@ func parseStrategies(s []config.PreemptionStrategy) []fsStrategy {
return strategies
}

func (p *Preemptor) fairPreemptions(log logr.Logger, wl *workload.Info, assignment flavorassigner.Assignment, snapshot *cache.Snapshot, resPerFlv resourcesPerFlavor, candidates []*workload.Info, allowBorrowingBelowPriority *int32) []*workload.Info {
func (p *Preemptor) fairPreemptions(log logr.Logger, wl *workload.Info, assignment flavorassigner.Assignment, snapshot *cache.Snapshot, resPerFlv resourcesPerFlavor, candidates []*workload.Info, allowBorrowingBelowPriority *int32) []*Target {
if logV := log.V(5); logV.Enabled() {
logV.Info("Simulating fair preemption", "candidates", workload.References(candidates), "resourcesRequiringPreemption", resPerFlv, "allowBorrowingBelowPriority", allowBorrowingBelowPriority)
}
cqHeap := cqHeapFromCandidates(candidates, false, snapshot)
nominatedCQ := snapshot.ClusterQueues[wl.ClusterQueue]
wlReq := assignment.TotalRequestsFor(wl)
newNominatedShareValue, _ := nominatedCQ.DominantResourceShareWith(wlReq)
var targets []*workload.Info
var targets []*Target
fits := false
var retryCandidates []*workload.Info
for cqHeap.Len() > 0 && !fits {
Expand All @@ -326,7 +353,10 @@ func (p *Preemptor) fairPreemptions(log logr.Logger, wl *workload.Info, assignme
if candCQ.cq == nominatedCQ {
candWl := candCQ.workloads[0]
snapshot.RemoveWorkload(candWl)
targets = append(targets, candWl)
targets = append(targets, &Target{
WorkloadInfo: candWl,
Reason: InClusterQueueReason,
})
if workloadFits(wlReq, nominatedCQ, true) {
fits = true
break
Expand All @@ -343,9 +373,18 @@ func (p *Preemptor) fairPreemptions(log logr.Logger, wl *workload.Info, assignme
for i, candWl := range candCQ.workloads {
belowThreshold := allowBorrowingBelowPriority != nil && priority.Priority(candWl.Obj) < *allowBorrowingBelowPriority
newCandShareVal, _ := candCQ.cq.DominantResourceShareWithout(candWl.FlavorResourceUsage())
if belowThreshold || p.fsStrategies[0](newNominatedShareValue, candCQ.share, newCandShareVal) {
strategy := p.fsStrategies[0](newNominatedShareValue, candCQ.share, newCandShareVal)
if belowThreshold || strategy {
snapshot.RemoveWorkload(candWl)
targets = append(targets, candWl)
reason := InCohortFairSharingReason
if !strategy {
reason = InCohortReclaimWhileBorrowingReason
}

targets = append(targets, &Target{
WorkloadInfo: candWl,
Reason: reason,
})
if workloadFits(wlReq, nominatedCQ, true) {
fits = true
break
Expand Down Expand Up @@ -374,7 +413,10 @@ func (p *Preemptor) fairPreemptions(log logr.Logger, wl *workload.Info, assignme
// The criteria doesn't depend on the preempted workload, so just preempt the first candidate.
candWl := candCQ.workloads[0]
snapshot.RemoveWorkload(candWl)
targets = append(targets, candWl)
targets = append(targets, &Target{
WorkloadInfo: candWl,
Reason: InCohortFairSharingReason,
})
if workloadFits(wlReq, nominatedCQ, true) {
fits = true
}
Expand Down
Loading