Skip to content

Commit

Permalink
Flatten several usages of FlavorResourceQuantities (#2551)
Browse files Browse the repository at this point in the history
  • Loading branch information
gabesaba authored Jul 8, 2024
1 parent e1e6c59 commit 6facb75
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 64 deletions.
10 changes: 5 additions & 5 deletions pkg/cache/clusterqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -664,12 +664,12 @@ func (c *ClusterQueueSnapshot) DominantResourceShare() (int, corev1.ResourceName
return dominantResourceShare(c, nil, 0)
}

func (c *ClusterQueueSnapshot) DominantResourceShareWith(wlReq resources.FlavorResourceQuantities) (int, corev1.ResourceName) {
func (c *ClusterQueueSnapshot) DominantResourceShareWith(wlReq resources.FlavorResourceQuantitiesFlat) (int, corev1.ResourceName) {
return dominantResourceShare(c, wlReq, 1)
}

func (c *ClusterQueueSnapshot) DominantResourceShareWithout(w *workload.Info) (int, corev1.ResourceName) {
return dominantResourceShare(c, w.FlavorResourceUsage(), -1)
func (c *ClusterQueueSnapshot) DominantResourceShareWithout(wlReq resources.FlavorResourceQuantitiesFlat) (int, corev1.ResourceName) {
return dominantResourceShare(c, wlReq, -1)
}

type dominantResourceShareNode interface {
Expand All @@ -680,7 +680,7 @@ type dominantResourceShareNode interface {
netQuotaNode
}

func dominantResourceShare(node dominantResourceShareNode, wlReq resources.FlavorResourceQuantities, m int64) (int, corev1.ResourceName) {
func dominantResourceShare(node dominantResourceShareNode, wlReq resources.FlavorResourceQuantitiesFlat, m int64) (int, corev1.ResourceName) {
if !node.hasCohort() {
return 0, ""
}
Expand All @@ -690,7 +690,7 @@ func dominantResourceShare(node dominantResourceShareNode, wlReq resources.Flavo

borrowing := make(map[corev1.ResourceName]int64)
for fr, quota := range remainingQuota(node) {
b := m*wlReq[fr.Flavor][fr.Resource] - quota
b := m*wlReq[fr] - quota
if b > 0 {
borrowing[fr.Resource] += b
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/cache/clusterqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -716,7 +716,7 @@ func TestClusterQueueUpdateWithAdmissionCheck(t *testing.T) {
func TestDominantResourceShare(t *testing.T) {
cases := map[string]struct {
cq ClusterQueueSnapshot
flvResQ resources.FlavorResourceQuantities
flvResQ resources.FlavorResourceQuantitiesFlat
wantDRValue int
wantDRName corev1.ResourceName
}{
Expand Down Expand Up @@ -885,7 +885,7 @@ func TestDominantResourceShare(t *testing.T) {
flvResQ: resources.FlavorResourceQuantitiesFlat{
{Flavor: "default", Resource: corev1.ResourceCPU}: 4_000,
{Flavor: "default", Resource: "example.com/gpu"}: 4,
}.Unflatten(),
},
wantDRName: corev1.ResourceCPU,
wantDRValue: 300, // (1+4-2)*1000/10
},
Expand Down Expand Up @@ -925,7 +925,7 @@ func TestDominantResourceShare(t *testing.T) {
flvResQ: resources.FlavorResourceQuantitiesFlat{
{Flavor: "default", Resource: corev1.ResourceCPU}: 4_000,
{Flavor: "default", Resource: "example.com/gpu"}: 4,
}.Unflatten(),
},
wantDRName: corev1.ResourceCPU,
wantDRValue: 300, // (1+4-2)*1000/10
},
Expand Down Expand Up @@ -967,7 +967,7 @@ func TestDominantResourceShare(t *testing.T) {
},
flvResQ: resources.FlavorResourceQuantitiesFlat{
{Flavor: "on-demand", Resource: corev1.ResourceCPU}: 10_000,
}.Unflatten(),
},
wantDRName: corev1.ResourceCPU,
wantDRValue: 25, // ((15+10-20)+0)*1000/200 (spot under nominal)
},
Expand Down
11 changes: 3 additions & 8 deletions pkg/scheduler/flavorassigner/flavorassigner.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,17 +106,12 @@ func (a *Assignment) ToAPI() []kueue.PodSetAssignment {
return psFlavors
}

func (a *Assignment) TotalRequestsFor(wl *workload.Info) resources.FlavorResourceQuantities {
usage := make(resources.FlavorResourceQuantities)
func (a *Assignment) TotalRequestsFor(wl *workload.Info) resources.FlavorResourceQuantitiesFlat {
usage := make(resources.FlavorResourceQuantitiesFlat)
for i, ps := range wl.TotalRequests {
for res, q := range ps.Requests {
flv := a.PodSets[i].Flavors[res].Name
resUsage := usage[flv]
if resUsage == nil {
resUsage = make(map[corev1.ResourceName]int64)
usage[flv] = resUsage
}
resUsage[res] += q
usage[resources.FlavorResource{Flavor: flv, Resource: res}] += q
}
}
return usage
Expand Down
21 changes: 10 additions & 11 deletions pkg/scheduler/preemption/preemption.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func (p *Preemptor) applyPreemptionWithSSA(ctx context.Context, w *kueue.Workloa
// Once the Workload fits, the heuristic tries to add Workloads back, in the
// reverse order in which they were removed, while the incoming Workload still
// fits.
func minimalPreemptions(log logr.Logger, wlReq resources.FlavorResourceQuantities, cq *cache.ClusterQueueSnapshot, snapshot *cache.Snapshot, resPerFlv resourcesPerFlavor, candidates []*workload.Info, allowBorrowing bool, allowBorrowingBelowPriority *int32) []*workload.Info {
func minimalPreemptions(log logr.Logger, wlReq resources.FlavorResourceQuantitiesFlat, cq *cache.ClusterQueueSnapshot, snapshot *cache.Snapshot, resPerFlv resourcesPerFlavor, candidates []*workload.Info, allowBorrowing bool, allowBorrowingBelowPriority *int32) []*workload.Info {
if logV := log.V(5); logV.Enabled() {
logV.Info("Simulating preemption", "candidates", workload.References(candidates), "resourcesRequiringPreemption", resPerFlv, "allowBorrowing", allowBorrowing, "allowBorrowingBelowPriority", allowBorrowingBelowPriority)
}
Expand Down Expand Up @@ -257,7 +257,7 @@ func minimalPreemptions(log logr.Logger, wlReq resources.FlavorResourceQuantitie
return targets
}

func fillBackWorkloads(targets []*workload.Info, wlReq resources.FlavorResourceQuantities, cq *cache.ClusterQueueSnapshot, snapshot *cache.Snapshot, allowBorrowing bool) []*workload.Info {
func fillBackWorkloads(targets []*workload.Info, wlReq resources.FlavorResourceQuantitiesFlat, cq *cache.ClusterQueueSnapshot, snapshot *cache.Snapshot, allowBorrowing bool) []*workload.Info {
// In the reverse order, check if any of the workloads can be added back.
for i := len(targets) - 2; i >= 0; i-- {
snapshot.AddWorkload(targets[i])
Expand Down Expand Up @@ -342,7 +342,7 @@ func (p *Preemptor) fairPreemptions(log logr.Logger, wl *workload.Info, assignme

for i, candWl := range candCQ.workloads {
belowThreshold := allowBorrowingBelowPriority != nil && priority.Priority(candWl.Obj) < *allowBorrowingBelowPriority
newCandShareVal, _ := candCQ.cq.DominantResourceShareWithout(candWl)
newCandShareVal, _ := candCQ.cq.DominantResourceShareWithout(candWl.FlavorResourceUsage())
if belowThreshold || p.fsStrategies[0](newNominatedShareValue, candCQ.share, newCandShareVal) {
snapshot.RemoveWorkload(candWl)
targets = append(targets, candWl)
Expand Down Expand Up @@ -528,17 +528,16 @@ func workloadUsesResources(wl *workload.Info, resPerFlv resourcesPerFlavor) bool
// workloadFits determines if the workload requests would fit given the
// requestable resources and simulated usage of the ClusterQueue and its cohort,
// if it belongs to one.
func workloadFits(wlReq resources.FlavorResourceQuantities, cq *cache.ClusterQueueSnapshot, allowBorrowing bool) bool {
func workloadFits(wlReq resources.FlavorResourceQuantitiesFlat, cq *cache.ClusterQueueSnapshot, allowBorrowing bool) bool {
for _, rg := range cq.ResourceGroups {
for _, flvQuotas := range rg.Flavors {
flvReq, found := wlReq[flvQuotas.Name]
if !found {
// Workload doesn't request this flavor.
continue
}
cqResUsage := cq.Usage[flvQuotas.Name]
for rName, rReq := range flvReq {
resource := flvQuotas.Resources[rName]
for rName, resource := range flvQuotas.Resources {
rReq, found := wlReq[resources.FlavorResource{Flavor: flvQuotas.Name, Resource: rName}]
if !found {
// Workload doesn't request this FlavorResource.
continue
}

if cq.Cohort == nil || !allowBorrowing {
if cqResUsage[rName]+rReq > resource.Nominal {
Expand Down
16 changes: 5 additions & 11 deletions pkg/workload/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,21 +206,15 @@ func (i *Info) CanBePartiallyAdmitted() bool {

// FlavorResourceUsage returns the total resource usage for the workload,
// per flavor (if assigned, otherwise flavor shows as empty string), per resource.
func (i *Info) FlavorResourceUsage() resources.FlavorResourceQuantities {
if i == nil || len(i.TotalRequests) == 0 {
return nil
func (i *Info) FlavorResourceUsage() resources.FlavorResourceQuantitiesFlat {
total := make(resources.FlavorResourceQuantitiesFlat)
if i == nil {
return total
}
total := make(resources.FlavorResourceQuantities)
for _, psReqs := range i.TotalRequests {
for res, q := range psReqs.Requests {
flv := psReqs.Flavors[res]
if requests, found := total[flv]; found {
requests[res] += q
} else {
total[flv] = resources.Requests{
res: q,
}
}
total[resources.FlavorResource{Flavor: flv, Resource: res}] += q
}
}
return total
Expand Down
40 changes: 15 additions & 25 deletions pkg/workload/workload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -593,9 +593,11 @@ func TestIsEvictedByPodsReadyTimeout(t *testing.T) {
func TestFlavorResourceUsage(t *testing.T) {
cases := map[string]struct {
info *Info
want resources.FlavorResourceQuantities
want resources.FlavorResourceQuantitiesFlat
}{
"nil": {},
"nil": {
want: resources.FlavorResourceQuantitiesFlat{},
},
"one podset, no flavors": {
info: &Info{
TotalRequests: []PodSetResources{{
Expand All @@ -605,11 +607,9 @@ func TestFlavorResourceUsage(t *testing.T) {
},
}},
},
want: map[kueue.ResourceFlavorReference]resources.Requests{
"": {
corev1.ResourceCPU: 1_000,
"example.com/gpu": 3,
},
want: resources.FlavorResourceQuantitiesFlat{
{Flavor: "", Resource: "cpu"}: 1_000,
{Flavor: "", Resource: "example.com/gpu"}: 3,
},
},
"one podset, multiple flavors": {
Expand All @@ -625,13 +625,9 @@ func TestFlavorResourceUsage(t *testing.T) {
},
}},
},
want: map[kueue.ResourceFlavorReference]resources.Requests{
"default": {
corev1.ResourceCPU: 1_000,
},
"gpu": {
"example.com/gpu": 3,
},
want: resources.FlavorResourceQuantitiesFlat{
{Flavor: "default", Resource: "cpu"}: 1_000,
{Flavor: "gpu", Resource: "example.com/gpu"}: 3,
},
},
"multiple podsets, multiple flavors": {
Expand Down Expand Up @@ -667,17 +663,11 @@ func TestFlavorResourceUsage(t *testing.T) {
},
},
},
want: map[kueue.ResourceFlavorReference]resources.Requests{
"default": {
corev1.ResourceCPU: 3_000,
corev1.ResourceMemory: 2 * utiltesting.Gi,
},
"model_a": {
"example.com/gpu": 3,
},
"model_b": {
"example.com/gpu": 1,
},
want: resources.FlavorResourceQuantitiesFlat{
{Flavor: "default", Resource: "cpu"}: 3_000,
{Flavor: "default", Resource: "memory"}: 2 * utiltesting.Gi,
{Flavor: "model_a", Resource: "example.com/gpu"}: 3,
{Flavor: "model_b", Resource: "example.com/gpu"}: 1,
},
},
}
Expand Down

0 comments on commit 6facb75

Please sign in to comment.