diff --git a/pkg/cache/clusterqueue.go b/pkg/cache/clusterqueue.go index d73b96f8a9..ea730cbb12 100644 --- a/pkg/cache/clusterqueue.go +++ b/pkg/cache/clusterqueue.go @@ -664,12 +664,12 @@ func (c *ClusterQueueSnapshot) DominantResourceShare() (int, corev1.ResourceName return dominantResourceShare(c, nil, 0) } -func (c *ClusterQueueSnapshot) DominantResourceShareWith(wlReq resources.FlavorResourceQuantities) (int, corev1.ResourceName) { +func (c *ClusterQueueSnapshot) DominantResourceShareWith(wlReq resources.FlavorResourceQuantitiesFlat) (int, corev1.ResourceName) { return dominantResourceShare(c, wlReq, 1) } -func (c *ClusterQueueSnapshot) DominantResourceShareWithout(w *workload.Info) (int, corev1.ResourceName) { - return dominantResourceShare(c, w.FlavorResourceUsage(), -1) +func (c *ClusterQueueSnapshot) DominantResourceShareWithout(wlReq resources.FlavorResourceQuantitiesFlat) (int, corev1.ResourceName) { + return dominantResourceShare(c, wlReq, -1) } type dominantResourceShareNode interface { @@ -680,7 +680,7 @@ type dominantResourceShareNode interface { netQuotaNode } -func dominantResourceShare(node dominantResourceShareNode, wlReq resources.FlavorResourceQuantities, m int64) (int, corev1.ResourceName) { +func dominantResourceShare(node dominantResourceShareNode, wlReq resources.FlavorResourceQuantitiesFlat, m int64) (int, corev1.ResourceName) { if !node.hasCohort() { return 0, "" } @@ -690,7 +690,7 @@ func dominantResourceShare(node dominantResourceShareNode, wlReq resources.Flavo borrowing := make(map[corev1.ResourceName]int64) for fr, quota := range remainingQuota(node) { - b := m*wlReq[fr.Flavor][fr.Resource] - quota + b := m*wlReq[fr] - quota if b > 0 { borrowing[fr.Resource] += b } diff --git a/pkg/cache/clusterqueue_test.go b/pkg/cache/clusterqueue_test.go index 122a2060d3..80e2b75767 100644 --- a/pkg/cache/clusterqueue_test.go +++ b/pkg/cache/clusterqueue_test.go @@ -716,7 +716,7 @@ func TestClusterQueueUpdateWithAdmissionCheck(t *testing.T) { func TestDominantResourceShare(t *testing.T) { cases := map[string]struct { cq ClusterQueueSnapshot - flvResQ resources.FlavorResourceQuantities + flvResQ resources.FlavorResourceQuantitiesFlat wantDRValue int wantDRName corev1.ResourceName }{ @@ -885,7 +885,7 @@ func TestDominantResourceShare(t *testing.T) { flvResQ: resources.FlavorResourceQuantitiesFlat{ {Flavor: "default", Resource: corev1.ResourceCPU}: 4_000, {Flavor: "default", Resource: "example.com/gpu"}: 4, - }.Unflatten(), + }, wantDRName: corev1.ResourceCPU, wantDRValue: 300, // (1+4-2)*1000/10 }, @@ -925,7 +925,7 @@ func TestDominantResourceShare(t *testing.T) { flvResQ: resources.FlavorResourceQuantitiesFlat{ {Flavor: "default", Resource: corev1.ResourceCPU}: 4_000, {Flavor: "default", Resource: "example.com/gpu"}: 4, - }.Unflatten(), + }, wantDRName: corev1.ResourceCPU, wantDRValue: 300, // (1+4-2)*1000/10 }, @@ -967,7 +967,7 @@ func TestDominantResourceShare(t *testing.T) { }, flvResQ: resources.FlavorResourceQuantitiesFlat{ {Flavor: "on-demand", Resource: corev1.ResourceCPU}: 10_000, - }.Unflatten(), + }, wantDRName: corev1.ResourceCPU, wantDRValue: 25, // ((15+10-20)+0)*1000/200 (spot under nominal) }, diff --git a/pkg/scheduler/flavorassigner/flavorassigner.go b/pkg/scheduler/flavorassigner/flavorassigner.go index 018403e0e6..fbc78d56e3 100644 --- a/pkg/scheduler/flavorassigner/flavorassigner.go +++ b/pkg/scheduler/flavorassigner/flavorassigner.go @@ -106,17 +106,12 @@ func (a *Assignment) ToAPI() []kueue.PodSetAssignment { return psFlavors } -func (a *Assignment) TotalRequestsFor(wl *workload.Info) resources.FlavorResourceQuantities { - usage := make(resources.FlavorResourceQuantities) +func (a *Assignment) TotalRequestsFor(wl *workload.Info) resources.FlavorResourceQuantitiesFlat { + usage := make(resources.FlavorResourceQuantitiesFlat) for i, ps := range wl.TotalRequests { for res, q := range ps.Requests { flv := a.PodSets[i].Flavors[res].Name - resUsage := usage[flv] - if resUsage == nil { - resUsage = make(map[corev1.ResourceName]int64) - usage[flv] = resUsage - } - resUsage[res] += q + usage[resources.FlavorResource{Flavor: flv, Resource: res}] += q } } return usage diff --git a/pkg/scheduler/preemption/preemption.go b/pkg/scheduler/preemption/preemption.go index 9f63e11afd..887e65810c 100644 --- a/pkg/scheduler/preemption/preemption.go +++ b/pkg/scheduler/preemption/preemption.go @@ -214,7 +214,7 @@ func (p *Preemptor) applyPreemptionWithSSA(ctx context.Context, w *kueue.Workloa // Once the Workload fits, the heuristic tries to add Workloads back, in the // reverse order in which they were removed, while the incoming Workload still // fits. -func minimalPreemptions(log logr.Logger, wlReq resources.FlavorResourceQuantities, cq *cache.ClusterQueueSnapshot, snapshot *cache.Snapshot, resPerFlv resourcesPerFlavor, candidates []*workload.Info, allowBorrowing bool, allowBorrowingBelowPriority *int32) []*workload.Info { +func minimalPreemptions(log logr.Logger, wlReq resources.FlavorResourceQuantitiesFlat, cq *cache.ClusterQueueSnapshot, snapshot *cache.Snapshot, resPerFlv resourcesPerFlavor, candidates []*workload.Info, allowBorrowing bool, allowBorrowingBelowPriority *int32) []*workload.Info { if logV := log.V(5); logV.Enabled() { logV.Info("Simulating preemption", "candidates", workload.References(candidates), "resourcesRequiringPreemption", resPerFlv, "allowBorrowing", allowBorrowing, "allowBorrowingBelowPriority", allowBorrowingBelowPriority) } @@ -257,7 +257,7 @@ func minimalPreemptions(log logr.Logger, wlReq resources.FlavorResourceQuantitie return targets } -func fillBackWorkloads(targets []*workload.Info, wlReq resources.FlavorResourceQuantities, cq *cache.ClusterQueueSnapshot, snapshot *cache.Snapshot, allowBorrowing bool) []*workload.Info { +func fillBackWorkloads(targets []*workload.Info, wlReq resources.FlavorResourceQuantitiesFlat, cq *cache.ClusterQueueSnapshot, snapshot *cache.Snapshot, allowBorrowing bool) []*workload.Info { // In the reverse order, check if any of the workloads can be added back. for i := len(targets) - 2; i >= 0; i-- { snapshot.AddWorkload(targets[i]) @@ -342,7 +342,7 @@ func (p *Preemptor) fairPreemptions(log logr.Logger, wl *workload.Info, assignme for i, candWl := range candCQ.workloads { belowThreshold := allowBorrowingBelowPriority != nil && priority.Priority(candWl.Obj) < *allowBorrowingBelowPriority - newCandShareVal, _ := candCQ.cq.DominantResourceShareWithout(candWl) + newCandShareVal, _ := candCQ.cq.DominantResourceShareWithout(candWl.FlavorResourceUsage()) if belowThreshold || p.fsStrategies[0](newNominatedShareValue, candCQ.share, newCandShareVal) { snapshot.RemoveWorkload(candWl) targets = append(targets, candWl) @@ -528,17 +528,16 @@ func workloadUsesResources(wl *workload.Info, resPerFlv resourcesPerFlavor) bool // workloadFits determines if the workload requests would fit given the // requestable resources and simulated usage of the ClusterQueue and its cohort, // if it belongs to one. -func workloadFits(wlReq resources.FlavorResourceQuantities, cq *cache.ClusterQueueSnapshot, allowBorrowing bool) bool { +func workloadFits(wlReq resources.FlavorResourceQuantitiesFlat, cq *cache.ClusterQueueSnapshot, allowBorrowing bool) bool { for _, rg := range cq.ResourceGroups { for _, flvQuotas := range rg.Flavors { - flvReq, found := wlReq[flvQuotas.Name] - if !found { - // Workload doesn't request this flavor. - continue - } cqResUsage := cq.Usage[flvQuotas.Name] - for rName, rReq := range flvReq { - resource := flvQuotas.Resources[rName] + for rName, resource := range flvQuotas.Resources { + rReq, found := wlReq[resources.FlavorResource{Flavor: flvQuotas.Name, Resource: rName}] + if !found { + // Workload doesn't request this FlavorResource. + continue + } if cq.Cohort == nil || !allowBorrowing { if cqResUsage[rName]+rReq > resource.Nominal { diff --git a/pkg/workload/workload.go b/pkg/workload/workload.go index 5d0df2cb2f..9dd9da29d9 100644 --- a/pkg/workload/workload.go +++ b/pkg/workload/workload.go @@ -206,21 +206,15 @@ func (i *Info) CanBePartiallyAdmitted() bool { // FlavorResourceUsage returns the total resource usage for the workload, // per flavor (if assigned, otherwise flavor shows as empty string), per resource. -func (i *Info) FlavorResourceUsage() resources.FlavorResourceQuantities { - if i == nil || len(i.TotalRequests) == 0 { - return nil +func (i *Info) FlavorResourceUsage() resources.FlavorResourceQuantitiesFlat { + total := make(resources.FlavorResourceQuantitiesFlat) + if i == nil { + return total } - total := make(resources.FlavorResourceQuantities) for _, psReqs := range i.TotalRequests { for res, q := range psReqs.Requests { flv := psReqs.Flavors[res] - if requests, found := total[flv]; found { - requests[res] += q - } else { - total[flv] = resources.Requests{ - res: q, - } - } + total[resources.FlavorResource{Flavor: flv, Resource: res}] += q } } return total diff --git a/pkg/workload/workload_test.go b/pkg/workload/workload_test.go index 0b6cf7238b..b74a90048c 100644 --- a/pkg/workload/workload_test.go +++ b/pkg/workload/workload_test.go @@ -593,9 +593,11 @@ func TestIsEvictedByPodsReadyTimeout(t *testing.T) { func TestFlavorResourceUsage(t *testing.T) { cases := map[string]struct { info *Info - want resources.FlavorResourceQuantities + want resources.FlavorResourceQuantitiesFlat }{ - "nil": {}, + "nil": { + want: resources.FlavorResourceQuantitiesFlat{}, + }, "one podset, no flavors": { info: &Info{ TotalRequests: []PodSetResources{{ @@ -605,11 +607,9 @@ func TestFlavorResourceUsage(t *testing.T) { }, }}, }, - want: map[kueue.ResourceFlavorReference]resources.Requests{ - "": { - corev1.ResourceCPU: 1_000, - "example.com/gpu": 3, - }, + want: resources.FlavorResourceQuantitiesFlat{ + {Flavor: "", Resource: "cpu"}: 1_000, + {Flavor: "", Resource: "example.com/gpu"}: 3, }, }, "one podset, multiple flavors": { @@ -625,13 +625,9 @@ func TestFlavorResourceUsage(t *testing.T) { }, }}, }, - want: map[kueue.ResourceFlavorReference]resources.Requests{ - "default": { - corev1.ResourceCPU: 1_000, - }, - "gpu": { - "example.com/gpu": 3, - }, + want: resources.FlavorResourceQuantitiesFlat{ + {Flavor: "default", Resource: "cpu"}: 1_000, + {Flavor: "gpu", Resource: "example.com/gpu"}: 3, }, }, "multiple podsets, multiple flavors": { @@ -667,17 +663,11 @@ func TestFlavorResourceUsage(t *testing.T) { }, }, }, - want: map[kueue.ResourceFlavorReference]resources.Requests{ - "default": { - corev1.ResourceCPU: 3_000, - corev1.ResourceMemory: 2 * utiltesting.Gi, - }, - "model_a": { - "example.com/gpu": 3, - }, - "model_b": { - "example.com/gpu": 1, - }, + want: resources.FlavorResourceQuantitiesFlat{ + {Flavor: "default", Resource: "cpu"}: 3_000, + {Flavor: "default", Resource: "memory"}: 2 * utiltesting.Gi, + {Flavor: "model_a", Resource: "example.com/gpu"}: 3, + {Flavor: "model_b", Resource: "example.com/gpu"}: 1, }, }, }