Skip to content

Commit

Permalink
Assign the same flavor to resources that have the same set of flavors
Browse files Browse the repository at this point in the history
Change-Id: I495a05c1ff5044e82996842c88f05f09eaa4605e
  • Loading branch information
alculquicondor committed Jul 29, 2022
1 parent 4fca39d commit 4a90d5a
Show file tree
Hide file tree
Showing 6 changed files with 394 additions and 261 deletions.
76 changes: 58 additions & 18 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,16 @@ func New(client client.Client) *Cache {
}
}

type Resources map[corev1.ResourceName]map[string]int64
type ResourceQuantities map[corev1.ResourceName]map[string]int64

// Cohort is a set of ClusterQueues that can borrow resources from each other.
type Cohort struct {
Name string
members map[*ClusterQueue]struct{}

// These fields are only populated for a snapshot.
RequestableResources Resources
UsedResources Resources
RequestableResources ResourceQuantities
UsedResources ResourceQuantities
}

func newCohort(name string, size int) *Cohort {
Expand Down Expand Up @@ -105,8 +105,8 @@ const (
type ClusterQueue struct {
Name string
Cohort *Cohort
RequestableResources map[corev1.ResourceName][]FlavorLimits
UsedResources Resources
RequestableResources map[corev1.ResourceName]*Resource
UsedResources ResourceQuantities
Workloads map[string]*workload.Info
NamespaceSelector labels.Selector
// The set of key labels from all flavors of a resource.
Expand All @@ -120,6 +120,23 @@ type ClusterQueue struct {
admittedWorkloadsPerQueue map[string]int
}

type Resource struct {
CodependentResources sets.String
Flavors []FlavorLimits
}

func (r *Resource) matchesFlavors(other *Resource) bool {
if len(r.Flavors) != len(other.Flavors) {
return false
}
for i := range r.Flavors {
if r.Flavors[i].Name != other.Flavors[i].Name {
return false
}
}
return true
}

// FlavorLimits holds a processed ClusterQueue flavor quota.
type FlavorLimits struct {
Name string
Expand All @@ -145,14 +162,15 @@ func (c *ClusterQueue) Active() bool {
}

func (c *ClusterQueue) update(in *kueue.ClusterQueue, resourceFlavors map[string]*kueue.ResourceFlavor) error {
c.RequestableResources = resourceLimitsByName(in.Spec.Resources)
c.RequestableResources = resourcesByName(in.Spec.Resources)
c.UpdateCodependentResources()
nsSelector, err := metav1.LabelSelectorAsSelector(in.Spec.NamespaceSelector)
if err != nil {
return err
}
c.NamespaceSelector = nsSelector

usedResources := make(Resources, len(in.Spec.Resources))
usedResources := make(ResourceQuantities, len(in.Spec.Resources))
for _, r := range in.Spec.Resources {
if len(r.Flavors) == 0 {
continue
Expand All @@ -170,6 +188,26 @@ func (c *ClusterQueue) update(in *kueue.ClusterQueue, resourceFlavors map[string
return nil
}

func (c *ClusterQueue) UpdateCodependentResources() {
for iName, iRes := range c.RequestableResources {
if len(iRes.CodependentResources) > 0 {
// Already matched with other resources.
continue
}
codep := sets.NewString()
for jName, jRes := range c.RequestableResources {
if iName == jName || iRes.matchesFlavors(jRes) {
codep.Insert(string(jName))
}
}
if len(codep) > 1 {
for name := range codep {
c.RequestableResources[corev1.ResourceName(name)].CodependentResources = codep
}
}
}
}

// UpdateWithFlavors updates a ClusterQueue based on the passed ResourceFlavors set.
// Exported only for testing.
func (c *ClusterQueue) UpdateWithFlavors(flavors map[string]*kueue.ResourceFlavor) {
Expand All @@ -186,13 +224,13 @@ func (c *ClusterQueue) UpdateWithFlavors(flavors map[string]*kueue.ResourceFlavo
func (c *ClusterQueue) updateLabelKeys(flavors map[string]*kueue.ResourceFlavor) bool {
var flavorNotFound bool
labelKeys := map[corev1.ResourceName]sets.String{}
for rName, flvLimits := range c.RequestableResources {
if len(flvLimits) == 0 {
for rName, res := range c.RequestableResources {
if len(res.Flavors) == 0 {
continue
}
resKeys := sets.NewString()
for _, l := range flvLimits {
if flv, exist := flavors[l.Name]; exist {
for _, rf := range res.Flavors {
if flv, exist := flavors[rf.Name]; exist {
for k := range flv.Labels {
resKeys.Insert(k)
}
Expand Down Expand Up @@ -278,9 +316,9 @@ func (c *ClusterQueue) deleteQueue(q *kueue.Queue) {
}

func (c *ClusterQueue) flavorInUse(flavor string) bool {
for _, flvLimits := range c.RequestableResources {
for _, l := range flvLimits {
if flavor == l.Name {
for _, r := range c.RequestableResources {
for _, f := range r.Flavors {
if flavor == f.Name {
return true
}
}
Expand Down Expand Up @@ -602,7 +640,7 @@ func (c *Cache) Usage(cqObj *kueue.ClusterQueue) (kueue.UsedResources, int, erro
for rName, usedRes := range cq.UsedResources {
rUsage := make(map[string]kueue.Usage)
requestable := cq.RequestableResources[rName]
for _, flavor := range requestable {
for _, flavor := range requestable.Flavors {
used := usedRes[flavor.Name]
fUsage := kueue.Usage{
Total: pointer.Quantity(workload.ResourceQuantity(rName, used)),
Expand Down Expand Up @@ -683,8 +721,8 @@ func (c *Cache) MatchingClusterQueues(nsLabels map[string]string) sets.String {
return cqs
}

func resourceLimitsByName(in []kueue.Resource) map[corev1.ResourceName][]FlavorLimits {
out := make(map[corev1.ResourceName][]FlavorLimits, len(in))
func resourcesByName(in []kueue.Resource) map[corev1.ResourceName]*Resource {
out := make(map[corev1.ResourceName]*Resource, len(in))
for _, r := range in {
flavors := make([]FlavorLimits, len(r.Flavors))
for i := range flavors {
Expand All @@ -699,7 +737,9 @@ func resourceLimitsByName(in []kueue.Resource) map[corev1.ResourceName][]FlavorL
flavors[i] = fLimits

}
out[r.Name] = flavors
out[r.Name] = &Resource{
Flavors: flavors,
}
}
return out
}
Expand Down
Loading

0 comments on commit 4a90d5a

Please sign in to comment.