Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flatten several usages of FlavorResourceQuantities #2551

Merged
merged 1 commit into from
Jul 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions pkg/cache/clusterqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -664,12 +664,12 @@ func (c *ClusterQueueSnapshot) DominantResourceShare() (int, corev1.ResourceName
return dominantResourceShare(c, nil, 0)
}

func (c *ClusterQueueSnapshot) DominantResourceShareWith(wlReq resources.FlavorResourceQuantities) (int, corev1.ResourceName) {
func (c *ClusterQueueSnapshot) DominantResourceShareWith(wlReq resources.FlavorResourceQuantitiesFlat) (int, corev1.ResourceName) {
return dominantResourceShare(c, wlReq, 1)
}

func (c *ClusterQueueSnapshot) DominantResourceShareWithout(w *workload.Info) (int, corev1.ResourceName) {
return dominantResourceShare(c, w.FlavorResourceUsage(), -1)
func (c *ClusterQueueSnapshot) DominantResourceShareWithout(wlReq resources.FlavorResourceQuantitiesFlat) (int, corev1.ResourceName) {
return dominantResourceShare(c, wlReq, -1)
}

type dominantResourceShareNode interface {
Expand All @@ -680,7 +680,7 @@ type dominantResourceShareNode interface {
netQuotaNode
}

func dominantResourceShare(node dominantResourceShareNode, wlReq resources.FlavorResourceQuantities, m int64) (int, corev1.ResourceName) {
func dominantResourceShare(node dominantResourceShareNode, wlReq resources.FlavorResourceQuantitiesFlat, m int64) (int, corev1.ResourceName) {
if !node.hasCohort() {
return 0, ""
}
Expand All @@ -690,7 +690,7 @@ func dominantResourceShare(node dominantResourceShareNode, wlReq resources.Flavo

borrowing := make(map[corev1.ResourceName]int64)
for fr, quota := range remainingQuota(node) {
b := m*wlReq[fr.Flavor][fr.Resource] - quota
b := m*wlReq[fr] - quota
if b > 0 {
borrowing[fr.Resource] += b
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/cache/clusterqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -716,7 +716,7 @@ func TestClusterQueueUpdateWithAdmissionCheck(t *testing.T) {
func TestDominantResourceShare(t *testing.T) {
cases := map[string]struct {
cq ClusterQueueSnapshot
flvResQ resources.FlavorResourceQuantities
flvResQ resources.FlavorResourceQuantitiesFlat
wantDRValue int
wantDRName corev1.ResourceName
}{
Expand Down Expand Up @@ -885,7 +885,7 @@ func TestDominantResourceShare(t *testing.T) {
flvResQ: resources.FlavorResourceQuantitiesFlat{
{Flavor: "default", Resource: corev1.ResourceCPU}: 4_000,
{Flavor: "default", Resource: "example.com/gpu"}: 4,
}.Unflatten(),
},
wantDRName: corev1.ResourceCPU,
wantDRValue: 300, // (1+4-2)*1000/10
},
Expand Down Expand Up @@ -925,7 +925,7 @@ func TestDominantResourceShare(t *testing.T) {
flvResQ: resources.FlavorResourceQuantitiesFlat{
{Flavor: "default", Resource: corev1.ResourceCPU}: 4_000,
{Flavor: "default", Resource: "example.com/gpu"}: 4,
}.Unflatten(),
},
wantDRName: corev1.ResourceCPU,
wantDRValue: 300, // (1+4-2)*1000/10
},
Expand Down Expand Up @@ -967,7 +967,7 @@ func TestDominantResourceShare(t *testing.T) {
},
flvResQ: resources.FlavorResourceQuantitiesFlat{
{Flavor: "on-demand", Resource: corev1.ResourceCPU}: 10_000,
}.Unflatten(),
},
wantDRName: corev1.ResourceCPU,
wantDRValue: 25, // ((15+10-20)+0)*1000/200 (spot under nominal)
},
Expand Down
11 changes: 3 additions & 8 deletions pkg/scheduler/flavorassigner/flavorassigner.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,17 +106,12 @@ func (a *Assignment) ToAPI() []kueue.PodSetAssignment {
return psFlavors
}

func (a *Assignment) TotalRequestsFor(wl *workload.Info) resources.FlavorResourceQuantities {
usage := make(resources.FlavorResourceQuantities)
func (a *Assignment) TotalRequestsFor(wl *workload.Info) resources.FlavorResourceQuantitiesFlat {
usage := make(resources.FlavorResourceQuantitiesFlat)
for i, ps := range wl.TotalRequests {
for res, q := range ps.Requests {
flv := a.PodSets[i].Flavors[res].Name
resUsage := usage[flv]
if resUsage == nil {
resUsage = make(map[corev1.ResourceName]int64)
usage[flv] = resUsage
}
resUsage[res] += q
usage[resources.FlavorResource{Flavor: flv, Resource: res}] += q
}
}
return usage
Expand Down
21 changes: 10 additions & 11 deletions pkg/scheduler/preemption/preemption.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func (p *Preemptor) applyPreemptionWithSSA(ctx context.Context, w *kueue.Workloa
// Once the Workload fits, the heuristic tries to add Workloads back, in the
// reverse order in which they were removed, while the incoming Workload still
// fits.
func minimalPreemptions(log logr.Logger, wlReq resources.FlavorResourceQuantities, cq *cache.ClusterQueueSnapshot, snapshot *cache.Snapshot, resPerFlv resourcesPerFlavor, candidates []*workload.Info, allowBorrowing bool, allowBorrowingBelowPriority *int32) []*workload.Info {
func minimalPreemptions(log logr.Logger, wlReq resources.FlavorResourceQuantitiesFlat, cq *cache.ClusterQueueSnapshot, snapshot *cache.Snapshot, resPerFlv resourcesPerFlavor, candidates []*workload.Info, allowBorrowing bool, allowBorrowingBelowPriority *int32) []*workload.Info {
if logV := log.V(5); logV.Enabled() {
logV.Info("Simulating preemption", "candidates", workload.References(candidates), "resourcesRequiringPreemption", resPerFlv, "allowBorrowing", allowBorrowing, "allowBorrowingBelowPriority", allowBorrowingBelowPriority)
}
Expand Down Expand Up @@ -257,7 +257,7 @@ func minimalPreemptions(log logr.Logger, wlReq resources.FlavorResourceQuantitie
return targets
}

func fillBackWorkloads(targets []*workload.Info, wlReq resources.FlavorResourceQuantities, cq *cache.ClusterQueueSnapshot, snapshot *cache.Snapshot, allowBorrowing bool) []*workload.Info {
func fillBackWorkloads(targets []*workload.Info, wlReq resources.FlavorResourceQuantitiesFlat, cq *cache.ClusterQueueSnapshot, snapshot *cache.Snapshot, allowBorrowing bool) []*workload.Info {
// In the reverse order, check if any of the workloads can be added back.
for i := len(targets) - 2; i >= 0; i-- {
snapshot.AddWorkload(targets[i])
Expand Down Expand Up @@ -342,7 +342,7 @@ func (p *Preemptor) fairPreemptions(log logr.Logger, wl *workload.Info, assignme

for i, candWl := range candCQ.workloads {
belowThreshold := allowBorrowingBelowPriority != nil && priority.Priority(candWl.Obj) < *allowBorrowingBelowPriority
newCandShareVal, _ := candCQ.cq.DominantResourceShareWithout(candWl)
newCandShareVal, _ := candCQ.cq.DominantResourceShareWithout(candWl.FlavorResourceUsage())
if belowThreshold || p.fsStrategies[0](newNominatedShareValue, candCQ.share, newCandShareVal) {
snapshot.RemoveWorkload(candWl)
targets = append(targets, candWl)
Expand Down Expand Up @@ -528,17 +528,16 @@ func workloadUsesResources(wl *workload.Info, resPerFlv resourcesPerFlavor) bool
// workloadFits determines if the workload requests would fit given the
// requestable resources and simulated usage of the ClusterQueue and its cohort,
// if it belongs to one.
func workloadFits(wlReq resources.FlavorResourceQuantities, cq *cache.ClusterQueueSnapshot, allowBorrowing bool) bool {
func workloadFits(wlReq resources.FlavorResourceQuantitiesFlat, cq *cache.ClusterQueueSnapshot, allowBorrowing bool) bool {
for _, rg := range cq.ResourceGroups {
for _, flvQuotas := range rg.Flavors {
flvReq, found := wlReq[flvQuotas.Name]
if !found {
// Workload doesn't request this flavor.
continue
}
cqResUsage := cq.Usage[flvQuotas.Name]
for rName, rReq := range flvReq {
resource := flvQuotas.Resources[rName]
for rName, resource := range flvQuotas.Resources {
rReq, found := wlReq[resources.FlavorResource{Flavor: flvQuotas.Name, Resource: rName}]
if !found {
// Workload doesn't request this FlavorResource.
continue
}

if cq.Cohort == nil || !allowBorrowing {
if cqResUsage[rName]+rReq > resource.Nominal {
Expand Down
16 changes: 5 additions & 11 deletions pkg/workload/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,21 +206,15 @@ func (i *Info) CanBePartiallyAdmitted() bool {

// FlavorResourceUsage returns the total resource usage for the workload,
// per flavor (if assigned, otherwise flavor shows as empty string), per resource.
func (i *Info) FlavorResourceUsage() resources.FlavorResourceQuantities {
if i == nil || len(i.TotalRequests) == 0 {
return nil
func (i *Info) FlavorResourceUsage() resources.FlavorResourceQuantitiesFlat {
total := make(resources.FlavorResourceQuantitiesFlat)
if i == nil {
return total
}
total := make(resources.FlavorResourceQuantities)
for _, psReqs := range i.TotalRequests {
for res, q := range psReqs.Requests {
flv := psReqs.Flavors[res]
if requests, found := total[flv]; found {
requests[res] += q
} else {
total[flv] = resources.Requests{
res: q,
}
}
total[resources.FlavorResource{Flavor: flv, Resource: res}] += q
}
}
return total
Expand Down
40 changes: 15 additions & 25 deletions pkg/workload/workload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -593,9 +593,11 @@ func TestIsEvictedByPodsReadyTimeout(t *testing.T) {
func TestFlavorResourceUsage(t *testing.T) {
cases := map[string]struct {
info *Info
want resources.FlavorResourceQuantities
want resources.FlavorResourceQuantitiesFlat
}{
"nil": {},
"nil": {
want: resources.FlavorResourceQuantitiesFlat{},
},
"one podset, no flavors": {
info: &Info{
TotalRequests: []PodSetResources{{
Expand All @@ -605,11 +607,9 @@ func TestFlavorResourceUsage(t *testing.T) {
},
}},
},
want: map[kueue.ResourceFlavorReference]resources.Requests{
"": {
corev1.ResourceCPU: 1_000,
"example.com/gpu": 3,
},
want: resources.FlavorResourceQuantitiesFlat{
{Flavor: "", Resource: "cpu"}: 1_000,
{Flavor: "", Resource: "example.com/gpu"}: 3,
},
},
"one podset, multiple flavors": {
Expand All @@ -625,13 +625,9 @@ func TestFlavorResourceUsage(t *testing.T) {
},
}},
},
want: map[kueue.ResourceFlavorReference]resources.Requests{
"default": {
corev1.ResourceCPU: 1_000,
},
"gpu": {
"example.com/gpu": 3,
},
want: resources.FlavorResourceQuantitiesFlat{
{Flavor: "default", Resource: "cpu"}: 1_000,
{Flavor: "gpu", Resource: "example.com/gpu"}: 3,
},
},
"multiple podsets, multiple flavors": {
Expand Down Expand Up @@ -667,17 +663,11 @@ func TestFlavorResourceUsage(t *testing.T) {
},
},
},
want: map[kueue.ResourceFlavorReference]resources.Requests{
"default": {
corev1.ResourceCPU: 3_000,
corev1.ResourceMemory: 2 * utiltesting.Gi,
},
"model_a": {
"example.com/gpu": 3,
},
"model_b": {
"example.com/gpu": 1,
},
want: resources.FlavorResourceQuantitiesFlat{
{Flavor: "default", Resource: "cpu"}: 3_000,
{Flavor: "default", Resource: "memory"}: 2 * utiltesting.Gi,
{Flavor: "model_a", Resource: "example.com/gpu"}: 3,
{Flavor: "model_b", Resource: "example.com/gpu"}: 1,
},
},
}
Expand Down