From 4d8e79e201c2c33ef1473a5e6dc5116e1f1ae22b Mon Sep 17 00:00:00 2001 From: "Dr. Stefan Schimanski" Date: Tue, 23 Apr 2024 14:25:15 +0200 Subject: [PATCH] Switch to builder focused cluster support Signed-off-by: Dr. Stefan Schimanski --- pkg/builder/controller.go | 272 ++++++++++++++++++-------- pkg/config/controller.go | 15 +- pkg/controller/controller.go | 24 +-- pkg/controller/multicluster.go | 106 ++++++++++ pkg/handler/cluster.go | 82 ++++++++ pkg/handler/enqueue.go | 82 ++------ pkg/handler/enqueue_mapped.go | 25 +-- pkg/handler/enqueue_owner.go | 42 +--- pkg/handler/eventhandler.go | 8 - pkg/internal/controller/controller.go | 112 +---------- pkg/internal/source/kind.go | 35 ---- pkg/manager/manager.go | 5 +- pkg/source/source.go | 17 +- 13 files changed, 452 insertions(+), 373 deletions(-) create mode 100644 pkg/controller/multicluster.go create mode 100644 pkg/handler/cluster.go diff --git a/pkg/builder/controller.go b/pkg/builder/controller.go index 3b21f3ddbc..ebf0c6424a 100644 --- a/pkg/builder/controller.go +++ b/pkg/builder/controller.go @@ -17,6 +17,7 @@ limitations under the License. package builder import ( + "context" "errors" "fmt" "strings" @@ -24,11 +25,12 @@ import ( "github.com/go-logr/logr" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/cluster" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/apiutil" - "sigs.k8s.io/controller-runtime/pkg/cluster" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/handler" internalsource "sigs.k8s.io/controller-runtime/pkg/internal/source" @@ -53,23 +55,29 @@ const ( projectAsMetadata ) -// Builder builds a Controller. -type Builder struct { +var _ controller.ClusterWatcher = &clusterWatcher{} + +// clusterWatcher sets up watches between a cluster and a controller. +type clusterWatcher struct { + ctrl controller.Controller forInput ForInput ownsInput []OwnsInput - watchesInput []WatchesInput - mgr manager.Manager - cluster cluster.Cluster - clusterName string + watchSpecs []watchSpec globalPredicates []predicate.Predicate - ctrl controller.Controller - ctrlOptions controller.Options - name string +} + +// Builder builds a Controller. +type Builder struct { + clusterWatcher + rawWatchesInput []WatchesInput + mgr manager.Manager + ctrlOptions controller.Options + name string } // ControllerManagedBy returns a new controller builder that will be started by the provided Manager. func ControllerManagedBy(m manager.Manager) *Builder { - return &Builder{cluster: m, mgr: m} + return &Builder{mgr: m} } // ForInput represents the information set by the For method. @@ -124,6 +132,12 @@ func (blder *Builder) Owns(object client.Object, opts ...OwnsOption) *Builder { return blder } +type watchSpec struct { + object client.Object + eventHandler handler.EventHandler + opts []WatchesOption +} + // WatchesInput represents the information set by Watches method. type WatchesInput struct { src source.Source @@ -138,8 +152,12 @@ type WatchesInput struct { // This is the equivalent of calling // WatchesRawSource(source.Kind(cache, object), eventHandler, opts...). func (blder *Builder) Watches(object client.Object, eventHandler handler.EventHandler, opts ...WatchesOption) *Builder { - src := source.Kind(blder.cluster.GetCache(), object) - return blder.WatchesRawSource(src, eventHandler, opts...) + blder.watchSpecs = append(blder.watchSpecs, watchSpec{ + object: object, + eventHandler: eventHandler, + opts: opts, + }) + return blder } // WatchesMetadata is the same as Watches, but forces the internal cache to only watch PartialObjectMetadata. @@ -170,8 +188,12 @@ func (blder *Builder) Watches(object client.Object, eventHandler handler.EventHa // concrete type on top of the metadata cache; this increases memory // consumption and leads to race conditions as caches are not in sync. func (blder *Builder) WatchesMetadata(object client.Object, eventHandler handler.EventHandler, opts ...WatchesOption) *Builder { - opts = append(opts, OnlyMetadata) - return blder.Watches(object, eventHandler, opts...) + blder.watchSpecs = append(blder.watchSpecs, watchSpec{ + object: object, + eventHandler: eventHandler, + opts: append(opts, OnlyMetadata), + }) + return blder } // WatchesRawSource exposes the lower-level ControllerManagedBy Watches functions through the builder. @@ -185,7 +207,7 @@ func (blder *Builder) WatchesRawSource(src source.Source, eventHandler handler.E opt.ApplyToWatches(&input) } - blder.watchesInput = append(blder.watchesInput, input) + blder.rawWatchesInput = append(blder.rawWatchesInput, input) return blder } @@ -231,38 +253,42 @@ func (blder *Builder) Build(r reconcile.Reconciler) (controller.Controller, erro if r == nil { return nil, fmt.Errorf("must provide a non-nil Reconciler") } - if blder.mgr == nil || blder.cluster == nil { + if blder.mgr == nil { return nil, fmt.Errorf("must provide a non-nil Manager") } if blder.forInput.err != nil { return nil, blder.forInput.err } - if err := blder.do(r); err != nil { + + // Set the ControllerManagedBy + if err := blder.doController(r); err != nil { return nil, err } - if err := blder.mgr.Add(blder.ctrl); err != nil { + + // Set the Watch + if err := blder.doWatch(); err != nil { return nil, err } - return blder.ctrl, nil -} -func (blder *Builder) do(r reconcile.Reconciler) error { - // Set the ControllerManagedBy - if err := blder.doController(r); err != nil { - return err + ctrl := blder.ctrl + if *blder.ctrlOptions.EngageWithProviderClusters { + // wrap as cluster.AwareRunnable to be engaged with provider clusters on demand + ctrl = controller.NewMultiClusterController(ctrl, &blder.clusterWatcher) + } + if err := blder.mgr.Add(ctrl); err != nil { + return nil, err } - // Set the Watch - return blder.doWatch() + return blder.ctrl, nil } -func (blder *Builder) project(obj client.Object, proj objectProjection) (client.Object, error) { +func project(cl cluster.Cluster, obj client.Object, proj objectProjection) (client.Object, error) { switch proj { case projectAsNormal: return obj, nil case projectAsMetadata: metaObj := &metav1.PartialObjectMetadata{} - gvk, err := getGvk(obj, blder.cluster.GetScheme()) + gvk, err := getGvk(obj, cl.GetScheme()) if err != nil { return nil, fmt.Errorf("unable to determine GVK of %T for a metadata-only watch: %w", obj, err) } @@ -273,74 +299,115 @@ func (blder *Builder) project(obj client.Object, proj objectProjection) (client. } } -func (blder *Builder) doWatch() error { +func (cc *clusterWatcher) Watch(ctx context.Context, cl cluster.Cluster) error { // Reconcile type - if blder.forInput.object != nil { - obj, err := blder.project(blder.forInput.object, blder.forInput.objectProjection) + if cc.forInput.object != nil { + obj, err := project(cl, cc.forInput.object, cc.forInput.objectProjection) if err != nil { return err } - src := source.Kind(blder.cluster.GetCache(), obj) - hdler := &handler.EnqueueRequestForObject{} - allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...) - allPredicates = append(allPredicates, blder.forInput.predicates...) - if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil { + src := &ctxBoundedSyncingSource{ctx: ctx, src: source.Kind(cl.GetCache(), obj)} + hdler := handler.ForCluster(cl.Name(), &handler.EnqueueRequestForObject{}) + + allPredicates := append([]predicate.Predicate(nil), cc.globalPredicates...) + allPredicates = append(allPredicates, cc.forInput.predicates...) + if err := cc.ctrl.Watch(src, hdler, allPredicates...); err != nil { return err } } // Watches the managed types - if len(blder.ownsInput) > 0 && blder.forInput.object == nil { + if len(cc.ownsInput) > 0 && cc.forInput.object == nil { return errors.New("Owns() can only be used together with For()") } - for _, own := range blder.ownsInput { - obj, err := blder.project(own.object, own.objectProjection) + for _, own := range cc.ownsInput { + obj, err := project(cl, own.object, own.objectProjection) if err != nil { return err } - src := source.Kind(blder.cluster.GetCache(), obj) + src := &ctxBoundedSyncingSource{ctx: ctx, src: source.Kind(cl.GetCache(), obj)} opts := []handler.OwnerOption{} if !own.matchEveryOwner { opts = append(opts, handler.OnlyControllerOwner()) } - hdler := handler.EnqueueRequestForOwner( - blder.cluster.GetScheme(), blder.cluster.GetRESTMapper(), - blder.forInput.object, + hdler := handler.ForCluster(cl.Name(), handler.EnqueueRequestForOwner( + cl.GetScheme(), cl.GetRESTMapper(), + cc.forInput.object, opts..., - ) - allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...) + )) + + allPredicates := append([]predicate.Predicate(nil), cc.globalPredicates...) allPredicates = append(allPredicates, own.predicates...) - if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil { + if err := cc.ctrl.Watch(src, hdler, allPredicates...); err != nil { + return err + } + } + + // Watches extra types + for _, ws := range cc.watchSpecs { + srcKind := source.Kind(cl.GetCache(), ws.object).(*internalsource.Kind) + src := &ctxBoundedSyncingSource{ctx: ctx, src: srcKind} + input := &WatchesInput{ + src: src, + eventHandler: ws.eventHandler, + predicates: nil, + objectProjection: projectAsNormal, + } + for _, opt := range ws.opts { + opt.ApplyToWatches(input) + } + typeForSrc, err := project(cl, srcKind.Type, input.objectProjection) + if err != nil { + return err + } + srcKind.Type = typeForSrc + hdler := handler.ForCluster(cl.Name(), input.eventHandler) + + allPredicates := append([]predicate.Predicate(nil), cc.globalPredicates...) + allPredicates = append(allPredicates, input.predicates...) + if err := cc.ctrl.Watch(src, hdler, input.predicates...); err != nil { return err } } + return nil +} + +func (blder *Builder) doWatch() error { // Do the watch requests - if len(blder.watchesInput) == 0 && blder.forInput.object == nil { + if len(blder.rawWatchesInput) == 0 && blder.forInput.object == nil { return errors.New("there are no watches configured, controller will never get triggered. Use For(), Owns() or Watches() to set them up") } - for _, w := range blder.watchesInput { - // If the source of this watch is of type Kind, project it. - if srcKind, ok := w.src.(*internalsource.Kind); ok { - typeForSrc, err := blder.project(srcKind.Type, w.objectProjection) - if err != nil { - return err + if *blder.ctrlOptions.EngageWithProviderClusters && len(blder.rawWatchesInput) > 0 { + return errors.New("when using a cluster adapter, custom raw watches are not allowed") + } + if *blder.ctrlOptions.EngageWithDefaultCluster { + // connect to the manager's cluster directly. If there is a cluster + // provider, the manager will engage the controller later for each + // provided cluster. + + // + // Note: nil for the context is okay here, as it only acts as upper bound for the watch life-cycle. + if err := blder.clusterWatcher.Watch(nil, blder.mgr); err != nil { + return err + } + + // raw watches are only allowed when not using a cluster provider + for _, w := range blder.rawWatchesInput { + // If the source of this watch is of type Kind, project it. + if srcKind, ok := w.src.(*internalsource.Kind); ok { + typeForSrc, err := project(blder.mgr, srcKind.Type, w.objectProjection) + if err != nil { + return err + } + srcKind.Type = typeForSrc } - srcKind.Type = typeForSrc - } else if !ok { - // If we're building a cluster-aware controller, raw watches are not allowed - // given that the cache cannot be validated to be coming from the same cluster. - // In the future, we could consider allowing this by satisfying a new interface - // that sets and uses the cluster. - if blder.clusterName != "" { - return fmt.Errorf("when using a cluster adapter, custom raw watches %T are not allowed", w.src) + allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...) + allPredicates = append(allPredicates, w.predicates...) + if err := blder.ctrl.Watch(w.src, w.eventHandler, allPredicates...); err != nil { + return err } } - allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...) - allPredicates = append(allPredicates, w.predicates...) - if err := blder.ctrl.Watch(w.src, w.eventHandler, allPredicates...); err != nil { - return err - } } return nil } @@ -358,12 +425,11 @@ func (blder *Builder) getControllerName(gvk schema.GroupVersionKind, hasGVK bool func (blder *Builder) doController(r reconcile.Reconciler) error { globalOpts := blder.mgr.GetControllerOptions() - ctrlOptions := blder.ctrlOptions - if ctrlOptions.Reconciler != nil && r != nil { + if blder.ctrlOptions.Reconciler != nil && r != nil { return errors.New("reconciler was set via WithOptions() and via Build() or Complete()") } - if ctrlOptions.Reconciler == nil { - ctrlOptions.Reconciler = r + if blder.ctrlOptions.Reconciler == nil { + blder.ctrlOptions.Reconciler = r } // Retrieve the GVK from the object we're reconciling @@ -372,24 +438,24 @@ func (blder *Builder) doController(r reconcile.Reconciler) error { hasGVK := blder.forInput.object != nil if hasGVK { var err error - gvk, err = getGvk(blder.forInput.object, blder.cluster.GetScheme()) + gvk, err = getGvk(blder.forInput.object, blder.mgr.GetScheme()) if err != nil { return err } } // Setup concurrency. - if ctrlOptions.MaxConcurrentReconciles == 0 && hasGVK { + if blder.ctrlOptions.MaxConcurrentReconciles == 0 && hasGVK { groupKind := gvk.GroupKind().String() if concurrency, ok := globalOpts.GroupKindConcurrency[groupKind]; ok && concurrency > 0 { - ctrlOptions.MaxConcurrentReconciles = concurrency + blder.ctrlOptions.MaxConcurrentReconciles = concurrency } } // Setup cache sync timeout. - if ctrlOptions.CacheSyncTimeout == 0 && globalOpts.CacheSyncTimeout > 0 { - ctrlOptions.CacheSyncTimeout = globalOpts.CacheSyncTimeout + if blder.ctrlOptions.CacheSyncTimeout == 0 && globalOpts.CacheSyncTimeout > 0 { + blder.ctrlOptions.CacheSyncTimeout = globalOpts.CacheSyncTimeout } controllerName, err := blder.getControllerName(gvk, hasGVK) @@ -398,7 +464,7 @@ func (blder *Builder) doController(r reconcile.Reconciler) error { } // Setup the logger. - if ctrlOptions.LogConstructor == nil { + if blder.ctrlOptions.LogConstructor == nil { log := blder.mgr.GetLogger().WithValues( "controller", controllerName, ) @@ -409,7 +475,7 @@ func (blder *Builder) doController(r reconcile.Reconciler) error { ) } - ctrlOptions.LogConstructor = func(req *reconcile.Request) logr.Logger { + blder.ctrlOptions.LogConstructor = func(req *reconcile.Request) logr.Logger { log := log if req != nil { if hasGVK { @@ -423,7 +489,55 @@ func (blder *Builder) doController(r reconcile.Reconciler) error { } } + // Default which clusters to engage with. + if blder.ctrlOptions.EngageWithDefaultCluster == nil { + blder.ctrlOptions.EngageWithDefaultCluster = globalOpts.EngageWithDefaultCluster + } + if blder.ctrlOptions.EngageWithProviderClusters == nil { + blder.ctrlOptions.EngageWithProviderClusters = globalOpts.EngageWithProviderClusters + } + if blder.ctrlOptions.EngageWithDefaultCluster == nil { + return errors.New("EngageWithDefaultCluster must not be nil") // should not happen due to defaulting + } + if blder.ctrlOptions.EngageWithProviderClusters == nil { + return errors.New("EngageWithProviderClusters must not be nil") // should not happen due to defaulting + } + if !*blder.ctrlOptions.EngageWithDefaultCluster && !*blder.ctrlOptions.EngageWithProviderClusters { + return errors.New("EngageWithDefaultCluster and EngageWithProviderClusters are both false, controller will never get triggered") + } + // Build the controller and return. - blder.ctrl, err = newController(controllerName, blder.mgr, ctrlOptions) + blder.ctrl, err = newController(controllerName, blder.mgr, blder.ctrlOptions) return err } + +// ctxBoundedSyncingSource implements source.SyncingSource and wraps the ctx +// passed to the methods into the life-cycle of another context, i.e. stop +// whenever one of the contexts is done. +type ctxBoundedSyncingSource struct { + ctx context.Context + src source.SyncingSource +} + +var _ source.SyncingSource = &ctxBoundedSyncingSource{} + +func (s *ctxBoundedSyncingSource) Start(ctx context.Context, hdler handler.EventHandler, q workqueue.RateLimitingInterface, preds ...predicate.Predicate) error { + return s.src.Start(joinContexts(ctx, s.ctx), hdler, q, preds...) +} + +func (s *ctxBoundedSyncingSource) WaitForSync(ctx context.Context) error { + return s.src.WaitForSync(joinContexts(ctx, s.ctx)) +} + +func joinContexts(ctx, bound context.Context) context.Context { + if bound == nil { + return ctx + } + + ctx, cancel := context.WithCancel(ctx) + go func() { + defer cancel() + <-bound.Done() + }() + return ctx +} diff --git a/pkg/config/controller.go b/pkg/config/controller.go index 34af282a40..cb5536a563 100644 --- a/pkg/config/controller.go +++ b/pkg/config/controller.go @@ -47,8 +47,15 @@ type Controller struct { // Defaults to true, which means the controller will use leader election. NeedLeaderElection *bool - // WatchProviderClusters indicates whether the controller should - // only watch clusters that are engaged by the cluster provider. Defaults to false - // if no provider is set, and to true if a provider is set. - WatchProviderClusters *bool + // EngageWithDefaultCluster indicates whether the controller should engage + // with the default cluster. This default to false if a cluster provider + // is configured, and to true otherwise. + // + // This is an experimental feature and is subject to change. + EngageWithDefaultCluster *bool + + // EngageWithProvidedClusters indicates whether the controller should engage + // with the provided clusters of the manager. This defaults to true if a + // cluster provider is set, and to false otherwise. + EngageWithProviderClusters *bool } diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 0f2938c3a0..fc3fcd5465 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -24,8 +24,6 @@ import ( "github.com/go-logr/logr" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" - "k8s.io/utils/ptr" - "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/internal/controller" "sigs.k8s.io/controller-runtime/pkg/manager" @@ -64,10 +62,16 @@ type Options struct { // to each reconciliation via the context field. LogConstructor func(request *reconcile.Request) logr.Logger - // WatchProviderClusters indicates whether the controller should - // only watch clusters that are engaged by the cluster provider. Defaults to false - // if no provider is set, and to true if a provider is set. - WatchProviderClusters *bool + // EngageWithDefaultCluster indicates whether the controller should engage + // with the default cluster of a manager. This defaults to false through the + // global controller options of the manager if a cluster provider is set, + // and to true otherwise. Here it can be overridden. + EngageWithDefaultCluster *bool + // EngageWithProvidedClusters indicates whether the controller should engage + // with the provided clusters of a manager. This defaults to true through the + // global controller options of the manager if a cluster provider is set, + // and to false otherwise. Here it can be overridden. + EngageWithProviderClusters *bool } // Controller implements a Kubernetes API. A Controller manages a work queue fed reconcile.Requests @@ -161,13 +165,6 @@ func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller options.NeedLeaderElection = mgr.GetControllerOptions().NeedLeaderElection } - if options.WatchProviderClusters == nil { - options.WatchProviderClusters = mgr.GetControllerOptions().WatchProviderClusters - if options.WatchProviderClusters == nil { // should never happen - options.WatchProviderClusters = ptr.To(false) - } - } - // Create controller with dependencies set return &controller.Controller{ Do: options.Reconciler, @@ -182,7 +179,6 @@ func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller LogConstructor: options.LogConstructor, RecoverPanic: options.RecoverPanic, LeaderElected: options.NeedLeaderElection, - WatchProviderClusters: *options.WatchProviderClusters, }, nil } diff --git a/pkg/controller/multicluster.go b/pkg/controller/multicluster.go new file mode 100644 index 0000000000..fcabeae104 --- /dev/null +++ b/pkg/controller/multicluster.go @@ -0,0 +1,106 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "context" + "sync" + + "sigs.k8s.io/controller-runtime/pkg/cluster" +) + +// MultiClusterController is a Controller that is aware of the Cluster it is +// running in. It engage and disengage clusters dynamically, starting the +// watches and stopping them. +type MultiClusterController interface { + cluster.AwareRunnable + Controller +} + +// ClusterWatcher starts watches for a given Cluster. The ctx should be +// used to cancel the watch when the Cluster is disengaged. +type ClusterWatcher interface { + Watch(ctx context.Context, cl cluster.Cluster) error +} + +// NewMultiClusterController creates a new MultiClusterController for the given +// controller with the given ClusterWatcher. +func NewMultiClusterController(c Controller, watcher ClusterWatcher) MultiClusterController { + return &multiClusterController{ + Controller: c, + watcher: watcher, + clusters: map[string]struct{}{}, + } +} + +type multiClusterController struct { + Controller + watcher ClusterWatcher + + lock sync.Mutex + clusters map[string]struct{} +} + +// Engage gets called when the runnable should start operations for the given Cluster. +func (c *multiClusterController) Engage(clusterCtx context.Context, cl cluster.Cluster) error { + c.lock.Lock() + defer c.lock.Unlock() + + if _, ok := c.clusters[cl.Name()]; ok { + return nil + } + + // pass through in case the controller itself is cluster aware + if ctrl, ok := c.Controller.(cluster.AwareRunnable); ok { + if err := ctrl.Engage(clusterCtx, cl); err != nil { + return err + } + } + + // start watches on the cluster + if err := c.watcher.Watch(clusterCtx, cl); err != nil { + if ctrl, ok := c.Controller.(cluster.AwareRunnable); ok { + if err := ctrl.Disengage(clusterCtx, cl); err != nil { + return err + } + } + return err + } + c.clusters[cl.Name()] = struct{}{} + + return nil +} + +// Disengage gets called when the runnable should stop operations for the given Cluster. +func (c *multiClusterController) Disengage(ctx context.Context, cl cluster.Cluster) error { + c.lock.Lock() + defer c.lock.Unlock() + + if _, ok := c.clusters[cl.Name()]; !ok { + return nil + } + delete(c.clusters, cl.Name()) + + // pass through in case the controller itself is cluster aware + if ctrl, ok := c.Controller.(cluster.AwareRunnable); ok { + if err := ctrl.Disengage(ctx, cl); err != nil { + return err + } + } + + return nil +} diff --git a/pkg/handler/cluster.go b/pkg/handler/cluster.go new file mode 100644 index 0000000000..6319eafbb1 --- /dev/null +++ b/pkg/handler/cluster.go @@ -0,0 +1,82 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package handler + +import ( + "context" + "time" + + "k8s.io/client-go/util/workqueue" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/reconcile" +) + +// ForCluster wraps an EventHandler and adds the cluster name to the reconcile.Requests. +func ForCluster(clusterName string, h EventHandler) EventHandler { + return &clusterAwareHandler{ + clusterName: clusterName, + handler: h, + } +} + +type clusterAwareHandler struct { + handler EventHandler + clusterName string +} + +var _ EventHandler = &clusterAwareHandler{} + +func (c *clusterAwareHandler) Create(ctx context.Context, evt event.CreateEvent, q workqueue.RateLimitingInterface) { + c.handler.Create(ctx, evt, &clusterWorkqueue{RateLimitingInterface: q, clusterName: c.clusterName}) +} + +func (c *clusterAwareHandler) Update(ctx context.Context, evt event.UpdateEvent, q workqueue.RateLimitingInterface) { + c.handler.Update(ctx, evt, &clusterWorkqueue{RateLimitingInterface: q, clusterName: c.clusterName}) +} + +func (c *clusterAwareHandler) Delete(ctx context.Context, evt event.DeleteEvent, q workqueue.RateLimitingInterface) { + c.handler.Delete(ctx, evt, &clusterWorkqueue{RateLimitingInterface: q, clusterName: c.clusterName}) +} + +func (c *clusterAwareHandler) Generic(ctx context.Context, evt event.GenericEvent, q workqueue.RateLimitingInterface) { + c.handler.Generic(ctx, evt, &clusterWorkqueue{RateLimitingInterface: q, clusterName: c.clusterName}) +} + +// clusterWorkqueue is a wrapper around a RateLimitingInterface that adds the +// cluster name to the reconcile.Requests +type clusterWorkqueue struct { + workqueue.RateLimitingInterface + clusterName string +} + +func (q *clusterWorkqueue) AddAfter(item interface{}, duration time.Duration) { + req := item.(reconcile.Request) + req.ClusterName = q.clusterName + q.RateLimitingInterface.AddAfter(req, duration) +} + +func (q *clusterWorkqueue) Add(item interface{}) { + req := item.(reconcile.Request) + req.ClusterName = q.clusterName + q.RateLimitingInterface.Add(req) +} + +func (q *clusterWorkqueue) AddRateLimited(item interface{}) { + req := item.(reconcile.Request) + req.ClusterName = q.clusterName + q.RateLimitingInterface.AddRateLimited(req) +} diff --git a/pkg/handler/enqueue.go b/pkg/handler/enqueue.go index e2b37cfe6d..c72b2e1ebb 100644 --- a/pkg/handler/enqueue.go +++ b/pkg/handler/enqueue.go @@ -21,7 +21,6 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/util/workqueue" - "sigs.k8s.io/controller-runtime/pkg/cluster" "sigs.k8s.io/controller-runtime/pkg/event" logf "sigs.k8s.io/controller-runtime/pkg/internal/log" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -36,9 +35,7 @@ var _ EventHandler = &EnqueueRequestForObject{} // EnqueueRequestForObject enqueues a Request containing the Name and Namespace of the object that is the source of the Event. // (e.g. the created / deleted / updated objects Name and Namespace). handler.EnqueueRequestForObject is used by almost all // Controllers that have associated Resources (e.g. CRDs) to reconcile the associated Resource. -type EnqueueRequestForObject struct { - cluster cluster.Cluster -} +type EnqueueRequestForObject struct{} // Create implements EventHandler. func (e *EnqueueRequestForObject) Create(ctx context.Context, evt event.CreateEvent, q workqueue.RateLimitingInterface) { @@ -46,43 +43,25 @@ func (e *EnqueueRequestForObject) Create(ctx context.Context, evt event.CreateEv enqueueLog.Error(nil, "CreateEvent received with no metadata", "event", evt) return } - var clusterName string - if e.cluster != nil { - clusterName = e.cluster.Name() - } - q.Add(reconcile.Request{ - ClusterName: clusterName, - NamespacedName: types.NamespacedName{ - Name: evt.Object.GetName(), - Namespace: evt.Object.GetNamespace(), - }, - }) + q.Add(reconcile.Request{NamespacedName: types.NamespacedName{ + Name: evt.Object.GetName(), + Namespace: evt.Object.GetNamespace(), + }}) } // Update implements EventHandler. func (e *EnqueueRequestForObject) Update(ctx context.Context, evt event.UpdateEvent, q workqueue.RateLimitingInterface) { - var clusterName string - if e.cluster != nil { - clusterName = e.cluster.Name() - } - switch { case evt.ObjectNew != nil: - q.Add(reconcile.Request{ - ClusterName: clusterName, - NamespacedName: types.NamespacedName{ - Name: evt.ObjectNew.GetName(), - Namespace: evt.ObjectNew.GetNamespace(), - }, - }) + q.Add(reconcile.Request{NamespacedName: types.NamespacedName{ + Name: evt.ObjectNew.GetName(), + Namespace: evt.ObjectNew.GetNamespace(), + }}) case evt.ObjectOld != nil: - q.Add(reconcile.Request{ - ClusterName: clusterName, - NamespacedName: types.NamespacedName{ - Name: evt.ObjectOld.GetName(), - Namespace: evt.ObjectOld.GetNamespace(), - }, - }) + q.Add(reconcile.Request{NamespacedName: types.NamespacedName{ + Name: evt.ObjectOld.GetName(), + Namespace: evt.ObjectOld.GetNamespace(), + }}) default: enqueueLog.Error(nil, "UpdateEvent received with no metadata", "event", evt) } @@ -94,17 +73,10 @@ func (e *EnqueueRequestForObject) Delete(ctx context.Context, evt event.DeleteEv enqueueLog.Error(nil, "DeleteEvent received with no metadata", "event", evt) return } - var clusterName string - if e.cluster != nil { - clusterName = e.cluster.Name() - } - q.Add(reconcile.Request{ - ClusterName: clusterName, - NamespacedName: types.NamespacedName{ - Name: evt.Object.GetName(), - Namespace: evt.Object.GetNamespace(), - }, - }) + q.Add(reconcile.Request{NamespacedName: types.NamespacedName{ + Name: evt.Object.GetName(), + Namespace: evt.Object.GetNamespace(), + }}) } // Generic implements EventHandler. @@ -113,20 +85,8 @@ func (e *EnqueueRequestForObject) Generic(ctx context.Context, evt event.Generic enqueueLog.Error(nil, "GenericEvent received with no metadata", "event", evt) return } - var clusterName string - if e.cluster != nil { - clusterName = e.cluster.Name() - } - q.Add(reconcile.Request{ - ClusterName: clusterName, - NamespacedName: types.NamespacedName{ - Name: evt.Object.GetName(), - Namespace: evt.Object.GetNamespace(), - }, - }) -} - -// DeepCopyFor implements cluster.AwareDeepCopy[EventHandler]. -func (e *EnqueueRequestForObject) DeepCopyFor(c cluster.Cluster) DeepCopyableEventHandler { - return &EnqueueRequestForObject{cluster: c} + q.Add(reconcile.Request{NamespacedName: types.NamespacedName{ + Name: evt.Object.GetName(), + Namespace: evt.Object.GetNamespace(), + }}) } diff --git a/pkg/handler/enqueue_mapped.go b/pkg/handler/enqueue_mapped.go index e1eaf41b7a..b55fdde6ba 100644 --- a/pkg/handler/enqueue_mapped.go +++ b/pkg/handler/enqueue_mapped.go @@ -21,7 +21,6 @@ import ( "k8s.io/client-go/util/workqueue" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/cluster" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/reconcile" ) @@ -49,9 +48,6 @@ func EnqueueRequestsFromMapFunc(fn MapFunc) EventHandler { var _ EventHandler = &enqueueRequestsFromMapFunc{} type enqueueRequestsFromMapFunc struct { - // cluster is the source of the requeue request. - cluster cluster.Cluster - // Mapper transforms the argument into a slice of keys to be reconciled toRequests MapFunc } @@ -83,23 +79,10 @@ func (e *enqueueRequestsFromMapFunc) Generic(ctx context.Context, evt event.Gene func (e *enqueueRequestsFromMapFunc) mapAndEnqueue(ctx context.Context, q workqueue.RateLimitingInterface, object client.Object, reqs map[reconcile.Request]empty) { for _, req := range e.toRequests(ctx, object) { - if _, ok := reqs[req]; ok { - // reqs is a map of requests to avoid enqueueing the same request multiple times. - continue - } - // If the request doesn't specify a cluster, use the cluster from the handler. - if req.ClusterName == "" && e.cluster != nil { - req.ClusterName = e.cluster.Name() + _, ok := reqs[req] + if !ok { + q.Add(req) + reqs[req] = empty{} } - // Enqueue the request and track it. - q.Add(req) - reqs[req] = empty{} - } -} - -func (e *enqueueRequestsFromMapFunc) DeepCopyFor(c cluster.Cluster) DeepCopyableEventHandler { - return &enqueueRequestsFromMapFunc{ - cluster: c, - toRequests: e.toRequests, } } diff --git a/pkg/handler/enqueue_owner.go b/pkg/handler/enqueue_owner.go index 0a3087e6e4..02e7d756f8 100644 --- a/pkg/handler/enqueue_owner.go +++ b/pkg/handler/enqueue_owner.go @@ -27,7 +27,6 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/util/workqueue" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/cluster" "sigs.k8s.io/controller-runtime/pkg/event" logf "sigs.k8s.io/controller-runtime/pkg/internal/log" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -70,11 +69,8 @@ func OnlyControllerOwner() OwnerOption { } type enqueueRequestForOwner struct { - // cluster is the source of the requeue request. - cluster cluster.Cluster - // ownerType is the type of the Owner object to look for in OwnerReferences. Only Group and Kind are compared. - ownerType client.Object + ownerType runtime.Object // isController if set will only look at the first OwnerReference with Controller: true. isController bool @@ -89,7 +85,7 @@ type enqueueRequestForOwner struct { // Create implements EventHandler. func (e *enqueueRequestForOwner) Create(ctx context.Context, evt event.CreateEvent, q workqueue.RateLimitingInterface) { reqs := map[reconcile.Request]empty{} - e.getOwnerReconcileRequest(ctx, evt.Object, reqs) + e.getOwnerReconcileRequest(evt.Object, reqs) for req := range reqs { q.Add(req) } @@ -98,8 +94,8 @@ func (e *enqueueRequestForOwner) Create(ctx context.Context, evt event.CreateEve // Update implements EventHandler. func (e *enqueueRequestForOwner) Update(ctx context.Context, evt event.UpdateEvent, q workqueue.RateLimitingInterface) { reqs := map[reconcile.Request]empty{} - e.getOwnerReconcileRequest(ctx, evt.ObjectOld, reqs) - e.getOwnerReconcileRequest(ctx, evt.ObjectNew, reqs) + e.getOwnerReconcileRequest(evt.ObjectOld, reqs) + e.getOwnerReconcileRequest(evt.ObjectNew, reqs) for req := range reqs { q.Add(req) } @@ -108,7 +104,7 @@ func (e *enqueueRequestForOwner) Update(ctx context.Context, evt event.UpdateEve // Delete implements EventHandler. func (e *enqueueRequestForOwner) Delete(ctx context.Context, evt event.DeleteEvent, q workqueue.RateLimitingInterface) { reqs := map[reconcile.Request]empty{} - e.getOwnerReconcileRequest(ctx, evt.Object, reqs) + e.getOwnerReconcileRequest(evt.Object, reqs) for req := range reqs { q.Add(req) } @@ -117,7 +113,7 @@ func (e *enqueueRequestForOwner) Delete(ctx context.Context, evt event.DeleteEve // Generic implements EventHandler. func (e *enqueueRequestForOwner) Generic(ctx context.Context, evt event.GenericEvent, q workqueue.RateLimitingInterface) { reqs := map[reconcile.Request]empty{} - e.getOwnerReconcileRequest(ctx, evt.Object, reqs) + e.getOwnerReconcileRequest(evt.Object, reqs) for req := range reqs { q.Add(req) } @@ -145,7 +141,7 @@ func (e *enqueueRequestForOwner) parseOwnerTypeGroupKind(scheme *runtime.Scheme) // getOwnerReconcileRequest looks at object and builds a map of reconcile.Request to reconcile // owners of object that match e.OwnerType. -func (e *enqueueRequestForOwner) getOwnerReconcileRequest(_ context.Context, object metav1.Object, result map[reconcile.Request]empty) { +func (e *enqueueRequestForOwner) getOwnerReconcileRequest(object metav1.Object, result map[reconcile.Request]empty) { // Iterate through the OwnerReferences looking for a match on Group and Kind against what was requested // by the user for _, ref := range e.getOwnersReferences(object) { @@ -163,14 +159,9 @@ func (e *enqueueRequestForOwner) getOwnerReconcileRequest(_ context.Context, obj // object in the event. if ref.Kind == e.groupKind.Kind && refGV.Group == e.groupKind.Group { // Match found - add a Request for the object referred to in the OwnerReference - request := reconcile.Request{ - NamespacedName: types.NamespacedName{ - Name: ref.Name, - }, - } - if e.cluster != nil { - request.ClusterName = e.cluster.Name() - } + request := reconcile.Request{NamespacedName: types.NamespacedName{ + Name: ref.Name, + }} // if owner is not namespaced then we should not set the namespace mapping, err := e.mapper.RESTMapping(e.groupKind, refGV.Version) @@ -206,16 +197,3 @@ func (e *enqueueRequestForOwner) getOwnersReferences(object metav1.Object) []met // No Controller OwnerReference found return nil } - -func (e *enqueueRequestForOwner) DeepCopyFor(c cluster.Cluster) DeepCopyableEventHandler { - cpy := &enqueueRequestForOwner{ - cluster: c, - ownerType: e.ownerType, - isController: e.isController, - mapper: c.GetRESTMapper(), - } - if err := cpy.parseOwnerTypeGroupKind(c.GetScheme()); err != nil { - panic(err) - } - return cpy -} diff --git a/pkg/handler/eventhandler.go b/pkg/handler/eventhandler.go index 559afc6c90..ff2f3e80b2 100644 --- a/pkg/handler/eventhandler.go +++ b/pkg/handler/eventhandler.go @@ -20,7 +20,6 @@ import ( "context" "k8s.io/client-go/util/workqueue" - "sigs.k8s.io/controller-runtime/pkg/cluster" "sigs.k8s.io/controller-runtime/pkg/event" ) @@ -57,13 +56,6 @@ type EventHandler interface { Generic(context.Context, event.GenericEvent, workqueue.RateLimitingInterface) } -// DeepCopyableEventHandler is an EventHandler that can be deep copied for use -// in a different Cluster. This is used if a cluster provider is set in a manager. -type DeepCopyableEventHandler interface { - EventHandler - DeepCopyFor(c cluster.Cluster) DeepCopyableEventHandler -} - var _ EventHandler = Funcs{} // Funcs implements EventHandler. diff --git a/pkg/internal/controller/controller.go b/pkg/internal/controller/controller.go index 22bcee41ff..08e79ccfb2 100644 --- a/pkg/internal/controller/controller.go +++ b/pkg/internal/controller/controller.go @@ -25,11 +25,9 @@ import ( "github.com/go-logr/logr" "k8s.io/apimachinery/pkg/types" - kerrors "k8s.io/apimachinery/pkg/util/errors" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/client-go/util/workqueue" - "sigs.k8s.io/controller-runtime/pkg/cluster" "sigs.k8s.io/controller-runtime/pkg/handler" ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/internal/controller/metrics" logf "sigs.k8s.io/controller-runtime/pkg/log" @@ -78,17 +76,7 @@ type Controller struct { CacheSyncTimeout time.Duration // startWatches maintains a list of sources, handlers, and predicates to start when the controller is started. - startWatches []*watchDescription - - // clusterAwareWatches maintains a list of cluster aware sources, handlers, and predicates to start when the controller is started. - clusterAwareWatches []*deepcopyableWatchDescription - - // clustersByName is used to manage the fleet of clusters. - clustersByName map[string]*clusterDescription - // WatchProviderClusters indicates whether the controller should - // only watch clusters that are engaged by the cluster provider. Defaults to false - // if no provider is set, and to true if a provider is set. - WatchProviderClusters bool + startWatches []watchDescription // LogConstructor is used to construct a logger to then log messages to users during reconciliation, // or for example when a watch is started. @@ -103,12 +91,6 @@ type Controller struct { LeaderElected *bool } -type clusterDescription struct { - cluster.Cluster - ctx context.Context - sources []source.Source -} - // watchDescription contains all the information necessary to start a watch. type watchDescription struct { src source.Source @@ -116,15 +98,6 @@ type watchDescription struct { predicates []predicate.Predicate } -// deepcopyableWatchDescription contains all the information necessary to start -// a watch. In addition to watchDescription it also contains the DeepCopyFor -// method to adapt it to a different cluster. -type deepcopyableWatchDescription struct { - src source.DeepCopyableSyncingSource - handler handler.DeepCopyableEventHandler - predicates []predicate.Predicate -} - // Reconcile implements reconcile.Reconciler. func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (_ reconcile.Result, err error) { defer func() { @@ -150,37 +123,11 @@ func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prc c.mu.Lock() defer c.mu.Unlock() - // If a cluster provider is in-place, run src for every provided cluster - if c.WatchProviderClusters { - src, ok := src.(source.DeepCopyableSyncingSource) - if !ok { - return fmt.Errorf("source %T is not cluster aware, but WatchProviderClusters is true", src) - } - evthdler, ok := evthdler.(handler.DeepCopyableEventHandler) - if !ok { - return fmt.Errorf("handler %T is not cluster aware, but WatchProviderClusters is true", evthdler) - } - - watchDesc := &deepcopyableWatchDescription{src: src, handler: evthdler, predicates: prct} - c.clusterAwareWatches = append(c.clusterAwareWatches, watchDesc) - - // If the watch is cluster aware, start it for all the clusters - // This covers the case where a Watch was added later to the controller. - var errs []error - for _, cldesc := range c.clustersByName { - if err := c.startClusterAwareWatchLocked(cldesc, watchDesc); err != nil { - errs = append(errs, err) - } - } - - return kerrors.NewAggregate(errs) - } - // Controller hasn't started yet, store the watches locally and return. // // These watches are going to be held on the controller struct until the manager or user calls Start(...). if !c.Started { - c.startWatches = append(c.startWatches, &watchDescription{src: src, handler: evthdler, predicates: prct}) + c.startWatches = append(c.startWatches, watchDescription{src: src, handler: evthdler, predicates: prct}) return nil } @@ -188,60 +135,6 @@ func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prc return src.Start(c.ctx, evthdler, c.Queue, prct...) } -// Engage implements cluster.AwareRunnable. -func (c *Controller) Engage(ctx context.Context, cluster cluster.Cluster) error { - c.mu.Lock() - defer c.mu.Unlock() - if !c.Started { - return errors.New("controller must be started before calling Engage") - } - - var errs []error - cldesc := c.clustersByName[cluster.Name()] - if cldesc == nil { - // Initialize the cluster description. - c.clustersByName[cluster.Name()] = &clusterDescription{ - ctx: ctx, - Cluster: cluster, - } - cldesc = c.clustersByName[cluster.Name()] - } - for i, watchDesc := range c.clusterAwareWatches { - if i < len(cldesc.sources) { - // The source has already been started for this cluster. - continue - } - if err := c.startClusterAwareWatchLocked(cldesc, watchDesc); err != nil { - errs = append(errs, err) - } - } - return kerrors.NewAggregate(errs) -} - -// Disengage implements cluster.AwareRunnable. -func (c *Controller) Disengage(ctx context.Context, cluster cluster.Cluster) error { - c.mu.Lock() - defer c.mu.Unlock() - if !c.Started { - return errors.New("controller must be started before calling Disengage") - } - - c.LogConstructor(nil).Info("Disengaging Cluster", "cluster", cluster.Name()) - // TODO(vincepri): Stop the sources for this cluster before removing it, once we have a way to do that. - delete(c.clustersByName, cluster.Name()) - return nil -} - -func (c *Controller) startClusterAwareWatchLocked(cldesc *clusterDescription, watchDesc *deepcopyableWatchDescription) error { - watch := &deepcopyableWatchDescription{src: watchDesc.src.DeepCopyFor(cldesc.Cluster), handler: watchDesc.handler.DeepCopyFor(cldesc.Cluster), predicates: watchDesc.predicates} - c.LogConstructor(nil).Info("Starting Cluster-Aware EventSource", "cluster", cldesc.Name(), "source", watch.src) - if err := watch.src.Start(cldesc.ctx, watch.handler, c.Queue, watch.predicates...); err != nil { - return err - } - cldesc.sources = append(cldesc.sources, watch.src) - return nil -} - // NeedLeaderElection implements the manager.LeaderElectionRunnable interface. func (c *Controller) NeedLeaderElection() bool { if c.LeaderElected == nil { @@ -258,7 +151,6 @@ func (c *Controller) Start(ctx context.Context) error { if c.Started { return errors.New("controller was started more than once. This is likely to be caused by being added to a manager multiple times") } - c.clustersByName = make(map[string]*clusterDescription) c.initMetrics() // Set the internal context. diff --git a/pkg/internal/source/kind.go b/pkg/internal/source/kind.go index c40bd4f1bd..b3a8227125 100644 --- a/pkg/internal/source/kind.go +++ b/pkg/internal/source/kind.go @@ -12,37 +12,10 @@ import ( "k8s.io/client-go/util/workqueue" "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/cluster" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/predicate" ) -// Source is a source of events (eh.g. Create, Update, Delete operations on Kubernetes Objects, Webhook callbacks, etc) -// which should be processed by event.EventHandlers to enqueue reconcile.Requests. -// -// * Use Kind for events originating in the cluster (e.g. Pod Create, Pod Update, Deployment Update). -// -// * Use Channel for events originating outside the cluster (eh.g. GitHub Webhook callback, Polling external urls). -// -// Users may build their own Source implementations. -type Source interface { - Start(context.Context, handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error -} - -// SyncingSource is a source that needs syncing prior to being usable. The controller -// will call its WaitForSync prior to starting workers. -type SyncingSource interface { - Source - WaitForSync(ctx context.Context) error -} - -// DeepCopyableSyncingSource is a source that can be deep copied for a specific cluster. -// It is used in setups with a cluster provider set in the manager. -type DeepCopyableSyncingSource interface { - SyncingSource - DeepCopyFor(cluster cluster.Cluster) DeepCopyableSyncingSource -} - // Kind is used to provide a source of events originating inside the cluster from Watches (e.g. Pod Create). type Kind struct { // Type is the type of object to watch. e.g. &v1.Pod{} @@ -142,11 +115,3 @@ func (ks *Kind) WaitForSync(ctx context.Context) error { return fmt.Errorf("timed out waiting for cache to be synced for Kind %T", ks.Type) } } - -// DeepCopyFor implements cluster.AwareDeepCopy[Source]. -func (ks *Kind) DeepCopyFor(c cluster.Cluster) DeepCopyableSyncingSource { - return &Kind{ - Type: ks.Type, - Cache: c.GetCache(), - } -} diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index 41fc243e32..d4353f2c9b 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -686,8 +686,9 @@ func setOptionsDefaults(options Options) Options { options.WebhookServer = webhook.NewServer(webhook.Options{}) } - if options.Controller.WatchProviderClusters == nil { - options.Controller.WatchProviderClusters = ptr.To(options.clusterProvider != nil) + if options.Controller.EngageWithDefaultCluster == nil { + options.Controller.EngageWithDefaultCluster = ptr.To[bool](options.clusterProvider == nil) + options.Controller.EngageWithProviderClusters = ptr.To[bool](options.clusterProvider != nil) } return options diff --git a/pkg/source/source.go b/pkg/source/source.go index 92afdb00dc..c0b9b1d9da 100644 --- a/pkg/source/source.go +++ b/pkg/source/source.go @@ -44,18 +44,21 @@ const ( // * Use Channel for events originating outside the cluster (e.g. GitHub Webhook callback, Polling external urls). // // Users may build their own Source implementations. -type Source = internal.Source +type Source interface { + // Start is internal and should be called only by the Controller to register an EventHandler with the Informer + // to enqueue reconcile.Requests. + Start(context.Context, handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error +} // SyncingSource is a source that needs syncing prior to being usable. The controller // will call its WaitForSync prior to starting workers. -type SyncingSource = internal.SyncingSource - -// DeepCopyableSyncingSource is a source that can be deep copied for a specific cluster. -// It is used in setups with a cluster provider set in the manager. -type DeepCopyableSyncingSource = internal.DeepCopyableSyncingSource +type SyncingSource interface { + Source + WaitForSync(ctx context.Context) error +} // Kind creates a KindSource with the given cache provider. -func Kind(cache cache.Cache, object client.Object) DeepCopyableSyncingSource { +func Kind(cache cache.Cache, object client.Object) SyncingSource { return &internal.Kind{Type: object, Cache: cache} }