Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[multikueue] add support for custom multikueue controller #2405

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions apis/kueue/v1alpha1/multikueue_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,14 @@ type KubeConfig struct {
}

type MultiKueueClusterSpec struct {
// controllerName is name of the controller which will actually perform
// the checks. This is the name with which controller identifies with,
// not necessarily a K8S Pod or Deployment name. Cannot be empty.
// +kubebuilder:validation:Required
// +kubebuilder:default="kueue.x-k8s.io/multikueue"
// +kubebuilder:validation:XValidation:rule="self == oldSelf", message="field is immutable"
ControllerName string `json:"controllerName"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We deliberately didn't make it configurable if the first iteration to limit the potential of misconfiguring the system. Could you first open the issue to discuss the need for it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tenzen-y is this change required in your understanding of #2349?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The status of a multikueue cluster follows the state of a contreller-runtime client in the kueue-controller-manager, if we want to be able to have multiple controller managers working with the clusters we need a way to mark who is managing each cluster.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, but I'm wondering if we could alternatively still use MultiKueue controller, but have a concept of external plugin?

To make it easier to track of an object is managed by MultiKueue or not. I feel this would help OnCall, but maybe I need to think about this more.

Since this required API changes should we not first go via KEP to discuss alternatives? WDYT @alculquicondor, @tenzen-y?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we want to be able to have multiple controller managers working with the clusters we need a way to mark who is managing each cluster.

There should only be one controller manager modifying these objects (Kueue).

Other controller managers should only be in charge of mirroring the Job CRDs, but otherwise they can use all the same objects.

I would like to see at least a description of what the plan is, how components should interact, what each of them is responsible of. Maybe a KEP is worth, but please start with some description before writing more code.

Copy link
Contributor

@trasc trasc Jun 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, but I'm wondering if we could alternatively still use MultiKueue controller, but have a concept of external plugin?

That will be a lot more complicated and will most likely require some additional API object type(s) to communicate between that custom controller to multikueue admission check controller.

The target of this PR is to be able to instantiate multikueue admission check controllers having different sets of adapters that can work in the same cluster at the same time.

There should only be one controller manager modifying these objects (Kueue).

MultiKueueClusters should not be that different then AdmissionChecks, which by design are managed by an AdmissionCheckController running in or outside of Kueue.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current approach will lose the consistency between the kueue-manager and external controllers since it seems that the current approach allows us to re-implement the multikueue admission controller.

Can we consider an alternative approach to prevent the losing specification consistency?

As I described in the #2349, nice to start in the synchronizing design and the actual implementations.


// Information how to connect to the cluster.
KubeConfig KubeConfig `json:"kubeConfig"`
}
Expand Down
11 changes: 11 additions & 0 deletions charts/kueue/templates/crd/kueue.x-k8s.io_multikueueclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,16 @@ spec:
type: object
spec:
properties:
controllerName:
default: kueue.x-k8s.io/multikueue
description: |-
controllerName is name of the controller which will actually perform
the checks. This is the name with which controller identifies with,
not necessarily a K8S Pod or Deployment name. Cannot be empty.
type: string
x-kubernetes-validations:
- message: field is immutable
rule: self == oldSelf
kubeConfig:
description: Information how to connect to the cluster.
properties:
Expand All @@ -76,6 +86,7 @@ spec:
- locationType
type: object
required:
- controllerName
- kubeConfig
type: object
status:
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions cmd/kueue/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,10 +256,18 @@ func setupControllers(mgr ctrl.Manager, cCache *cache.Cache, queues *queue.Manag
}

if features.Enabled(features.MultiKueue) {
adapters, err := jobframework.GetMultiKueueAdapters()
if err != nil {
setupLog.Error(err, "Could not get the multikueue adapters")
os.Exit(1)
}

if err := multikueue.SetupControllers(mgr, *cfg.Namespace,
multikueue.WithGCInterval(cfg.MultiKueue.GCInterval.Duration),
multikueue.WithOrigin(ptr.Deref(cfg.MultiKueue.Origin, configapi.DefaultMultiKueueOrigin)),
multikueue.WithWorkerLostTimeout(cfg.MultiKueue.WorkerLostTimeout.Duration),
multikueue.WithControllerName(kueuealpha.MultiKueueControllerName),
multikueue.WithAdapters(adapters),
); err != nil {
setupLog.Error(err, "Could not setup MultiKueue controller")
os.Exit(1)
Expand Down
11 changes: 11 additions & 0 deletions config/components/crd/bases/kueue.x-k8s.io_multikueueclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,16 @@ spec:
type: object
spec:
properties:
controllerName:
default: kueue.x-k8s.io/multikueue
description: |-
controllerName is name of the controller which will actually perform
the checks. This is the name with which controller identifies with,
not necessarily a K8S Pod or Deployment name. Cannot be empty.
type: string
x-kubernetes-validations:
- message: field is immutable
rule: self == oldSelf
kubeConfig:
description: Information how to connect to the cluster.
properties:
Expand All @@ -61,6 +71,7 @@ spec:
- locationType
type: object
required:
- controllerName
- kubeConfig
type: object
status:
Expand Down
19 changes: 11 additions & 8 deletions pkg/controller/admissionchecks/multikueue/admissioncheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,19 +52,20 @@ func newMultiKueueStoreHelper(c client.Client) (*multiKueueStoreHelper, error) {
}

// ACReconciler implements the reconciler for all the admission checks controlled by multikueue.
// Its main task being to maintain the active state of the admission checks based on the heath
// Its main task being to maintain the active state of the admission checks based on the health
// of its referenced MultiKueueClusters.
type ACReconciler struct {
client client.Client
helper *multiKueueStoreHelper
controllerName string
client client.Client
helper *multiKueueStoreHelper
}

var _ reconcile.Reconciler = (*ACReconciler)(nil)

func (a *ACReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
log := ctrl.LoggerFrom(ctx)
ac := &kueue.AdmissionCheck{}
if err := a.client.Get(ctx, req.NamespacedName, ac); err != nil || ac.Spec.ControllerName != kueuealpha.MultiKueueControllerName {
if err := a.client.Get(ctx, req.NamespacedName, ac); err != nil || ac.Spec.ControllerName != a.controllerName {
return reconcile.Result{}, client.IgnoreNotFound(err)
}

Expand Down Expand Up @@ -96,7 +97,8 @@ func (a *ACReconciler) Reconcile(ctx context.Context, req reconcile.Request) (re

if err != nil {
missingClusters = append(missingClusters, clusterName)
} else if !apimeta.IsStatusConditionTrue(cluster.Status.Conditions, kueuealpha.MultiKueueClusterActive) {
} else if !apimeta.IsStatusConditionTrue(cluster.Status.Conditions, kueuealpha.MultiKueueClusterActive) ||
ac.Spec.ControllerName != cluster.Spec.ControllerName {
inactiveClusters = append(inactiveClusters, clusterName)
}
}
Expand Down Expand Up @@ -166,10 +168,11 @@ func (a *ACReconciler) Reconcile(ctx context.Context, req reconcile.Request) (re
// +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=admissionchecks,verbs=get;list;watch
// +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=multikueueconfigs,verbs=get;list;watch

func newACReconciler(c client.Client, helper *multiKueueStoreHelper) *ACReconciler {
func newACReconciler(c client.Client, helper *multiKueueStoreHelper, so SetupOptions) *ACReconciler {
return &ACReconciler{
client: c,
helper: helper,
controllerName: so.controllerName,
client: c,
helper: helper,
}
}

Expand Down
49 changes: 48 additions & 1 deletion pkg/controller/admissionchecks/multikueue/admissioncheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package multikueue

import (
"fmt"
"testing"

"github.com/google/go-cmp/cmp"
Expand Down Expand Up @@ -123,6 +124,7 @@ func TestReconcile(t *testing.T) {
},
clusters: []kueuealpha.MultiKueueCluster{
*utiltesting.MakeMultiKueueCluster("worker1").
ControllerName(kueuealpha.MultiKueueControllerName).
Active(metav1.ConditionFalse, "ByTest", "by test", 1).
Obj(),
},
Expand Down Expand Up @@ -156,9 +158,11 @@ func TestReconcile(t *testing.T) {
},
clusters: []kueuealpha.MultiKueueCluster{
*utiltesting.MakeMultiKueueCluster("worker1").
ControllerName(kueuealpha.MultiKueueControllerName).
Active(metav1.ConditionFalse, "ByTest", "by test", 1).
Obj(),
*utiltesting.MakeMultiKueueCluster("worker2").
ControllerName(kueuealpha.MultiKueueControllerName).
Active(metav1.ConditionFalse, "ByTest", "by test", 1).
Obj(),
},
Expand Down Expand Up @@ -192,9 +196,11 @@ func TestReconcile(t *testing.T) {
},
clusters: []kueuealpha.MultiKueueCluster{
*utiltesting.MakeMultiKueueCluster("worker1").
ControllerName(kueuealpha.MultiKueueControllerName).
Active(metav1.ConditionFalse, "ByTest", "by test", 1).
Obj(),
*utiltesting.MakeMultiKueueCluster("worker2").
ControllerName(kueuealpha.MultiKueueControllerName).
Active(metav1.ConditionTrue, "ByTest", "by test", 1).
Obj(),
},
Expand Down Expand Up @@ -228,6 +234,7 @@ func TestReconcile(t *testing.T) {
},
clusters: []kueuealpha.MultiKueueCluster{
*utiltesting.MakeMultiKueueCluster("worker1").
ControllerName(kueuealpha.MultiKueueControllerName).
Active(metav1.ConditionTrue, "ByTest", "by test", 1).
Obj(),
},
Expand All @@ -247,10 +254,49 @@ func TestReconcile(t *testing.T) {
Obj(),
},
},
"non default controller name": {
reconcileFor: "ac1",
checks: []kueue.AdmissionCheck{
*utiltesting.MakeAdmissionCheck("ac1").
ControllerName(kueuealpha.MultiKueueControllerName).
Parameters(kueuealpha.GroupVersion.Group, "MultiKueueConfig", "config1").
Generation(1).
Obj(),
},
configs: []kueuealpha.MultiKueueConfig{
*utiltesting.MakeMultiKueueConfig("config1").Clusters("worker1", "worker2").Obj(),
},
clusters: []kueuealpha.MultiKueueCluster{
*utiltesting.MakeMultiKueueCluster("worker1").
ControllerName("mkc1").
Active(metav1.ConditionTrue, "ByTest", "by test", 1).
Obj(),
*utiltesting.MakeMultiKueueCluster("worker2").
ControllerName("mkc1").
Active(metav1.ConditionTrue, "ByTest", "by test", 1).
Obj(),
},
wantChecks: []kueue.AdmissionCheck{
*utiltesting.MakeAdmissionCheck("ac1").
ControllerName(kueuealpha.MultiKueueControllerName).
Parameters(kueuealpha.GroupVersion.Group, "MultiKueueConfig", "config1").
SingleInstanceInClusterQueue(true, SingleInstanceReason, SingleInstanceMessage, 1).
ApplyToAllFlavors(true, FlavorIndependentCheckReason, FlavorIndependentCheckMessage, 1).
Condition(metav1.Condition{
Type: kueue.AdmissionCheckActive,
Status: metav1.ConditionFalse,
Reason: "NoUsableClusters",
Message: "Inactive clusters: [worker1 worker2]",
ObservedGeneration: 1,
}).
Obj(),
},
},
}

for name, tc := range cases {
t.Run(name, func(t *testing.T) {
fmt.Println(name)
builder, ctx := getClientBuilder()

builder = builder.WithLists(
Expand All @@ -266,7 +312,8 @@ func TestReconcile(t *testing.T) {
c := builder.Build()

helper, _ := newMultiKueueStoreHelper(c)
reconciler := newACReconciler(c, helper)
setupOptions := NewSetupOptions()
reconciler := newACReconciler(c, helper, *setupOptions)

_, gotErr := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: types.NamespacedName{Name: tc.reconcileFor}})
if diff := cmp.Diff(tc.wantError, gotErr); diff != "" {
Expand Down
46 changes: 36 additions & 10 deletions pkg/controller/admissionchecks/multikueue/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

ctrl "sigs.k8s.io/controller-runtime"

kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1"
"sigs.k8s.io/kueue/pkg/constants"
"sigs.k8s.io/kueue/pkg/controller/jobframework"
)
Expand All @@ -32,10 +33,12 @@ const (
)

type SetupOptions struct {
controllerName string
gcInterval time.Duration
origin string
workerLostTimeout time.Duration
eventsBatchPeriod time.Duration
adapters map[string]jobframework.MultiKueueAdapter
}

type SetupOption func(o *SetupOptions)
Expand Down Expand Up @@ -72,13 +75,41 @@ func WithEventsBatchPeriod(d time.Duration) SetupOption {
}
}

func SetupControllers(mgr ctrl.Manager, namespace string, opts ...SetupOption) error {
options := &SetupOptions{
// WithControllerName - sets the controller name for which the multikueue
// admission check match.
func WithControllerName(controllerName string) SetupOption {
return func(o *SetupOptions) {
o.controllerName = controllerName
}
}

// WithAdapters - sets all the MultiKueue adaptors.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo

func WithAdapters(adapters map[string]jobframework.MultiKueueAdapter) SetupOption {
return func(o *SetupOptions) {
o.adapters = adapters
}
}

// WithAdapters - sets or updates the adadpter of the MultiKueue adaptors.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix comment

func WithAdapter(adapter jobframework.MultiKueueAdapter) SetupOption {
return func(o *SetupOptions) {
o.adapters[adapter.GVK().String()] = adapter
}
}

func NewSetupOptions() *SetupOptions {
return &SetupOptions{
gcInterval: defaultGCInterval,
origin: defaultOrigin,
workerLostTimeout: defaultWorkerLostTimeout,
eventsBatchPeriod: constants.UpdatesBatchPeriod,
controllerName: kueuealpha.MultiKueueControllerName,
adapters: make(map[string]jobframework.MultiKueueAdapter),
}
}

func SetupControllers(mgr ctrl.Manager, namespace string, opts ...SetupOption) error {
options := NewSetupOptions()

for _, o := range opts {
o(options)
Expand All @@ -95,23 +126,18 @@ func SetupControllers(mgr ctrl.Manager, namespace string, opts ...SetupOption) e
return err
}

adapters, err := jobframework.GetMultiKueueAdapters()
if err != nil {
return err
}

cRec := newClustersReconciler(mgr.GetClient(), namespace, options.gcInterval, options.origin, fsWatcher, adapters)
cRec := newClustersReconciler(mgr.GetClient(), namespace, *options, fsWatcher)
err = cRec.setupWithManager(mgr)
if err != nil {
return err
}

acRec := newACReconciler(mgr.GetClient(), helper)
acRec := newACReconciler(mgr.GetClient(), helper, *options)
err = acRec.setupWithManager(mgr)
if err != nil {
return err
}

wlRec := newWlReconciler(mgr.GetClient(), helper, cRec, options.origin, options.workerLostTimeout, options.eventsBatchPeriod, adapters)
wlRec := newWlReconciler(mgr.GetClient(), helper, cRec, *options)
return wlRec.setupWithManager(mgr)
}
Loading