Skip to content

Commit

Permalink
Make ClusterQueue's AdmissionCheckStrategy a pointer, avoid type cast…
Browse files Browse the repository at this point in the history
…ing (kubernetes-sigs#1993)

* Make AdmissionCheckStrategy a pointer, avoid type casting

* Fix flaky integration test
  • Loading branch information
PBundyra authored Apr 17, 2024
1 parent 9885a3e commit f1f0223
Show file tree
Hide file tree
Showing 11 changed files with 57 additions and 52 deletions.
4 changes: 2 additions & 2 deletions apis/kueue/v1beta1/clusterqueue_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ type ClusterQueueSpec struct {
// admissionCheckStrategy defines a list of strategies to determine which ResourceFlavors require AdmissionChecks.
// This property cannot be used in conjunction with the 'admissionChecks' property.
// +optional
AdmissionChecksStrategy AdmissionChecksStrategy `json:"admissionChecksStrategy,omitempty"`
AdmissionChecksStrategy *AdmissionChecksStrategy `json:"admissionChecksStrategy,omitempty"`

// stopPolicy - if set to a value different from None, the ClusterQueue is considered Inactive, no new reservation being
// made.
Expand Down Expand Up @@ -137,7 +137,7 @@ type AdmissionCheckStrategyRule struct {
// onFlavors is a list of ResourceFlavors' names that this AdmissionCheck should run for.
// If empty, the AdmissionCheck will run for all workloads submitted to the ClusterQueue.
// +optional
OnFlavors []ResourceFlavorReference `json:"onFlavors"`
OnFlavors []ResourceFlavorReference `json:"onFlavors,omitempty"`
}

type QueueingStrategy string
Expand Down
6 changes: 5 additions & 1 deletion apis/kueue/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 12 additions & 12 deletions pkg/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -964,9 +964,9 @@ func TestCacheClusterQueueOperations(t *testing.T) {
Preemption: defaultPreemption,
AllocatableResourceGeneration: 1,
FlavorFungibility: defaultFlavorFungibility,
AdmissionChecks: map[string]sets.Set[string]{
"check1": sets.New[string](),
"check2": sets.New[string](),
AdmissionChecks: map[string]sets.Set[kueue.ResourceFlavorReference]{
"check1": sets.New[kueue.ResourceFlavorReference](),
"check2": sets.New[kueue.ResourceFlavorReference](),
},
},
},
Expand Down Expand Up @@ -995,9 +995,9 @@ func TestCacheClusterQueueOperations(t *testing.T) {
Preemption: defaultPreemption,
AllocatableResourceGeneration: 1,
FlavorFungibility: defaultFlavorFungibility,
AdmissionChecks: map[string]sets.Set[string]{
"check1": sets.New[string](),
"check2": sets.New[string](),
AdmissionChecks: map[string]sets.Set[kueue.ResourceFlavorReference]{
"check1": sets.New[kueue.ResourceFlavorReference](),
"check2": sets.New[kueue.ResourceFlavorReference](),
}},
},
wantCohorts: map[string]sets.Set[string]{},
Expand Down Expand Up @@ -1026,9 +1026,9 @@ func TestCacheClusterQueueOperations(t *testing.T) {
Preemption: defaultPreemption,
AllocatableResourceGeneration: 1,
FlavorFungibility: defaultFlavorFungibility,
AdmissionChecks: map[string]sets.Set[string]{
"check1": sets.New[string](),
"check2": sets.New[string](),
AdmissionChecks: map[string]sets.Set[kueue.ResourceFlavorReference]{
"check1": sets.New[kueue.ResourceFlavorReference](),
"check2": sets.New[kueue.ResourceFlavorReference](),
}},
},
wantCohorts: map[string]sets.Set[string]{},
Expand Down Expand Up @@ -1057,9 +1057,9 @@ func TestCacheClusterQueueOperations(t *testing.T) {
Preemption: defaultPreemption,
AllocatableResourceGeneration: 1,
FlavorFungibility: defaultFlavorFungibility,
AdmissionChecks: map[string]sets.Set[string]{
"check1": sets.New[string](),
"check2": sets.New[string](),
AdmissionChecks: map[string]sets.Set[kueue.ResourceFlavorReference]{
"check1": sets.New[kueue.ResourceFlavorReference](),
"check2": sets.New[kueue.ResourceFlavorReference](),
}},
},
wantCohorts: map[string]sets.Set[string]{},
Expand Down
2 changes: 1 addition & 1 deletion pkg/cache/clusterqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ type ClusterQueue struct {
// Aggregates AdmissionChecks from both .spec.AdmissionChecks and .spec.AdmissionCheckStrategy
// Sets hold ResourceFlavors to which an AdmissionCheck should apply.
// In case its empty, it means an AdmissionCheck should apply to all ResourceFlavor
AdmissionChecks map[string]sets.Set[string]
AdmissionChecks map[string]sets.Set[kueue.ResourceFlavorReference]
Status metrics.ClusterQueueStatus
// GuaranteedQuota records how much resource quota the ClusterQueue reserved
// when feature LendingLimit is enabled and flavor's lendingLimit is not nil.
Expand Down
2 changes: 1 addition & 1 deletion pkg/cache/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func (c *ClusterQueue) snapshot() *ClusterQueue {
Preemption: c.Preemption,
NamespaceSelector: c.NamespaceSelector,
Status: c.Status,
AdmissionChecks: utilmaps.DeepCopySets[string](c.AdmissionChecks),
AdmissionChecks: utilmaps.DeepCopySets[kueue.ResourceFlavorReference](c.AdmissionChecks),
}

for fName, rUsage := range c.Usage {
Expand Down
20 changes: 11 additions & 9 deletions pkg/util/admissioncheck/admissioncheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,15 +157,17 @@ func FilterProvReqAnnotations(annotations map[string]string) map[string]string {
}

// NewAdmissionChecks aggregates AdmissionChecks from .spec.AdmissionChecks and .spec.AdmissionChecksStrategy
func NewAdmissionChecks(cq *kueue.ClusterQueue) map[string]sets.Set[string] {
checks := make(map[string]sets.Set[string], len(cq.Spec.AdmissionChecks)+len(cq.Spec.AdmissionChecksStrategy.AdmissionChecks))
for _, checkName := range cq.Spec.AdmissionChecks {
checks[checkName] = sets.New[string]()
}
for _, check := range cq.Spec.AdmissionChecksStrategy.AdmissionChecks {
checks[check.Name] = sets.New[string]()
for _, flavor := range check.OnFlavors {
checks[check.Name].Insert(string(flavor))
func NewAdmissionChecks(cq *kueue.ClusterQueue) map[string]sets.Set[kueue.ResourceFlavorReference] {
var checks map[string]sets.Set[kueue.ResourceFlavorReference]
if cq.Spec.AdmissionChecksStrategy != nil {
checks = make(map[string]sets.Set[kueue.ResourceFlavorReference], len(cq.Spec.AdmissionChecksStrategy.AdmissionChecks))
for _, check := range cq.Spec.AdmissionChecksStrategy.AdmissionChecks {
checks[check.Name] = sets.New[kueue.ResourceFlavorReference](check.OnFlavors...)
}
} else {
checks = make(map[string]sets.Set[kueue.ResourceFlavorReference], len(cq.Spec.AdmissionChecks))
for _, checkName := range cq.Spec.AdmissionChecks {
checks[checkName] = sets.New[kueue.ResourceFlavorReference]()
}
}
return checks
Expand Down
3 changes: 3 additions & 0 deletions pkg/util/testing/wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,9 @@ func (c *ClusterQueueWrapper) Cohort(cohort string) *ClusterQueueWrapper {
}

func (c *ClusterQueueWrapper) AdmissionCheckStrategy(acs ...kueue.AdmissionCheckStrategyRule) *ClusterQueueWrapper {
if c.Spec.AdmissionChecksStrategy == nil {
c.Spec.AdmissionChecksStrategy = &kueue.AdmissionChecksStrategy{}
}
c.Spec.AdmissionChecksStrategy.AdmissionChecks = acs
return c
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/webhooks/clusterqueue_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func validatePreemption(preemption *kueue.ClusterQueuePreemption, path *field.Pa

func validateCQAdmissionChecks(spec *kueue.ClusterQueueSpec, path *field.Path) field.ErrorList {
var allErrs field.ErrorList
if len(spec.AdmissionChecksStrategy.AdmissionChecks) != 0 && len(spec.AdmissionChecks) != 0 {
if spec.AdmissionChecksStrategy != nil && len(spec.AdmissionChecks) != 0 {
allErrs = append(allErrs, field.Invalid(path, spec, "Either AdmissionChecks or AdmissionCheckStrategy can be set, but not both"))
}

Expand Down
4 changes: 1 addition & 3 deletions pkg/webhooks/clusterqueue_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,7 @@ func TestValidateClusterQueue(t *testing.T) {
name: "both admissionChecks and admissionCheckStrategy is defined",
clusterQueue: testingutil.MakeClusterQueue("cluster-queue").
AdmissionChecks("ac1").
AdmissionCheckStrategy(
*testingutil.MakeAdmissionCheckStrategyRule("ac1", "flavor1").Obj(),
).Obj(),
AdmissionCheckStrategy().Obj(),
wantErr: field.ErrorList{
field.Invalid(specPath, "spec", "Either AdmissionChecks or AdmissionCheckStrategy can be set, but not both"),
},
Expand Down
4 changes: 2 additions & 2 deletions pkg/workload/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ func RemoveFinalizer(ctx context.Context, c client.Client, wl *kueue.Workload) e

// AdmissionChecksForWorkload returns AdmissionChecks that should be assigned to a specific Workload based on
// ClusterQueue configuration and ResourceFlavors
func AdmissionChecksForWorkload(log logr.Logger, wl *kueue.Workload, admissionChecks map[string]sets.Set[string]) sets.Set[string] {
func AdmissionChecksForWorkload(log logr.Logger, wl *kueue.Workload, admissionChecks map[string]sets.Set[kueue.ResourceFlavorReference]) sets.Set[string] {
// If all admissionChecks should be run for all flavors we don't need to wait for Workload's Admission to be set.
// This is also the case if admissionChecks are specified with ClusterQueue.Spec.AdmissionChecks instead of
// ClusterQueue.Spec.AdmissionCheckStrategy
Expand Down Expand Up @@ -586,7 +586,7 @@ func AdmissionChecksForWorkload(log logr.Logger, wl *kueue.Workload, admissionCh
continue
}
for _, fName := range assignedFlavors {
if flavors.Has(string(fName)) {
if flavors.Has(fName) {
acNames.Insert(acName)
}
}
Expand Down
38 changes: 18 additions & 20 deletions test/integration/scheduler/workload_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,35 +151,33 @@ var _ = ginkgo.Describe("Workload controller with scheduler", func() {
ginkgo.By("adding an additional admission check to the clusterqueue", func() {
createdQueue := kueue.ClusterQueue{}
queueKey := client.ObjectKeyFromObject(clusterQueue)
gomega.Expect(k8sClient.Get(ctx, queueKey, &createdQueue)).To(gomega.Succeed())
createdQueue.Spec.AdmissionChecksStrategy.AdmissionChecks = []kueue.AdmissionCheckStrategyRule{
*testing.MakeAdmissionCheckStrategyRule("check1", kueue.ResourceFlavorReference(flavorOnDemand)).Obj(),
*testing.MakeAdmissionCheckStrategyRule("check2", kueue.ResourceFlavorReference(reservationFlavor)).Obj(),
*testing.MakeAdmissionCheckStrategyRule("check3").Obj()}
gomega.Expect(k8sClient.Update(ctx, &createdQueue)).To(gomega.Succeed())

gomega.Eventually(func(g gomega.Gomega) {
g.Expect(k8sClient.Get(ctx, queueKey, &createdQueue)).To(gomega.Succeed())
createdQueue.Spec.AdmissionChecksStrategy.AdmissionChecks = []kueue.AdmissionCheckStrategyRule{
*testing.MakeAdmissionCheckStrategyRule("check1", kueue.ResourceFlavorReference(flavorOnDemand)).Obj(),
*testing.MakeAdmissionCheckStrategyRule("check2", kueue.ResourceFlavorReference(reservationFlavor)).Obj(),
*testing.MakeAdmissionCheckStrategyRule("check3").Obj()}
g.Expect(k8sClient.Update(ctx, &createdQueue)).To(gomega.Succeed())
g.Expect(k8sClient.Get(ctx, wlKey, &updatedWl)).To(gomega.Succeed())
checks := slices.Map(updatedWl.Status.AdmissionChecks, func(c *kueue.AdmissionCheckState) string { return c.Name })
g.Expect(checks).Should(gomega.ConsistOf("check1", "check3"))
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})

ginkgo.By("marking the checks as passed", func() {
gomega.Expect(k8sClient.Get(ctx, wlKey, &updatedWl)).To(gomega.Succeed())
workload.SetAdmissionCheckState(&updatedWl.Status.AdmissionChecks, kueue.AdmissionCheckState{
Name: "check1",
State: kueue.CheckStateReady,
Message: "check successfully passed",
})
workload.SetAdmissionCheckState(&updatedWl.Status.AdmissionChecks, kueue.AdmissionCheckState{
Name: "check3",
State: kueue.CheckStateReady,
Message: "check successfully passed",
})
gomega.Expect(k8sClient.Status().Update(ctx, &updatedWl)).Should(gomega.Succeed())

gomega.Eventually(func(g gomega.Gomega) {
g.Expect(k8sClient.Get(ctx, wlKey, &updatedWl)).To(gomega.Succeed())
workload.SetAdmissionCheckState(&updatedWl.Status.AdmissionChecks, kueue.AdmissionCheckState{
Name: "check1",
State: kueue.CheckStateReady,
Message: "check successfully passed",
})
workload.SetAdmissionCheckState(&updatedWl.Status.AdmissionChecks, kueue.AdmissionCheckState{
Name: "check3",
State: kueue.CheckStateReady,
Message: "check successfully passed",
})
g.Expect(k8sClient.Status().Update(ctx, &updatedWl)).Should(gomega.Succeed())
g.Expect(k8sClient.Get(ctx, wlKey, &updatedWl)).Should(gomega.Succeed())
g.Expect(workload.IsAdmitted(&updatedWl)).Should(gomega.BeTrue(), "should have been admitted")
}, util.Timeout, util.Interval).Should(gomega.Succeed())
Expand Down

0 comments on commit f1f0223

Please sign in to comment.