From b7ecfe7b40381eb8e3500988bf0d6ce0192577ec Mon Sep 17 00:00:00 2001 From: Gabe <15304068+gabesaba@users.noreply.github.com> Date: Thu, 27 Jun 2024 15:06:23 +0000 Subject: [PATCH] Define ClusterQueueSnapshot and CohortSnapshot types --- pkg/cache/cache.go | 2 +- pkg/cache/cache_test.go | 12 +- pkg/cache/clusterqueue.go | 103 +++++++++--------- pkg/cache/clusterqueue_snapshot.go | 72 ++++++++++++ pkg/cache/clusterqueue_test.go | 84 +++++++------- pkg/cache/cohort_snapshot.go | 22 ++++ pkg/cache/resource.go | 47 ++++++++ pkg/cache/snapshot.go | 46 ++++---- pkg/cache/snapshot_test.go | 69 ++++++------ pkg/resources/resource.go | 8 ++ .../flavorassigner/flavorassigner.go | 12 +- .../flavorassigner/flavorassigner_test.go | 12 +- pkg/scheduler/preemption/preemption.go | 18 +-- pkg/scheduler/preemption/preemption_test.go | 8 +- pkg/scheduler/scheduler.go | 6 +- 15 files changed, 336 insertions(+), 185 deletions(-) create mode 100644 pkg/cache/clusterqueue_snapshot.go create mode 100644 pkg/cache/cohort_snapshot.go create mode 100644 pkg/cache/resource.go diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index 59d44172e2..3574128bcb 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -608,7 +608,7 @@ func (c *Cache) Usage(cqObj *kueue.ClusterQueue) (*ClusterQueueUsageStats, error } if c.fairSharingEnabled { - weightedShare, _ := cq.DominantResourceShare() + weightedShare, _ := dominantResourceShare(cq, nil, 0) stats.WeightedShare = int64(weightedShare) } diff --git a/pkg/cache/cache_test.go b/pkg/cache/cache_test.go index f13bead307..731efcc63d 100644 --- a/pkg/cache/cache_test.go +++ b/pkg/cache/cache_test.go @@ -1251,22 +1251,12 @@ func TestCacheClusterQueueOperations(t *testing.T) { t.Errorf("Unexpected error during test operation: %s", err) } if diff := cmp.Diff(tc.wantClusterQueues, cache.clusterQueues, - cmpopts.IgnoreFields(ClusterQueue{}, "Cohort", "RGByResource", "ResourceGroups"), + cmpopts.IgnoreFields(ClusterQueue{}, "Cohort", "ResourceGroups"), cmpopts.IgnoreFields(workload.Info{}, "Obj", "LastAssignment"), cmpopts.IgnoreUnexported(ClusterQueue{}), cmpopts.EquateEmpty()); diff != "" { t.Errorf("Unexpected clusterQueues (-want,+got):\n%s", diff) } - for _, cq := range cache.clusterQueues { - for i := range cq.ResourceGroups { - rg := &cq.ResourceGroups[i] - for rName := range rg.CoveredResources { - if cq.RGByResource[rName] != rg { - t.Errorf("RGByResource[%s] does not point to its resource group", rName) - } - } - } - } gotCohorts := make(map[string]sets.Set[string], len(cache.cohorts)) for name, cohort := range cache.cohorts { diff --git a/pkg/cache/clusterqueue.go b/pkg/cache/clusterqueue.go index 23713533f1..a270424473 100644 --- a/pkg/cache/clusterqueue.go +++ b/pkg/cache/clusterqueue.go @@ -49,7 +49,6 @@ type ClusterQueue struct { Name string Cohort *Cohort ResourceGroups []ResourceGroup - RGByResource map[corev1.ResourceName]*ResourceGroup Usage resources.FlavorResourceQuantities Workloads map[string]*workload.Info WorkloadsNotReady sets.Set[string] @@ -72,8 +71,6 @@ type ClusterQueue struct { // Lendable holds the total lendable quota for the resources of the ClusterQueue, independent of the flavor. Lendable map[corev1.ResourceName]int64 - // The following fields are not populated in a snapshot. - AdmittedUsage resources.FlavorResourceQuantities // localQueues by (namespace/name). localQueues map[string]*queue @@ -91,16 +88,6 @@ type ClusterQueue struct { type Cohort struct { Name string Members sets.Set[*ClusterQueue] - - // The next fields are only populated for a snapshot. - - // RequestableResources equals to the sum of LendingLimit when feature LendingLimit enabled. - RequestableResources resources.FlavorResourceQuantities - Usage resources.FlavorResourceQuantities - Lendable map[corev1.ResourceName]int64 - // AllocatableResourceGeneration equals to - // the sum of allocatable generation among its members. - AllocatableResourceGeneration int64 } type ResourceGroup struct { @@ -150,7 +137,7 @@ func (c *Cohort) CalculateLendable() map[corev1.ResourceName]int64 { return lendable } -func (c *ClusterQueue) FitInCohort(q resources.FlavorResourceQuantities) bool { +func (c *ClusterQueueSnapshot) FitInCohort(q resources.FlavorResourceQuantities) bool { for flavor, qResources := range q { if _, flavorFound := c.Cohort.RequestableResources[flavor]; flavorFound { for resource, value := range qResources { @@ -313,17 +300,6 @@ func (c *ClusterQueue) updateResourceGroups(in []kueue.ResourceGroup) { if c.AllocatableResourceGeneration == 0 || !equality.Semantic.DeepEqual(oldRG, c.ResourceGroups) { c.AllocatableResourceGeneration++ } - c.UpdateRGByResource() -} - -func (c *ClusterQueue) UpdateRGByResource() { - c.RGByResource = make(map[corev1.ResourceName]*ResourceGroup) - for i := range c.ResourceGroups { - rg := &c.ResourceGroups[i] - for rName := range rg.CoveredResources { - c.RGByResource[rName] = rg - } - } } func (c *ClusterQueue) updateQueueStatus() { @@ -530,7 +506,7 @@ func updateFlavorUsage(wi *workload.Info, flvUsage resources.FlavorResourceQuant } } -func updateCohortUsage(wi *workload.Info, cq *ClusterQueue, m int64) { +func updateCohortUsage(wi *workload.Info, cq *ClusterQueueSnapshot, m int64) { for _, ps := range wi.TotalRequests { for wlRes, wlResFlv := range ps.Flavors { v, wlResExist := ps.Requests[wlRes] @@ -626,7 +602,7 @@ func workloadBelongsToLocalQueue(wl *kueue.Workload, q *kueue.LocalQueue) bool { // LendingLimit will also be counted here if feature LendingLimit enabled. // Please note that for different clusterQueues, the requestable quota is different, // they should be calculated dynamically. -func (c *ClusterQueue) RequestableCohortQuota(fName kueue.ResourceFlavorReference, rName corev1.ResourceName) (val int64) { +func (c *ClusterQueueSnapshot) RequestableCohortQuota(fName kueue.ResourceFlavorReference, rName corev1.ResourceName) (val int64) { if c.Cohort.RequestableResources == nil || c.Cohort.RequestableResources[fName] == nil { return 0 } @@ -639,7 +615,7 @@ func (c *ClusterQueue) RequestableCohortQuota(fName kueue.ResourceFlavorReferenc return requestableCohortQuota } -func (c *ClusterQueue) guaranteedQuota(fName kueue.ResourceFlavorReference, rName corev1.ResourceName) (val int64) { +func (c *ClusterQueueSnapshot) guaranteedQuota(fName kueue.ResourceFlavorReference, rName corev1.ResourceName) (val int64) { if !features.Enabled(features.LendingLimit) { return 0 } @@ -652,7 +628,7 @@ func (c *ClusterQueue) guaranteedQuota(fName kueue.ResourceFlavorReference, rNam // UsedCohortQuota returns the used quota by the flavor and resource name in the cohort. // Note that when LendingLimit enabled, the usage is not equal to the total used quota but the one // minus the guaranteed resources, this is only for judging whether workloads fit in the cohort. -func (c *ClusterQueue) UsedCohortQuota(fName kueue.ResourceFlavorReference, rName corev1.ResourceName) (val int64) { +func (c *ClusterQueueSnapshot) UsedCohortQuota(fName kueue.ResourceFlavorReference, rName corev1.ResourceName) (val int64) { if c.Cohort.Usage == nil || c.Cohort.Usage[fName] == nil { return 0 } @@ -674,41 +650,68 @@ func (c *ClusterQueue) UsedCohortQuota(fName kueue.ResourceFlavorReference, rNam return cohortUsage } +// The methods below implement several interfaces. See +// dominantResourceShareNode, resourceGroupNode, and netQuotaNode. + +func (c *ClusterQueue) hasCohort() bool { + return c.Cohort != nil +} + +func (c *ClusterQueue) fairWeight() *resource.Quantity { + return &c.FairWeight +} + +func (c *ClusterQueue) lendableResourcesInCohort() map[corev1.ResourceName]int64 { + return c.Cohort.CalculateLendable() +} + +func (c *ClusterQueue) usageFor(fr resources.FlavorResource) int64 { + return c.Usage.For(fr) +} + +func (c *ClusterQueue) resourceGroups() []ResourceGroup { + return c.ResourceGroups +} + // DominantResourceShare returns a value from 0 to 1,000,000 representing the maximum of the ratios // of usage above nominal quota to the lendable resources in the cohort, among all the resources // provided by the ClusterQueue, and divided by the weight. // If zero, it means that the usage of the ClusterQueue is below the nominal quota. // The function also returns the resource name that yielded this value. // Also for a weight of zero, this will return 9223372036854775807. -func (c *ClusterQueue) DominantResourceShare() (int, corev1.ResourceName) { - return c.dominantResourceShare(nil, 0) +func (c *ClusterQueueSnapshot) DominantResourceShare() (int, corev1.ResourceName) { + return dominantResourceShare(c, nil, 0) } -func (c *ClusterQueue) DominantResourceShareWith(wlReq resources.FlavorResourceQuantities) (int, corev1.ResourceName) { - return c.dominantResourceShare(wlReq, 1) +func (c *ClusterQueueSnapshot) DominantResourceShareWith(wlReq resources.FlavorResourceQuantities) (int, corev1.ResourceName) { + return dominantResourceShare(c, wlReq, 1) } -func (c *ClusterQueue) DominantResourceShareWithout(w *workload.Info) (int, corev1.ResourceName) { - return c.dominantResourceShare(w.FlavorResourceUsage(), -1) +func (c *ClusterQueueSnapshot) DominantResourceShareWithout(w *workload.Info) (int, corev1.ResourceName) { + return dominantResourceShare(c, w.FlavorResourceUsage(), -1) } -func (c *ClusterQueue) dominantResourceShare(wlReq resources.FlavorResourceQuantities, m int64) (int, corev1.ResourceName) { - if c.Cohort == nil { +type dominantResourceShareNode interface { + hasCohort() bool + fairWeight() *resource.Quantity + lendableResourcesInCohort() map[corev1.ResourceName]int64 + + netQuotaNode +} + +func dominantResourceShare(node dominantResourceShareNode, wlReq resources.FlavorResourceQuantities, m int64) (int, corev1.ResourceName) { + if !node.hasCohort() { return 0, "" } - if c.FairWeight.IsZero() { + if node.fairWeight().IsZero() { return math.MaxInt, "" } borrowing := make(map[corev1.ResourceName]int64) - for _, rg := range c.ResourceGroups { - for _, flv := range rg.Flavors { - for rName, quotas := range flv.Resources { - b := c.Usage[flv.Name][rName] + m*wlReq[flv.Name][rName] - quotas.Nominal - if b > 0 { - borrowing[rName] += b - } - } + for fr, quota := range remainingQuota(node) { + b := m*wlReq[fr.Flavor][fr.Resource] - quota + if b > 0 { + borrowing[fr.Resource] += b } } if len(borrowing) == 0 { @@ -718,11 +721,7 @@ func (c *ClusterQueue) dominantResourceShare(wlReq resources.FlavorResourceQuant var drs int64 = -1 var dRes corev1.ResourceName - // If we are running from snapshot the c.Cohort.Lendable should be pre-calculated. - lendable := c.Cohort.Lendable - if lendable == nil { - lendable = c.Cohort.CalculateLendable() - } + lendable := node.lendableResourcesInCohort() for rName, b := range borrowing { if lr := lendable[rName]; lr > 0 { ratio := b * 1000 / lr @@ -733,6 +732,6 @@ func (c *ClusterQueue) dominantResourceShare(wlReq resources.FlavorResourceQuant } } } - dws := drs * 1000 / c.FairWeight.MilliValue() + dws := drs * 1000 / node.fairWeight().MilliValue() return int(dws), dRes } diff --git a/pkg/cache/clusterqueue_snapshot.go b/pkg/cache/clusterqueue_snapshot.go new file mode 100644 index 0000000000..8c6c8e1fa5 --- /dev/null +++ b/pkg/cache/clusterqueue_snapshot.go @@ -0,0 +1,72 @@ +package cache + +import ( + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/sets" + + kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" + "sigs.k8s.io/kueue/pkg/metrics" + "sigs.k8s.io/kueue/pkg/resources" + "sigs.k8s.io/kueue/pkg/workload" +) + +type ClusterQueueSnapshot struct { + Name string + Cohort *CohortSnapshot + ResourceGroups []ResourceGroup + Usage resources.FlavorResourceQuantities + Workloads map[string]*workload.Info + WorkloadsNotReady sets.Set[string] + NamespaceSelector labels.Selector + Preemption kueue.ClusterQueuePreemption + FairWeight resource.Quantity + FlavorFungibility kueue.FlavorFungibility + // Aggregates AdmissionChecks from both .spec.AdmissionChecks and .spec.AdmissionCheckStrategy + // Sets hold ResourceFlavors to which an AdmissionCheck should apply. + // In case its empty, it means an AdmissionCheck should apply to all ResourceFlavor + AdmissionChecks map[string]sets.Set[kueue.ResourceFlavorReference] + Status metrics.ClusterQueueStatus + // GuaranteedQuota records how much resource quota the ClusterQueue reserved + // when feature LendingLimit is enabled and flavor's lendingLimit is not nil. + GuaranteedQuota resources.FlavorResourceQuantities + // AllocatableResourceGeneration will be increased when some admitted workloads are + // deleted, or the resource groups are changed. + AllocatableResourceGeneration int64 + + // Lendable holds the total lendable quota for the resources of the ClusterQueue, independent of the flavor. + Lendable map[corev1.ResourceName]int64 +} + +// RGByResource returns the ResourceGroup which contains capacity +// for the resource, or nil if the CQ doesn't provide this resource. +func (c *ClusterQueueSnapshot) RGByResource(resource corev1.ResourceName) *ResourceGroup { + for i := range c.ResourceGroups { + if c.ResourceGroups[i].CoveredResources.Has(resource) { + return &c.ResourceGroups[i] + } + } + return nil +} + +// The methods below implement several interfaces. See +// dominantResourceShareNode, resourceGroupNode, and netQuotaNode. + +func (c *ClusterQueueSnapshot) hasCohort() bool { + return c.Cohort != nil +} +func (c *ClusterQueueSnapshot) fairWeight() *resource.Quantity { + return &c.FairWeight +} +func (c *ClusterQueueSnapshot) lendableResourcesInCohort() map[corev1.ResourceName]int64 { + return c.Cohort.Lendable +} + +func (c *ClusterQueueSnapshot) usageFor(fr resources.FlavorResource) int64 { + return c.Usage.For(fr) +} + +func (c *ClusterQueueSnapshot) resourceGroups() []ResourceGroup { + return c.ResourceGroups +} diff --git a/pkg/cache/clusterqueue_test.go b/pkg/cache/clusterqueue_test.go index f06a3bdd9b..e0aea8eae1 100644 --- a/pkg/cache/clusterqueue_test.go +++ b/pkg/cache/clusterqueue_test.go @@ -97,15 +97,15 @@ func TestFitInCohort(t *testing.T) { cases := map[string]struct { request resources.FlavorResourceQuantities wantFit bool - cq *ClusterQueue + cq *ClusterQueueSnapshot enableLendingLimit bool }{ "full cohort, empty request": { request: resources.FlavorResourceQuantities{}, wantFit: true, - cq: &ClusterQueue{ + cq: &ClusterQueueSnapshot{ Name: "CQ", - Cohort: &Cohort{ + Cohort: &CohortSnapshot{ Name: "C", RequestableResources: resources.FlavorResourceQuantitiesFlat{ {Flavor: "f1", Resource: corev1.ResourceCPU}: 5, @@ -129,9 +129,9 @@ func TestFitInCohort(t *testing.T) { {Flavor: "f2", Resource: corev1.ResourceMemory}: 1, }.Unflatten(), wantFit: true, - cq: &ClusterQueue{ + cq: &ClusterQueueSnapshot{ Name: "CQ", - Cohort: &Cohort{ + Cohort: &CohortSnapshot{ Name: "C", RequestableResources: resources.FlavorResourceQuantitiesFlat{ {Flavor: "f1", Resource: corev1.ResourceCPU}: 5, @@ -157,9 +157,9 @@ func TestFitInCohort(t *testing.T) { {Flavor: "f2", Resource: corev1.ResourceMemory}: 1, }.Unflatten(), wantFit: false, - cq: &ClusterQueue{ + cq: &ClusterQueueSnapshot{ Name: "CQ", - Cohort: &Cohort{ + Cohort: &CohortSnapshot{ Name: "C", RequestableResources: resources.FlavorResourceQuantitiesFlat{ {Flavor: "f1", Resource: corev1.ResourceCPU}: 5, @@ -185,9 +185,9 @@ func TestFitInCohort(t *testing.T) { {Flavor: "f2", Resource: corev1.ResourceMemory}: 1, }.Unflatten(), wantFit: false, - cq: &ClusterQueue{ + cq: &ClusterQueueSnapshot{ Name: "CQ", - Cohort: &Cohort{ + Cohort: &CohortSnapshot{ Name: "C", RequestableResources: resources.FlavorResourceQuantitiesFlat{ {Flavor: "f1", Resource: corev1.ResourceCPU}: 5, @@ -211,9 +211,9 @@ func TestFitInCohort(t *testing.T) { {Flavor: "f2", Resource: corev1.ResourceMemory}: 1, }.Unflatten(), wantFit: false, - cq: &ClusterQueue{ + cq: &ClusterQueueSnapshot{ Name: "CQ", - Cohort: &Cohort{ + Cohort: &CohortSnapshot{ Name: "C", RequestableResources: resources.FlavorResourceQuantitiesFlat{ {Flavor: "f1", Resource: corev1.ResourceCPU}: 5, @@ -233,9 +233,9 @@ func TestFitInCohort(t *testing.T) { {Flavor: "f1", Resource: corev1.ResourceMemory}: 1, }.Unflatten(), wantFit: false, - cq: &ClusterQueue{ + cq: &ClusterQueueSnapshot{ Name: "CQ", - Cohort: &Cohort{ + Cohort: &CohortSnapshot{ Name: "C", RequestableResources: resources.FlavorResourceQuantitiesFlat{ {Flavor: "f1", Resource: corev1.ResourceCPU}: 5, @@ -252,9 +252,9 @@ func TestFitInCohort(t *testing.T) { {Flavor: "f1", Resource: corev1.ResourceCPU}: 3, }.Unflatten(), wantFit: false, - cq: &ClusterQueue{ + cq: &ClusterQueueSnapshot{ Name: "CQ-A", - Cohort: &Cohort{ + Cohort: &CohortSnapshot{ Name: "C", RequestableResources: resources.FlavorResourceQuantitiesFlat{ {Flavor: "f1", Resource: @@ -277,9 +277,9 @@ func TestFitInCohort(t *testing.T) { {Flavor: "f1", Resource: corev1.ResourceCPU}: 3, }.Unflatten(), wantFit: true, - cq: &ClusterQueue{ + cq: &ClusterQueueSnapshot{ Name: "CQ-A", - Cohort: &Cohort{ + Cohort: &CohortSnapshot{ Name: "C", RequestableResources: resources.FlavorResourceQuantitiesFlat{ {Flavor: "f1", Resource: @@ -715,13 +715,13 @@ func TestClusterQueueUpdateWithAdmissionCheck(t *testing.T) { func TestDominantResourceShare(t *testing.T) { cases := map[string]struct { - cq ClusterQueue + cq ClusterQueueSnapshot flvResQ resources.FlavorResourceQuantities wantDRValue int wantDRName corev1.ResourceName }{ "no cohort": { - cq: ClusterQueue{ + cq: ClusterQueueSnapshot{ FairWeight: oneQuantity, Usage: resources.FlavorResourceQuantitiesFlat{ {Flavor: "default", Resource: corev1.ResourceCPU}: 1_000, @@ -729,6 +729,7 @@ func TestDominantResourceShare(t *testing.T) { }.Unflatten(), ResourceGroups: []ResourceGroup{ { + CoveredResources: sets.New(corev1.ResourceCPU, "example.com/gpu"), Flavors: []FlavorQuotas{ { Name: "default", @@ -747,7 +748,7 @@ func TestDominantResourceShare(t *testing.T) { }, }, "usage below nominal": { - cq: ClusterQueue{ + cq: ClusterQueueSnapshot{ FairWeight: oneQuantity, Usage: resources.FlavorResourceQuantitiesFlat{ {Flavor: "default", Resource: corev1.ResourceCPU}: 1_000, @@ -755,6 +756,7 @@ func TestDominantResourceShare(t *testing.T) { }.Unflatten(), ResourceGroups: []ResourceGroup{ { + CoveredResources: sets.New(corev1.ResourceCPU, "example.com/gpu"), Flavors: []FlavorQuotas{ { Name: "default", @@ -770,7 +772,7 @@ func TestDominantResourceShare(t *testing.T) { }, }, }, - Cohort: &Cohort{ + Cohort: &CohortSnapshot{ Lendable: map[corev1.ResourceName]int64{ corev1.ResourceCPU: 10_000, "example.com/gpu": 10, @@ -779,7 +781,7 @@ func TestDominantResourceShare(t *testing.T) { }, }, "usage above nominal": { - cq: ClusterQueue{ + cq: ClusterQueueSnapshot{ FairWeight: oneQuantity, Usage: resources.FlavorResourceQuantitiesFlat{ {Flavor: "default", Resource: corev1.ResourceCPU}: 3_000, @@ -787,6 +789,7 @@ func TestDominantResourceShare(t *testing.T) { }.Unflatten(), ResourceGroups: []ResourceGroup{ { + CoveredResources: sets.New(corev1.ResourceCPU, "example.com/gpu"), Flavors: []FlavorQuotas{ { Name: "default", @@ -802,7 +805,7 @@ func TestDominantResourceShare(t *testing.T) { }, }, }, - Cohort: &Cohort{ + Cohort: &CohortSnapshot{ Lendable: map[corev1.ResourceName]int64{ corev1.ResourceCPU: 10_000, "example.com/gpu": 10, @@ -813,7 +816,7 @@ func TestDominantResourceShare(t *testing.T) { wantDRValue: 200, // (7-5)*1000/10 }, "one resource above nominal": { - cq: ClusterQueue{ + cq: ClusterQueueSnapshot{ FairWeight: oneQuantity, Usage: resources.FlavorResourceQuantitiesFlat{ {Flavor: "default", Resource: corev1.ResourceCPU}: 3_000, @@ -821,6 +824,7 @@ func TestDominantResourceShare(t *testing.T) { }.Unflatten(), ResourceGroups: []ResourceGroup{ { + CoveredResources: sets.New(corev1.ResourceCPU, "example.com/gpu"), Flavors: []FlavorQuotas{ { Name: "default", @@ -836,7 +840,7 @@ func TestDominantResourceShare(t *testing.T) { }, }, }, - Cohort: &Cohort{ + Cohort: &CohortSnapshot{ Lendable: map[corev1.ResourceName]int64{ corev1.ResourceCPU: 10_000, "example.com/gpu": 10, @@ -847,7 +851,7 @@ func TestDominantResourceShare(t *testing.T) { wantDRValue: 100, // (3-2)*1000/10 }, "usage with workload above nominal": { - cq: ClusterQueue{ + cq: ClusterQueueSnapshot{ FairWeight: oneQuantity, Usage: resources.FlavorResourceQuantitiesFlat{ {Flavor: "default", Resource: corev1.ResourceCPU}: 1_000, @@ -855,6 +859,7 @@ func TestDominantResourceShare(t *testing.T) { }.Unflatten(), ResourceGroups: []ResourceGroup{ { + CoveredResources: sets.New(corev1.ResourceCPU, "example.com/gpu"), Flavors: []FlavorQuotas{ { Name: "default", @@ -870,7 +875,7 @@ func TestDominantResourceShare(t *testing.T) { }, }, }, - Cohort: &Cohort{ + Cohort: &CohortSnapshot{ Lendable: map[corev1.ResourceName]int64{ corev1.ResourceCPU: 10_000, "example.com/gpu": 10, @@ -885,7 +890,7 @@ func TestDominantResourceShare(t *testing.T) { wantDRValue: 300, // (1+4-2)*1000/10 }, "A resource with zero lendable": { - cq: ClusterQueue{ + cq: ClusterQueueSnapshot{ FairWeight: oneQuantity, Usage: resources.FlavorResourceQuantitiesFlat{ {Flavor: "default", Resource: corev1.ResourceCPU}: 1_000, @@ -893,6 +898,7 @@ func TestDominantResourceShare(t *testing.T) { }.Unflatten(), ResourceGroups: []ResourceGroup{ { + CoveredResources: sets.New(corev1.ResourceCPU, "example.com/gpu"), Flavors: []FlavorQuotas{ { Name: "default", @@ -909,7 +915,7 @@ func TestDominantResourceShare(t *testing.T) { }, }, }, - Cohort: &Cohort{ + Cohort: &CohortSnapshot{ Lendable: map[corev1.ResourceName]int64{ corev1.ResourceCPU: 10_000, "example.com/gpu": 0, @@ -924,7 +930,7 @@ func TestDominantResourceShare(t *testing.T) { wantDRValue: 300, // (1+4-2)*1000/10 }, "multiple flavors": { - cq: ClusterQueue{ + cq: ClusterQueueSnapshot{ FairWeight: oneQuantity, Usage: resources.FlavorResourceQuantitiesFlat{ {Flavor: "on-demand", Resource: corev1.ResourceCPU}: 15_000, @@ -932,6 +938,7 @@ func TestDominantResourceShare(t *testing.T) { }.Unflatten(), ResourceGroups: []ResourceGroup{ { + CoveredResources: sets.New(corev1.ResourceCPU, "example.com/gpu"), Flavors: []FlavorQuotas{ { Name: "on-demand", @@ -952,7 +959,7 @@ func TestDominantResourceShare(t *testing.T) { }, }, }, - Cohort: &Cohort{ + Cohort: &CohortSnapshot{ Lendable: map[corev1.ResourceName]int64{ corev1.ResourceCPU: 200_000, }, @@ -965,13 +972,14 @@ func TestDominantResourceShare(t *testing.T) { wantDRValue: 25, // ((15+10-20)+0)*1000/200 (spot under nominal) }, "above nominal with integer weight": { - cq: ClusterQueue{ + cq: ClusterQueueSnapshot{ FairWeight: resource.MustParse("2"), Usage: resources.FlavorResourceQuantitiesFlat{ {Flavor: "default", Resource: "example.com/gpu"}: 7, }.Unflatten(), ResourceGroups: []ResourceGroup{ { + CoveredResources: sets.New(corev1.ResourceCPU, "example.com/gpu"), Flavors: []FlavorQuotas{ { Name: "default", @@ -984,7 +992,7 @@ func TestDominantResourceShare(t *testing.T) { }, }, }, - Cohort: &Cohort{ + Cohort: &CohortSnapshot{ Lendable: map[corev1.ResourceName]int64{ "example.com/gpu": 10, }, @@ -994,13 +1002,14 @@ func TestDominantResourceShare(t *testing.T) { wantDRValue: 100, // ((7-5)*1000/10)/2 }, "above nominal with decimal weight": { - cq: ClusterQueue{ + cq: ClusterQueueSnapshot{ FairWeight: resource.MustParse("0.5"), Usage: resources.FlavorResourceQuantitiesFlat{ {Flavor: "default", Resource: "example.com/gpu"}: 7, }.Unflatten(), ResourceGroups: []ResourceGroup{ { + CoveredResources: sets.New(corev1.ResourceCPU, "example.com/gpu"), Flavors: []FlavorQuotas{ { Name: "default", @@ -1013,7 +1022,7 @@ func TestDominantResourceShare(t *testing.T) { }, }, }, - Cohort: &Cohort{ + Cohort: &CohortSnapshot{ Lendable: map[corev1.ResourceName]int64{ "example.com/gpu": 10, }, @@ -1023,12 +1032,13 @@ func TestDominantResourceShare(t *testing.T) { wantDRValue: 400, // ((7-5)*1000/10)/(1/2) }, "above nominal with zero weight": { - cq: ClusterQueue{ + cq: ClusterQueueSnapshot{ Usage: resources.FlavorResourceQuantitiesFlat{ {Flavor: "default", Resource: "example.com/gpu"}: 7, }.Unflatten(), ResourceGroups: []ResourceGroup{ { + CoveredResources: sets.New(corev1.ResourceCPU, "example.com/gpu"), Flavors: []FlavorQuotas{ { Name: "default", @@ -1041,7 +1051,7 @@ func TestDominantResourceShare(t *testing.T) { }, }, }, - Cohort: &Cohort{ + Cohort: &CohortSnapshot{ Lendable: map[corev1.ResourceName]int64{ "example.com/gpu": 10, }, diff --git a/pkg/cache/cohort_snapshot.go b/pkg/cache/cohort_snapshot.go new file mode 100644 index 0000000000..4fea62709d --- /dev/null +++ b/pkg/cache/cohort_snapshot.go @@ -0,0 +1,22 @@ +package cache + +import ( + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/sets" + + "sigs.k8s.io/kueue/pkg/resources" +) + +type CohortSnapshot struct { + Name string + Members sets.Set[*ClusterQueueSnapshot] + + // RequestableResources equals to the sum of LendingLimit when feature LendingLimit enabled. + RequestableResources resources.FlavorResourceQuantities + Usage resources.FlavorResourceQuantities + Lendable map[corev1.ResourceName]int64 + + // AllocatableResourceGeneration equals to + // the sum of allocatable generation among its members. + AllocatableResourceGeneration int64 +} diff --git a/pkg/cache/resource.go b/pkg/cache/resource.go new file mode 100644 index 0000000000..28ff6d1223 --- /dev/null +++ b/pkg/cache/resource.go @@ -0,0 +1,47 @@ +package cache + +import ( + "sigs.k8s.io/kueue/pkg/resources" +) + +type resourceGroupNode interface { + resourceGroups() []ResourceGroup +} + +type flavorResourceQuota struct { + fr resources.FlavorResource + quota *ResourceQuota +} + +// flavorResourceQuotas returns all of the FlavorResource(s) defined in the given, +// node, along with their corresponding quotas. +func flavorResourceQuotas(node resourceGroupNode) (flavorResources []flavorResourceQuota) { + for _, rg := range node.resourceGroups() { + for _, flavor := range rg.Flavors { + for resourceName, resource := range flavor.Resources { + flavorResources = append(flavorResources, + flavorResourceQuota{ + fr: resources.FlavorResource{Flavor: flavor.Name, Resource: resourceName}, + quota: resource, + }, + ) + } + } + } + return +} + +type netQuotaNode interface { + usageFor(resources.FlavorResource) int64 + resourceGroups() []ResourceGroup +} + +// remainingQuota computes the remaining quota for each FlavorResource. A +// negative value implies that the node is borrowing. +func remainingQuota(node netQuotaNode) resources.FlavorResourceQuantitiesFlat { + remainingQuota := make(resources.FlavorResourceQuantitiesFlat) + for _, frq := range flavorResourceQuotas(node) { + remainingQuota[frq.fr] += frq.quota.Nominal - node.usageFor(frq.fr) + } + return remainingQuota +} diff --git a/pkg/cache/snapshot.go b/pkg/cache/snapshot.go index e60a659235..8384278439 100644 --- a/pkg/cache/snapshot.go +++ b/pkg/cache/snapshot.go @@ -32,7 +32,7 @@ import ( ) type Snapshot struct { - ClusterQueues map[string]*ClusterQueue + ClusterQueues map[string]*ClusterQueueSnapshot ResourceFlavors map[kueue.ResourceFlavorReference]*kueue.ResourceFlavor InactiveClusterQueueSets sets.Set[string] } @@ -53,7 +53,7 @@ func (s *Snapshot) AddWorkload(wl *workload.Info) { cq.addOrRemoveWorkload(wl, 1) } -func (c *ClusterQueue) addOrRemoveWorkload(wl *workload.Info, m int64) { +func (c *ClusterQueueSnapshot) addOrRemoveWorkload(wl *workload.Info, m int64) { updateFlavorUsage(wl, c.Usage, m) if c.Cohort != nil { if features.Enabled(features.LendingLimit) { @@ -65,7 +65,7 @@ func (c *ClusterQueue) addOrRemoveWorkload(wl *workload.Info, m int64) { } func (s *Snapshot) Log(log logr.Logger) { - cohorts := make(map[string]*Cohort) + cohorts := make(map[string]*CohortSnapshot) for name, cq := range s.ClusterQueues { cohortName := "" if cq.Cohort != nil { @@ -95,7 +95,7 @@ func (c *Cache) Snapshot() Snapshot { defer c.RUnlock() snap := Snapshot{ - ClusterQueues: make(map[string]*ClusterQueue, len(c.clusterQueues)), + ClusterQueues: make(map[string]*ClusterQueueSnapshot, len(c.clusterQueues)), ResourceFlavors: make(map[kueue.ResourceFlavorReference]*kueue.ResourceFlavor, len(c.resourceFlavors)), InactiveClusterQueueSets: sets.New[string](), } @@ -111,29 +111,17 @@ func (c *Cache) Snapshot() Snapshot { snap.ResourceFlavors[name] = rf } for _, cohort := range c.cohorts { - cohortCopy := newCohort(cohort.Name, cohort.Members.Len()) - cohortCopy.AllocatableResourceGeneration = 0 - for cq := range cohort.Members { - if cq.Active() { - cqCopy := snap.ClusterQueues[cq.Name] - cqCopy.accumulateResources(cohortCopy) - cqCopy.Cohort = cohortCopy - cohortCopy.Members.Insert(cqCopy) - cohortCopy.AllocatableResourceGeneration += cqCopy.AllocatableResourceGeneration - cohortCopy.Lendable = cohortCopy.CalculateLendable() - } - } + cohort.snapshotInto(snap.ClusterQueues) } return snap } // snapshot creates a copy of ClusterQueue that includes references to immutable // objects and deep copies of changing ones. A reference to the cohort is not included. -func (c *ClusterQueue) snapshot() *ClusterQueue { - cc := &ClusterQueue{ +func (c *ClusterQueue) snapshot() *ClusterQueueSnapshot { + cc := &ClusterQueueSnapshot{ Name: c.Name, ResourceGroups: c.ResourceGroups, // Shallow copy is enough. - RGByResource: c.RGByResource, // Shallow copy is enough. FlavorFungibility: c.FlavorFungibility, FairWeight: c.FairWeight, AllocatableResourceGeneration: c.AllocatableResourceGeneration, @@ -156,7 +144,25 @@ func (c *ClusterQueue) snapshot() *ClusterQueue { return cc } -func (c *ClusterQueue) accumulateResources(cohort *Cohort) { +func (c *Cohort) snapshotInto(cqs map[string]*ClusterQueueSnapshot) { + cohortSnap := &CohortSnapshot{ + Name: c.Name, + Members: make(sets.Set[*ClusterQueueSnapshot], c.Members.Len()), + Lendable: c.CalculateLendable(), + } + cohortSnap.AllocatableResourceGeneration = 0 + for cq := range c.Members { + if cq.Active() { + cqSnap := cqs[cq.Name] + cqSnap.accumulateResources(cohortSnap) + cqSnap.Cohort = cohortSnap + cohortSnap.Members.Insert(cqSnap) + cohortSnap.AllocatableResourceGeneration += cqSnap.AllocatableResourceGeneration + } + } +} + +func (c *ClusterQueueSnapshot) accumulateResources(cohort *CohortSnapshot) { if cohort.RequestableResources == nil { cohort.RequestableResources = make(resources.FlavorResourceQuantities, len(c.ResourceGroups)) } diff --git a/pkg/cache/snapshot_test.go b/pkg/cache/snapshot_test.go index c317a3964e..2a8e7aedef 100644 --- a/pkg/cache/snapshot_test.go +++ b/pkg/cache/snapshot_test.go @@ -38,9 +38,7 @@ import ( var snapCmpOpts = []cmp.Option{ cmpopts.EquateEmpty(), - cmpopts.IgnoreUnexported(ClusterQueue{}), - cmpopts.IgnoreFields(ClusterQueue{}, "RGByResource"), - cmpopts.IgnoreFields(Cohort{}, "Members"), // avoid recursion. + cmpopts.IgnoreFields(CohortSnapshot{}, "Members"), // avoid recursion. cmpopts.IgnoreFields(metav1.Condition{}, "LastTransitionTime"), } @@ -65,7 +63,7 @@ func TestSnapshot(t *testing.T) { ReserveQuota(&kueue.Admission{ClusterQueue: "b"}).Obj(), }, wantSnapshot: Snapshot{ - ClusterQueues: map[string]*ClusterQueue{ + ClusterQueues: map[string]*ClusterQueueSnapshot{ "a": { Name: "a", NamespaceSelector: labels.Everything(), @@ -121,7 +119,7 @@ func TestSnapshot(t *testing.T) { utiltesting.MakeResourceFlavor("default").Obj(), }, wantSnapshot: Snapshot{ - ClusterQueues: map[string]*ClusterQueue{}, + ClusterQueues: map[string]*ClusterQueueSnapshot{}, ResourceFlavors: map[kueue.ResourceFlavorReference]*kueue.ResourceFlavor{ "demand": utiltesting.MakeResourceFlavor("demand"). Label("a", "b"). @@ -202,7 +200,7 @@ func TestSnapshot(t *testing.T) { Obj(), }, wantSnapshot: func() Snapshot { - cohort := &Cohort{ + cohort := &CohortSnapshot{ Name: "borrowing", AllocatableResourceGeneration: 2, RequestableResources: resources.FlavorResourceQuantitiesFlat{ @@ -221,7 +219,7 @@ func TestSnapshot(t *testing.T) { }, } return Snapshot{ - ClusterQueues: map[string]*ClusterQueue{ + ClusterQueues: map[string]*ClusterQueueSnapshot{ "a": { Name: "a", Cohort: cohort, @@ -377,7 +375,7 @@ func TestSnapshot(t *testing.T) { }).Obj(), }, wantSnapshot: Snapshot{ - ClusterQueues: map[string]*ClusterQueue{ + ClusterQueues: map[string]*ClusterQueueSnapshot{ "with-preemption": { Name: "with-preemption", NamespaceSelector: labels.Everything(), @@ -399,7 +397,7 @@ func TestSnapshot(t *testing.T) { utiltesting.MakeClusterQueue("with-preemption").FairWeight(resource.MustParse("3")).Obj(), }, wantSnapshot: Snapshot{ - ClusterQueues: map[string]*ClusterQueue{ + ClusterQueues: map[string]*ClusterQueueSnapshot{ "with-preemption": { Name: "with-preemption", NamespaceSelector: labels.Everything(), @@ -459,7 +457,7 @@ func TestSnapshot(t *testing.T) { Obj(), }, wantSnapshot: func() Snapshot { - cohort := &Cohort{ + cohort := &CohortSnapshot{ Name: "lending", AllocatableResourceGeneration: 2, RequestableResources: resources.FlavorResourceQuantitiesFlat{ @@ -475,7 +473,7 @@ func TestSnapshot(t *testing.T) { }, } return Snapshot{ - ClusterQueues: map[string]*ClusterQueue{ + ClusterQueues: map[string]*ClusterQueueSnapshot{ "a": { Name: "a", Cohort: cohort, @@ -633,8 +631,8 @@ func TestSnapshot(t *testing.T) { for i := range cq.ResourceGroups { rg := &cq.ResourceGroups[i] for rName := range rg.CoveredResources { - if cq.RGByResource[rName] != rg { - t.Errorf("RGByResource[%s] does not point to its resource group", rName) + if cq.RGByResource(rName) != rg { + t.Errorf("RGByResource[%s] does return its resource group", rName) } } } @@ -723,7 +721,7 @@ func TestSnapshotAddRemoveWorkload(t *testing.T) { "remove all": { remove: []string{"/c1-cpu", "/c1-memory-alpha", "/c1-memory-beta", "/c2-cpu-1", "/c2-cpu-2"}, want: func() Snapshot { - cohort := &Cohort{ + cohort := &CohortSnapshot{ Name: "cohort", AllocatableResourceGeneration: 2, RequestableResources: initialCohortResources, @@ -738,7 +736,7 @@ func TestSnapshotAddRemoveWorkload(t *testing.T) { }, } return Snapshot{ - ClusterQueues: map[string]*ClusterQueue{ + ClusterQueues: map[string]*ClusterQueueSnapshot{ "c1": { Name: "c1", Cohort: cohort, @@ -779,7 +777,7 @@ func TestSnapshotAddRemoveWorkload(t *testing.T) { "remove c1-cpu": { remove: []string{"/c1-cpu"}, want: func() Snapshot { - cohort := &Cohort{ + cohort := &CohortSnapshot{ Name: "cohort", AllocatableResourceGeneration: 2, RequestableResources: initialCohortResources, @@ -794,7 +792,7 @@ func TestSnapshotAddRemoveWorkload(t *testing.T) { }, } return Snapshot{ - ClusterQueues: map[string]*ClusterQueue{ + ClusterQueues: map[string]*ClusterQueueSnapshot{ "c1": { Name: "c1", Cohort: cohort, @@ -841,7 +839,7 @@ func TestSnapshotAddRemoveWorkload(t *testing.T) { "remove c1-memory-alpha": { remove: []string{"/c1-memory-alpha"}, want: func() Snapshot { - cohort := &Cohort{ + cohort := &CohortSnapshot{ Name: "cohort", AllocatableResourceGeneration: 2, RequestableResources: initialCohortResources, @@ -856,7 +854,7 @@ func TestSnapshotAddRemoveWorkload(t *testing.T) { }, } return Snapshot{ - ClusterQueues: map[string]*ClusterQueue{ + ClusterQueues: map[string]*ClusterQueueSnapshot{ "c1": { Name: "c1", Cohort: cohort, @@ -902,8 +900,7 @@ func TestSnapshotAddRemoveWorkload(t *testing.T) { }, } cmpOpts := append(snapCmpOpts, - cmpopts.IgnoreFields(ClusterQueue{}, "NamespaceSelector", "Preemption", "Status"), - cmpopts.IgnoreFields(Cohort{}), + cmpopts.IgnoreFields(ClusterQueueSnapshot{}, "NamespaceSelector", "Preemption", "Status"), cmpopts.IgnoreFields(Snapshot{}, "ResourceFlavors"), cmpopts.IgnoreTypes(&workload.Info{})) for name, tc := range cases { @@ -1001,7 +998,7 @@ func TestSnapshotAddRemoveWorkloadWithLendingLimit(t *testing.T) { "remove all": { remove: []string{"/lend-a-1", "/lend-a-2", "/lend-a-3", "/lend-b-1"}, want: func() Snapshot { - cohort := &Cohort{ + cohort := &CohortSnapshot{ Name: "lend", AllocatableResourceGeneration: 2, RequestableResources: initialCohortResources, @@ -1013,7 +1010,7 @@ func TestSnapshotAddRemoveWorkloadWithLendingLimit(t *testing.T) { }, } return Snapshot{ - ClusterQueues: map[string]*ClusterQueue{ + ClusterQueues: map[string]*ClusterQueueSnapshot{ "lend-a": { Name: "lend-a", Cohort: cohort, @@ -1057,7 +1054,7 @@ func TestSnapshotAddRemoveWorkloadWithLendingLimit(t *testing.T) { "remove workload, but still using quota over GuaranteedQuota": { remove: []string{"/lend-a-2"}, want: func() Snapshot { - cohort := &Cohort{ + cohort := &CohortSnapshot{ Name: "lend", AllocatableResourceGeneration: 2, RequestableResources: initialCohortResources, @@ -1069,7 +1066,7 @@ func TestSnapshotAddRemoveWorkloadWithLendingLimit(t *testing.T) { }, } return Snapshot{ - ClusterQueues: map[string]*ClusterQueue{ + ClusterQueues: map[string]*ClusterQueueSnapshot{ "lend-a": { Name: "lend-a", Cohort: cohort, @@ -1113,7 +1110,7 @@ func TestSnapshotAddRemoveWorkloadWithLendingLimit(t *testing.T) { "remove wokload, using same quota as GuaranteedQuota": { remove: []string{"/lend-a-1", "/lend-a-2"}, want: func() Snapshot { - cohort := &Cohort{ + cohort := &CohortSnapshot{ Name: "lend", AllocatableResourceGeneration: 2, RequestableResources: initialCohortResources, @@ -1125,7 +1122,7 @@ func TestSnapshotAddRemoveWorkloadWithLendingLimit(t *testing.T) { }, } return Snapshot{ - ClusterQueues: map[string]*ClusterQueue{ + ClusterQueues: map[string]*ClusterQueueSnapshot{ "lend-a": { Name: "lend-a", Cohort: cohort, @@ -1169,7 +1166,7 @@ func TestSnapshotAddRemoveWorkloadWithLendingLimit(t *testing.T) { "remove workload, using less quota than GuaranteedQuota": { remove: []string{"/lend-a-2", "/lend-a-3"}, want: func() Snapshot { - cohort := &Cohort{ + cohort := &CohortSnapshot{ Name: "lend", AllocatableResourceGeneration: 2, RequestableResources: initialCohortResources, @@ -1181,7 +1178,7 @@ func TestSnapshotAddRemoveWorkloadWithLendingLimit(t *testing.T) { }, } return Snapshot{ - ClusterQueues: map[string]*ClusterQueue{ + ClusterQueues: map[string]*ClusterQueueSnapshot{ "lend-a": { Name: "lend-a", Cohort: cohort, @@ -1226,7 +1223,7 @@ func TestSnapshotAddRemoveWorkloadWithLendingLimit(t *testing.T) { remove: []string{"/lend-a-1", "/lend-a-2", "/lend-a-3", "/lend-b-1"}, add: []string{"/lend-a-1"}, want: func() Snapshot { - cohort := &Cohort{ + cohort := &CohortSnapshot{ Name: "lend", AllocatableResourceGeneration: 2, RequestableResources: initialCohortResources, @@ -1238,7 +1235,7 @@ func TestSnapshotAddRemoveWorkloadWithLendingLimit(t *testing.T) { }, } return Snapshot{ - ClusterQueues: map[string]*ClusterQueue{ + ClusterQueues: map[string]*ClusterQueueSnapshot{ "lend-a": { Name: "lend-a", Cohort: cohort, @@ -1283,7 +1280,7 @@ func TestSnapshotAddRemoveWorkloadWithLendingLimit(t *testing.T) { remove: []string{"/lend-a-1", "/lend-a-2", "/lend-a-3", "/lend-b-1"}, add: []string{"/lend-a-3"}, want: func() Snapshot { - cohort := &Cohort{ + cohort := &CohortSnapshot{ Name: "lend", AllocatableResourceGeneration: 2, RequestableResources: initialCohortResources, @@ -1295,7 +1292,7 @@ func TestSnapshotAddRemoveWorkloadWithLendingLimit(t *testing.T) { }, } return Snapshot{ - ClusterQueues: map[string]*ClusterQueue{ + ClusterQueues: map[string]*ClusterQueueSnapshot{ "lend-a": { Name: "lend-a", Cohort: cohort, @@ -1340,7 +1337,7 @@ func TestSnapshotAddRemoveWorkloadWithLendingLimit(t *testing.T) { remove: []string{"/lend-a-1", "/lend-a-2", "/lend-a-3", "/lend-b-1"}, add: []string{"/lend-a-2"}, want: func() Snapshot { - cohort := &Cohort{ + cohort := &CohortSnapshot{ Name: "lend", AllocatableResourceGeneration: 2, RequestableResources: initialCohortResources, @@ -1352,7 +1349,7 @@ func TestSnapshotAddRemoveWorkloadWithLendingLimit(t *testing.T) { }, } return Snapshot{ - ClusterQueues: map[string]*ClusterQueue{ + ClusterQueues: map[string]*ClusterQueueSnapshot{ "lend-a": { Name: "lend-a", Cohort: cohort, @@ -1395,7 +1392,7 @@ func TestSnapshotAddRemoveWorkloadWithLendingLimit(t *testing.T) { }, } cmpOpts := append(snapCmpOpts, - cmpopts.IgnoreFields(ClusterQueue{}, "NamespaceSelector", "Preemption", "Status"), + cmpopts.IgnoreFields(ClusterQueueSnapshot{}, "NamespaceSelector", "Preemption", "Status"), cmpopts.IgnoreFields(Snapshot{}, "ResourceFlavors"), cmpopts.IgnoreTypes(&workload.Info{})) for name, tc := range cases { diff --git a/pkg/resources/resource.go b/pkg/resources/resource.go index 0c9c079ccb..0d546dad81 100644 --- a/pkg/resources/resource.go +++ b/pkg/resources/resource.go @@ -41,3 +41,11 @@ func (f FlavorResourceQuantitiesFlat) Unflatten() FlavorResourceQuantities { } return out } + +// For attempts to access nested value, returning 0 if absent. +func (f FlavorResourceQuantities) For(fr FlavorResource) int64 { + if f == nil || f[fr.Flavor] == nil { + return 0 + } + return f[fr.Flavor][fr.Resource] +} diff --git a/pkg/scheduler/flavorassigner/flavorassigner.go b/pkg/scheduler/flavorassigner/flavorassigner.go index 3bb68e0e1f..036a0b32de 100644 --- a/pkg/scheduler/flavorassigner/flavorassigner.go +++ b/pkg/scheduler/flavorassigner/flavorassigner.go @@ -246,12 +246,12 @@ type FlavorAssignment struct { type FlavorAssigner struct { wl *workload.Info - cq *cache.ClusterQueue + cq *cache.ClusterQueueSnapshot resourceFlavors map[kueue.ResourceFlavorReference]*kueue.ResourceFlavor enableFairSharing bool } -func New(wl *workload.Info, cq *cache.ClusterQueue, resourceFlavors map[kueue.ResourceFlavorReference]*kueue.ResourceFlavor, enableFairSharing bool) *FlavorAssigner { +func New(wl *workload.Info, cq *cache.ClusterQueueSnapshot, resourceFlavors map[kueue.ResourceFlavorReference]*kueue.ResourceFlavor, enableFairSharing bool) *FlavorAssigner { return &FlavorAssigner{ wl: wl, cq: cq, @@ -260,7 +260,7 @@ func New(wl *workload.Info, cq *cache.ClusterQueue, resourceFlavors map[kueue.Re } } -func lastAssignmentOutdated(wl *workload.Info, cq *cache.ClusterQueue) bool { +func lastAssignmentOutdated(wl *workload.Info, cq *cache.ClusterQueueSnapshot) bool { return cq.AllocatableResourceGeneration > wl.LastAssignment.ClusterQueueGeneration || (cq.Cohort != nil && cq.Cohort.AllocatableResourceGeneration > wl.LastAssignment.CohortGeneration) } @@ -313,7 +313,7 @@ func (a *FlavorAssigner) assignFlavors(log logr.Logger, requests []workload.PodS } for i, podSet := range requests { - if _, found := a.cq.RGByResource[corev1.ResourcePods]; found { + if a.cq.RGByResource(corev1.ResourcePods) != nil { podSet.Requests[corev1.ResourcePods] = int64(podSet.Count) } @@ -386,8 +386,8 @@ func (a *FlavorAssigner) findFlavorForPodSetResource( resName corev1.ResourceName, assignmentUsage resources.FlavorResourceQuantities, ) (ResourceAssignment, *Status) { - resourceGroup, found := a.cq.RGByResource[resName] - if !found { + resourceGroup := a.cq.RGByResource(resName) + if resourceGroup == nil { return nil, &Status{ reasons: []string{fmt.Sprintf("resource %s unavailable in ClusterQueue", resName)}, } diff --git a/pkg/scheduler/flavorassigner/flavorassigner_test.go b/pkg/scheduler/flavorassigner/flavorassigner_test.go index 8aee312979..6f08ac32aa 100644 --- a/pkg/scheduler/flavorassigner/flavorassigner_test.go +++ b/pkg/scheduler/flavorassigner/flavorassigner_test.go @@ -2094,7 +2094,7 @@ func TestDeletedFlavors(t *testing.T) { func TestLastAssignmentOutdated(t *testing.T) { type args struct { wl *workload.Info - cq *cache.ClusterQueue + cq *cache.ClusterQueueSnapshot } tests := []struct { name string @@ -2109,7 +2109,7 @@ func TestLastAssignmentOutdated(t *testing.T) { ClusterQueueGeneration: 0, }, }, - cq: &cache.ClusterQueue{ + cq: &cache.ClusterQueueSnapshot{ Cohort: nil, AllocatableResourceGeneration: 1, }, @@ -2125,8 +2125,8 @@ func TestLastAssignmentOutdated(t *testing.T) { CohortGeneration: 0, }, }, - cq: &cache.ClusterQueue{ - Cohort: &cache.Cohort{ + cq: &cache.ClusterQueueSnapshot{ + Cohort: &cache.CohortSnapshot{ AllocatableResourceGeneration: 1, }, AllocatableResourceGeneration: 0, @@ -2143,8 +2143,8 @@ func TestLastAssignmentOutdated(t *testing.T) { CohortGeneration: 0, }, }, - cq: &cache.ClusterQueue{ - Cohort: &cache.Cohort{ + cq: &cache.ClusterQueueSnapshot{ + Cohort: &cache.CohortSnapshot{ AllocatableResourceGeneration: 0, }, AllocatableResourceGeneration: 0, diff --git a/pkg/scheduler/preemption/preemption.go b/pkg/scheduler/preemption/preemption.go index 4daf0a3f73..9f63e11afd 100644 --- a/pkg/scheduler/preemption/preemption.go +++ b/pkg/scheduler/preemption/preemption.go @@ -152,7 +152,7 @@ func (p *Preemptor) GetTargets(log logr.Logger, wl workload.Info, assignment fla } // canBorrowWithinCohort returns whether the behavior is enabled for the ClusterQueue and the threshold priority to use. -func canBorrowWithinCohort(cq *cache.ClusterQueue, wl *kueue.Workload) (bool, *int32) { +func canBorrowWithinCohort(cq *cache.ClusterQueueSnapshot, wl *kueue.Workload) (bool, *int32) { borrowWithinCohort := cq.Preemption.BorrowWithinCohort if borrowWithinCohort == nil || borrowWithinCohort.Policy == kueue.BorrowWithinCohortPolicyNever { return false, nil @@ -165,7 +165,7 @@ func canBorrowWithinCohort(cq *cache.ClusterQueue, wl *kueue.Workload) (bool, *i } // IssuePreemptions marks the target workloads as evicted. -func (p *Preemptor) IssuePreemptions(ctx context.Context, preemptor *workload.Info, targets []*workload.Info, cq *cache.ClusterQueue) (int, error) { +func (p *Preemptor) IssuePreemptions(ctx context.Context, preemptor *workload.Info, targets []*workload.Info, cq *cache.ClusterQueueSnapshot) (int, error) { log := ctrl.LoggerFrom(ctx) errCh := routine.NewErrorChannel() ctx, cancel := context.WithCancel(ctx) @@ -214,7 +214,7 @@ func (p *Preemptor) applyPreemptionWithSSA(ctx context.Context, w *kueue.Workloa // Once the Workload fits, the heuristic tries to add Workloads back, in the // reverse order in which they were removed, while the incoming Workload still // fits. -func minimalPreemptions(log logr.Logger, wlReq resources.FlavorResourceQuantities, cq *cache.ClusterQueue, snapshot *cache.Snapshot, resPerFlv resourcesPerFlavor, candidates []*workload.Info, allowBorrowing bool, allowBorrowingBelowPriority *int32) []*workload.Info { +func minimalPreemptions(log logr.Logger, wlReq resources.FlavorResourceQuantities, cq *cache.ClusterQueueSnapshot, snapshot *cache.Snapshot, resPerFlv resourcesPerFlavor, candidates []*workload.Info, allowBorrowing bool, allowBorrowingBelowPriority *int32) []*workload.Info { if logV := log.V(5); logV.Enabled() { logV.Info("Simulating preemption", "candidates", workload.References(candidates), "resourcesRequiringPreemption", resPerFlv, "allowBorrowing", allowBorrowing, "allowBorrowingBelowPriority", allowBorrowingBelowPriority) } @@ -257,7 +257,7 @@ func minimalPreemptions(log logr.Logger, wlReq resources.FlavorResourceQuantitie return targets } -func fillBackWorkloads(targets []*workload.Info, wlReq resources.FlavorResourceQuantities, cq *cache.ClusterQueue, snapshot *cache.Snapshot, allowBorrowing bool) []*workload.Info { +func fillBackWorkloads(targets []*workload.Info, wlReq resources.FlavorResourceQuantities, cq *cache.ClusterQueueSnapshot, snapshot *cache.Snapshot, allowBorrowing bool) []*workload.Info { // In the reverse order, check if any of the workloads can be added back. for i := len(targets) - 2; i >= 0; i-- { snapshot.AddWorkload(targets[i]) @@ -393,7 +393,7 @@ func (p *Preemptor) fairPreemptions(log logr.Logger, wl *workload.Info, assignme } type candidateCQ struct { - cq *cache.ClusterQueue + cq *cache.ClusterQueueSnapshot workloads []*workload.Info share int } @@ -448,7 +448,7 @@ func resourcesRequiringPreemption(assignment flavorassigner.Assignment) resource // findCandidates obtains candidates for preemption within the ClusterQueue and // cohort that respect the preemption policy and are using a resource that the // preempting workload needs. -func findCandidates(wl *kueue.Workload, wo workload.Ordering, cq *cache.ClusterQueue, resPerFlv resourcesPerFlavor) []*workload.Info { +func findCandidates(wl *kueue.Workload, wo workload.Ordering, cq *cache.ClusterQueueSnapshot, resPerFlv resourcesPerFlavor) []*workload.Info { var candidates []*workload.Info wlPriority := priority.Priority(wl) @@ -497,7 +497,7 @@ func findCandidates(wl *kueue.Workload, wo workload.Ordering, cq *cache.ClusterQ return candidates } -func cqIsBorrowing(cq *cache.ClusterQueue, resPerFlv resourcesPerFlavor) bool { +func cqIsBorrowing(cq *cache.ClusterQueueSnapshot, resPerFlv resourcesPerFlavor) bool { if cq.Cohort == nil { return false } @@ -528,7 +528,7 @@ func workloadUsesResources(wl *workload.Info, resPerFlv resourcesPerFlavor) bool // workloadFits determines if the workload requests would fit given the // requestable resources and simulated usage of the ClusterQueue and its cohort, // if it belongs to one. -func workloadFits(wlReq resources.FlavorResourceQuantities, cq *cache.ClusterQueue, allowBorrowing bool) bool { +func workloadFits(wlReq resources.FlavorResourceQuantities, cq *cache.ClusterQueueSnapshot, allowBorrowing bool) bool { for _, rg := range cq.ResourceGroups { for _, flvQuotas := range rg.Flavors { flvReq, found := wlReq[flvQuotas.Name] @@ -567,7 +567,7 @@ func workloadFits(wlReq resources.FlavorResourceQuantities, cq *cache.ClusterQue return true } -func queueUnderNominalInResourcesNeedingPreemption(resPerFlv resourcesPerFlavor, cq *cache.ClusterQueue) bool { +func queueUnderNominalInResourcesNeedingPreemption(resPerFlv resourcesPerFlavor, cq *cache.ClusterQueueSnapshot) bool { for _, rg := range cq.ResourceGroups { for _, flvQuotas := range rg.Flavors { flvReq, found := resPerFlv[flvQuotas.Name] diff --git a/pkg/scheduler/preemption/preemption_test.go b/pkg/scheduler/preemption/preemption_test.go index e47337ea3b..31eaa44722 100644 --- a/pkg/scheduler/preemption/preemption_test.go +++ b/pkg/scheduler/preemption/preemption_test.go @@ -47,10 +47,10 @@ import ( var snapCmpOpts = []cmp.Option{ cmpopts.EquateEmpty(), - cmpopts.IgnoreUnexported(cache.ClusterQueue{}), - cmpopts.IgnoreFields(cache.Cohort{}, "AllocatableResourceGeneration"), - cmpopts.IgnoreFields(cache.ClusterQueue{}, "AllocatableResourceGeneration"), - cmp.Transformer("Cohort.Members", func(s sets.Set[*cache.ClusterQueue]) sets.Set[string] { + cmpopts.IgnoreUnexported(cache.ClusterQueueSnapshot{}), + cmpopts.IgnoreFields(cache.CohortSnapshot{}, "AllocatableResourceGeneration"), + cmpopts.IgnoreFields(cache.ClusterQueueSnapshot{}, "AllocatableResourceGeneration"), + cmp.Transformer("Cohort.Members", func(s sets.Set[*cache.ClusterQueueSnapshot]) sets.Set[string] { result := make(sets.Set[string], len(s)) for cq := range s { result.Insert(cq.Name) diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 8a5c32388c..f53c01f7ce 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -377,7 +377,7 @@ func (s *Scheduler) nominate(ctx context.Context, workloads []workload.Info, sna } // resourcesToReserve calculates how much of the available resources in cq/cohort assignment should be reserved. -func resourcesToReserve(e *entry, cq *cache.ClusterQueue) resources.FlavorResourceQuantities { +func resourcesToReserve(e *entry, cq *cache.ClusterQueueSnapshot) resources.FlavorResourceQuantities { if e.assignment.RepresentativeMode() != flavorassigner.Preempt { return e.assignment.Usage } @@ -385,7 +385,7 @@ func resourcesToReserve(e *entry, cq *cache.ClusterQueue) resources.FlavorResour for flavor, resourceUsage := range e.assignment.Usage { reservedUsage[flavor] = make(map[corev1.ResourceName]int64) for resource, usage := range resourceUsage { - rg := cq.RGByResource[resource] + rg := cq.RGByResource(resource) cqQuota := cache.ResourceQuota{} for _, cqFlavor := range rg.Flavors { if cqFlavor.Name == flavor { @@ -513,7 +513,7 @@ func (s *Scheduler) validateLimitRange(ctx context.Context, wi *workload.Info) e // admit sets the admitting clusterQueue and flavors into the workload of // the entry, and asynchronously updates the object in the apiserver after // assuming it in the cache. -func (s *Scheduler) admit(ctx context.Context, e *entry, cq *cache.ClusterQueue) error { +func (s *Scheduler) admit(ctx context.Context, e *entry, cq *cache.ClusterQueueSnapshot) error { log := ctrl.LoggerFrom(ctx) newWorkload := e.Obj.DeepCopy() admission := &kueue.Admission{