Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cherry-pick #2504 to 0.7 #2510

Merged
merged 1 commit into from
Jul 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/cache/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ func (s *Snapshot) Log(log logr.Logger) {
for name, cq := range s.ClusterQueues {
cohortName := "<none>"
if cq.Cohort != nil {
cohorts[cq.Name] = cq.Cohort
cohortName = cq.Cohort.Name
cohorts[cohortName] = cq.Cohort
}

log.Info("Found ClusterQueue",
Expand Down
4 changes: 3 additions & 1 deletion pkg/scheduler/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"k8s.io/klog/v2"

"sigs.k8s.io/kueue/pkg/cache"
"sigs.k8s.io/kueue/pkg/workload"
)

func logAdmissionAttemptIfVerbose(log logr.Logger, e *entry) {
Expand All @@ -34,8 +35,9 @@ func logAdmissionAttemptIfVerbose(log logr.Logger, e *entry) {
"status", e.status,
"reason", e.inadmissibleMsg,
}
if log.V(6).Enabled() {
if log.V(4).Enabled() {
args = append(args, "nominatedAssignment", e.assignment)
args = append(args, "preempted", workload.References(e.preemptionTargets))
}
logV.Info("Workload evaluated for admission", args...)
}
Expand Down
23 changes: 15 additions & 8 deletions pkg/scheduler/preemption/preemption.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"sync/atomic"
"time"

"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -95,7 +96,7 @@ func candidatesFromCQOrUnderThreshold(candidates []*workload.Info, clusterQueue
}

// GetTargets returns the list of workloads that should be evicted in order to make room for wl.
func (p *Preemptor) GetTargets(wl workload.Info, assignment flavorassigner.Assignment, snapshot *cache.Snapshot) []*workload.Info {
func (p *Preemptor) GetTargets(log logr.Logger, wl workload.Info, assignment flavorassigner.Assignment, snapshot *cache.Snapshot) []*workload.Info {
resPerFlv := resourcesRequiringPreemption(assignment)
cq := snapshot.ClusterQueues[wl.ClusterQueue]

Expand All @@ -116,12 +117,12 @@ func (p *Preemptor) GetTargets(wl workload.Info, assignment flavorassigner.Assig
if len(sameQueueCandidates) == len(candidates) {
// There is no possible preemption of workloads from other queues,
// so we'll try borrowing.
return minimalPreemptions(wlReq, cq, snapshot, resPerFlv, candidates, true, nil)
return minimalPreemptions(log, wlReq, cq, snapshot, resPerFlv, candidates, true, nil)
}

borrowWithinCohort, thresholdPrio := canBorrowWithinCohort(cq, wl.Obj)
if p.enableFairSharing {
return p.fairPreemptions(&wl, assignment, snapshot, resPerFlv, candidates, thresholdPrio)
return p.fairPreemptions(log, &wl, assignment, snapshot, resPerFlv, candidates, thresholdPrio)
}
// There is a potential of preemption of workloads from the other queue in the
// cohort. We proceed with borrowing only if the dedicated policy
Expand All @@ -133,20 +134,20 @@ func (p *Preemptor) GetTargets(wl workload.Info, assignment flavorassigner.Assig
// It can only preempt workloads from another CQ if they are strictly under allowBorrowingBelowPriority.
candidates = candidatesFromCQOrUnderThreshold(candidates, wl.ClusterQueue, *thresholdPrio)
}
return minimalPreemptions(wlReq, cq, snapshot, resPerFlv, candidates, true, thresholdPrio)
return minimalPreemptions(log, wlReq, cq, snapshot, resPerFlv, candidates, true, thresholdPrio)
}

// Only try preemptions in the cohort, without borrowing, if the target clusterqueue is still
// under nominal quota for all resources.
if queueUnderNominalInAllRequestedResources(wlReq, cq) {
if targets := minimalPreemptions(wlReq, cq, snapshot, resPerFlv, candidates, false, nil); len(targets) > 0 {
if targets := minimalPreemptions(log, wlReq, cq, snapshot, resPerFlv, candidates, false, nil); len(targets) > 0 {
return targets
}
}

// Final attempt. This time only candidates from the same queue, but
// with borrowing.
return minimalPreemptions(wlReq, cq, snapshot, resPerFlv, sameQueueCandidates, true, nil)
return minimalPreemptions(log, wlReq, cq, snapshot, resPerFlv, sameQueueCandidates, true, nil)
}

// canBorrowWithinCohort returns whether the behavior is enabled for the ClusterQueue and the threshold priority to use.
Expand Down Expand Up @@ -212,7 +213,10 @@ func (p *Preemptor) applyPreemptionWithSSA(ctx context.Context, w *kueue.Workloa
// Once the Workload fits, the heuristic tries to add Workloads back, in the
// reverse order in which they were removed, while the incoming Workload still
// fits.
func minimalPreemptions(wlReq cache.FlavorResourceQuantities, cq *cache.ClusterQueue, snapshot *cache.Snapshot, resPerFlv resourcesPerFlavor, candidates []*workload.Info, allowBorrowing bool, allowBorrowingBelowPriority *int32) []*workload.Info {
func minimalPreemptions(log logr.Logger, wlReq cache.FlavorResourceQuantities, cq *cache.ClusterQueue, snapshot *cache.Snapshot, resPerFlv resourcesPerFlavor, candidates []*workload.Info, allowBorrowing bool, allowBorrowingBelowPriority *int32) []*workload.Info {
if logV := log.V(5); logV.Enabled() {
logV.Info("Simulating preemption", "candidates", workload.References(candidates), "resourcesRequiringPreemption", resPerFlv, "allowBorrowing", allowBorrowing, "allowBorrowingBelowPriority", allowBorrowingBelowPriority)
}
// Simulate removing all candidates from the ClusterQueue and cohort.
var targets []*workload.Info
fits := false
Expand Down Expand Up @@ -304,7 +308,10 @@ func parseStrategies(s []config.PreemptionStrategy) []fsStrategy {
return strategies
}

func (p *Preemptor) fairPreemptions(wl *workload.Info, assignment flavorassigner.Assignment, snapshot *cache.Snapshot, resPerFlv resourcesPerFlavor, candidates []*workload.Info, allowBorrowingBelowPriority *int32) []*workload.Info {
func (p *Preemptor) fairPreemptions(log logr.Logger, wl *workload.Info, assignment flavorassigner.Assignment, snapshot *cache.Snapshot, resPerFlv resourcesPerFlavor, candidates []*workload.Info, allowBorrowingBelowPriority *int32) []*workload.Info {
if logV := log.V(5); logV.Enabled() {
logV.Info("Simulating fair preemption", "candidates", workload.References(candidates), "resourcesRequiringPreemption", resPerFlv, "allowBorrowingBelowPriority", allowBorrowingBelowPriority)
}
cqHeap := cqHeapFromCandidates(candidates, false, snapshot)
nominatedCQ := snapshot.ClusterQueues[wl.ClusterQueue]
wlReq := assignment.TotalRequestsFor(wl)
Expand Down
8 changes: 4 additions & 4 deletions pkg/scheduler/preemption/preemption_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1384,7 +1384,7 @@ func TestPreemption(t *testing.T) {
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
defer features.SetFeatureGateDuringTest(t, features.LendingLimit, tc.enableLendingLimit)()
ctx, _ := utiltesting.ContextWithLog(t)
ctx, log := utiltesting.ContextWithLog(t)
cl := utiltesting.NewClientBuilder().
WithLists(&kueue.WorkloadList{Items: tc.admitted}).
Build()
Expand Down Expand Up @@ -1418,7 +1418,7 @@ func TestPreemption(t *testing.T) {
wlInfo := workload.NewInfo(tc.incoming)
wlInfo.ClusterQueue = tc.targetCQ
targetClusterQueue := snapshot.ClusterQueues[wlInfo.ClusterQueue]
targets := preemptor.GetTargets(*wlInfo, tc.assignment, &snapshot)
targets := preemptor.GetTargets(log, *wlInfo, tc.assignment, &snapshot)
preempted, err := preemptor.IssuePreemptions(ctx, wlInfo, targets, targetClusterQueue)
if err != nil {
t.Fatalf("Failed doing preemption")
Expand Down Expand Up @@ -1875,7 +1875,7 @@ func TestFairPreemptions(t *testing.T) {
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
ctx, _ := utiltesting.ContextWithLog(t)
ctx, log := utiltesting.ContextWithLog(t)
// Set name as UID so that candidates sorting is predictable.
for i := range tc.admitted {
tc.admitted[i].UID = types.UID(tc.admitted[i].Name)
Expand Down Expand Up @@ -1904,7 +1904,7 @@ func TestFairPreemptions(t *testing.T) {
snapshot := cqCache.Snapshot()
wlInfo := workload.NewInfo(tc.incoming)
wlInfo.ClusterQueue = tc.targetCQ
targets := preemptor.GetTargets(*wlInfo, singlePodSetAssignment(
targets := preemptor.GetTargets(log, *wlInfo, singlePodSetAssignment(
flavorassigner.ResourceAssignment{
corev1.ResourceCPU: &flavorassigner.FlavorAssignment{
Name: "default", Mode: flavorassigner.Preempt,
Expand Down
13 changes: 9 additions & 4 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ type Scheduler struct {
workloadOrdering workload.Ordering
fairSharing config.FairSharing

// attemptCount identifies the number of scheduling attempt in logs, from the last restart.
attemptCount int64

// Stubs.
applyAdmission func(context.Context, *kueue.Workload) error
}
Expand Down Expand Up @@ -182,7 +185,9 @@ func (cu *cohortsUsage) hasCommonFlavorResources(cohort string, assignment cache
}

func (s *Scheduler) schedule(ctx context.Context) wait.SpeedSignal {
log := ctrl.LoggerFrom(ctx)
s.attemptCount++
log := ctrl.LoggerFrom(ctx).WithValues("attemptCount", s.attemptCount)
ctx = ctrl.LoggerInto(ctx, log)

// 1. Get the heads from the queues, including their desired clusterQueue.
// This operation blocks while the queues are empty.
Expand Down Expand Up @@ -418,7 +423,7 @@ func (s *Scheduler) getAssignments(log logr.Logger, wl *workload.Info, snap *cac
}

if arm == flavorassigner.Preempt {
faPreemtionTargets = s.preemptor.GetTargets(*wl, fullAssignment, snap)
faPreemtionTargets = s.preemptor.GetTargets(log, *wl, fullAssignment, snap)
}

// if the feature gate is not enabled or we can preempt
Expand All @@ -432,7 +437,7 @@ func (s *Scheduler) getAssignments(log logr.Logger, wl *workload.Info, snap *cac
if assignment.RepresentativeMode() == flavorassigner.Fit {
return &partialAssignment{assignment: assignment}, true
}
preemptionTargets := s.preemptor.GetTargets(*wl, assignment, snap)
preemptionTargets := s.preemptor.GetTargets(log, *wl, assignment, snap)
if len(preemptionTargets) > 0 {
return &partialAssignment{assignment: assignment, preemptionTargets: preemptionTargets}, true
}
Expand Down Expand Up @@ -616,7 +621,7 @@ func (s *Scheduler) requeueAndUpdate(log logr.Logger, ctx context.Context, e ent
e.requeueReason = queue.RequeueReasonFailedAfterNomination
}
added := s.queues.RequeueWorkload(ctx, &e.Info, e.requeueReason)
log.V(2).Info("Workload re-queued", "workload", klog.KObj(e.Obj), "clusterQueue", klog.KRef("", e.ClusterQueue), "queue", klog.KRef(e.Obj.Namespace, e.Obj.Spec.QueueName), "requeueReason", e.requeueReason, "added", added)
log.V(2).Info("Workload re-queued", "workload", klog.KObj(e.Obj), "clusterQueue", klog.KRef("", e.ClusterQueue), "queue", klog.KRef(e.Obj.Namespace, e.Obj.Spec.QueueName), "requeueReason", e.requeueReason, "added", added, "status", e.status)

if e.status == notNominated || e.status == skipped {
if workload.UnsetQuotaReservationWithCondition(e.Obj, "Pending", e.inadmissibleMsg) {
Expand Down
11 changes: 11 additions & 0 deletions pkg/workload/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -682,3 +682,14 @@ func AdmissionChecksForWorkload(log logr.Logger, wl *kueue.Workload, admissionCh
}
return acNames
}

func References(wls []*Info) []klog.ObjectRef {
if len(wls) == 0 {
return nil
}
keys := make([]klog.ObjectRef, len(wls))
for i, wl := range wls {
keys[i] = klog.KObj(wl.Obj)
}
return keys
}