From 6866b9bf9d61974b5f0f6aa7bf88744f0f94b59d Mon Sep 17 00:00:00 2001 From: Michal Szadkowski Date: Wed, 12 Jun 2024 15:50:05 +0200 Subject: [PATCH 1/5] Add ControllerName to the MultiClusterSpec --- apis/kueue/v1alpha1/multikueue_types.go | 8 ++++++++ .../crd/kueue.x-k8s.io_multikueueclusters.yaml | 11 +++++++++++ .../kueue/v1alpha1/multikueueclusterspec.go | 11 ++++++++++- .../crd/bases/kueue.x-k8s.io_multikueueclusters.yaml | 11 +++++++++++ .../content/en/docs/reference/kueue-alpha.v1alpha1.md | 9 +++++++++ 5 files changed, 49 insertions(+), 1 deletion(-) diff --git a/apis/kueue/v1alpha1/multikueue_types.go b/apis/kueue/v1alpha1/multikueue_types.go index a8b8a6cad8..25f4f52924 100644 --- a/apis/kueue/v1alpha1/multikueue_types.go +++ b/apis/kueue/v1alpha1/multikueue_types.go @@ -59,6 +59,14 @@ type KubeConfig struct { } type MultiKueueClusterSpec struct { + // controllerName is name of the controller which will actually perform + // the checks. This is the name with which controller identifies with, + // not necessarily a K8S Pod or Deployment name. Cannot be empty. + // +kubebuilder:validation:Required + // +kubebuilder:default="kueue.x-k8s.io/multikueue" + // +kubebuilder:validation:XValidation:rule="self == oldSelf", message="field is immutable" + ControllerName string `json:"controllerName"` + // Information how to connect to the cluster. KubeConfig KubeConfig `json:"kubeConfig"` } diff --git a/charts/kueue/templates/crd/kueue.x-k8s.io_multikueueclusters.yaml b/charts/kueue/templates/crd/kueue.x-k8s.io_multikueueclusters.yaml index 72466573ad..5ca26e3034 100644 --- a/charts/kueue/templates/crd/kueue.x-k8s.io_multikueueclusters.yaml +++ b/charts/kueue/templates/crd/kueue.x-k8s.io_multikueueclusters.yaml @@ -53,6 +53,16 @@ spec: type: object spec: properties: + controllerName: + default: kueue.x-k8s.io/multikueue + description: |- + controllerName is name of the controller which will actually perform + the checks. This is the name with which controller identifies with, + not necessarily a K8S Pod or Deployment name. Cannot be empty. + type: string + x-kubernetes-validations: + - message: field is immutable + rule: self == oldSelf kubeConfig: description: Information how to connect to the cluster. properties: @@ -76,6 +86,7 @@ spec: - locationType type: object required: + - controllerName - kubeConfig type: object status: diff --git a/client-go/applyconfiguration/kueue/v1alpha1/multikueueclusterspec.go b/client-go/applyconfiguration/kueue/v1alpha1/multikueueclusterspec.go index e7e304f7ea..c20d0dad9a 100644 --- a/client-go/applyconfiguration/kueue/v1alpha1/multikueueclusterspec.go +++ b/client-go/applyconfiguration/kueue/v1alpha1/multikueueclusterspec.go @@ -20,7 +20,8 @@ package v1alpha1 // MultiKueueClusterSpecApplyConfiguration represents an declarative configuration of the MultiKueueClusterSpec type for use // with apply. type MultiKueueClusterSpecApplyConfiguration struct { - KubeConfig *KubeConfigApplyConfiguration `json:"kubeConfig,omitempty"` + ControllerName *string `json:"controllerName,omitempty"` + KubeConfig *KubeConfigApplyConfiguration `json:"kubeConfig,omitempty"` } // MultiKueueClusterSpecApplyConfiguration constructs an declarative configuration of the MultiKueueClusterSpec type for use with @@ -29,6 +30,14 @@ func MultiKueueClusterSpec() *MultiKueueClusterSpecApplyConfiguration { return &MultiKueueClusterSpecApplyConfiguration{} } +// WithControllerName sets the ControllerName field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the ControllerName field is set to the value of the last call. +func (b *MultiKueueClusterSpecApplyConfiguration) WithControllerName(value string) *MultiKueueClusterSpecApplyConfiguration { + b.ControllerName = &value + return b +} + // WithKubeConfig sets the KubeConfig field in the declarative configuration to the given value // and returns the receiver, so that objects can be built by chaining "With" function invocations. // If called multiple times, the KubeConfig field is set to the value of the last call. diff --git a/config/components/crd/bases/kueue.x-k8s.io_multikueueclusters.yaml b/config/components/crd/bases/kueue.x-k8s.io_multikueueclusters.yaml index 3107263106..3a8e437bbe 100644 --- a/config/components/crd/bases/kueue.x-k8s.io_multikueueclusters.yaml +++ b/config/components/crd/bases/kueue.x-k8s.io_multikueueclusters.yaml @@ -38,6 +38,16 @@ spec: type: object spec: properties: + controllerName: + default: kueue.x-k8s.io/multikueue + description: |- + controllerName is name of the controller which will actually perform + the checks. This is the name with which controller identifies with, + not necessarily a K8S Pod or Deployment name. Cannot be empty. + type: string + x-kubernetes-validations: + - message: field is immutable + rule: self == oldSelf kubeConfig: description: Information how to connect to the cluster. properties: @@ -61,6 +71,7 @@ spec: - locationType type: object required: + - controllerName - kubeConfig type: object status: diff --git a/site/content/en/docs/reference/kueue-alpha.v1alpha1.md b/site/content/en/docs/reference/kueue-alpha.v1alpha1.md index 7620453cd3..152294594d 100644 --- a/site/content/en/docs/reference/kueue-alpha.v1alpha1.md +++ b/site/content/en/docs/reference/kueue-alpha.v1alpha1.md @@ -133,6 +133,15 @@ which the kueue controller manager is running. The config should be stored in th +controllerName [Required]
+string + + +

controllerName is name of the controller which will actually perform +the checks. This is the name with which controller identifies with, +not necessarily a K8S Pod or Deployment name. Cannot be empty.

+ + kubeConfig [Required]
KubeConfig From dbad92194d3e06897f1cd9cc7861ceff7d9b3ddb Mon Sep 17 00:00:00 2001 From: Michal Szadkowski Date: Wed, 12 Jun 2024 15:52:15 +0200 Subject: [PATCH 2/5] Add ControllerName as Multikueue setup option Pass the given ControllerName to the reconcilers Update tests --- .../multikueue/admissioncheck.go | 19 ++++--- .../multikueue/admissioncheck_test.go | 49 ++++++++++++++++++- .../admissionchecks/multikueue/controllers.go | 33 +++++++++---- .../multikueue/multikueuecluster.go | 14 ++++-- .../multikueue/multikueuecluster_test.go | 24 ++++++++- .../admissionchecks/multikueue/workload.go | 20 ++++---- .../multikueue/workload_test.go | 8 ++- pkg/util/testing/wrappers.go | 5 ++ test/e2e/multikueue/e2e_test.go | 8 ++- .../integration/multikueue/multikueue_test.go | 16 ++++-- 10 files changed, 157 insertions(+), 39 deletions(-) diff --git a/pkg/controller/admissionchecks/multikueue/admissioncheck.go b/pkg/controller/admissionchecks/multikueue/admissioncheck.go index 7af98a71a4..14aa271a45 100644 --- a/pkg/controller/admissionchecks/multikueue/admissioncheck.go +++ b/pkg/controller/admissionchecks/multikueue/admissioncheck.go @@ -52,11 +52,12 @@ func newMultiKueueStoreHelper(c client.Client) (*multiKueueStoreHelper, error) { } // ACReconciler implements the reconciler for all the admission checks controlled by multikueue. -// Its main task being to maintain the active state of the admission checks based on the heath +// Its main task being to maintain the active state of the admission checks based on the health // of its referenced MultiKueueClusters. type ACReconciler struct { - client client.Client - helper *multiKueueStoreHelper + controllerName string + client client.Client + helper *multiKueueStoreHelper } var _ reconcile.Reconciler = (*ACReconciler)(nil) @@ -64,7 +65,7 @@ var _ reconcile.Reconciler = (*ACReconciler)(nil) func (a *ACReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { log := ctrl.LoggerFrom(ctx) ac := &kueue.AdmissionCheck{} - if err := a.client.Get(ctx, req.NamespacedName, ac); err != nil || ac.Spec.ControllerName != kueuealpha.MultiKueueControllerName { + if err := a.client.Get(ctx, req.NamespacedName, ac); err != nil || ac.Spec.ControllerName != a.controllerName { return reconcile.Result{}, client.IgnoreNotFound(err) } @@ -96,7 +97,8 @@ func (a *ACReconciler) Reconcile(ctx context.Context, req reconcile.Request) (re if err != nil { missingClusters = append(missingClusters, clusterName) - } else if !apimeta.IsStatusConditionTrue(cluster.Status.Conditions, kueuealpha.MultiKueueClusterActive) { + } else if !apimeta.IsStatusConditionTrue(cluster.Status.Conditions, kueuealpha.MultiKueueClusterActive) || + ac.Spec.ControllerName != cluster.Spec.ControllerName { inactiveClusters = append(inactiveClusters, clusterName) } } @@ -166,10 +168,11 @@ func (a *ACReconciler) Reconcile(ctx context.Context, req reconcile.Request) (re // +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=admissionchecks,verbs=get;list;watch // +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=multikueueconfigs,verbs=get;list;watch -func newACReconciler(c client.Client, helper *multiKueueStoreHelper) *ACReconciler { +func newACReconciler(c client.Client, helper *multiKueueStoreHelper, so SetupOptions) *ACReconciler { return &ACReconciler{ - client: c, - helper: helper, + controllerName: so.controllerName, + client: c, + helper: helper, } } diff --git a/pkg/controller/admissionchecks/multikueue/admissioncheck_test.go b/pkg/controller/admissionchecks/multikueue/admissioncheck_test.go index b0f8ae7fb3..615e831c1f 100644 --- a/pkg/controller/admissionchecks/multikueue/admissioncheck_test.go +++ b/pkg/controller/admissionchecks/multikueue/admissioncheck_test.go @@ -17,6 +17,7 @@ limitations under the License. package multikueue import ( + "fmt" "testing" "github.com/google/go-cmp/cmp" @@ -123,6 +124,7 @@ func TestReconcile(t *testing.T) { }, clusters: []kueuealpha.MultiKueueCluster{ *utiltesting.MakeMultiKueueCluster("worker1"). + ControllerName(kueuealpha.MultiKueueControllerName). Active(metav1.ConditionFalse, "ByTest", "by test", 1). Obj(), }, @@ -156,9 +158,11 @@ func TestReconcile(t *testing.T) { }, clusters: []kueuealpha.MultiKueueCluster{ *utiltesting.MakeMultiKueueCluster("worker1"). + ControllerName(kueuealpha.MultiKueueControllerName). Active(metav1.ConditionFalse, "ByTest", "by test", 1). Obj(), *utiltesting.MakeMultiKueueCluster("worker2"). + ControllerName(kueuealpha.MultiKueueControllerName). Active(metav1.ConditionFalse, "ByTest", "by test", 1). Obj(), }, @@ -192,9 +196,11 @@ func TestReconcile(t *testing.T) { }, clusters: []kueuealpha.MultiKueueCluster{ *utiltesting.MakeMultiKueueCluster("worker1"). + ControllerName(kueuealpha.MultiKueueControllerName). Active(metav1.ConditionFalse, "ByTest", "by test", 1). Obj(), *utiltesting.MakeMultiKueueCluster("worker2"). + ControllerName(kueuealpha.MultiKueueControllerName). Active(metav1.ConditionTrue, "ByTest", "by test", 1). Obj(), }, @@ -228,6 +234,7 @@ func TestReconcile(t *testing.T) { }, clusters: []kueuealpha.MultiKueueCluster{ *utiltesting.MakeMultiKueueCluster("worker1"). + ControllerName(kueuealpha.MultiKueueControllerName). Active(metav1.ConditionTrue, "ByTest", "by test", 1). Obj(), }, @@ -247,10 +254,49 @@ func TestReconcile(t *testing.T) { Obj(), }, }, + "non default controller name": { + reconcileFor: "ac1", + checks: []kueue.AdmissionCheck{ + *utiltesting.MakeAdmissionCheck("ac1"). + ControllerName(kueuealpha.MultiKueueControllerName). + Parameters(kueuealpha.GroupVersion.Group, "MultiKueueConfig", "config1"). + Generation(1). + Obj(), + }, + configs: []kueuealpha.MultiKueueConfig{ + *utiltesting.MakeMultiKueueConfig("config1").Clusters("worker1", "worker2").Obj(), + }, + clusters: []kueuealpha.MultiKueueCluster{ + *utiltesting.MakeMultiKueueCluster("worker1"). + ControllerName("mkc1"). + Active(metav1.ConditionTrue, "ByTest", "by test", 1). + Obj(), + *utiltesting.MakeMultiKueueCluster("worker2"). + ControllerName("mkc1"). + Active(metav1.ConditionTrue, "ByTest", "by test", 1). + Obj(), + }, + wantChecks: []kueue.AdmissionCheck{ + *utiltesting.MakeAdmissionCheck("ac1"). + ControllerName(kueuealpha.MultiKueueControllerName). + Parameters(kueuealpha.GroupVersion.Group, "MultiKueueConfig", "config1"). + SingleInstanceInClusterQueue(true, SingleInstanceReason, SingleInstanceMessage, 1). + ApplyToAllFlavors(true, FlavorIndependentCheckReason, FlavorIndependentCheckMessage, 1). + Condition(metav1.Condition{ + Type: kueue.AdmissionCheckActive, + Status: metav1.ConditionFalse, + Reason: "NoUsableClusters", + Message: "Inactive clusters: [worker1 worker2]", + ObservedGeneration: 1, + }). + Obj(), + }, + }, } for name, tc := range cases { t.Run(name, func(t *testing.T) { + fmt.Println(name) builder, ctx := getClientBuilder() builder = builder.WithLists( @@ -266,7 +312,8 @@ func TestReconcile(t *testing.T) { c := builder.Build() helper, _ := newMultiKueueStoreHelper(c) - reconciler := newACReconciler(c, helper) + setupOptions := NewSetupOptions() + reconciler := newACReconciler(c, helper, *setupOptions) _, gotErr := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: types.NamespacedName{Name: tc.reconcileFor}}) if diff := cmp.Diff(tc.wantError, gotErr); diff != "" { diff --git a/pkg/controller/admissionchecks/multikueue/controllers.go b/pkg/controller/admissionchecks/multikueue/controllers.go index 265ddd4e95..2e370ee14e 100644 --- a/pkg/controller/admissionchecks/multikueue/controllers.go +++ b/pkg/controller/admissionchecks/multikueue/controllers.go @@ -21,17 +21,19 @@ import ( ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/kueue/pkg/constants" + kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1" "sigs.k8s.io/kueue/pkg/controller/jobframework" ) const ( - defaultGCInterval = time.Minute - defaultOrigin = "multikueue" - defaultWorkerLostTimeout = 5 * time.Minute + defaultGCInterval = time.Minute + defaultOrigin = "multikueue" + defaultWorkerLostTimeout = 5 * time.Minute + defaultMultikueControllerName = kueuealpha.MultiKueueControllerName ) type SetupOptions struct { + controllerName string gcInterval time.Duration origin string workerLostTimeout time.Duration @@ -72,13 +74,26 @@ func WithEventsBatchPeriod(d time.Duration) SetupOption { } } -func SetupControllers(mgr ctrl.Manager, namespace string, opts ...SetupOption) error { - options := &SetupOptions{ +// WithControllerName - sets the controller name for which the multikueue +// admission check match. +func WithControllerName(controllerName string) SetupOption { + return func(o *SetupOptions) { + o.controllerName = controllerName + } +} + +func NewSetupOptions() *SetupOptions { + return &SetupOptions{ gcInterval: defaultGCInterval, origin: defaultOrigin, workerLostTimeout: defaultWorkerLostTimeout, eventsBatchPeriod: constants.UpdatesBatchPeriod, + controllerName: defaultMultikueControllerName, } +} + +func SetupControllers(mgr ctrl.Manager, namespace string, opts ...SetupOption) error { + options := NewSetupOptions() for _, o := range opts { o(options) @@ -100,18 +115,18 @@ func SetupControllers(mgr ctrl.Manager, namespace string, opts ...SetupOption) e return err } - cRec := newClustersReconciler(mgr.GetClient(), namespace, options.gcInterval, options.origin, fsWatcher, adapters) + cRec := newClustersReconciler(mgr.GetClient(), namespace, *options, fsWatcher, adapters) err = cRec.setupWithManager(mgr) if err != nil { return err } - acRec := newACReconciler(mgr.GetClient(), helper) + acRec := newACReconciler(mgr.GetClient(), helper, *options) err = acRec.setupWithManager(mgr) if err != nil { return err } - wlRec := newWlReconciler(mgr.GetClient(), helper, cRec, options.origin, options.workerLostTimeout, options.eventsBatchPeriod, adapters) + wlRec := newWlReconciler(mgr.GetClient(), helper, cRec, *options, adapters) return wlRec.setupWithManager(mgr) } diff --git a/pkg/controller/admissionchecks/multikueue/multikueuecluster.go b/pkg/controller/admissionchecks/multikueue/multikueuecluster.go index 15fa09a508..9d6b895d76 100644 --- a/pkg/controller/admissionchecks/multikueue/multikueuecluster.go +++ b/pkg/controller/admissionchecks/multikueue/multikueuecluster.go @@ -312,6 +312,9 @@ type clustersReconciler struct { remoteClients map[string]*remoteClient wlUpdateCh chan event.GenericEvent + // name of the admission check controller that manages the cluster + controllerName string + // gcInterval - time waiting between two GC runs. gcInterval time.Duration @@ -395,6 +398,10 @@ func (c *clustersReconciler) Reconcile(ctx context.Context, req reconcile.Reques return reconcile.Result{}, err } + if err == nil && cluster.Spec.ControllerName != c.controllerName { + return reconcile.Result{}, nil + } + log.V(2).Info("Reconcile MultiKueueCluster") if err != nil || !cluster.DeletionTimestamp.IsZero() { @@ -503,14 +510,15 @@ func (c *clustersReconciler) runGC(ctx context.Context) { // +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=multikueueclusters,verbs=get;list;watch // +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=multikueueclusters/status,verbs=get;update;patch -func newClustersReconciler(c client.Client, namespace string, gcInterval time.Duration, origin string, fsWatcher *KubeConfigFSWatcher, adapters map[string]jobframework.MultiKueueAdapter) *clustersReconciler { +func newClustersReconciler(c client.Client, namespace string, so SetupOptions, fsWatcher *KubeConfigFSWatcher, adapters map[string]jobframework.MultiKueueAdapter) *clustersReconciler { return &clustersReconciler{ localClient: c, configNamespace: namespace, + controllerName: so.controllerName, remoteClients: make(map[string]*remoteClient), wlUpdateCh: make(chan event.GenericEvent, eventChBufferSize), - gcInterval: gcInterval, - origin: origin, + gcInterval: so.gcInterval, + origin: so.origin, watchEndedCh: make(chan event.GenericEvent, eventChBufferSize), fsWatcher: fsWatcher, adapters: adapters, diff --git a/pkg/controller/admissionchecks/multikueue/multikueuecluster_test.go b/pkg/controller/admissionchecks/multikueue/multikueuecluster_test.go index e0b85adb1d..f2690a8de9 100644 --- a/pkg/controller/admissionchecks/multikueue/multikueuecluster_test.go +++ b/pkg/controller/admissionchecks/multikueue/multikueuecluster_test.go @@ -113,6 +113,7 @@ func TestUpdateConfig(t *testing.T) { reconcileFor: "worker1", clusters: []kueuealpha.MultiKueueCluster{ *utiltesting.MakeMultiKueueCluster("worker1"). + ControllerName(kueuealpha.MultiKueueControllerName). KubeConfig(kueuealpha.SecretLocationType, "worker1"). Generation(1). Obj(), @@ -122,6 +123,7 @@ func TestUpdateConfig(t *testing.T) { }, wantClusters: []kueuealpha.MultiKueueCluster{ *utiltesting.MakeMultiKueueCluster("worker1"). + ControllerName(kueuealpha.MultiKueueControllerName). KubeConfig(kueuealpha.SecretLocationType, "worker1"). Active(metav1.ConditionTrue, "Active", "Connected", 1). Generation(1). @@ -137,6 +139,7 @@ func TestUpdateConfig(t *testing.T) { reconcileFor: "worker1", clusters: []kueuealpha.MultiKueueCluster{ *utiltesting.MakeMultiKueueCluster("worker1"). + ControllerName(kueuealpha.MultiKueueControllerName). KubeConfig(kueuealpha.SecretLocationType, "worker1"). Generation(1). Obj(), @@ -149,6 +152,7 @@ func TestUpdateConfig(t *testing.T) { }, wantClusters: []kueuealpha.MultiKueueCluster{ *utiltesting.MakeMultiKueueCluster("worker1"). + ControllerName(kueuealpha.MultiKueueControllerName). KubeConfig(kueuealpha.SecretLocationType, "worker1"). Active(metav1.ConditionTrue, "Active", "Connected", 1). Generation(1). @@ -165,6 +169,7 @@ func TestUpdateConfig(t *testing.T) { reconcileFor: "worker1", clusters: []kueuealpha.MultiKueueCluster{ *utiltesting.MakeMultiKueueCluster("worker1"). + ControllerName(kueuealpha.MultiKueueControllerName). KubeConfig(kueuealpha.PathLocationType, "testdata/worker1KubeConfig"). Generation(1). Obj(), @@ -174,6 +179,7 @@ func TestUpdateConfig(t *testing.T) { }, wantClusters: []kueuealpha.MultiKueueCluster{ *utiltesting.MakeMultiKueueCluster("worker1"). + ControllerName(kueuealpha.MultiKueueControllerName). KubeConfig(kueuealpha.PathLocationType, "testdata/worker1KubeConfig"). Active(metav1.ConditionTrue, "Active", "Connected", 1). Generation(1). @@ -190,6 +196,7 @@ func TestUpdateConfig(t *testing.T) { reconcileFor: "worker1", clusters: []kueuealpha.MultiKueueCluster{ *utiltesting.MakeMultiKueueCluster("worker1"). + ControllerName(kueuealpha.MultiKueueControllerName). KubeConfig(kueuealpha.SecretLocationType, "worker1"). Generation(1). Obj(), @@ -205,6 +212,7 @@ func TestUpdateConfig(t *testing.T) { }, wantClusters: []kueuealpha.MultiKueueCluster{ *utiltesting.MakeMultiKueueCluster("worker1"). + ControllerName(kueuealpha.MultiKueueControllerName). KubeConfig(kueuealpha.SecretLocationType, "worker1"). Active(metav1.ConditionFalse, "ClientConnectionFailed", "invalid kubeconfig", 1). Generation(1). @@ -216,6 +224,7 @@ func TestUpdateConfig(t *testing.T) { reconcileFor: "worker1", clusters: []kueuealpha.MultiKueueCluster{ *utiltesting.MakeMultiKueueCluster("worker1"). + ControllerName(kueuealpha.MultiKueueControllerName). KubeConfig(kueuealpha.PathLocationType, ""). Generation(1). Obj(), @@ -225,6 +234,7 @@ func TestUpdateConfig(t *testing.T) { }, wantClusters: []kueuealpha.MultiKueueCluster{ *utiltesting.MakeMultiKueueCluster("worker1"). + ControllerName(kueuealpha.MultiKueueControllerName). KubeConfig(kueuealpha.PathLocationType, ""). Active(metav1.ConditionFalse, "BadConfig", "open : no such file or directory", 1). Generation(1). @@ -236,6 +246,7 @@ func TestUpdateConfig(t *testing.T) { reconcileFor: "worker2", clusters: []kueuealpha.MultiKueueCluster{ *utiltesting.MakeMultiKueueCluster("worker1"). + ControllerName(kueuealpha.MultiKueueControllerName). KubeConfig(kueuealpha.SecretLocationType, "worker1"). Generation(1). Obj(), @@ -246,6 +257,7 @@ func TestUpdateConfig(t *testing.T) { }, wantClusters: []kueuealpha.MultiKueueCluster{ *utiltesting.MakeMultiKueueCluster("worker1"). + ControllerName(kueuealpha.MultiKueueControllerName). KubeConfig(kueuealpha.SecretLocationType, "worker1"). Generation(1). Obj(), @@ -261,6 +273,7 @@ func TestUpdateConfig(t *testing.T) { reconcileFor: "worker1", clusters: []kueuealpha.MultiKueueCluster{ *utiltesting.MakeMultiKueueCluster("worker1"). + ControllerName(kueuealpha.MultiKueueControllerName). KubeConfig(kueuealpha.SecretLocationType, "worker1"). Generation(1). Obj(), @@ -276,6 +289,7 @@ func TestUpdateConfig(t *testing.T) { }, wantClusters: []kueuealpha.MultiKueueCluster{ *utiltesting.MakeMultiKueueCluster("worker1"). + ControllerName(kueuealpha.MultiKueueControllerName). KubeConfig(kueuealpha.SecretLocationType, "worker1"). Active(metav1.ConditionFalse, "ClientConnectionFailed", "client cannot watch", 1). Generation(1). @@ -288,6 +302,7 @@ func TestUpdateConfig(t *testing.T) { reconcileFor: "worker1", clusters: []kueuealpha.MultiKueueCluster{ *utiltesting.MakeMultiKueueCluster("worker1"). + ControllerName(kueuealpha.MultiKueueControllerName). KubeConfig(kueuealpha.SecretLocationType, "worker1"). Active(metav1.ConditionFalse, "ClientConnectionFailed", "client cannot watch", 1). Generation(1). @@ -304,6 +319,7 @@ func TestUpdateConfig(t *testing.T) { }, wantClusters: []kueuealpha.MultiKueueCluster{ *utiltesting.MakeMultiKueueCluster("worker1"). + ControllerName(kueuealpha.MultiKueueControllerName). KubeConfig(kueuealpha.SecretLocationType, "worker1"). Active(metav1.ConditionFalse, "ClientConnectionFailed", "client cannot watch", 1). Generation(1). @@ -316,6 +332,7 @@ func TestUpdateConfig(t *testing.T) { reconcileFor: "worker1", clusters: []kueuealpha.MultiKueueCluster{ *utiltesting.MakeMultiKueueCluster("worker1"). + ControllerName(kueuealpha.MultiKueueControllerName). KubeConfig(kueuealpha.SecretLocationType, "worker1"). Active(metav1.ConditionFalse, "ClientConnectionFailed", "client cannot watch", 1). Generation(1). @@ -332,6 +349,7 @@ func TestUpdateConfig(t *testing.T) { }, wantClusters: []kueuealpha.MultiKueueCluster{ *utiltesting.MakeMultiKueueCluster("worker1"). + ControllerName(kueuealpha.MultiKueueControllerName). KubeConfig(kueuealpha.SecretLocationType, "worker1"). Active(metav1.ConditionTrue, "Active", "Connected", 1). Generation(1). @@ -343,6 +361,7 @@ func TestUpdateConfig(t *testing.T) { reconcileFor: "worker1", clusters: []kueuealpha.MultiKueueCluster{ *utiltesting.MakeMultiKueueCluster("worker1"). + ControllerName(kueuealpha.MultiKueueControllerName). KubeConfig(kueuealpha.SecretLocationType, "worker1"). Active(metav1.ConditionFalse, "ClientConnectionFailed", "client cannot watch", 1). Generation(1). @@ -359,6 +378,7 @@ func TestUpdateConfig(t *testing.T) { }, wantClusters: []kueuealpha.MultiKueueCluster{ *utiltesting.MakeMultiKueueCluster("worker1"). + ControllerName(kueuealpha.MultiKueueControllerName). KubeConfig(kueuealpha.SecretLocationType, "worker1"). Active(metav1.ConditionFalse, "ClientConnectionFailed", "invalid kubeconfig", 1). Generation(1). @@ -377,7 +397,9 @@ func TestUpdateConfig(t *testing.T) { c := builder.Build() adapters, _ := jobframework.GetMultiKueueAdapters() - reconciler := newClustersReconciler(c, TestNamespace, 0, defaultOrigin, nil, adapters) + setupOptions := NewSetupOptions() + setupOptions.gcInterval = 0 + reconciler := newClustersReconciler(c, TestNamespace, *setupOptions, nil, adapters) reconciler.rootContext = ctx if len(tc.remoteClients) > 0 { diff --git a/pkg/controller/admissionchecks/multikueue/workload.go b/pkg/controller/admissionchecks/multikueue/workload.go index 2b9916628b..ea16362445 100644 --- a/pkg/controller/admissionchecks/multikueue/workload.go +++ b/pkg/controller/admissionchecks/multikueue/workload.go @@ -50,6 +50,7 @@ var ( ) type wlReconciler struct { + controllerName string client client.Client helper *multiKueueStoreHelper clusters *clustersReconciler @@ -215,7 +216,7 @@ func (w *wlReconciler) updateACS(ctx context.Context, wl *kueue.Workload, acs *k acs.LastTransitionTime = metav1.NewTime(time.Now()) wlPatch := workload.BaseSSAWorkload(wl) workload.SetAdmissionCheckState(&wlPatch.Status.AdmissionChecks, *acs) - return w.client.Status().Patch(ctx, wlPatch, client.Apply, client.FieldOwner(kueuealpha.MultiKueueControllerName), client.ForceOwnership) + return w.client.Status().Patch(ctx, wlPatch, client.Apply, client.FieldOwner(w.controllerName), client.ForceOwnership) } func (w *wlReconciler) remoteClientsForAC(ctx context.Context, acName string) (map[string]*remoteClient, error) { @@ -239,7 +240,7 @@ func (w *wlReconciler) remoteClientsForAC(ctx context.Context, acName string) (m } func (w *wlReconciler) multikueueAC(ctx context.Context, local *kueue.Workload) (*kueue.AdmissionCheckState, error) { - relevantChecks, err := admissioncheck.FilterForController(ctx, w.client, local.Status.AdmissionChecks, kueuealpha.MultiKueueControllerName) + relevantChecks, err := admissioncheck.FilterForController(ctx, w.client, local.Status.AdmissionChecks, w.controllerName) if err != nil { return nil, err } @@ -332,7 +333,7 @@ func (w *wlReconciler) reconcileGroup(ctx context.Context, group *wlGroup) (reco Reason: remoteFinishedCond.Reason, Message: remoteFinishedCond.Message, }) - return reconcile.Result{}, w.client.Status().Patch(ctx, wlPatch, client.Apply, client.FieldOwner(kueuealpha.MultiKueueControllerName+"-finish"), client.ForceOwnership) + return reconcile.Result{}, w.client.Status().Patch(ctx, wlPatch, client.Apply, client.FieldOwner(w.controllerName+"-finish"), client.ForceOwnership) } // 2. delete all workloads that are out of sync or are not in the chosen worker @@ -380,7 +381,7 @@ func (w *wlReconciler) reconcileGroup(ctx context.Context, group *wlGroup) (reco wlPatch := workload.BaseSSAWorkload(group.local) workload.SetAdmissionCheckState(&wlPatch.Status.AdmissionChecks, *acs) - err := w.client.Status().Patch(ctx, wlPatch, client.Apply, client.FieldOwner(kueuealpha.MultiKueueControllerName), client.ForceOwnership) + err := w.client.Status().Patch(ctx, wlPatch, client.Apply, client.FieldOwner(w.controllerName), client.ForceOwnership) if err != nil { return reconcile.Result{}, err } @@ -399,7 +400,7 @@ func (w *wlReconciler) reconcileGroup(ctx context.Context, group *wlGroup) (reco acs.LastTransitionTime = metav1.NewTime(time.Now()) wlPatch := workload.BaseSSAWorkload(group.local) workload.SetAdmissionCheckState(&wlPatch.Status.AdmissionChecks, *acs) - return reconcile.Result{}, w.client.Status().Patch(ctx, wlPatch, client.Apply, client.FieldOwner(kueuealpha.MultiKueueControllerName), client.ForceOwnership) + return reconcile.Result{}, w.client.Status().Patch(ctx, wlPatch, client.Apply, client.FieldOwner(w.controllerName), client.ForceOwnership) } } @@ -438,15 +439,16 @@ func (w *wlReconciler) Generic(_ event.GenericEvent) bool { return true } -func newWlReconciler(c client.Client, helper *multiKueueStoreHelper, cRec *clustersReconciler, origin string, workerLostTimeout, eventsBatchPeriod time.Duration, adapters map[string]jobframework.MultiKueueAdapter) *wlReconciler { +func newWlReconciler(c client.Client, helper *multiKueueStoreHelper, cRec *clustersReconciler, so SetupOptions, adapters map[string]jobframework.MultiKueueAdapter) *wlReconciler { return &wlReconciler{ + controllerName: so.controllerName, client: c, helper: helper, clusters: cRec, - origin: origin, - workerLostTimeout: workerLostTimeout, + origin: so.origin, + workerLostTimeout: so.workerLostTimeout, deletedWlCache: utilmaps.NewSyncMap[string, *kueue.Workload](0), - eventsBatchPeriod: eventsBatchPeriod, + eventsBatchPeriod: so.eventsBatchPeriod, adapters: adapters, } } diff --git a/pkg/controller/admissionchecks/multikueue/workload_test.go b/pkg/controller/admissionchecks/multikueue/workload_test.go index 12c0c5817e..d211105c4f 100644 --- a/pkg/controller/admissionchecks/multikueue/workload_test.go +++ b/pkg/controller/admissionchecks/multikueue/workload_test.go @@ -1009,7 +1009,9 @@ func TestWlReconcile(t *testing.T) { managerClient := managerBuilder.Build() adapters, _ := jobframework.GetMultiKueueAdapters() - cRec := newClustersReconciler(managerClient, TestNamespace, 0, defaultOrigin, nil, adapters) + setupOptions := NewSetupOptions() + setupOptions.gcInterval = 0 + cRec := newClustersReconciler(managerClient, TestNamespace, *setupOptions, nil, adapters) worker1Builder, _ := getClientBuilder() worker1Builder = worker1Builder.WithLists(&kueue.WorkloadList{Items: tc.worker1Workloads}, &batchv1.JobList{Items: tc.worker1Jobs}) @@ -1055,7 +1057,9 @@ func TestWlReconcile(t *testing.T) { } helper, _ := newMultiKueueStoreHelper(managerClient) - reconciler := newWlReconciler(managerClient, helper, cRec, defaultOrigin, defaultWorkerLostTimeout, time.Second, adapters) + setupOptions.workerLostTimeout = defaultWorkerLostTimeout + setupOptions.eventsBatchPeriod = time.Second + reconciler := newWlReconciler(managerClient, helper, cRec, *setupOptions, adapters) for _, val := range tc.managersDeletedWorkloads { reconciler.Delete(event.DeleteEvent{ diff --git a/pkg/util/testing/wrappers.go b/pkg/util/testing/wrappers.go index 64fe90aaa6..9bdc2c836b 100644 --- a/pkg/util/testing/wrappers.go +++ b/pkg/util/testing/wrappers.go @@ -1069,6 +1069,11 @@ func (mkc *MultiKueueClusterWrapper) Obj() *kueuealpha.MultiKueueCluster { return &mkc.MultiKueueCluster } +func (mkc *MultiKueueClusterWrapper) ControllerName(controllerName string) *MultiKueueClusterWrapper { + mkc.Spec.ControllerName = controllerName + return mkc +} + func (mkc *MultiKueueClusterWrapper) KubeConfig(locationType kueuealpha.LocationType, location string) *MultiKueueClusterWrapper { mkc.Spec.KubeConfig = kueuealpha.KubeConfig{ Location: location, diff --git a/test/e2e/multikueue/e2e_test.go b/test/e2e/multikueue/e2e_test.go index 334e9d8fba..d8bbdda327 100644 --- a/test/e2e/multikueue/e2e_test.go +++ b/test/e2e/multikueue/e2e_test.go @@ -91,10 +91,14 @@ var _ = ginkgo.Describe("MultiKueue", func() { } gomega.Expect(k8sWorker2Client.Create(ctx, worker2Ns)).To(gomega.Succeed()) - workerCluster1 = utiltesting.MakeMultiKueueCluster("worker1").KubeConfig(kueuealpha.SecretLocationType, "multikueue1").Obj() + workerCluster1 = utiltesting.MakeMultiKueueCluster("worker1"). + ControllerName(kueuealpha.MultiKueueControllerName). + KubeConfig(kueuealpha.SecretLocationType, "multikueue1").Obj() gomega.Expect(k8sManagerClient.Create(ctx, workerCluster1)).To(gomega.Succeed()) - workerCluster2 = utiltesting.MakeMultiKueueCluster("worker2").KubeConfig(kueuealpha.SecretLocationType, "multikueue2").Obj() + workerCluster2 = utiltesting.MakeMultiKueueCluster("worker2"). + ControllerName(kueuealpha.MultiKueueControllerName). + KubeConfig(kueuealpha.SecretLocationType, "multikueue2").Obj() gomega.Expect(k8sManagerClient.Create(ctx, workerCluster2)).To(gomega.Succeed()) multiKueueConfig = utiltesting.MakeMultiKueueConfig("multikueueconfig").Clusters("worker1", "worker2").Obj() diff --git a/test/integration/multikueue/multikueue_test.go b/test/integration/multikueue/multikueue_test.go index a84a8e88ac..469f77f93b 100644 --- a/test/integration/multikueue/multikueue_test.go +++ b/test/integration/multikueue/multikueue_test.go @@ -128,10 +128,14 @@ var _ = ginkgo.Describe("Multikueue", ginkgo.Ordered, ginkgo.ContinueOnFailure, } gomega.Expect(managerTestCluster.client.Create(managerTestCluster.ctx, managerMultikueueSecret2)).To(gomega.Succeed()) - workerCluster1 = utiltesting.MakeMultiKueueCluster("worker1").KubeConfig(kueuealpha.SecretLocationType, managerMultikueueSecret1.Name).Obj() + workerCluster1 = utiltesting.MakeMultiKueueCluster("worker1"). + ControllerName(kueuealpha.MultiKueueControllerName). + KubeConfig(kueuealpha.SecretLocationType, managerMultikueueSecret1.Name).Obj() gomega.Expect(managerTestCluster.client.Create(managerTestCluster.ctx, workerCluster1)).To(gomega.Succeed()) - workerCluster2 = utiltesting.MakeMultiKueueCluster("worker2").KubeConfig(kueuealpha.SecretLocationType, managerMultikueueSecret2.Name).Obj() + workerCluster2 = utiltesting.MakeMultiKueueCluster("worker2"). + ControllerName(kueuealpha.MultiKueueControllerName). + KubeConfig(kueuealpha.SecretLocationType, managerMultikueueSecret2.Name).Obj() gomega.Expect(managerTestCluster.client.Create(managerTestCluster.ctx, workerCluster2)).To(gomega.Succeed()) managerMultiKueueConfig = utiltesting.MakeMultiKueueConfig("multikueueconfig").Clusters(workerCluster1.Name, workerCluster2.Name).Obj() @@ -251,7 +255,9 @@ var _ = ginkgo.Describe("Multikueue", ginkgo.Ordered, ginkgo.ContinueOnFailure, }) }) - cluster := utiltesting.MakeMultiKueueCluster("testing-cluster").KubeConfig(kueuealpha.SecretLocationType, "testing-secret").Obj() + cluster := utiltesting.MakeMultiKueueCluster("testing-cluster"). + ControllerName(kueuealpha.MultiKueueControllerName). + KubeConfig(kueuealpha.SecretLocationType, "testing-secret").Obj() ginkgo.By("creating the cluster, its Active state is updated, the admission check's state is updated", func() { gomega.Expect(managerTestCluster.client.Create(managerTestCluster.ctx, cluster)).Should(gomega.Succeed()) ginkgo.DeferCleanup(func() error { return managerTestCluster.client.Delete(managerTestCluster.ctx, cluster) }) @@ -392,7 +398,9 @@ var _ = ginkgo.Describe("Multikueue", ginkgo.Ordered, ginkgo.ContinueOnFailure, }) }) - cluster := utiltesting.MakeMultiKueueCluster("testing-cluster").KubeConfig(kueuealpha.PathLocationType, fsKubeConfig).Obj() + cluster := utiltesting.MakeMultiKueueCluster("testing-cluster"). + ControllerName(kueuealpha.MultiKueueControllerName). + KubeConfig(kueuealpha.PathLocationType, fsKubeConfig).Obj() ginkgo.By("creating the cluster, its Active state is updated, the admission check's state is updated", func() { gomega.Expect(managerTestCluster.client.Create(managerTestCluster.ctx, cluster)).Should(gomega.Succeed()) ginkgo.DeferCleanup(func() error { return managerTestCluster.client.Delete(managerTestCluster.ctx, cluster) }) From 5da30dafc38b2169bf560e7273064c8cda5caafa Mon Sep 17 00:00:00 2001 From: Michal Szadkowski Date: Wed, 12 Jun 2024 15:53:22 +0200 Subject: [PATCH 3/5] Update kueue cmd to be able to pass custom multikueue controller and adapters --- cmd/kueue/main.go | 8 ++++++ .../admissionchecks/multikueue/controllers.go | 28 +++++++++++-------- .../multikueue/multikueuecluster.go | 4 +-- .../multikueue/multikueuecluster_test.go | 3 +- .../admissionchecks/multikueue/workload.go | 4 +-- .../multikueue/workload_test.go | 6 ++-- 6 files changed, 34 insertions(+), 19 deletions(-) diff --git a/cmd/kueue/main.go b/cmd/kueue/main.go index 4884bbf443..eff6ddd4f6 100644 --- a/cmd/kueue/main.go +++ b/cmd/kueue/main.go @@ -256,10 +256,18 @@ func setupControllers(mgr ctrl.Manager, cCache *cache.Cache, queues *queue.Manag } if features.Enabled(features.MultiKueue) { + adapters, err := jobframework.GetMultiKueueAdapters() + if err != nil { + setupLog.Error(err, "Could not get the multikueue adapters") + os.Exit(1) + } + if err := multikueue.SetupControllers(mgr, *cfg.Namespace, multikueue.WithGCInterval(cfg.MultiKueue.GCInterval.Duration), multikueue.WithOrigin(ptr.Deref(cfg.MultiKueue.Origin, configapi.DefaultMultiKueueOrigin)), multikueue.WithWorkerLostTimeout(cfg.MultiKueue.WorkerLostTimeout.Duration), + multikueue.WithControllerName(kueuealpha.MultiKueueControllerName), + multikueue.WithAdapters(adapters), ); err != nil { setupLog.Error(err, "Could not setup MultiKueue controller") os.Exit(1) diff --git a/pkg/controller/admissionchecks/multikueue/controllers.go b/pkg/controller/admissionchecks/multikueue/controllers.go index 2e370ee14e..a4a26cbb36 100644 --- a/pkg/controller/admissionchecks/multikueue/controllers.go +++ b/pkg/controller/admissionchecks/multikueue/controllers.go @@ -26,10 +26,9 @@ import ( ) const ( - defaultGCInterval = time.Minute - defaultOrigin = "multikueue" - defaultWorkerLostTimeout = 5 * time.Minute - defaultMultikueControllerName = kueuealpha.MultiKueueControllerName + defaultGCInterval = time.Minute + defaultOrigin = "multikueue" + defaultWorkerLostTimeout = 5 * time.Minute ) type SetupOptions struct { @@ -38,6 +37,7 @@ type SetupOptions struct { origin string workerLostTimeout time.Duration eventsBatchPeriod time.Duration + adapters map[string]jobframework.MultiKueueAdapter } type SetupOption func(o *SetupOptions) @@ -82,13 +82,22 @@ func WithControllerName(controllerName string) SetupOption { } } +// WithAdapters - sets the controller name for which the multikueue +// admission check match. +func WithAdapters(adapters map[string]jobframework.MultiKueueAdapter) SetupOption { + return func(o *SetupOptions) { + o.adapters = adapters + } +} + func NewSetupOptions() *SetupOptions { return &SetupOptions{ gcInterval: defaultGCInterval, origin: defaultOrigin, workerLostTimeout: defaultWorkerLostTimeout, eventsBatchPeriod: constants.UpdatesBatchPeriod, - controllerName: defaultMultikueControllerName, + controllerName: kueuealpha.MultiKueueControllerName, + adapters: make(map[string]jobframework.MultiKueueAdapter), } } @@ -110,12 +119,7 @@ func SetupControllers(mgr ctrl.Manager, namespace string, opts ...SetupOption) e return err } - adapters, err := jobframework.GetMultiKueueAdapters() - if err != nil { - return err - } - - cRec := newClustersReconciler(mgr.GetClient(), namespace, *options, fsWatcher, adapters) + cRec := newClustersReconciler(mgr.GetClient(), namespace, *options, fsWatcher) err = cRec.setupWithManager(mgr) if err != nil { return err @@ -127,6 +131,6 @@ func SetupControllers(mgr ctrl.Manager, namespace string, opts ...SetupOption) e return err } - wlRec := newWlReconciler(mgr.GetClient(), helper, cRec, *options, adapters) + wlRec := newWlReconciler(mgr.GetClient(), helper, cRec, *options) return wlRec.setupWithManager(mgr) } diff --git a/pkg/controller/admissionchecks/multikueue/multikueuecluster.go b/pkg/controller/admissionchecks/multikueue/multikueuecluster.go index 9d6b895d76..7c3a6e0bb4 100644 --- a/pkg/controller/admissionchecks/multikueue/multikueuecluster.go +++ b/pkg/controller/admissionchecks/multikueue/multikueuecluster.go @@ -510,7 +510,7 @@ func (c *clustersReconciler) runGC(ctx context.Context) { // +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=multikueueclusters,verbs=get;list;watch // +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=multikueueclusters/status,verbs=get;update;patch -func newClustersReconciler(c client.Client, namespace string, so SetupOptions, fsWatcher *KubeConfigFSWatcher, adapters map[string]jobframework.MultiKueueAdapter) *clustersReconciler { +func newClustersReconciler(c client.Client, namespace string, so SetupOptions, fsWatcher *KubeConfigFSWatcher) *clustersReconciler { return &clustersReconciler{ localClient: c, configNamespace: namespace, @@ -521,7 +521,7 @@ func newClustersReconciler(c client.Client, namespace string, so SetupOptions, f origin: so.origin, watchEndedCh: make(chan event.GenericEvent, eventChBufferSize), fsWatcher: fsWatcher, - adapters: adapters, + adapters: so.adapters, } } diff --git a/pkg/controller/admissionchecks/multikueue/multikueuecluster_test.go b/pkg/controller/admissionchecks/multikueue/multikueuecluster_test.go index f2690a8de9..b3b38c4ded 100644 --- a/pkg/controller/admissionchecks/multikueue/multikueuecluster_test.go +++ b/pkg/controller/admissionchecks/multikueue/multikueuecluster_test.go @@ -399,7 +399,8 @@ func TestUpdateConfig(t *testing.T) { adapters, _ := jobframework.GetMultiKueueAdapters() setupOptions := NewSetupOptions() setupOptions.gcInterval = 0 - reconciler := newClustersReconciler(c, TestNamespace, *setupOptions, nil, adapters) + setupOptions.adapters = adapters + reconciler := newClustersReconciler(c, TestNamespace, *setupOptions, nil) reconciler.rootContext = ctx if len(tc.remoteClients) > 0 { diff --git a/pkg/controller/admissionchecks/multikueue/workload.go b/pkg/controller/admissionchecks/multikueue/workload.go index ea16362445..b36240c8b5 100644 --- a/pkg/controller/admissionchecks/multikueue/workload.go +++ b/pkg/controller/admissionchecks/multikueue/workload.go @@ -439,7 +439,7 @@ func (w *wlReconciler) Generic(_ event.GenericEvent) bool { return true } -func newWlReconciler(c client.Client, helper *multiKueueStoreHelper, cRec *clustersReconciler, so SetupOptions, adapters map[string]jobframework.MultiKueueAdapter) *wlReconciler { +func newWlReconciler(c client.Client, helper *multiKueueStoreHelper, cRec *clustersReconciler, so SetupOptions) *wlReconciler { return &wlReconciler{ controllerName: so.controllerName, client: c, @@ -449,7 +449,7 @@ func newWlReconciler(c client.Client, helper *multiKueueStoreHelper, cRec *clust workerLostTimeout: so.workerLostTimeout, deletedWlCache: utilmaps.NewSyncMap[string, *kueue.Workload](0), eventsBatchPeriod: so.eventsBatchPeriod, - adapters: adapters, + adapters: so.adapters, } } diff --git a/pkg/controller/admissionchecks/multikueue/workload_test.go b/pkg/controller/admissionchecks/multikueue/workload_test.go index d211105c4f..b926422365 100644 --- a/pkg/controller/admissionchecks/multikueue/workload_test.go +++ b/pkg/controller/admissionchecks/multikueue/workload_test.go @@ -1011,7 +1011,8 @@ func TestWlReconcile(t *testing.T) { adapters, _ := jobframework.GetMultiKueueAdapters() setupOptions := NewSetupOptions() setupOptions.gcInterval = 0 - cRec := newClustersReconciler(managerClient, TestNamespace, *setupOptions, nil, adapters) + setupOptions.adapters = adapters + cRec := newClustersReconciler(managerClient, TestNamespace, *setupOptions, nil) worker1Builder, _ := getClientBuilder() worker1Builder = worker1Builder.WithLists(&kueue.WorkloadList{Items: tc.worker1Workloads}, &batchv1.JobList{Items: tc.worker1Jobs}) @@ -1059,7 +1060,8 @@ func TestWlReconcile(t *testing.T) { helper, _ := newMultiKueueStoreHelper(managerClient) setupOptions.workerLostTimeout = defaultWorkerLostTimeout setupOptions.eventsBatchPeriod = time.Second - reconciler := newWlReconciler(managerClient, helper, cRec, *setupOptions, adapters) + setupOptions.adapters = adapters + reconciler := newWlReconciler(managerClient, helper, cRec, *setupOptions) for _, val := range tc.managersDeletedWorkloads { reconciler.Delete(event.DeleteEvent{ From ee5a7986000252a63f134cf279012ac19fbef53a Mon Sep 17 00:00:00 2001 From: Michal Szadkowski Date: Tue, 18 Jun 2024 12:08:50 +0200 Subject: [PATCH 4/5] Add tests for distinct multikueues operating with different adapters --- .../admissionchecks/multikueue/controllers.go | 11 +- .../integration/multikueue/multikueue_test.go | 401 +++++++++++++++++- test/integration/multikueue/suite_test.go | 57 ++- 3 files changed, 458 insertions(+), 11 deletions(-) diff --git a/pkg/controller/admissionchecks/multikueue/controllers.go b/pkg/controller/admissionchecks/multikueue/controllers.go index a4a26cbb36..b8bdbfd77c 100644 --- a/pkg/controller/admissionchecks/multikueue/controllers.go +++ b/pkg/controller/admissionchecks/multikueue/controllers.go @@ -22,6 +22,7 @@ import ( ctrl "sigs.k8s.io/controller-runtime" kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1" + "sigs.k8s.io/kueue/pkg/constants" "sigs.k8s.io/kueue/pkg/controller/jobframework" ) @@ -82,14 +83,20 @@ func WithControllerName(controllerName string) SetupOption { } } -// WithAdapters - sets the controller name for which the multikueue -// admission check match. +// WithAdapters - sets all the MultiKueue adaptors. func WithAdapters(adapters map[string]jobframework.MultiKueueAdapter) SetupOption { return func(o *SetupOptions) { o.adapters = adapters } } +// WithAdapters - sets or updates the adadpter of the MultiKueue adaptors. +func WithAdapter(adapter jobframework.MultiKueueAdapter) SetupOption { + return func(o *SetupOptions) { + o.adapters[adapter.GVK().String()] = adapter + } +} + func NewSetupOptions() *SetupOptions { return &SetupOptions{ gcInterval: defaultGCInterval, diff --git a/test/integration/multikueue/multikueue_test.go b/test/integration/multikueue/multikueue_test.go index 469f77f93b..c2cda3006b 100644 --- a/test/integration/multikueue/multikueue_test.go +++ b/test/integration/multikueue/multikueue_test.go @@ -38,6 +38,7 @@ import ( kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" "sigs.k8s.io/kueue/pkg/controller/admissionchecks/multikueue" + "sigs.k8s.io/kueue/pkg/controller/jobframework" workloadjob "sigs.k8s.io/kueue/pkg/controller/jobs/job" workloadjobset "sigs.k8s.io/kueue/pkg/controller/jobs/jobset" "sigs.k8s.io/kueue/pkg/features" @@ -71,7 +72,7 @@ var _ = ginkgo.Describe("Multikueue", ginkgo.Ordered, ginkgo.ContinueOnFailure, ) ginkgo.BeforeAll(func() { - multiclusterSetup(2 * time.Second) + multiclusterSetup(MultiKueueWithGCInterval(2 * time.Second)) }) ginkgo.AfterAll(func() { @@ -1106,7 +1107,7 @@ var _ = ginkgo.Describe("Multikueue no GC", ginkgo.Ordered, ginkgo.ContinueOnFai ) ginkgo.BeforeAll(func() { - multiclusterSetup(0) + multiclusterSetup(MultiKueueWithGCInterval(0)) }) ginkgo.AfterAll(func() { @@ -1163,10 +1164,12 @@ var _ = ginkgo.Describe("Multikueue no GC", ginkgo.Ordered, ginkgo.ContinueOnFai } gomega.Expect(managerTestCluster.client.Create(managerTestCluster.ctx, managerMultikueueSecret2)).To(gomega.Succeed()) - workerCluster1 = utiltesting.MakeMultiKueueCluster("worker1").KubeConfig(kueuealpha.SecretLocationType, managerMultikueueSecret1.Name).Obj() + workerCluster1 = utiltesting.MakeMultiKueueCluster("worker1"). + ControllerName(kueuealpha.MultiKueueControllerName).KubeConfig(kueuealpha.SecretLocationType, managerMultikueueSecret1.Name).Obj() gomega.Expect(managerTestCluster.client.Create(managerTestCluster.ctx, workerCluster1)).To(gomega.Succeed()) - workerCluster2 = utiltesting.MakeMultiKueueCluster("worker2").KubeConfig(kueuealpha.SecretLocationType, managerMultikueueSecret2.Name).Obj() + workerCluster2 = utiltesting.MakeMultiKueueCluster("worker2"). + ControllerName(kueuealpha.MultiKueueControllerName).KubeConfig(kueuealpha.SecretLocationType, managerMultikueueSecret2.Name).Obj() gomega.Expect(managerTestCluster.client.Create(managerTestCluster.ctx, workerCluster2)).To(gomega.Succeed()) managerMultiKueueConfig = utiltesting.MakeMultiKueueConfig("multikueueconfig").Clusters(workerCluster1.Name, workerCluster2.Name).Obj() @@ -1280,3 +1283,393 @@ var _ = ginkgo.Describe("Multikueue no GC", ginkgo.Ordered, ginkgo.ContinueOnFai }) }) }) + +var _ = ginkgo.Describe("2 Multikueues with distinct adapters", ginkgo.Ordered, ginkgo.ContinueOnFailure, func() { + var ( + managerNs *corev1.Namespace + worker1Ns *corev1.Namespace + worker2Ns *corev1.Namespace + + managerMultikueueSecret1 *corev1.Secret + managerMultikueueSecret2 *corev1.Secret + + multikueue1workerCluster1 *kueuealpha.MultiKueueCluster + multikueue1workerCluster2 *kueuealpha.MultiKueueCluster + multikueue2workerCluster1 *kueuealpha.MultiKueueCluster + multikueue2workerCluster2 *kueuealpha.MultiKueueCluster + + managerMultiKueueConfig1 *kueuealpha.MultiKueueConfig + managerMultiKueueConfig2 *kueuealpha.MultiKueueConfig + multikueueAC1 *kueue.AdmissionCheck + multikueueAC2 *kueue.AdmissionCheck + managerCq1 *kueue.ClusterQueue + managerCq2 *kueue.ClusterQueue + managerLq1 *kueue.LocalQueue + managerLq2 *kueue.LocalQueue + + worker1Cq *kueue.ClusterQueue + worker1Lq *kueue.LocalQueue + + worker2Cq *kueue.ClusterQueue + worker2Lq *kueue.LocalQueue + ) + + const ( + mkCn1 = "kueue.x-k8s.io/mkc1" + mkCn2 = "kueue.x-k8s.io/mkc2" + originMkCn1 = "mkc1" + originMkCn2 = "mkc2" + ) + + ginkgo.BeforeAll(func() { + defaultAdapters, _ := jobframework.GetMultiKueueAdapters() + + // create a mapping between controller name and adapter for 2 distinct multikueue admission checks + mkAdapters := make(controllerNameToAdapter, 0) + mkAdapters[mkCn1] = defaultAdapters[batchv1.SchemeGroupVersion.WithKind("Job").String()] + mkAdapters[mkCn2] = defaultAdapters[jobset.SchemeGroupVersion.WithKind("JobSet").String()] + + multiclusterSetup(MultiKueueWithCustomAdapters(mkAdapters)) + }) + + ginkgo.AfterAll(func() { + multiclusterTeardown() + }) + + ginkgo.BeforeEach(func() { + managerNs = &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "multikueue-", + }, + } + gomega.Expect(managerTestCluster.client.Create(managerTestCluster.ctx, managerNs)).To(gomega.Succeed()) + + worker1Ns = &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: managerNs.Name, + }, + } + gomega.Expect(worker1TestCluster.client.Create(worker1TestCluster.ctx, worker1Ns)).To(gomega.Succeed()) + + worker2Ns = &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: managerNs.Name, + }, + } + gomega.Expect(worker2TestCluster.client.Create(worker2TestCluster.ctx, worker2Ns)).To(gomega.Succeed()) + + w1Kubeconfig, err := worker1TestCluster.kubeConfigBytes() + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + w2Kubeconfig, err := worker2TestCluster.kubeConfigBytes() + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + managerMultikueueSecret1 = &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "multikueue1", + Namespace: managersConfigNamespace.Name, + }, + Data: map[string][]byte{ + kueuealpha.MultiKueueConfigSecretKey: w1Kubeconfig, + }, + } + gomega.Expect(managerTestCluster.client.Create(managerTestCluster.ctx, managerMultikueueSecret1)).To(gomega.Succeed()) + + managerMultikueueSecret2 = &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "multikueue2", + Namespace: managersConfigNamespace.Name, + }, + Data: map[string][]byte{ + kueuealpha.MultiKueueConfigSecretKey: w2Kubeconfig, + }, + } + gomega.Expect(managerTestCluster.client.Create(managerTestCluster.ctx, managerMultikueueSecret2)).To(gomega.Succeed()) + + multikueue1workerCluster1 = utiltesting.MakeMultiKueueCluster("worker11"). + ControllerName(mkCn1).KubeConfig(kueuealpha.SecretLocationType, managerMultikueueSecret1.Name).Obj() + gomega.Expect(managerTestCluster.client.Create(managerTestCluster.ctx, multikueue1workerCluster1)).To(gomega.Succeed()) + multikueue1workerCluster2 = utiltesting.MakeMultiKueueCluster("worker12"). + ControllerName(mkCn1).KubeConfig(kueuealpha.SecretLocationType, managerMultikueueSecret2.Name).Obj() + gomega.Expect(managerTestCluster.client.Create(managerTestCluster.ctx, multikueue1workerCluster2)).To(gomega.Succeed()) + + multikueue2workerCluster1 = utiltesting.MakeMultiKueueCluster("worker21"). + ControllerName(mkCn2).KubeConfig(kueuealpha.SecretLocationType, managerMultikueueSecret1.Name).Obj() + gomega.Expect(managerTestCluster.client.Create(managerTestCluster.ctx, multikueue2workerCluster1)).To(gomega.Succeed()) + multikueue2workerCluster2 = utiltesting.MakeMultiKueueCluster("worker22"). + ControllerName(mkCn2).KubeConfig(kueuealpha.SecretLocationType, managerMultikueueSecret2.Name).Obj() + gomega.Expect(managerTestCluster.client.Create(managerTestCluster.ctx, multikueue2workerCluster2)).To(gomega.Succeed()) + + ginkgo.By("create 2 multikueues working in parallel on the common multicluster env", func() { + managerMultiKueueConfig1 = utiltesting.MakeMultiKueueConfig("multikueueconfig1"). + Clusters(multikueue1workerCluster1.Name, multikueue1workerCluster2.Name).Obj() + gomega.Expect(managerTestCluster.client.Create(managerTestCluster.ctx, managerMultiKueueConfig1)).Should(gomega.Succeed()) + + managerMultiKueueConfig2 = utiltesting.MakeMultiKueueConfig("multikueueconfig2"). + Clusters(multikueue2workerCluster1.Name, multikueue2workerCluster2.Name).Obj() + gomega.Expect(managerTestCluster.client.Create(managerTestCluster.ctx, managerMultiKueueConfig2)).Should(gomega.Succeed()) + + multikueueAC1 = utiltesting.MakeAdmissionCheck("ac1"). + ControllerName(mkCn1). + Parameters(kueuealpha.GroupVersion.Group, "MultiKueueConfig", managerMultiKueueConfig1.Name). + Obj() + gomega.Expect(managerTestCluster.client.Create(managerTestCluster.ctx, multikueueAC1)).Should(gomega.Succeed()) + + multikueueAC2 = utiltesting.MakeAdmissionCheck("ac2"). + ControllerName(mkCn2). + Parameters(kueuealpha.GroupVersion.Group, "MultiKueueConfig", managerMultiKueueConfig2.Name). + Obj() + gomega.Expect(managerTestCluster.client.Create(managerTestCluster.ctx, multikueueAC2)).Should(gomega.Succeed()) + }) + + ginkgo.By("wait for check active for the multikueues", func() { + updatedAc1 := kueue.AdmissionCheck{} + acKey1 := client.ObjectKeyFromObject(multikueueAC1) + + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, acKey1, &updatedAc1)).To(gomega.Succeed()) + cond := apimeta.FindStatusCondition(updatedAc1.Status.Conditions, kueue.AdmissionCheckActive) + g.Expect(cond).NotTo(gomega.BeNil()) + g.Expect(cond.Status).To(gomega.Equal(metav1.ConditionTrue), "Reason: %s, Message: %q", cond.Reason, cond.Message) + }, util.LongTimeout, util.Interval).Should(gomega.Succeed()) + + updatedAc2 := kueue.AdmissionCheck{} + acKey2 := client.ObjectKeyFromObject(multikueueAC2) + + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, acKey2, &updatedAc2)).To(gomega.Succeed()) + cond := apimeta.FindStatusCondition(updatedAc2.Status.Conditions, kueue.AdmissionCheckActive) + g.Expect(cond).NotTo(gomega.BeNil()) + g.Expect(cond.Status).To(gomega.Equal(metav1.ConditionTrue), "Reason: %s, Message: %q", cond.Reason, cond.Message) + }, util.LongTimeout, util.Interval).Should(gomega.Succeed()) + }) + + managerCq1 = utiltesting.MakeClusterQueue("q1"). + AdmissionChecks(multikueueAC1.Name). + Obj() + gomega.Expect(managerTestCluster.client.Create(managerTestCluster.ctx, managerCq1)).Should(gomega.Succeed()) + + managerLq1 = utiltesting.MakeLocalQueue(managerCq1.Name, managerNs.Name).ClusterQueue(managerCq1.Name).Obj() + gomega.Expect(managerTestCluster.client.Create(managerTestCluster.ctx, managerLq1)).Should(gomega.Succeed()) + + managerCq2 = utiltesting.MakeClusterQueue("q2"). + AdmissionChecks(multikueueAC2.Name). + Obj() + gomega.Expect(managerTestCluster.client.Create(managerTestCluster.ctx, managerCq2)).Should(gomega.Succeed()) + + managerLq2 = utiltesting.MakeLocalQueue(managerCq2.Name, managerNs.Name).ClusterQueue(managerCq2.Name).Obj() + gomega.Expect(managerTestCluster.client.Create(managerTestCluster.ctx, managerLq2)).Should(gomega.Succeed()) + + worker1Cq = utiltesting.MakeClusterQueue("w1q1").Obj() + gomega.Expect(worker1TestCluster.client.Create(worker1TestCluster.ctx, worker1Cq)).Should(gomega.Succeed()) + worker1Lq = utiltesting.MakeLocalQueue(worker1Cq.Name, worker1Ns.Name).ClusterQueue(worker1Cq.Name).Obj() + gomega.Expect(worker1TestCluster.client.Create(worker1TestCluster.ctx, worker1Lq)).Should(gomega.Succeed()) + + worker2Cq = utiltesting.MakeClusterQueue("w2q1").Obj() + gomega.Expect(worker2TestCluster.client.Create(worker2TestCluster.ctx, worker2Cq)).Should(gomega.Succeed()) + worker2Lq = utiltesting.MakeLocalQueue(worker2Cq.Name, worker2Ns.Name).ClusterQueue(worker2Cq.Name).Obj() + gomega.Expect(worker2TestCluster.client.Create(worker2TestCluster.ctx, worker2Lq)).Should(gomega.Succeed()) + }) + + ginkgo.AfterEach(func() { + gomega.Expect(util.DeleteNamespace(managerTestCluster.ctx, managerTestCluster.client, managerNs)).To(gomega.Succeed()) + gomega.Expect(util.DeleteNamespace(worker1TestCluster.ctx, worker1TestCluster.client, worker1Ns)).To(gomega.Succeed()) + gomega.Expect(util.DeleteNamespace(worker2TestCluster.ctx, worker2TestCluster.client, worker2Ns)).To(gomega.Succeed()) + util.ExpectClusterQueueToBeDeleted(managerTestCluster.ctx, managerTestCluster.client, managerCq1, true) + util.ExpectClusterQueueToBeDeleted(managerTestCluster.ctx, managerTestCluster.client, managerCq2, true) + util.ExpectClusterQueueToBeDeleted(worker1TestCluster.ctx, worker1TestCluster.client, worker1Cq, true) + util.ExpectClusterQueueToBeDeleted(worker2TestCluster.ctx, worker2TestCluster.client, worker2Cq, true) + util.ExpectAdmissionCheckToBeDeleted(managerTestCluster.ctx, managerTestCluster.client, multikueueAC1, true) + util.ExpectAdmissionCheckToBeDeleted(managerTestCluster.ctx, managerTestCluster.client, multikueueAC2, true) + gomega.Expect(managerTestCluster.client.Delete(managerTestCluster.ctx, managerMultiKueueConfig1)).To(gomega.Succeed()) + gomega.Expect(managerTestCluster.client.Delete(managerTestCluster.ctx, managerMultiKueueConfig2)).To(gomega.Succeed()) + gomega.Expect(managerTestCluster.client.Delete(managerTestCluster.ctx, multikueue1workerCluster1)).To(gomega.Succeed()) + gomega.Expect(managerTestCluster.client.Delete(managerTestCluster.ctx, multikueue1workerCluster2)).To(gomega.Succeed()) + gomega.Expect(managerTestCluster.client.Delete(managerTestCluster.ctx, multikueue2workerCluster1)).To(gomega.Succeed()) + gomega.Expect(managerTestCluster.client.Delete(managerTestCluster.ctx, multikueue2workerCluster2)).To(gomega.Succeed()) + gomega.Expect(managerTestCluster.client.Delete(managerTestCluster.ctx, managerMultikueueSecret1)).To(gomega.Succeed()) + gomega.Expect(managerTestCluster.client.Delete(managerTestCluster.ctx, managerMultikueueSecret2)).To(gomega.Succeed()) + }) + + ginkgo.It("Should run a workload specific to the multkiueue that has dedicated adapters", func() { + if managerK8sVersion.LessThan(versionutil.MustParseSemantic("1.30.0")) { + ginkgo.Skip("the managers kubernetes version is less then 1.30") + } + ginkgo.DeferCleanup(features.SetFeatureGateDuringTest(featuregatesT, features.MultiKueueBatchJobWithManagedBy, true)) + + jobSet := testingjobset.MakeJobSet("job-set", managerNs.Name). + Queue(managerLq2.Name). + ManagedBy(mkCn2). + ReplicatedJobs( + testingjobset.ReplicatedJobRequirements{ + Name: "replicated-job-1", + Replicas: 1, + Completions: 1, + Parallelism: 1, + }, + ). + Obj() + gomega.Expect(managerTestCluster.client.Create(managerTestCluster.ctx, jobSet)).Should(gomega.Succeed()) + + wlLookupKeyJobSet := types.NamespacedName{Name: workloadjobset.GetWorkloadNameForJobSet(jobSet.Name, jobSet.UID), Namespace: managerNs.Name} + admission := utiltesting.MakeAdmission(managerCq2.Name).PodSets( + kueue.PodSetAssignment{ + Name: "replicated-job-1", + }, + ).Obj() + + ginkgo.By("setting workload reservation in the management cluster for jobset", func() { + createdWorkloadJobSet := &kueue.Workload{} + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, wlLookupKeyJobSet, createdWorkloadJobSet)).To(gomega.Succeed()) + g.Expect(util.SetQuotaReservation(managerTestCluster.ctx, managerTestCluster.client, createdWorkloadJobSet, admission)).To(gomega.Succeed()) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + }) + + ginkgo.By("checking the workload creation in the worker clusters", func() { + managerWl := &kueue.Workload{} + createdWorkloadJobSet := &kueue.Workload{} + gomega.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, wlLookupKeyJobSet, managerWl)).To(gomega.Succeed()) + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(worker2TestCluster.client.Get(worker2TestCluster.ctx, wlLookupKeyJobSet, createdWorkloadJobSet)).To(gomega.Succeed()) + g.Expect(createdWorkloadJobSet.Spec).To(gomega.BeComparableTo(managerWl.Spec)) + g.Expect(createdWorkloadJobSet.Labels[kueuealpha.MultiKueueOriginLabel]).To(gomega.Equal(originMkCn2)) + + g.Expect(worker1TestCluster.client.Get(worker1TestCluster.ctx, wlLookupKeyJobSet, createdWorkloadJobSet)).To(gomega.Succeed()) + g.Expect(createdWorkloadJobSet.Spec).To(gomega.BeComparableTo(managerWl.Spec)) + g.Expect(createdWorkloadJobSet.Labels[kueuealpha.MultiKueueOriginLabel]).To(gomega.Equal(originMkCn2)) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + }) + + job := testingjob.MakeJob("job", managerNs.Name). + Queue(managerLq1.Name). + ManagedBy(mkCn1). + Obj() + gomega.Expect(managerTestCluster.client.Create(managerTestCluster.ctx, job)).Should(gomega.Succeed()) + + createdWorkload := &kueue.Workload{} + wlLookupKey := types.NamespacedName{Name: workloadjob.GetWorkloadNameForJob(job.Name, job.UID), Namespace: managerNs.Name} + + ginkgo.By("setting workload reservation in the management cluster for job", func() { + admission := utiltesting.MakeAdmission(managerCq1.Name).Obj() + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed()) + g.Expect(util.SetQuotaReservation(managerTestCluster.ctx, managerTestCluster.client, createdWorkload, admission)).To(gomega.Succeed()) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + }) + + ginkgo.By("checking the workload creation in the worker clusters", func() { + managerWl := &kueue.Workload{} + gomega.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, wlLookupKey, managerWl)).To(gomega.Succeed()) + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(worker1TestCluster.client.Get(worker1TestCluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed()) + g.Expect(createdWorkload.Spec).To(gomega.BeComparableTo(managerWl.Spec)) + g.Expect(createdWorkload.Labels[kueuealpha.MultiKueueOriginLabel]).To(gomega.Equal(originMkCn1)) + g.Expect(worker2TestCluster.client.Get(worker2TestCluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed()) + g.Expect(createdWorkload.Spec).To(gomega.BeComparableTo(managerWl.Spec)) + g.Expect(createdWorkload.Labels[kueuealpha.MultiKueueOriginLabel]).To(gomega.Equal(originMkCn1)) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + }) + + ginkgo.By("setting workload reservation in MultiKueue1, the job is created in one of the workers", func() { + admission := utiltesting.MakeAdmission(managerCq1.Name).Obj() + + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(worker1TestCluster.client.Get(worker1TestCluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed()) + g.Expect(util.SetQuotaReservation(worker1TestCluster.ctx, worker1TestCluster.client, createdWorkload, admission)).To(gomega.Succeed()) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + + gomega.Eventually(func(g gomega.Gomega) { + createdJob := batchv1.Job{} + g.Expect(worker1TestCluster.client.Get(worker1TestCluster.ctx, client.ObjectKeyFromObject(job), &createdJob)).To(gomega.Succeed()) + g.Expect(createdJob.Labels[kueuealpha.MultiKueueOriginLabel]).To(gomega.Equal(originMkCn1)) + + g.Expect(worker2TestCluster.client.Get(worker2TestCluster.ctx, client.ObjectKeyFromObject(job), &createdJob)).ToNot(gomega.Succeed()) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + }) + + ginkgo.By("setting workload reservation in MultiKueue2, the jobSet is created in one of the workers", func() { + admission := utiltesting.MakeAdmission(managerCq2.Name).PodSets( + kueue.PodSetAssignment{ + Name: "replicated-job-1", + }, + ).Obj() + + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(worker2TestCluster.client.Get(worker2TestCluster.ctx, wlLookupKeyJobSet, createdWorkload)).To(gomega.Succeed()) + g.Expect(util.SetQuotaReservation(worker2TestCluster.ctx, worker2TestCluster.client, createdWorkload, admission)).To(gomega.Succeed()) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + + gomega.Eventually(func(g gomega.Gomega) { + createdJobSet := jobset.JobSet{} + g.Expect(worker2TestCluster.client.Get(worker2TestCluster.ctx, client.ObjectKeyFromObject(jobSet), &createdJobSet)).To(gomega.Succeed()) + g.Expect(createdJobSet.Labels[kueuealpha.MultiKueueOriginLabel]).To(gomega.Equal(originMkCn2)) + + g.Expect(worker1TestCluster.client.Get(worker1TestCluster.ctx, client.ObjectKeyFromObject(jobSet), &createdJobSet)).ToNot(gomega.Succeed()) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + }) + + ginkgo.By("finishing the worker jobSet, the manager's wl is marked as finished and the worker2 wl removed", func() { + gomega.Eventually(func(g gomega.Gomega) { + createdJobSet := jobset.JobSet{} + g.Expect(worker2TestCluster.client.Get(worker2TestCluster.ctx, client.ObjectKeyFromObject(jobSet), &createdJobSet)).To(gomega.Succeed()) + apimeta.SetStatusCondition(&createdJobSet.Status.Conditions, metav1.Condition{ + Type: string(jobset.JobSetCompleted), + Status: metav1.ConditionTrue, + Reason: kueue.WorkloadFinishedReasonSucceeded, + Message: "JobSet finished successfully", + }) + g.Expect(worker2TestCluster.client.Status().Update(worker2TestCluster.ctx, &createdJobSet)).To(gomega.Succeed()) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + + createdWorkload := &kueue.Workload{} + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, wlLookupKeyJobSet, createdWorkload)).To(gomega.Succeed()) + g.Expect(apimeta.FindStatusCondition(createdWorkload.Status.Conditions, kueue.WorkloadFinished)).To(gomega.BeComparableTo(&metav1.Condition{ + Type: kueue.WorkloadFinished, + Status: metav1.ConditionTrue, + Reason: kueue.WorkloadFinishedReasonSucceeded, + Message: "JobSet finished successfully", + }, util.IgnoreConditionTimestampsAndObservedGeneration)) + }, util.LongTimeout, util.Interval).Should(gomega.Succeed()) + + gomega.Eventually(func(g gomega.Gomega) { + createdWorkload := &kueue.Workload{} + g.Expect(worker2TestCluster.client.Get(worker2TestCluster.ctx, wlLookupKeyJobSet, createdWorkload)).To(utiltesting.BeNotFoundError()) + }, util.LongTimeout, util.Interval).Should(gomega.Succeed()) + }) + + ginkgo.By("finishing the worker job, the manager's wl is marked as finished and the worker1 wl removed", func() { + gomega.Eventually(func(g gomega.Gomega) { + createdJob := batchv1.Job{} + g.Expect(worker1TestCluster.client.Get(worker1TestCluster.ctx, client.ObjectKeyFromObject(job), &createdJob)).To(gomega.Succeed()) + createdJob.Status.Conditions = append(createdJob.Status.Conditions, batchv1.JobCondition{ + Type: batchv1.JobComplete, + Status: corev1.ConditionTrue, + LastProbeTime: metav1.Now(), + LastTransitionTime: metav1.Now(), + Message: "Job finished successfully", + }) + createdJob.Status.StartTime = ptr.To(metav1.Now()) + createdJob.Status.CompletionTime = ptr.To(metav1.Now()) + g.Expect(worker1TestCluster.client.Status().Update(worker1TestCluster.ctx, &createdJob)).To(gomega.Succeed()) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed()) + + g.Expect(apimeta.FindStatusCondition(createdWorkload.Status.Conditions, kueue.WorkloadFinished)).To(gomega.BeComparableTo(&metav1.Condition{ + Type: kueue.WorkloadFinished, + Status: metav1.ConditionTrue, + Reason: kueue.WorkloadFinishedReasonSucceeded, + Message: "Job finished successfully", + }, util.IgnoreConditionTimestampsAndObservedGeneration)) + }, util.LongTimeout, util.Interval).Should(gomega.Succeed()) + + gomega.Eventually(func(g gomega.Gomega) { + createdWorkload := &kueue.Workload{} + g.Expect(worker1TestCluster.client.Get(worker1TestCluster.ctx, wlLookupKey, createdWorkload)).To(utiltesting.BeNotFoundError()) + }, util.LongTimeout, util.Interval).Should(gomega.Succeed()) + }) + }) +}) diff --git a/test/integration/multikueue/suite_test.go b/test/integration/multikueue/suite_test.go index de7a6ec614..426247bbdd 100644 --- a/test/integration/multikueue/suite_test.go +++ b/test/integration/multikueue/suite_test.go @@ -20,6 +20,7 @@ import ( "context" "os" "path/filepath" + "strings" "testing" "time" @@ -39,6 +40,7 @@ import ( "sigs.k8s.io/kueue/pkg/controller/admissionchecks/multikueue" "sigs.k8s.io/kueue/pkg/controller/core" "sigs.k8s.io/kueue/pkg/controller/core/indexer" + "sigs.k8s.io/kueue/pkg/controller/jobframework" workloadjob "sigs.k8s.io/kueue/pkg/controller/jobs/job" workloadjobset "sigs.k8s.io/kueue/pkg/controller/jobs/jobset" "sigs.k8s.io/kueue/pkg/queue" @@ -147,27 +149,72 @@ func managerAndMultiKueueSetup(mgr manager.Manager, ctx context.Context, gcInter } gomega.Expect(mgr.GetClient().Create(ctx, managersConfigNamespace)).To(gomega.Succeed()) - err := multikueue.SetupIndexer(ctx, mgr.GetFieldIndexer(), managersConfigNamespace.Name) + var err error + err = multikueue.SetupIndexer(ctx, mgr.GetFieldIndexer(), managersConfigNamespace.Name) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + adapters, err := jobframework.GetMultiKueueAdapters() gomega.Expect(err).NotTo(gomega.HaveOccurred()) err = multikueue.SetupControllers(mgr, managersConfigNamespace.Name, multikueue.WithGCInterval(gcInterval), multikueue.WithWorkerLostTimeout(testingWorkerLostTimeout), multikueue.WithEventsBatchPeriod(100*time.Millisecond), + multikueue.WithAdapters(adapters), ) gomega.Expect(err).NotTo(gomega.HaveOccurred()) } -func multiclusterSetup(gcInterval time.Duration) { +type controllerNameToAdapter map[string]jobframework.MultiKueueAdapter + +func managerAndMultiKueueSetupWithAdapters(mgr manager.Manager, ctx context.Context, adapters controllerNameToAdapter) { + managerSetup(mgr, ctx) + + managersConfigNamespace = &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: "kueue-system", + }, + } + gomega.Expect(mgr.GetClient().Create(ctx, managersConfigNamespace)).To(gomega.Succeed()) + + err := multikueue.SetupIndexer(ctx, mgr.GetFieldIndexer(), managersConfigNamespace.Name) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + for cn, a := range adapters { + // create origin as controllerName without the domain + origin := strings.Split(cn, "/")[1] + err = multikueue.SetupControllers(mgr, managersConfigNamespace.Name, + multikueue.WithGCInterval(2*time.Second), + multikueue.WithWorkerLostTimeout(testingWorkerLostTimeout), + multikueue.WithEventsBatchPeriod(100*time.Millisecond), + multikueue.WithControllerName(cn), + multikueue.WithOrigin(origin), + multikueue.WithAdapter(a), + ) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + } +} + +func MultiKueueWithGCInterval(gcInterval time.Duration) func(mgr manager.Manager, ctx context.Context) { + return func(mgr manager.Manager, ctx context.Context) { + managerAndMultiKueueSetup(mgr, ctx, gcInterval) + } +} + +func MultiKueueWithCustomAdapters(adapters controllerNameToAdapter) func(mgr manager.Manager, ctx context.Context) { + return func(mgr manager.Manager, ctx context.Context) { + managerAndMultiKueueSetupWithAdapters(mgr, ctx, adapters) + } +} + +func multiclusterSetup(multiKueueSetup func(mgr manager.Manager, ctx context.Context)) { var managerFeatureGates []string version, err := versionutil.ParseGeneric(os.Getenv("ENVTEST_K8S_VERSION")) if err != nil || !version.LessThan(versionutil.MustParseSemantic("1.30.0")) { managerFeatureGates = []string{"JobManagedBy=true"} } - managerTestCluster = createCluster(func(mgr manager.Manager, ctx context.Context) { - managerAndMultiKueueSetup(mgr, ctx, gcInterval) - }, managerFeatureGates...) + managerTestCluster = createCluster(multiKueueSetup, managerFeatureGates...) worker1TestCluster = createCluster(managerSetup) worker2TestCluster = createCluster(managerSetup) From c1e4244cc6d8afe8f728f75e86c68d2da41c7f25 Mon Sep 17 00:00:00 2001 From: Michal Szadkowski Date: Thu, 20 Jun 2024 16:12:39 +0200 Subject: [PATCH 5/5] pass controllerName to IsJobManagedByKueue fix typos --- pkg/controller/admissionchecks/multikueue/controllers.go | 4 ++-- pkg/controller/admissionchecks/multikueue/workload.go | 2 +- pkg/controller/jobframework/interface.go | 2 +- pkg/controller/jobs/job/job_multikueue_adapter.go | 4 ++-- pkg/controller/jobs/job/job_multikueue_adapter_test.go | 8 ++++---- pkg/controller/jobs/jobset/jobset_multikueue_adapter.go | 4 ++-- .../jobs/jobset/jobset_multikueue_adapter_test.go | 6 +++--- 7 files changed, 15 insertions(+), 15 deletions(-) diff --git a/pkg/controller/admissionchecks/multikueue/controllers.go b/pkg/controller/admissionchecks/multikueue/controllers.go index b8bdbfd77c..27222830e9 100644 --- a/pkg/controller/admissionchecks/multikueue/controllers.go +++ b/pkg/controller/admissionchecks/multikueue/controllers.go @@ -83,14 +83,14 @@ func WithControllerName(controllerName string) SetupOption { } } -// WithAdapters - sets all the MultiKueue adaptors. +// WithAdapters - sets all the MultiKueue adapters. func WithAdapters(adapters map[string]jobframework.MultiKueueAdapter) SetupOption { return func(o *SetupOptions) { o.adapters = adapters } } -// WithAdapters - sets or updates the adadpter of the MultiKueue adaptors. +// WithAdapter - sets or updates the adapter of the MultiKueue adapters. func WithAdapter(adapter jobframework.MultiKueueAdapter) SetupOption { return func(o *SetupOptions) { o.adapters[adapter.GVK().String()] = adapter diff --git a/pkg/controller/admissionchecks/multikueue/workload.go b/pkg/controller/admissionchecks/multikueue/workload.go index b36240c8b5..4f71093141 100644 --- a/pkg/controller/admissionchecks/multikueue/workload.go +++ b/pkg/controller/admissionchecks/multikueue/workload.go @@ -181,7 +181,7 @@ func (w *wlReconciler) Reconcile(ctx context.Context, req reconcile.Request) (re // If the workload is deleted there is a chance that it's owner is also deleted. In that case // we skip calling `IsJobManagedByKueue` as its output would not be reliable. if !isDeleted { - managed, unmanagedReason, err := adapter.IsJobManagedByKueue(ctx, w.client, types.NamespacedName{Name: owner.Name, Namespace: wl.Namespace}) + managed, unmanagedReason, err := adapter.IsJobManagedByKueue(ctx, w.client, types.NamespacedName{Name: owner.Name, Namespace: wl.Namespace}, w.controllerName) if err != nil { return reconcile.Result{}, err } diff --git a/pkg/controller/jobframework/interface.go b/pkg/controller/jobframework/interface.go index 76938ed5e0..5c00b793a4 100644 --- a/pkg/controller/jobframework/interface.go +++ b/pkg/controller/jobframework/interface.go @@ -160,7 +160,7 @@ type MultiKueueAdapter interface { // - a bool indicating if the job object identified by key is managed by kueue and can be delegated. // - a reason indicating why the job is not managed by Kueue // - any API error encountered during the check - IsJobManagedByKueue(ctx context.Context, localClient client.Client, key types.NamespacedName) (bool, string, error) + IsJobManagedByKueue(ctx context.Context, localClient client.Client, key types.NamespacedName, controllerName string) (bool, string, error) // KeepAdmissionCheckPending returns true if the state of the multikueue admission check should be // kept Pending while the job runs in a worker. This might be needed to keep the managers job // suspended and not start the execution locally. diff --git a/pkg/controller/jobs/job/job_multikueue_adapter.go b/pkg/controller/jobs/job/job_multikueue_adapter.go index b07b7a4f0c..7dffa72b7c 100644 --- a/pkg/controller/jobs/job/job_multikueue_adapter.go +++ b/pkg/controller/jobs/job/job_multikueue_adapter.go @@ -115,7 +115,7 @@ func (b *multikueueAdapter) KeepAdmissionCheckPending() bool { return !features.Enabled(features.MultiKueueBatchJobWithManagedBy) } -func (b *multikueueAdapter) IsJobManagedByKueue(ctx context.Context, c client.Client, key types.NamespacedName) (bool, string, error) { +func (b *multikueueAdapter) IsJobManagedByKueue(ctx context.Context, c client.Client, key types.NamespacedName, controllerName string) (bool, string, error) { if !features.Enabled(features.MultiKueueBatchJobWithManagedBy) { return true, "", nil } @@ -126,7 +126,7 @@ func (b *multikueueAdapter) IsJobManagedByKueue(ctx context.Context, c client.Cl return false, "", err } jobControllerName := ptr.Deref(job.Spec.ManagedBy, "") - if jobControllerName != kueuealpha.MultiKueueControllerName { + if jobControllerName != controllerName { return false, fmt.Sprintf("Expecting spec.managedBy to be %q not %q", kueuealpha.MultiKueueControllerName, jobControllerName), nil } return true, "", nil diff --git a/pkg/controller/jobs/job/job_multikueue_adapter_test.go b/pkg/controller/jobs/job/job_multikueue_adapter_test.go index d8477df1e1..ca92a61bb6 100644 --- a/pkg/controller/jobs/job/job_multikueue_adapter_test.go +++ b/pkg/controller/jobs/job/job_multikueue_adapter_test.go @@ -205,7 +205,7 @@ func TestMultikueueAdapter(t *testing.T) { }, "missing job is not considered managed": { operation: func(ctx context.Context, adapter *multikueueAdapter, managerClient, workerClient client.Client) error { - if isManged, _, _ := adapter.IsJobManagedByKueue(ctx, managerClient, types.NamespacedName{Name: "job1", Namespace: TestNamespace}); isManged { + if isManged, _, _ := adapter.IsJobManagedByKueue(ctx, managerClient, types.NamespacedName{Name: "job1", Namespace: TestNamespace}, kueuealpha.MultiKueueControllerName); isManged { return errors.New("expecting false") } return nil @@ -216,7 +216,7 @@ func TestMultikueueAdapter(t *testing.T) { *baseJobBuilder.Clone().Obj(), }, operation: func(ctx context.Context, adapter *multikueueAdapter, managerClient, workerClient client.Client) error { - if isManged, _, _ := adapter.IsJobManagedByKueue(ctx, managerClient, types.NamespacedName{Name: "job1", Namespace: TestNamespace}); isManged { + if isManged, _, _ := adapter.IsJobManagedByKueue(ctx, managerClient, types.NamespacedName{Name: "job1", Namespace: TestNamespace}, kueuealpha.MultiKueueControllerName); isManged { return errors.New("expecting false") } return nil @@ -231,7 +231,7 @@ func TestMultikueueAdapter(t *testing.T) { *baseJobManagedByKueueBuilder.Clone().Obj(), }, operation: func(ctx context.Context, adapter *multikueueAdapter, managerClient, workerClient client.Client) error { - if isManged, _, _ := adapter.IsJobManagedByKueue(ctx, managerClient, types.NamespacedName{Name: "job1", Namespace: TestNamespace}); !isManged { + if isManged, _, _ := adapter.IsJobManagedByKueue(ctx, managerClient, types.NamespacedName{Name: "job1", Namespace: TestNamespace}, kueuealpha.MultiKueueControllerName); !isManged { return errors.New("expecting true") } return nil @@ -246,7 +246,7 @@ func TestMultikueueAdapter(t *testing.T) { }, withoutJobManagedBy: true, operation: func(ctx context.Context, adapter *multikueueAdapter, managerClient, workerClient client.Client) error { - if isManged, _, _ := adapter.IsJobManagedByKueue(ctx, managerClient, types.NamespacedName{Name: "job1", Namespace: TestNamespace}); !isManged { + if isManged, _, _ := adapter.IsJobManagedByKueue(ctx, managerClient, types.NamespacedName{Name: "job1", Namespace: TestNamespace}, kueuealpha.MultiKueueControllerName); !isManged { return errors.New("expecting true") } return nil diff --git a/pkg/controller/jobs/jobset/jobset_multikueue_adapter.go b/pkg/controller/jobs/jobset/jobset_multikueue_adapter.go index 4dbd31b040..53403a69d2 100644 --- a/pkg/controller/jobs/jobset/jobset_multikueue_adapter.go +++ b/pkg/controller/jobs/jobset/jobset_multikueue_adapter.go @@ -88,14 +88,14 @@ func (b *multikueueAdapter) KeepAdmissionCheckPending() bool { return false } -func (b *multikueueAdapter) IsJobManagedByKueue(ctx context.Context, c client.Client, key types.NamespacedName) (bool, string, error) { +func (b *multikueueAdapter) IsJobManagedByKueue(ctx context.Context, c client.Client, key types.NamespacedName, controllerName string) (bool, string, error) { js := jobset.JobSet{} err := c.Get(ctx, key, &js) if err != nil { return false, "", err } jobsetControllerName := ptr.Deref(js.Spec.ManagedBy, "") - if jobsetControllerName != kueuealpha.MultiKueueControllerName { + if jobsetControllerName != controllerName { return false, fmt.Sprintf("Expecting spec.managedBy to be %q not %q", kueuealpha.MultiKueueControllerName, jobsetControllerName), nil } return true, "", nil diff --git a/pkg/controller/jobs/jobset/jobset_multikueue_adapter_test.go b/pkg/controller/jobs/jobset/jobset_multikueue_adapter_test.go index 78fc264af5..c0cfc91397 100644 --- a/pkg/controller/jobs/jobset/jobset_multikueue_adapter_test.go +++ b/pkg/controller/jobs/jobset/jobset_multikueue_adapter_test.go @@ -164,7 +164,7 @@ func TestMultikueueAdapter(t *testing.T) { }, "missing jobset is not considered managed": { operation: func(ctx context.Context, adapter *multikueueAdapter, managerClient, workerClient client.Client) error { - if isManged, _, _ := adapter.IsJobManagedByKueue(ctx, managerClient, types.NamespacedName{Name: "jobset1", Namespace: TestNamespace}); isManged { + if isManged, _, _ := adapter.IsJobManagedByKueue(ctx, managerClient, types.NamespacedName{Name: "jobset1", Namespace: TestNamespace}, kueuealpha.MultiKueueControllerName); isManged { return errors.New("expecting false") } return nil @@ -175,7 +175,7 @@ func TestMultikueueAdapter(t *testing.T) { *baseJobSetBuilder.DeepCopy().Obj(), }, operation: func(ctx context.Context, adapter *multikueueAdapter, managerClient, workerClient client.Client) error { - if isManged, _, _ := adapter.IsJobManagedByKueue(ctx, managerClient, types.NamespacedName{Name: "jobset1", Namespace: TestNamespace}); isManged { + if isManged, _, _ := adapter.IsJobManagedByKueue(ctx, managerClient, types.NamespacedName{Name: "jobset1", Namespace: TestNamespace}, kueuealpha.MultiKueueControllerName); isManged { return errors.New("expecting false") } return nil @@ -190,7 +190,7 @@ func TestMultikueueAdapter(t *testing.T) { *baseJobSetManagedByKueueBuilder.DeepCopy().Obj(), }, operation: func(ctx context.Context, adapter *multikueueAdapter, managerClient, workerClient client.Client) error { - if isManged, _, _ := adapter.IsJobManagedByKueue(ctx, managerClient, types.NamespacedName{Name: "jobset1", Namespace: TestNamespace}); !isManged { + if isManged, _, _ := adapter.IsJobManagedByKueue(ctx, managerClient, types.NamespacedName{Name: "jobset1", Namespace: TestNamespace}, kueuealpha.MultiKueueControllerName); !isManged { return errors.New("expecting true") } return nil