Skip to content

Commit

Permalink
Distinguish between Preemption due to reclamation and fair sharing
Browse files Browse the repository at this point in the history
* Rename WL -> WorkloadInfo
* Move getWorkloadReferences to logging
  • Loading branch information
vladikkuzn committed Jul 10, 2024
1 parent cad0de8 commit 5977741
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 39 deletions.
13 changes: 12 additions & 1 deletion pkg/scheduler/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func logAdmissionAttemptIfVerbose(log logr.Logger, e *entry) {
}
if log.V(4).Enabled() {
args = append(args, "nominatedAssignment", e.assignment)
args = append(args, "preempted", preemption.GetWorkloadReferences(e.preemptionTargets))
args = append(args, "preempted", getWorkloadReferences(e.preemptionTargets))
}
logV.Info("Workload evaluated for admission", args...)
}
Expand All @@ -47,3 +47,14 @@ func logSnapshotIfVerbose(log logr.Logger, s *cache.Snapshot) {
s.Log(logV)
}
}

func getWorkloadReferences(targets []*preemption.Target) []klog.ObjectRef {
if len(targets) == 0 {
return nil
}
keys := make([]klog.ObjectRef, len(targets))
for i, t := range targets {
keys[i] = klog.KObj(t.WorkloadInfo.Obj)
}
return keys
}
58 changes: 20 additions & 38 deletions pkg/scheduler/preemption/preemption.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,19 +97,8 @@ func candidatesFromCQOrUnderThreshold(candidates []*workload.Info, clusterQueue
}

type Target struct {
Wl *workload.Info
Reason string
}

func GetWorkloadReferences(targets []*Target) []klog.ObjectRef {
if len(targets) == 0 {
return nil
}
keys := make([]klog.ObjectRef, len(targets))
for i, t := range targets {
keys[i] = klog.KObj(t.Wl.Obj)
}
return keys
WorkloadInfo *workload.Info
Reason string
}

// GetTargets returns the list of workloads that should be evicted in order to make room for wl.
Expand Down Expand Up @@ -180,13 +169,6 @@ func canBorrowWithinCohort(cq *cache.ClusterQueueSnapshot, wl *kueue.Workload) (
return true, &threshold
}

const (
// ClusterQueueOrigin indicates that preemption originated from cluster queue
ClusterQueueOrigin = "ClusterQueue"
// CohortOrigin indicates that preemption originated from cohort
CohortOrigin = "Cohort"
)

// In cluster queue preemption reasons
const (
InClusterQueueReason string = "InClusterQueue"
Expand Down Expand Up @@ -215,19 +197,19 @@ func (p *Preemptor) IssuePreemptions(ctx context.Context, preemptor *workload.In
defer cancel()
workqueue.ParallelizeUntil(ctx, parallelPreemptions, len(targets), func(i int) {
target := targets[i]
if !meta.IsStatusConditionTrue(target.Wl.Obj.Status.Conditions, kueue.WorkloadEvicted) {
if !meta.IsStatusConditionTrue(target.WorkloadInfo.Obj.Status.Conditions, kueue.WorkloadEvicted) {
message := fmt.Sprintf("Preempted to accommodate a workload (UID: %s) due to %s", preemptor.Obj.UID, HumanReadablePreemptionReasons[target.Reason])
err := p.applyPreemption(ctx, target.Wl.Obj, target.Reason, message)
err := p.applyPreemption(ctx, target.WorkloadInfo.Obj, target.Reason, message)
if err != nil {
errCh.SendErrorWithCancel(err, cancel)
return
}

log.V(3).Info("Preempted", "targetWorkload", klog.KObj(target.Wl.Obj), "reason", target.Reason, "message", message)
p.recorder.Eventf(target.Wl.Obj, corev1.EventTypeNormal, "Preempted", message)
metrics.ReportEvictedWorkloads(target.Wl.ClusterQueue, kueue.WorkloadEvictedByPreemption)
log.V(3).Info("Preempted", "targetWorkload", klog.KObj(target.WorkloadInfo.Obj), "reason", target.Reason, "message", message)
p.recorder.Eventf(target.WorkloadInfo.Obj, corev1.EventTypeNormal, "Preempted", message)
metrics.ReportEvictedWorkloads(target.WorkloadInfo.ClusterQueue, kueue.WorkloadEvictedByPreemption)
} else {
log.V(3).Info("Preemption ongoing", "targetWorkload", klog.KObj(target.Wl.Obj))
log.V(3).Info("Preemption ongoing", "targetWorkload", klog.KObj(target.WorkloadInfo.Obj))
}
atomic.AddInt64(&successfullyPreempted, 1)
})
Expand Down Expand Up @@ -285,8 +267,8 @@ func minimalPreemptions(log logr.Logger, wlReq resources.FlavorResourceQuantitie
}
snapshot.RemoveWorkload(candWl)
targets = append(targets, &Target{
Wl: candWl,
Reason: reason,
WorkloadInfo: candWl,
Reason: reason,
})
if workloadFits(wlReq, cq, allowBorrowing) {
fits = true
Expand All @@ -305,21 +287,21 @@ func minimalPreemptions(log logr.Logger, wlReq resources.FlavorResourceQuantitie
func fillBackWorkloads(targets []*Target, wlReq resources.FlavorResourceQuantitiesFlat, cq *cache.ClusterQueueSnapshot, snapshot *cache.Snapshot, allowBorrowing bool) []*Target {
// In the reverse order, check if any of the workloads can be added back.
for i := len(targets) - 2; i >= 0; i-- {
snapshot.AddWorkload(targets[i].Wl)
snapshot.AddWorkload(targets[i].WorkloadInfo)
if workloadFits(wlReq, cq, allowBorrowing) {
// O(1) deletion: copy the last element into index i and reduce size.
targets[i] = targets[len(targets)-1]
targets = targets[:len(targets)-1]
} else {
snapshot.RemoveWorkload(targets[i].Wl)
snapshot.RemoveWorkload(targets[i].WorkloadInfo)
}
}
return targets
}

func restoreSnapshot(snapshot *cache.Snapshot, targets []*Target) {
for _, t := range targets {
snapshot.AddWorkload(t.Wl)
snapshot.AddWorkload(t.WorkloadInfo)
}
}

Expand Down Expand Up @@ -372,8 +354,8 @@ func (p *Preemptor) fairPreemptions(log logr.Logger, wl *workload.Info, assignme
candWl := candCQ.workloads[0]
snapshot.RemoveWorkload(candWl)
targets = append(targets, &Target{
Wl: candWl,
Reason: InClusterQueueReason,
WorkloadInfo: candWl,
Reason: InClusterQueueReason,
})
if workloadFits(wlReq, nominatedCQ, true) {
fits = true
Expand All @@ -395,13 +377,13 @@ func (p *Preemptor) fairPreemptions(log logr.Logger, wl *workload.Info, assignme
if belowThreshold || strategy {
snapshot.RemoveWorkload(candWl)
reason := InCohortFairSharingReason
if !strategy && belowThreshold {
if !strategy {
reason = InCohortReclaimWhileBorrowingReason
}

targets = append(targets, &Target{
Wl: candWl,
Reason: reason,
WorkloadInfo: candWl,
Reason: reason,
})
if workloadFits(wlReq, nominatedCQ, true) {
fits = true
Expand Down Expand Up @@ -432,8 +414,8 @@ func (p *Preemptor) fairPreemptions(log logr.Logger, wl *workload.Info, assignme
candWl := candCQ.workloads[0]
snapshot.RemoveWorkload(candWl)
targets = append(targets, &Target{
Wl: candWl,
Reason: InCohortFairSharingReason,
WorkloadInfo: candWl,
Reason: InCohortFairSharingReason,
})
if workloadFits(wlReq, nominatedCQ, true) {
fits = true
Expand Down

0 comments on commit 5977741

Please sign in to comment.