Skip to content

Commit

Permalink
[multikueue] Make the MultiKueue adapter part of the jobframework
Browse files Browse the repository at this point in the history
  • Loading branch information
trasc committed Jun 6, 2024
1 parent 0cfc37d commit 69ad097
Show file tree
Hide file tree
Showing 23 changed files with 766 additions and 553 deletions.
4 changes: 4 additions & 0 deletions apis/kueue/v1alpha1/multikueue_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ const (
// MultiKueueOriginLabel is a label used to track the creator
// of multikueue remote objects.
MultiKueueOriginLabel = "kueue.x-k8s.io/multikueue-origin"

// MultiKueueControllerName is the name used by the MultiKueue
// admission check controller.
MultiKueueControllerName = "kueue.x-k8s.io/multikueue"
)

type LocationType string
Expand Down
3 changes: 1 addition & 2 deletions pkg/controller/admissionchecks/multikueue/admissioncheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import (
)

const (
ControllerName = "kueue.x-k8s.io/multikueue"
SingleInstanceReason = "MultiKueue"
SingleInstanceMessage = "only one multikueue managed admission check can be used in one ClusterQueue"
FlavorIndependentCheckReason = "MultiKueue"
Expand All @@ -65,7 +64,7 @@ var _ reconcile.Reconciler = (*ACReconciler)(nil)
func (a *ACReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
log := ctrl.LoggerFrom(ctx)
ac := &kueue.AdmissionCheck{}
if err := a.client.Get(ctx, req.NamespacedName, ac); err != nil || ac.Spec.ControllerName != ControllerName {
if err := a.client.Get(ctx, req.NamespacedName, ac); err != nil || ac.Spec.ControllerName != kueuealpha.MultiKueueControllerName {
return reconcile.Result{}, client.IgnoreNotFound(err)
}

Expand Down
24 changes: 12 additions & 12 deletions pkg/controller/admissionchecks/multikueue/admissioncheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,14 @@ func TestReconcile(t *testing.T) {
reconcileFor: "ac1",
checks: []kueue.AdmissionCheck{
*utiltesting.MakeAdmissionCheck("ac1").
ControllerName(ControllerName).
ControllerName(kueuealpha.MultiKueueControllerName).
Parameters(kueuealpha.GroupVersion.Group, "MultiKueueConfig", "config1").
Generation(1).
Obj(),
},
wantChecks: []kueue.AdmissionCheck{
*utiltesting.MakeAdmissionCheck("ac1").
ControllerName(ControllerName).
ControllerName(kueuealpha.MultiKueueControllerName).
Parameters(kueuealpha.GroupVersion.Group, "MultiKueueConfig", "config1").
SingleInstanceInClusterQueue(true, SingleInstanceReason, SingleInstanceMessage, 1).
ApplyToAllFlavors(true, FlavorIndependentCheckReason, FlavorIndependentCheckMessage, 1).
Expand Down Expand Up @@ -85,7 +85,7 @@ func TestReconcile(t *testing.T) {
reconcileFor: "ac1",
checks: []kueue.AdmissionCheck{
*utiltesting.MakeAdmissionCheck("ac1").
ControllerName(ControllerName).
ControllerName(kueuealpha.MultiKueueControllerName).
Parameters(kueuealpha.GroupVersion.Group, "MultiKueueConfig", "config1").
Generation(1).
Obj(),
Expand All @@ -95,7 +95,7 @@ func TestReconcile(t *testing.T) {
},
wantChecks: []kueue.AdmissionCheck{
*utiltesting.MakeAdmissionCheck("ac1").
ControllerName(ControllerName).
ControllerName(kueuealpha.MultiKueueControllerName).
Parameters(kueuealpha.GroupVersion.Group, "MultiKueueConfig", "config1").
SingleInstanceInClusterQueue(true, SingleInstanceReason, SingleInstanceMessage, 1).
ApplyToAllFlavors(true, FlavorIndependentCheckReason, FlavorIndependentCheckMessage, 1).
Expand All @@ -113,7 +113,7 @@ func TestReconcile(t *testing.T) {
reconcileFor: "ac1",
checks: []kueue.AdmissionCheck{
*utiltesting.MakeAdmissionCheck("ac1").
ControllerName(ControllerName).
ControllerName(kueuealpha.MultiKueueControllerName).
Parameters(kueuealpha.GroupVersion.Group, "MultiKueueConfig", "config1").
Generation(1).
Obj(),
Expand All @@ -128,7 +128,7 @@ func TestReconcile(t *testing.T) {
},
wantChecks: []kueue.AdmissionCheck{
*utiltesting.MakeAdmissionCheck("ac1").
ControllerName(ControllerName).
ControllerName(kueuealpha.MultiKueueControllerName).
Parameters(kueuealpha.GroupVersion.Group, "MultiKueueConfig", "config1").
SingleInstanceInClusterQueue(true, SingleInstanceReason, SingleInstanceMessage, 1).
ApplyToAllFlavors(true, FlavorIndependentCheckReason, FlavorIndependentCheckMessage, 1).
Expand All @@ -146,7 +146,7 @@ func TestReconcile(t *testing.T) {
reconcileFor: "ac1",
checks: []kueue.AdmissionCheck{
*utiltesting.MakeAdmissionCheck("ac1").
ControllerName(ControllerName).
ControllerName(kueuealpha.MultiKueueControllerName).
Parameters(kueuealpha.GroupVersion.Group, "MultiKueueConfig", "config1").
Generation(1).
Obj(),
Expand All @@ -164,7 +164,7 @@ func TestReconcile(t *testing.T) {
},
wantChecks: []kueue.AdmissionCheck{
*utiltesting.MakeAdmissionCheck("ac1").
ControllerName(ControllerName).
ControllerName(kueuealpha.MultiKueueControllerName).
Parameters(kueuealpha.GroupVersion.Group, "MultiKueueConfig", "config1").
SingleInstanceInClusterQueue(true, SingleInstanceReason, SingleInstanceMessage, 1).
ApplyToAllFlavors(true, FlavorIndependentCheckReason, FlavorIndependentCheckMessage, 1).
Expand All @@ -182,7 +182,7 @@ func TestReconcile(t *testing.T) {
reconcileFor: "ac1",
checks: []kueue.AdmissionCheck{
*utiltesting.MakeAdmissionCheck("ac1").
ControllerName(ControllerName).
ControllerName(kueuealpha.MultiKueueControllerName).
Parameters(kueuealpha.GroupVersion.Group, "MultiKueueConfig", "config1").
Generation(1).
Obj(),
Expand All @@ -200,7 +200,7 @@ func TestReconcile(t *testing.T) {
},
wantChecks: []kueue.AdmissionCheck{
*utiltesting.MakeAdmissionCheck("ac1").
ControllerName(ControllerName).
ControllerName(kueuealpha.MultiKueueControllerName).
Parameters(kueuealpha.GroupVersion.Group, "MultiKueueConfig", "config1").
SingleInstanceInClusterQueue(true, SingleInstanceReason, SingleInstanceMessage, 1).
ApplyToAllFlavors(true, FlavorIndependentCheckReason, FlavorIndependentCheckMessage, 1).
Expand All @@ -218,7 +218,7 @@ func TestReconcile(t *testing.T) {
reconcileFor: "ac1",
checks: []kueue.AdmissionCheck{
*utiltesting.MakeAdmissionCheck("ac1").
ControllerName(ControllerName).
ControllerName(kueuealpha.MultiKueueControllerName).
Parameters(kueuealpha.GroupVersion.Group, "MultiKueueConfig", "config1").
Generation(1).
Obj(),
Expand All @@ -233,7 +233,7 @@ func TestReconcile(t *testing.T) {
},
wantChecks: []kueue.AdmissionCheck{
*utiltesting.MakeAdmissionCheck("ac1").
ControllerName(ControllerName).
ControllerName(kueuealpha.MultiKueueControllerName).
Parameters(kueuealpha.GroupVersion.Group, "MultiKueueConfig", "config1").
SingleInstanceInClusterQueue(true, SingleInstanceReason, SingleInstanceMessage, 1).
ApplyToAllFlavors(true, FlavorIndependentCheckReason, FlavorIndependentCheckMessage, 1).
Expand Down
11 changes: 9 additions & 2 deletions pkg/controller/admissionchecks/multikueue/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"time"

ctrl "sigs.k8s.io/controller-runtime"

"sigs.k8s.io/kueue/pkg/controller/jobframework"
)

const (
Expand Down Expand Up @@ -82,7 +84,12 @@ func SetupControllers(mgr ctrl.Manager, namespace string, opts ...SetupOption) e
return err
}

cRec := newClustersReconciler(mgr.GetClient(), namespace, options.gcInterval, options.origin, fsWatcher)
adapters, err := jobframework.GetMultiKueueAdapters()
if err != nil {
return err
}

cRec := newClustersReconciler(mgr.GetClient(), namespace, options.gcInterval, options.origin, fsWatcher, adapters)
err = cRec.setupWithManager(mgr)
if err != nil {
return err
Expand All @@ -94,6 +101,6 @@ func SetupControllers(mgr ctrl.Manager, namespace string, opts ...SetupOption) e
return err
}

wlRec := newWlReconciler(mgr.GetClient(), helper, cRec, options.origin, options.workerLostTimeout)
wlRec := newWlReconciler(mgr.GetClient(), helper, cRec, options.origin, options.workerLostTimeout, adapters)
return wlRec.setupWithManager(mgr)
}
2 changes: 1 addition & 1 deletion pkg/controller/admissionchecks/multikueue/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func SetupIndexer(ctx context.Context, indexer client.FieldIndexer, configNamesp
if err := indexer.IndexField(ctx, &kueuealpha.MultiKueueConfig{}, UsingMultiKueueClusters, indexUsingMultiKueueClusters); err != nil {
return fmt.Errorf("setting index on configs using clusters: %w", err)
}
if err := indexer.IndexField(ctx, &kueue.AdmissionCheck{}, AdmissionCheckUsingConfigKey, admissioncheck.IndexerByConfigFunction(ControllerName, configGVK)); err != nil {
if err := indexer.IndexField(ctx, &kueue.AdmissionCheck{}, AdmissionCheckUsingConfigKey, admissioncheck.IndexerByConfigFunction(kueuealpha.MultiKueueControllerName, configGVK)); err != nil {
return fmt.Errorf("setting index on admission checks config: %w", err)
}
return nil
Expand Down
Loading

0 comments on commit 69ad097

Please sign in to comment.