Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Define ClusterQueueSnapshot and CohortSnapshot types #2519

Merged
merged 1 commit into from
Jul 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,7 @@ func (c *Cache) Usage(cqObj *kueue.ClusterQueue) (*ClusterQueueUsageStats, error
}

if c.fairSharingEnabled {
weightedShare, _ := cq.DominantResourceShare()
weightedShare, _ := dominantResourceShare(cq, nil, 0)
stats.WeightedShare = int64(weightedShare)
}

Expand Down
12 changes: 1 addition & 11 deletions pkg/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1251,22 +1251,12 @@ func TestCacheClusterQueueOperations(t *testing.T) {
t.Errorf("Unexpected error during test operation: %s", err)
}
if diff := cmp.Diff(tc.wantClusterQueues, cache.clusterQueues,
cmpopts.IgnoreFields(ClusterQueue{}, "Cohort", "RGByResource", "ResourceGroups"),
cmpopts.IgnoreFields(ClusterQueue{}, "Cohort", "ResourceGroups"),
cmpopts.IgnoreFields(workload.Info{}, "Obj", "LastAssignment"),
cmpopts.IgnoreUnexported(ClusterQueue{}),
cmpopts.EquateEmpty()); diff != "" {
t.Errorf("Unexpected clusterQueues (-want,+got):\n%s", diff)
}
for _, cq := range cache.clusterQueues {
for i := range cq.ResourceGroups {
rg := &cq.ResourceGroups[i]
for rName := range rg.CoveredResources {
if cq.RGByResource[rName] != rg {
t.Errorf("RGByResource[%s] does not point to its resource group", rName)
}
}
}
}

gotCohorts := make(map[string]sets.Set[string], len(cache.cohorts))
for name, cohort := range cache.cohorts {
Expand Down
103 changes: 51 additions & 52 deletions pkg/cache/clusterqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ type ClusterQueue struct {
Name string
Cohort *Cohort
ResourceGroups []ResourceGroup
RGByResource map[corev1.ResourceName]*ResourceGroup
Usage resources.FlavorResourceQuantities
Workloads map[string]*workload.Info
WorkloadsNotReady sets.Set[string]
Expand All @@ -72,8 +71,6 @@ type ClusterQueue struct {
// Lendable holds the total lendable quota for the resources of the ClusterQueue, independent of the flavor.
Lendable map[corev1.ResourceName]int64

// The following fields are not populated in a snapshot.

AdmittedUsage resources.FlavorResourceQuantities
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are all of the fields still needed in this struct?

// localQueues by (namespace/name).
localQueues map[string]*queue
Expand All @@ -91,16 +88,6 @@ type ClusterQueue struct {
type Cohort struct {
alculquicondor marked this conversation as resolved.
Show resolved Hide resolved
Name string
Members sets.Set[*ClusterQueue]

// The next fields are only populated for a snapshot.

// RequestableResources equals to the sum of LendingLimit when feature LendingLimit enabled.
RequestableResources resources.FlavorResourceQuantities
Usage resources.FlavorResourceQuantities
Lendable map[corev1.ResourceName]int64
// AllocatableResourceGeneration equals to
// the sum of allocatable generation among its members.
AllocatableResourceGeneration int64
}

type ResourceGroup struct {
Expand Down Expand Up @@ -150,7 +137,7 @@ func (c *Cohort) CalculateLendable() map[corev1.ResourceName]int64 {
return lendable
}

func (c *ClusterQueue) FitInCohort(q resources.FlavorResourceQuantities) bool {
func (c *ClusterQueueSnapshot) FitInCohort(q resources.FlavorResourceQuantities) bool {
for flavor, qResources := range q {
if _, flavorFound := c.Cohort.RequestableResources[flavor]; flavorFound {
for resource, value := range qResources {
Expand Down Expand Up @@ -313,17 +300,6 @@ func (c *ClusterQueue) updateResourceGroups(in []kueue.ResourceGroup) {
if c.AllocatableResourceGeneration == 0 || !equality.Semantic.DeepEqual(oldRG, c.ResourceGroups) {
c.AllocatableResourceGeneration++
}
c.UpdateRGByResource()
}

func (c *ClusterQueue) UpdateRGByResource() {
c.RGByResource = make(map[corev1.ResourceName]*ResourceGroup)
for i := range c.ResourceGroups {
rg := &c.ResourceGroups[i]
for rName := range rg.CoveredResources {
c.RGByResource[rName] = rg
}
}
}

func (c *ClusterQueue) updateQueueStatus() {
Expand Down Expand Up @@ -530,7 +506,7 @@ func updateFlavorUsage(wi *workload.Info, flvUsage resources.FlavorResourceQuant
}
}

func updateCohortUsage(wi *workload.Info, cq *ClusterQueue, m int64) {
func updateCohortUsage(wi *workload.Info, cq *ClusterQueueSnapshot, m int64) {
for _, ps := range wi.TotalRequests {
for wlRes, wlResFlv := range ps.Flavors {
v, wlResExist := ps.Requests[wlRes]
Expand Down Expand Up @@ -626,7 +602,7 @@ func workloadBelongsToLocalQueue(wl *kueue.Workload, q *kueue.LocalQueue) bool {
// LendingLimit will also be counted here if feature LendingLimit enabled.
// Please note that for different clusterQueues, the requestable quota is different,
// they should be calculated dynamically.
func (c *ClusterQueue) RequestableCohortQuota(fName kueue.ResourceFlavorReference, rName corev1.ResourceName) (val int64) {
func (c *ClusterQueueSnapshot) RequestableCohortQuota(fName kueue.ResourceFlavorReference, rName corev1.ResourceName) (val int64) {
if c.Cohort.RequestableResources == nil || c.Cohort.RequestableResources[fName] == nil {
return 0
}
Expand All @@ -639,7 +615,7 @@ func (c *ClusterQueue) RequestableCohortQuota(fName kueue.ResourceFlavorReferenc
return requestableCohortQuota
}

func (c *ClusterQueue) guaranteedQuota(fName kueue.ResourceFlavorReference, rName corev1.ResourceName) (val int64) {
func (c *ClusterQueueSnapshot) guaranteedQuota(fName kueue.ResourceFlavorReference, rName corev1.ResourceName) (val int64) {
if !features.Enabled(features.LendingLimit) {
return 0
}
Expand All @@ -652,7 +628,7 @@ func (c *ClusterQueue) guaranteedQuota(fName kueue.ResourceFlavorReference, rNam
// UsedCohortQuota returns the used quota by the flavor and resource name in the cohort.
// Note that when LendingLimit enabled, the usage is not equal to the total used quota but the one
// minus the guaranteed resources, this is only for judging whether workloads fit in the cohort.
func (c *ClusterQueue) UsedCohortQuota(fName kueue.ResourceFlavorReference, rName corev1.ResourceName) (val int64) {
func (c *ClusterQueueSnapshot) UsedCohortQuota(fName kueue.ResourceFlavorReference, rName corev1.ResourceName) (val int64) {
if c.Cohort.Usage == nil || c.Cohort.Usage[fName] == nil {
return 0
}
Expand All @@ -674,41 +650,68 @@ func (c *ClusterQueue) UsedCohortQuota(fName kueue.ResourceFlavorReference, rNam
return cohortUsage
}

// The methods below implement several interfaces. See
// dominantResourceShareNode, resourceGroupNode, and netQuotaNode.

func (c *ClusterQueue) hasCohort() bool {
return c.Cohort != nil
}

func (c *ClusterQueue) fairWeight() *resource.Quantity {
return &c.FairWeight
}

func (c *ClusterQueue) lendableResourcesInCohort() map[corev1.ResourceName]int64 {
return c.Cohort.CalculateLendable()
}

func (c *ClusterQueue) usageFor(fr resources.FlavorResource) int64 {
return c.Usage.For(fr)
}

func (c *ClusterQueue) resourceGroups() []ResourceGroup {
return c.ResourceGroups
}

// DominantResourceShare returns a value from 0 to 1,000,000 representing the maximum of the ratios
// of usage above nominal quota to the lendable resources in the cohort, among all the resources
// provided by the ClusterQueue, and divided by the weight.
// If zero, it means that the usage of the ClusterQueue is below the nominal quota.
// The function also returns the resource name that yielded this value.
// Also for a weight of zero, this will return 9223372036854775807.
func (c *ClusterQueue) DominantResourceShare() (int, corev1.ResourceName) {
return c.dominantResourceShare(nil, 0)
func (c *ClusterQueueSnapshot) DominantResourceShare() (int, corev1.ResourceName) {
return dominantResourceShare(c, nil, 0)
}

func (c *ClusterQueue) DominantResourceShareWith(wlReq resources.FlavorResourceQuantities) (int, corev1.ResourceName) {
return c.dominantResourceShare(wlReq, 1)
func (c *ClusterQueueSnapshot) DominantResourceShareWith(wlReq resources.FlavorResourceQuantities) (int, corev1.ResourceName) {
return dominantResourceShare(c, wlReq, 1)
}

func (c *ClusterQueue) DominantResourceShareWithout(w *workload.Info) (int, corev1.ResourceName) {
return c.dominantResourceShare(w.FlavorResourceUsage(), -1)
func (c *ClusterQueueSnapshot) DominantResourceShareWithout(w *workload.Info) (int, corev1.ResourceName) {
return dominantResourceShare(c, w.FlavorResourceUsage(), -1)
}

func (c *ClusterQueue) dominantResourceShare(wlReq resources.FlavorResourceQuantities, m int64) (int, corev1.ResourceName) {
if c.Cohort == nil {
type dominantResourceShareNode interface {
hasCohort() bool
fairWeight() *resource.Quantity
lendableResourcesInCohort() map[corev1.ResourceName]int64

netQuotaNode
}

func dominantResourceShare(node dominantResourceShareNode, wlReq resources.FlavorResourceQuantities, m int64) (int, corev1.ResourceName) {
if !node.hasCohort() {
return 0, ""
}
if c.FairWeight.IsZero() {
if node.fairWeight().IsZero() {
return math.MaxInt, ""
}

borrowing := make(map[corev1.ResourceName]int64)
for _, rg := range c.ResourceGroups {
for _, flv := range rg.Flavors {
for rName, quotas := range flv.Resources {
b := c.Usage[flv.Name][rName] + m*wlReq[flv.Name][rName] - quotas.Nominal
if b > 0 {
borrowing[rName] += b
}
}
for fr, quota := range remainingQuota(node) {
b := m*wlReq[fr.Flavor][fr.Resource] - quota
if b > 0 {
borrowing[fr.Resource] += b
}
}
if len(borrowing) == 0 {
Expand All @@ -718,11 +721,7 @@ func (c *ClusterQueue) dominantResourceShare(wlReq resources.FlavorResourceQuant
var drs int64 = -1
var dRes corev1.ResourceName

// If we are running from snapshot the c.Cohort.Lendable should be pre-calculated.
lendable := c.Cohort.Lendable
if lendable == nil {
lendable = c.Cohort.CalculateLendable()
}
lendable := node.lendableResourcesInCohort()
for rName, b := range borrowing {
if lr := lendable[rName]; lr > 0 {
ratio := b * 1000 / lr
Expand All @@ -733,6 +732,6 @@ func (c *ClusterQueue) dominantResourceShare(wlReq resources.FlavorResourceQuant
}
}
}
dws := drs * 1000 / c.FairWeight.MilliValue()
dws := drs * 1000 / node.fairWeight().MilliValue()
return int(dws), dRes
}
72 changes: 72 additions & 0 deletions pkg/cache/clusterqueue_snapshot.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package cache

import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/sets"

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/metrics"
"sigs.k8s.io/kueue/pkg/resources"
"sigs.k8s.io/kueue/pkg/workload"
)

type ClusterQueueSnapshot struct {
alculquicondor marked this conversation as resolved.
Show resolved Hide resolved
Name string
Cohort *CohortSnapshot
ResourceGroups []ResourceGroup
Usage resources.FlavorResourceQuantities
Workloads map[string]*workload.Info
WorkloadsNotReady sets.Set[string]
NamespaceSelector labels.Selector
Preemption kueue.ClusterQueuePreemption
FairWeight resource.Quantity
FlavorFungibility kueue.FlavorFungibility
// Aggregates AdmissionChecks from both .spec.AdmissionChecks and .spec.AdmissionCheckStrategy
// Sets hold ResourceFlavors to which an AdmissionCheck should apply.
// In case its empty, it means an AdmissionCheck should apply to all ResourceFlavor
AdmissionChecks map[string]sets.Set[kueue.ResourceFlavorReference]
Status metrics.ClusterQueueStatus
// GuaranteedQuota records how much resource quota the ClusterQueue reserved
// when feature LendingLimit is enabled and flavor's lendingLimit is not nil.
GuaranteedQuota resources.FlavorResourceQuantities
// AllocatableResourceGeneration will be increased when some admitted workloads are
// deleted, or the resource groups are changed.
AllocatableResourceGeneration int64

// Lendable holds the total lendable quota for the resources of the ClusterQueue, independent of the flavor.
Lendable map[corev1.ResourceName]int64
}

// RGByResource returns the ResourceGroup which contains capacity
// for the resource, or nil if the CQ doesn't provide this resource.
func (c *ClusterQueueSnapshot) RGByResource(resource corev1.ResourceName) *ResourceGroup {
for i := range c.ResourceGroups {
if c.ResourceGroups[i].CoveredResources.Has(resource) {
return &c.ResourceGroups[i]
}
}
return nil
}

// The methods below implement several interfaces. See
// dominantResourceShareNode, resourceGroupNode, and netQuotaNode.

func (c *ClusterQueueSnapshot) hasCohort() bool {
return c.Cohort != nil
}
func (c *ClusterQueueSnapshot) fairWeight() *resource.Quantity {
return &c.FairWeight
}
func (c *ClusterQueueSnapshot) lendableResourcesInCohort() map[corev1.ResourceName]int64 {
return c.Cohort.Lendable
}

func (c *ClusterQueueSnapshot) usageFor(fr resources.FlavorResource) int64 {
return c.Usage.For(fr)
}

func (c *ClusterQueueSnapshot) resourceGroups() []ResourceGroup {
return c.ResourceGroups
}
Loading