Skip to content

Commit

Permalink
cherry-pick #2504 to 0.7 (#2510)
Browse files Browse the repository at this point in the history
Co-authored-by: Aldo Culquicondor <1299064+alculquicondor@users.noreply.github.com>
  • Loading branch information
gabesaba and alculquicondor authored Jul 1, 2024
1 parent 9dd29fc commit 667df7b
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 18 deletions.
2 changes: 1 addition & 1 deletion pkg/cache/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ func (s *Snapshot) Log(log logr.Logger) {
for name, cq := range s.ClusterQueues {
cohortName := "<none>"
if cq.Cohort != nil {
cohorts[cq.Name] = cq.Cohort
cohortName = cq.Cohort.Name
cohorts[cohortName] = cq.Cohort
}

log.Info("Found ClusterQueue",
Expand Down
4 changes: 3 additions & 1 deletion pkg/scheduler/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"k8s.io/klog/v2"

"sigs.k8s.io/kueue/pkg/cache"
"sigs.k8s.io/kueue/pkg/workload"
)

func logAdmissionAttemptIfVerbose(log logr.Logger, e *entry) {
Expand All @@ -34,8 +35,9 @@ func logAdmissionAttemptIfVerbose(log logr.Logger, e *entry) {
"status", e.status,
"reason", e.inadmissibleMsg,
}
if log.V(6).Enabled() {
if log.V(4).Enabled() {
args = append(args, "nominatedAssignment", e.assignment)
args = append(args, "preempted", workload.References(e.preemptionTargets))
}
logV.Info("Workload evaluated for admission", args...)
}
Expand Down
23 changes: 15 additions & 8 deletions pkg/scheduler/preemption/preemption.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"sync/atomic"
"time"

"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -95,7 +96,7 @@ func candidatesFromCQOrUnderThreshold(candidates []*workload.Info, clusterQueue
}

// GetTargets returns the list of workloads that should be evicted in order to make room for wl.
func (p *Preemptor) GetTargets(wl workload.Info, assignment flavorassigner.Assignment, snapshot *cache.Snapshot) []*workload.Info {
func (p *Preemptor) GetTargets(log logr.Logger, wl workload.Info, assignment flavorassigner.Assignment, snapshot *cache.Snapshot) []*workload.Info {
resPerFlv := resourcesRequiringPreemption(assignment)
cq := snapshot.ClusterQueues[wl.ClusterQueue]

Expand All @@ -116,12 +117,12 @@ func (p *Preemptor) GetTargets(wl workload.Info, assignment flavorassigner.Assig
if len(sameQueueCandidates) == len(candidates) {
// There is no possible preemption of workloads from other queues,
// so we'll try borrowing.
return minimalPreemptions(wlReq, cq, snapshot, resPerFlv, candidates, true, nil)
return minimalPreemptions(log, wlReq, cq, snapshot, resPerFlv, candidates, true, nil)
}

borrowWithinCohort, thresholdPrio := canBorrowWithinCohort(cq, wl.Obj)
if p.enableFairSharing {
return p.fairPreemptions(&wl, assignment, snapshot, resPerFlv, candidates, thresholdPrio)
return p.fairPreemptions(log, &wl, assignment, snapshot, resPerFlv, candidates, thresholdPrio)
}
// There is a potential of preemption of workloads from the other queue in the
// cohort. We proceed with borrowing only if the dedicated policy
Expand All @@ -133,20 +134,20 @@ func (p *Preemptor) GetTargets(wl workload.Info, assignment flavorassigner.Assig
// It can only preempt workloads from another CQ if they are strictly under allowBorrowingBelowPriority.
candidates = candidatesFromCQOrUnderThreshold(candidates, wl.ClusterQueue, *thresholdPrio)
}
return minimalPreemptions(wlReq, cq, snapshot, resPerFlv, candidates, true, thresholdPrio)
return minimalPreemptions(log, wlReq, cq, snapshot, resPerFlv, candidates, true, thresholdPrio)
}

// Only try preemptions in the cohort, without borrowing, if the target clusterqueue is still
// under nominal quota for all resources.
if queueUnderNominalInAllRequestedResources(wlReq, cq) {
if targets := minimalPreemptions(wlReq, cq, snapshot, resPerFlv, candidates, false, nil); len(targets) > 0 {
if targets := minimalPreemptions(log, wlReq, cq, snapshot, resPerFlv, candidates, false, nil); len(targets) > 0 {
return targets
}
}

// Final attempt. This time only candidates from the same queue, but
// with borrowing.
return minimalPreemptions(wlReq, cq, snapshot, resPerFlv, sameQueueCandidates, true, nil)
return minimalPreemptions(log, wlReq, cq, snapshot, resPerFlv, sameQueueCandidates, true, nil)
}

// canBorrowWithinCohort returns whether the behavior is enabled for the ClusterQueue and the threshold priority to use.
Expand Down Expand Up @@ -212,7 +213,10 @@ func (p *Preemptor) applyPreemptionWithSSA(ctx context.Context, w *kueue.Workloa
// Once the Workload fits, the heuristic tries to add Workloads back, in the
// reverse order in which they were removed, while the incoming Workload still
// fits.
func minimalPreemptions(wlReq cache.FlavorResourceQuantities, cq *cache.ClusterQueue, snapshot *cache.Snapshot, resPerFlv resourcesPerFlavor, candidates []*workload.Info, allowBorrowing bool, allowBorrowingBelowPriority *int32) []*workload.Info {
func minimalPreemptions(log logr.Logger, wlReq cache.FlavorResourceQuantities, cq *cache.ClusterQueue, snapshot *cache.Snapshot, resPerFlv resourcesPerFlavor, candidates []*workload.Info, allowBorrowing bool, allowBorrowingBelowPriority *int32) []*workload.Info {
if logV := log.V(5); logV.Enabled() {
logV.Info("Simulating preemption", "candidates", workload.References(candidates), "resourcesRequiringPreemption", resPerFlv, "allowBorrowing", allowBorrowing, "allowBorrowingBelowPriority", allowBorrowingBelowPriority)
}
// Simulate removing all candidates from the ClusterQueue and cohort.
var targets []*workload.Info
fits := false
Expand Down Expand Up @@ -304,7 +308,10 @@ func parseStrategies(s []config.PreemptionStrategy) []fsStrategy {
return strategies
}

func (p *Preemptor) fairPreemptions(wl *workload.Info, assignment flavorassigner.Assignment, snapshot *cache.Snapshot, resPerFlv resourcesPerFlavor, candidates []*workload.Info, allowBorrowingBelowPriority *int32) []*workload.Info {
func (p *Preemptor) fairPreemptions(log logr.Logger, wl *workload.Info, assignment flavorassigner.Assignment, snapshot *cache.Snapshot, resPerFlv resourcesPerFlavor, candidates []*workload.Info, allowBorrowingBelowPriority *int32) []*workload.Info {
if logV := log.V(5); logV.Enabled() {
logV.Info("Simulating fair preemption", "candidates", workload.References(candidates), "resourcesRequiringPreemption", resPerFlv, "allowBorrowingBelowPriority", allowBorrowingBelowPriority)
}
cqHeap := cqHeapFromCandidates(candidates, false, snapshot)
nominatedCQ := snapshot.ClusterQueues[wl.ClusterQueue]
wlReq := assignment.TotalRequestsFor(wl)
Expand Down
8 changes: 4 additions & 4 deletions pkg/scheduler/preemption/preemption_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1384,7 +1384,7 @@ func TestPreemption(t *testing.T) {
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
defer features.SetFeatureGateDuringTest(t, features.LendingLimit, tc.enableLendingLimit)()
ctx, _ := utiltesting.ContextWithLog(t)
ctx, log := utiltesting.ContextWithLog(t)
cl := utiltesting.NewClientBuilder().
WithLists(&kueue.WorkloadList{Items: tc.admitted}).
Build()
Expand Down Expand Up @@ -1418,7 +1418,7 @@ func TestPreemption(t *testing.T) {
wlInfo := workload.NewInfo(tc.incoming)
wlInfo.ClusterQueue = tc.targetCQ
targetClusterQueue := snapshot.ClusterQueues[wlInfo.ClusterQueue]
targets := preemptor.GetTargets(*wlInfo, tc.assignment, &snapshot)
targets := preemptor.GetTargets(log, *wlInfo, tc.assignment, &snapshot)
preempted, err := preemptor.IssuePreemptions(ctx, wlInfo, targets, targetClusterQueue)
if err != nil {
t.Fatalf("Failed doing preemption")
Expand Down Expand Up @@ -1875,7 +1875,7 @@ func TestFairPreemptions(t *testing.T) {
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
ctx, _ := utiltesting.ContextWithLog(t)
ctx, log := utiltesting.ContextWithLog(t)
// Set name as UID so that candidates sorting is predictable.
for i := range tc.admitted {
tc.admitted[i].UID = types.UID(tc.admitted[i].Name)
Expand Down Expand Up @@ -1904,7 +1904,7 @@ func TestFairPreemptions(t *testing.T) {
snapshot := cqCache.Snapshot()
wlInfo := workload.NewInfo(tc.incoming)
wlInfo.ClusterQueue = tc.targetCQ
targets := preemptor.GetTargets(*wlInfo, singlePodSetAssignment(
targets := preemptor.GetTargets(log, *wlInfo, singlePodSetAssignment(
flavorassigner.ResourceAssignment{
corev1.ResourceCPU: &flavorassigner.FlavorAssignment{
Name: "default", Mode: flavorassigner.Preempt,
Expand Down
13 changes: 9 additions & 4 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ type Scheduler struct {
workloadOrdering workload.Ordering
fairSharing config.FairSharing

// attemptCount identifies the number of scheduling attempt in logs, from the last restart.
attemptCount int64

// Stubs.
applyAdmission func(context.Context, *kueue.Workload) error
}
Expand Down Expand Up @@ -182,7 +185,9 @@ func (cu *cohortsUsage) hasCommonFlavorResources(cohort string, assignment cache
}

func (s *Scheduler) schedule(ctx context.Context) wait.SpeedSignal {
log := ctrl.LoggerFrom(ctx)
s.attemptCount++
log := ctrl.LoggerFrom(ctx).WithValues("attemptCount", s.attemptCount)
ctx = ctrl.LoggerInto(ctx, log)

// 1. Get the heads from the queues, including their desired clusterQueue.
// This operation blocks while the queues are empty.
Expand Down Expand Up @@ -418,7 +423,7 @@ func (s *Scheduler) getAssignments(log logr.Logger, wl *workload.Info, snap *cac
}

if arm == flavorassigner.Preempt {
faPreemtionTargets = s.preemptor.GetTargets(*wl, fullAssignment, snap)
faPreemtionTargets = s.preemptor.GetTargets(log, *wl, fullAssignment, snap)
}

// if the feature gate is not enabled or we can preempt
Expand All @@ -432,7 +437,7 @@ func (s *Scheduler) getAssignments(log logr.Logger, wl *workload.Info, snap *cac
if assignment.RepresentativeMode() == flavorassigner.Fit {
return &partialAssignment{assignment: assignment}, true
}
preemptionTargets := s.preemptor.GetTargets(*wl, assignment, snap)
preemptionTargets := s.preemptor.GetTargets(log, *wl, assignment, snap)
if len(preemptionTargets) > 0 {
return &partialAssignment{assignment: assignment, preemptionTargets: preemptionTargets}, true
}
Expand Down Expand Up @@ -616,7 +621,7 @@ func (s *Scheduler) requeueAndUpdate(log logr.Logger, ctx context.Context, e ent
e.requeueReason = queue.RequeueReasonFailedAfterNomination
}
added := s.queues.RequeueWorkload(ctx, &e.Info, e.requeueReason)
log.V(2).Info("Workload re-queued", "workload", klog.KObj(e.Obj), "clusterQueue", klog.KRef("", e.ClusterQueue), "queue", klog.KRef(e.Obj.Namespace, e.Obj.Spec.QueueName), "requeueReason", e.requeueReason, "added", added)
log.V(2).Info("Workload re-queued", "workload", klog.KObj(e.Obj), "clusterQueue", klog.KRef("", e.ClusterQueue), "queue", klog.KRef(e.Obj.Namespace, e.Obj.Spec.QueueName), "requeueReason", e.requeueReason, "added", added, "status", e.status)

if e.status == notNominated || e.status == skipped {
if workload.UnsetQuotaReservationWithCondition(e.Obj, "Pending", e.inadmissibleMsg) {
Expand Down
11 changes: 11 additions & 0 deletions pkg/workload/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -682,3 +682,14 @@ func AdmissionChecksForWorkload(log logr.Logger, wl *kueue.Workload, admissionCh
}
return acNames
}

func References(wls []*Info) []klog.ObjectRef {
if len(wls) == 0 {
return nil
}
keys := make([]klog.ObjectRef, len(wls))
for i, wl := range wls {
keys[i] = klog.KObj(wl.Obj)
}
return keys
}

0 comments on commit 667df7b

Please sign in to comment.