Skip to content

Commit

Permalink
Distinguish between Preemption due to reclamation and fair sharing
Browse files Browse the repository at this point in the history
* Dedicated GetWorkloadReferences function
* Extract humanReadablePreemptionReasons
* !strategy && belowThreshold -> InCohortReclaimWhileBorrowingReason
* ExpectPreemptedCondition test util
  • Loading branch information
vladikkuzn committed Jul 9, 2024
1 parent 50768c4 commit 2f68cca
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 79 deletions.
6 changes: 1 addition & 5 deletions pkg/scheduler/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ import (

"sigs.k8s.io/kueue/pkg/cache"
"sigs.k8s.io/kueue/pkg/scheduler/preemption"
"sigs.k8s.io/kueue/pkg/util/slices"
"sigs.k8s.io/kueue/pkg/workload"
)

func logAdmissionAttemptIfVerbose(log logr.Logger, e *entry) {
Expand All @@ -39,9 +37,7 @@ func logAdmissionAttemptIfVerbose(log logr.Logger, e *entry) {
}
if log.V(4).Enabled() {
args = append(args, "nominatedAssignment", e.assignment)
args = append(args, "preempted", workload.References(
slices.Map(e.preemptionTargets, func(p **preemption.Target) *workload.Info { return (*p).Wl }),
))
args = append(args, "preempted", preemption.GetWorkloadReferences(e.preemptionTargets))
}
logV.Info("Workload evaluated for admission", args...)
}
Expand Down
56 changes: 38 additions & 18 deletions pkg/scheduler/preemption/preemption.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,17 @@ func candidatesFromCQOrUnderThreshold(candidates []*workload.Info, clusterQueue
type Target struct {
Wl *workload.Info
Reason string
Scope string // ClusterQueue/Cohort
}

func GetWorkloadReferences(targets []*Target) []klog.ObjectRef {
if len(targets) == 0 {
return nil
}
keys := make([]klog.ObjectRef, len(targets))
for i, t := range targets {
keys[i] = klog.KObj(t.Wl.Obj)
}
return keys
}

// GetTargets returns the list of workloads that should be evicted in order to make room for wl.
Expand Down Expand Up @@ -175,14 +185,27 @@ const (
ClusterQueueOrigin = "ClusterQueue"
// CohortOrigin indicates that preemption originated from cohort
CohortOrigin = "Cohort"
)

InClusterQueueReason = "InClusterQueue"
// In cluster queue preemption reasons
const (
InClusterQueueReason string = "InClusterQueue"
)

InCohortReclamationReason = "InCohortReclamation"
InCohortFairSharingReason = "InCohortFairSharing"
InCohortReclaimWhileBorrowingReason = "InCohortReclaimWhileBorrowing"
// In cohort preemption reasons
const (
InCohortReclamationReason string = "InCohortReclamation"
InCohortFairSharingReason string = "InCohortFairSharing"
InCohortReclaimWhileBorrowingReason string = "InCohortReclaimWhileBorrowing"
)

var HumanReadablePreemptionReasons = map[string]string{
InClusterQueueReason: "prioritization in the ClusterQueue",
InCohortReclamationReason: "reclamation within the cohort",
InCohortFairSharingReason: "fair sharing within the cohort",
InCohortReclaimWhileBorrowingReason: "reclamation within the cohort while borrowing",
}

// IssuePreemptions marks the target workloads as evicted.
func (p *Preemptor) IssuePreemptions(ctx context.Context, preemptor *workload.Info, targets []*workload.Info, cq *cache.ClusterQueueSnapshot) (int, error) {
log := ctrl.LoggerFrom(ctx)
Expand All @@ -193,7 +216,7 @@ func (p *Preemptor) IssuePreemptions(ctx context.Context, preemptor *workload.In
workqueue.ParallelizeUntil(ctx, parallelPreemptions, len(targets), func(i int) {
target := targets[i]
if !meta.IsStatusConditionTrue(target.Wl.Obj.Status.Conditions, kueue.WorkloadEvicted) {
message := fmt.Sprintf("Preempted to accommodate a workload (UID: %s) in the %s", preemptor.Obj.UID, target.Scope)
message := fmt.Sprintf("Preempted to accommodate a workload (UID: %s) due to %s", preemptor.Obj.UID, HumanReadablePreemptionReasons[target.Reason])
err := p.applyPreemption(ctx, target.Wl.Obj, target.Reason, message)
if err != nil {
errCh.SendErrorWithCancel(err, cancel)
Expand Down Expand Up @@ -236,13 +259,12 @@ func minimalPreemptions(log logr.Logger, wlReq resources.FlavorResourceQuantitie
for _, candWl := range candidates {
candCQ := snapshot.ClusterQueues[candWl.ClusterQueue]
sameCq := cq == candCQ
scope := ClusterQueueOrigin
reason := InClusterQueueReason
if !sameCq && !cqIsBorrowing(candCQ, resPerFlv) {
continue
}
if !sameCq {
scope = CohortOrigin
reason = InCohortReclamationReason
if allowBorrowingBelowPriority != nil && priority.Priority(candWl.Obj) >= *allowBorrowingBelowPriority {
// We set allowBorrowing=false if there is a candidate with priority
// exceeding allowBorrowingBelowPriority added to targets.
Expand All @@ -262,14 +284,9 @@ func minimalPreemptions(log logr.Logger, wlReq resources.FlavorResourceQuantitie
}
}
snapshot.RemoveWorkload(candWl)
if !sameCq && !allowBorrowing {
scope = CohortOrigin
reason = InCohortReclamationReason
}
targets = append(targets, &Target{
Wl: candWl,
Reason: reason,
Scope: scope,
})
if workloadFits(wlReq, cq, allowBorrowing) {
fits = true
Expand Down Expand Up @@ -357,7 +374,6 @@ func (p *Preemptor) fairPreemptions(log logr.Logger, wl *workload.Info, assignme
targets = append(targets, &Target{
Wl: candWl,
Reason: InClusterQueueReason,
Scope: ClusterQueueOrigin,
})
if workloadFits(wlReq, nominatedCQ, true) {
fits = true
Expand All @@ -375,12 +391,17 @@ func (p *Preemptor) fairPreemptions(log logr.Logger, wl *workload.Info, assignme
for i, candWl := range candCQ.workloads {
belowThreshold := allowBorrowingBelowPriority != nil && priority.Priority(candWl.Obj) < *allowBorrowingBelowPriority
newCandShareVal, _ := candCQ.cq.DominantResourceShareWithout(candWl.FlavorResourceUsage())
if belowThreshold || p.fsStrategies[0](newNominatedShareValue, candCQ.share, newCandShareVal) {
strategy := p.fsStrategies[0](newNominatedShareValue, candCQ.share, newCandShareVal)
if belowThreshold || strategy {
snapshot.RemoveWorkload(candWl)
reason := InCohortFairSharingReason
if !strategy && belowThreshold {
reason = InCohortReclaimWhileBorrowingReason
}

targets = append(targets, &Target{
Wl: candWl,
Reason: InCohortFairSharingReason,
Scope: CohortOrigin,
Reason: reason,
})
if workloadFits(wlReq, nominatedCQ, true) {
fits = true
Expand Down Expand Up @@ -413,7 +434,6 @@ func (p *Preemptor) fairPreemptions(log logr.Logger, wl *workload.Info, assignme
targets = append(targets, &Target{
Wl: candWl,
Reason: InCohortFairSharingReason,
Scope: CohortOrigin,
})
if workloadFits(wlReq, nominatedCQ, true) {
fits = true
Expand Down
54 changes: 27 additions & 27 deletions pkg/scheduler/preemption/preemption_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1552,7 +1552,7 @@ func TestFairPreemptions(t *testing.T) {
},
incoming: unitWl.Clone().Name("c_incoming").Obj(),
targetCQ: "c",
wantPreempted: sets.New(targetKeyReasonScope("/b1", InCohortFairSharingReason, CohortOrigin)),
wantPreempted: sets.New(targetKeyReasonScope("/b1", InCohortFairSharingReason)),
},
"can reclaim from queue using less, if taking the latest workload from user using the most isn't enough": {
clusterQueues: baseCQs,
Expand All @@ -1564,7 +1564,7 @@ func TestFairPreemptions(t *testing.T) {
},
incoming: utiltesting.MakeWorkload("c_incoming", "").Request(corev1.ResourceCPU, "3").SimpleReserveQuota("a", "default", now).Obj(),
targetCQ: "c",
wantPreempted: sets.New(targetKeyReasonScope("/a1", InCohortFairSharingReason, CohortOrigin)), // attempts to preempt b1, but it's not enough.
wantPreempted: sets.New(targetKeyReasonScope("/a1", InCohortFairSharingReason)), // attempts to preempt b1, but it's not enough.
},
"reclaim borrowable quota from user using the most": {
clusterQueues: baseCQs,
Expand All @@ -1581,7 +1581,7 @@ func TestFairPreemptions(t *testing.T) {
},
incoming: unitWl.Clone().Name("a_incoming").Obj(),
targetCQ: "a",
wantPreempted: sets.New(targetKeyReasonScope("/b1", InCohortFairSharingReason, CohortOrigin)),
wantPreempted: sets.New(targetKeyReasonScope("/b1", InCohortFairSharingReason)),
},
"preempt one from each CQ borrowing": {
clusterQueues: baseCQs,
Expand All @@ -1596,8 +1596,8 @@ func TestFairPreemptions(t *testing.T) {
incoming: utiltesting.MakeWorkload("c_incoming", "").Request(corev1.ResourceCPU, "2").Obj(),
targetCQ: "c",
wantPreempted: sets.New(
targetKeyReasonScope("/a1", InCohortFairSharingReason, CohortOrigin),
targetKeyReasonScope("/b1", InCohortFairSharingReason, CohortOrigin),
targetKeyReasonScope("/a1", InCohortFairSharingReason),
targetKeyReasonScope("/b1", InCohortFairSharingReason),
),
},
"can't preempt when everyone under nominal": {
Expand Down Expand Up @@ -1647,8 +1647,8 @@ func TestFairPreemptions(t *testing.T) {
incoming: utiltesting.MakeWorkload("a_incoming", "").Request(corev1.ResourceCPU, "2").Obj(),
targetCQ: "a",
wantPreempted: sets.New(
targetKeyReasonScope("/a1_low", InClusterQueueReason, ClusterQueueOrigin),
targetKeyReasonScope("/a2_low", InClusterQueueReason, ClusterQueueOrigin),
targetKeyReasonScope("/a1_low", InClusterQueueReason),
targetKeyReasonScope("/a2_low", InClusterQueueReason),
),
},
"can preempt a combination of same CQ and highest user": {
Expand All @@ -1667,8 +1667,8 @@ func TestFairPreemptions(t *testing.T) {
incoming: utiltesting.MakeWorkload("a_incoming", "").Request(corev1.ResourceCPU, "2").Obj(),
targetCQ: "a",
wantPreempted: sets.New(
targetKeyReasonScope("/a_low", InClusterQueueReason, ClusterQueueOrigin),
targetKeyReasonScope("/b1", InCohortFairSharingReason, CohortOrigin),
targetKeyReasonScope("/a_low", InClusterQueueReason),
targetKeyReasonScope("/b1", InCohortFairSharingReason),
),
},
"preempt huge workload if there is no other option, as long as the target CQ gets a lower share": {
Expand All @@ -1678,7 +1678,7 @@ func TestFairPreemptions(t *testing.T) {
},
incoming: utiltesting.MakeWorkload("a_incoming", "").Request(corev1.ResourceCPU, "2").Obj(),
targetCQ: "a",
wantPreempted: sets.New(targetKeyReasonScope("/b1", InCohortFairSharingReason, CohortOrigin)),
wantPreempted: sets.New(targetKeyReasonScope("/b1", InCohortFairSharingReason)),
},
"can't preempt huge workload if the incoming is also huge": {
clusterQueues: baseCQs,
Expand Down Expand Up @@ -1710,8 +1710,8 @@ func TestFairPreemptions(t *testing.T) {
incoming: utiltesting.MakeWorkload("a_incoming", "").Request(corev1.ResourceCPU, "4").Obj(),
targetCQ: "a",
wantPreempted: sets.New(
targetKeyReasonScope("/a1_low", InClusterQueueReason, ClusterQueueOrigin),
targetKeyReasonScope("/b1", InCohortFairSharingReason, CohortOrigin),
targetKeyReasonScope("/a1_low", InClusterQueueReason),
targetKeyReasonScope("/b1", InCohortFairSharingReason),
),
},
"prefer to preempt workloads that don't make the target CQ have the biggest share": {
Expand All @@ -1725,7 +1725,7 @@ func TestFairPreemptions(t *testing.T) {
incoming: utiltesting.MakeWorkload("a_incoming", "").Request(corev1.ResourceCPU, "3.5").Obj(),
targetCQ: "a",
// It would have been possible to preempt "/b1" under rule S2-b, but S2-a was possible first.
wantPreempted: sets.New(targetKeyReasonScope("/b2", InCohortFairSharingReason, CohortOrigin)),
wantPreempted: sets.New(targetKeyReasonScope("/b2", InCohortFairSharingReason)),
},
"preempt from different cluster queues if the end result has a smaller max share": {
clusterQueues: baseCQs,
Expand All @@ -1738,8 +1738,8 @@ func TestFairPreemptions(t *testing.T) {
incoming: utiltesting.MakeWorkload("a_incoming", "").Request(corev1.ResourceCPU, "3.5").Obj(),
targetCQ: "a",
wantPreempted: sets.New(
targetKeyReasonScope("/b1", InCohortFairSharingReason, CohortOrigin),
targetKeyReasonScope("/c1", InCohortFairSharingReason, CohortOrigin),
targetKeyReasonScope("/b1", InCohortFairSharingReason),
targetKeyReasonScope("/c1", InCohortFairSharingReason),
),
},
"scenario above does not flap": {
Expand Down Expand Up @@ -1778,8 +1778,8 @@ func TestFairPreemptions(t *testing.T) {
incoming: utiltesting.MakeWorkload("a_incoming", "").Request(corev1.ResourceCPU, "2").Obj(),
targetCQ: "a",
wantPreempted: sets.New(
targetKeyReasonScope("/preemptible1", InCohortFairSharingReason, CohortOrigin),
targetKeyReasonScope("/preemptible2", InCohortFairSharingReason, CohortOrigin),
targetKeyReasonScope("/preemptible1", InCohortFairSharingReason),
targetKeyReasonScope("/preemptible2", InCohortReclaimWhileBorrowingReason),
),
},
"preempt lower priority first, even if big": {
Expand All @@ -1792,7 +1792,7 @@ func TestFairPreemptions(t *testing.T) {
},
incoming: utiltesting.MakeWorkload("a_incoming", "").Request(corev1.ResourceCPU, "1").Obj(),
targetCQ: "a",
wantPreempted: sets.New(targetKeyReasonScope("/b_low", InCohortFairSharingReason, CohortOrigin)),
wantPreempted: sets.New(targetKeyReasonScope("/b_low", InCohortFairSharingReason)),
},
"preempt workload that doesn't transfer the imbalance, even if high priority": {
clusterQueues: baseCQs,
Expand All @@ -1804,7 +1804,7 @@ func TestFairPreemptions(t *testing.T) {
},
incoming: utiltesting.MakeWorkload("a_incoming", "").Request(corev1.ResourceCPU, "1").Obj(),
targetCQ: "a",
wantPreempted: sets.New(targetKeyReasonScope("/b_high", InCohortFairSharingReason, CohortOrigin)),
wantPreempted: sets.New(targetKeyReasonScope("/b_high", InCohortFairSharingReason)),
},
"CQ with higher weight can preempt more": {
clusterQueues: []*kueue.ClusterQueue{
Expand Down Expand Up @@ -1851,8 +1851,8 @@ func TestFairPreemptions(t *testing.T) {
incoming: utiltesting.MakeWorkload("a_incoming", "").Request(corev1.ResourceCPU, "2").Obj(),
targetCQ: "a",
wantPreempted: sets.New(
targetKeyReasonScope("/b1", InCohortFairSharingReason, CohortOrigin),
targetKeyReasonScope("/b2", InCohortFairSharingReason, CohortOrigin),
targetKeyReasonScope("/b1", InCohortFairSharingReason),
targetKeyReasonScope("/b2", InCohortFairSharingReason),
),
},
"can preempt anything borrowing from CQ with 0 weight": {
Expand Down Expand Up @@ -1900,9 +1900,9 @@ func TestFairPreemptions(t *testing.T) {
incoming: utiltesting.MakeWorkload("a_incoming", "").Request(corev1.ResourceCPU, "3").Obj(),
targetCQ: "a",
wantPreempted: sets.New(
targetKeyReasonScope("/b1", InCohortFairSharingReason, CohortOrigin),
targetKeyReasonScope("/b2", InCohortFairSharingReason, CohortOrigin),
targetKeyReasonScope("/b3", InCohortFairSharingReason, CohortOrigin),
targetKeyReasonScope("/b1", InCohortFairSharingReason),
targetKeyReasonScope("/b2", InCohortFairSharingReason),
targetKeyReasonScope("/b3", InCohortFairSharingReason),
),
},
"can't preempt nominal from CQ with 0 weight": {
Expand Down Expand Up @@ -1978,7 +1978,7 @@ func TestFairPreemptions(t *testing.T) {
},
), &snapshot)
gotTargets := sets.New(slices.Map(targets, func(t **Target) string {
return targetKeyReasonScope(workload.Key((*t).Wl.Obj), (*t).Reason, (*t).Scope)
return targetKeyReasonScope(workload.Key((*t).Wl.Obj), (*t).Reason)
})...)
if diff := cmp.Diff(tc.wantPreempted, gotTargets, cmpopts.EquateEmpty()); diff != "" {
t.Errorf("Issued preemptions (-want,+got):\n%s", diff)
Expand All @@ -1987,8 +1987,8 @@ func TestFairPreemptions(t *testing.T) {
}
}

func targetKeyReasonScope(key, reason, scope string) string {
return fmt.Sprintf("%s:%s:%s", key, reason, scope)
func targetKeyReasonScope(key, reason string) string {
return fmt.Sprintf("%s:%s", key, reason)
}

func TestCandidatesOrdering(t *testing.T) {
Expand Down
32 changes: 3 additions & 29 deletions test/integration/scheduler/preemption_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,25 +281,8 @@ var _ = ginkgo.Describe("Preemption", func() {

conditionCmpOpts := cmpopts.IgnoreFields(metav1.Condition{}, "LastTransitionTime")
ginkgo.By("Verify the Preempted condition", func() {
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(alphaLowWl), alphaLowWl)).To(gomega.Succeed())
g.Expect(apimeta.FindStatusCondition(alphaLowWl.Status.Conditions, kueue.WorkloadPreempted)).To(gomega.BeComparableTo(&metav1.Condition{
Type: kueue.WorkloadPreempted,
Status: metav1.ConditionTrue,
Reason: preemption.InClusterQueueReason,
Message: fmt.Sprintf("Preempted to accommodate a workload (UID: %s) in the %s", alphaMidWl.UID, preemption.ClusterQueueOrigin),
}, conditionCmpOpts))
}, util.Timeout, util.Interval).Should(gomega.Succeed())

gomega.Eventually(func(g gomega.Gomega) {
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(betaMidWl), betaMidWl)).To(gomega.Succeed())
g.Expect(apimeta.FindStatusCondition(betaMidWl.Status.Conditions, kueue.WorkloadPreempted)).To(gomega.BeComparableTo(&metav1.Condition{
Type: kueue.WorkloadPreempted,
Status: metav1.ConditionTrue,
Reason: preemption.InCohortReclamationReason,
Message: fmt.Sprintf("Preempted to accommodate a workload (UID: %s) in the %s", alphaMidWl.UID, preemption.CohortOrigin),
}, conditionCmpOpts))
}, util.Timeout, util.Interval).Should(gomega.Succeed())
util.ExpectPreemptedCondition(ctx, k8sClient, preemption.InClusterQueueReason, metav1.ConditionTrue, alphaLowWl, alphaMidWl)
util.ExpectPreemptedCondition(ctx, k8sClient, preemption.InCohortReclamationReason, metav1.ConditionTrue, betaMidWl, alphaMidWl)
})

ginkgo.By("Verify the Preempted condition on re-admission, as the preemptor is finished", func() {
Expand Down Expand Up @@ -705,16 +688,7 @@ var _ = ginkgo.Describe("Preemption", func() {
Obj()
gomega.Expect(k8sClient.Create(ctx, aStandardVeryHighWl)).To(gomega.Succeed())

conditionCmpOpts := cmpopts.IgnoreFields(metav1.Condition{}, "LastTransitionTime", "ObservedGeneration")
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(aBestEffortLowWl), aBestEffortLowWl)).To(gomega.Succeed())
g.Expect(aBestEffortLowWl.Status.Conditions).To(gomega.ContainElements(gomega.BeComparableTo(metav1.Condition{
Type: kueue.WorkloadPreempted,
Status: metav1.ConditionTrue,
Reason: preemption.InCohortReclaimWhileBorrowingReason,
Message: fmt.Sprintf("Preempted to accommodate a workload (UID: %s) in the %s", aStandardVeryHighWl.UID, preemption.CohortOrigin),
}, conditionCmpOpts)))
}, util.Timeout, util.Interval).Should(gomega.Succeed())
util.ExpectPreemptedCondition(ctx, k8sClient, preemption.InCohortReclaimWhileBorrowingReason, metav1.ConditionTrue, aBestEffortLowWl, aStandardVeryHighWl)

ginkgo.By("Finish eviction fo the a-best-effort-low workload")
util.FinishEvictionForWorkloads(ctx, k8sClient, aBestEffortLowWl)
Expand Down
14 changes: 14 additions & 0 deletions test/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/controller/jobs/pod"
"sigs.k8s.io/kueue/pkg/metrics"
"sigs.k8s.io/kueue/pkg/scheduler/preemption"
"sigs.k8s.io/kueue/pkg/util/testing"
"sigs.k8s.io/kueue/pkg/workload"
)
Expand Down Expand Up @@ -792,6 +793,19 @@ readCh:
gomega.ExpectWithOffset(1, gotObjs).To(gomega.Equal(objs))
}

func ExpectPreemptedCondition(ctx context.Context, k8sClient client.Client, reason string, status metav1.ConditionStatus, preemptedWl, preempteeWl *kueue.Workload) {
conditionCmpOpts := cmpopts.IgnoreFields(metav1.Condition{}, "LastTransitionTime", "ObservedGeneration")
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(preemptedWl), preemptedWl)).To(gomega.Succeed())
g.Expect(preemptedWl.Status.Conditions).To(gomega.ContainElements(gomega.BeComparableTo(metav1.Condition{
Type: kueue.WorkloadPreempted,
Status: status,
Reason: reason,
Message: fmt.Sprintf("Preempted to accommodate a workload (UID: %s) due to %s", preempteeWl.UID, preemption.HumanReadablePreemptionReasons[reason]),
}, conditionCmpOpts)))
}, Timeout, Interval).Should(gomega.Succeed())
}

func NewTestingLogger(writer io.Writer, level int) logr.Logger {
opts := func(o *zap.Options) {
o.TimeEncoder = zapcore.RFC3339NanoTimeEncoder
Expand Down

0 comments on commit 2f68cca

Please sign in to comment.