From 489224f3cc566f506fa246bb302d6d685fc1d016 Mon Sep 17 00:00:00 2001 From: "Dr. Stefan Schimanski" Date: Wed, 24 Apr 2024 12:08:08 +0200 Subject: [PATCH] SQUASH: add ClusterAwareSource Signed-off-by: Dr. Stefan Schimanski --- pkg/builder/controller.go | 23 +++++++++++++++++------ pkg/controller/multicluster.go | 32 ++++++++++++++++++++++++++++++-- pkg/source/source.go | 8 ++++++++ 3 files changed, 55 insertions(+), 8 deletions(-) diff --git a/pkg/builder/controller.go b/pkg/builder/controller.go index 92504d9d34..7fc3014555 100644 --- a/pkg/builder/controller.go +++ b/pkg/builder/controller.go @@ -58,11 +58,12 @@ var _ controller.ClusterWatcher = &clusterWatcher{} // clusterWatcher sets up watches between a cluster and a controller. type clusterWatcher struct { - ctrl controller.Controller - forInput ForInput - ownsInput []OwnsInput - watchesInput []WatchesInput - globalPredicates []predicate.Predicate + ctrl controller.Controller + forInput ForInput + ownsInput []OwnsInput + watchesInput []WatchesInput + globalPredicates []predicate.Predicate + clusterAwareRawSources []source.ClusterAwareSource } // Builder builds a Controller. @@ -197,8 +198,12 @@ func (blder *Builder) WatchesMetadata(object client.Object, eventHandler handler // // WatchesRawSource does not respect predicates configured through WithEventFilter. func (blder *Builder) WatchesRawSource(src source.Source) *Builder { - blder.rawSources = append(blder.rawSources, src) + if src, ok := src.(source.ClusterAwareSource); ok { + blder.clusterAwareRawSources = append(blder.clusterAwareRawSources, src) + return blder + } + blder.rawSources = append(blder.rawSources, src) return blder } @@ -344,6 +349,12 @@ func (cc *clusterWatcher) Watch(ctx context.Context, cl cluster.Cluster) error { } } + for _, src := range cc.clusterAwareRawSources { + if err := cc.ctrl.Watch(src); err != nil { + return err + } + } + return nil } diff --git a/pkg/controller/multicluster.go b/pkg/controller/multicluster.go index d57b50dea6..3eedccf303 100644 --- a/pkg/controller/multicluster.go +++ b/pkg/controller/multicluster.go @@ -32,6 +32,9 @@ type MultiClusterController interface { Controller } +// MultiClusterOption is a functional option for MultiClusterController. +type MultiClusterOption func(*multiClusterController) + // ClusterWatcher starts watches for a given Cluster. The ctx should be // used to cancel the watch when the Cluster is disengaged. type ClusterWatcher interface { @@ -40,12 +43,25 @@ type ClusterWatcher interface { // NewMultiClusterController creates a new MultiClusterController for the given // controller with the given ClusterWatcher. -func NewMultiClusterController(c Controller, watcher ClusterWatcher) MultiClusterController { - return &multiClusterController{ +func NewMultiClusterController(c Controller, watcher ClusterWatcher, opts ...MultiClusterOption) MultiClusterController { + mcc := &multiClusterController{ Controller: c, watcher: watcher, clusters: map[string]struct{}{}, } + for _, opt := range opts { + opt(mcc) + } + + return mcc +} + +// WithClusterAware adds the given cluster.Aware instances to the MultiClusterController, +// being engaged and disengaged when the clusters are added or removed. +func WithClusterAware(awares ...cluster.Aware) MultiClusterOption { + return func(c *multiClusterController) { + c.awares = append(c.awares, awares...) + } } type multiClusterController struct { @@ -54,6 +70,7 @@ type multiClusterController struct { lock sync.Mutex clusters map[string]struct{} + awares []cluster.Aware } // Engage gets called when the runnable should start operations for the given Cluster. @@ -84,6 +101,17 @@ func (c *multiClusterController) Engage(clusterCtx context.Context, cl cluster.C engaged = append(engaged, ctrl) } + // engage cluster aware instances + for _, aware := range c.awares { + if err := aware.Engage(clusterCtx, cl); err != nil { + if err := disengage(); err != nil { + return err + } + return err + } + engaged = append(engaged, aware) + } + // start watches on the cluster if err := c.watcher.Watch(clusterCtx, cl); err != nil { if err := disengage(); err != nil { diff --git a/pkg/source/source.go b/pkg/source/source.go index 26e53022bf..90b6bf14fd 100644 --- a/pkg/source/source.go +++ b/pkg/source/source.go @@ -25,6 +25,7 @@ import ( "k8s.io/client-go/util/workqueue" "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/cluster" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" internal "sigs.k8s.io/controller-runtime/pkg/internal/source" @@ -54,6 +55,13 @@ type SyncingSource interface { WaitForSync(ctx context.Context) error } +// ClusterAwareSource is a source that can be engaged and disengaged when +// clusters are added or removed from the manager. +type ClusterAwareSource interface { + Source + cluster.Aware +} + // Kind creates a KindSource with the given cache provider. func Kind[T client.Object](cache cache.Cache, object T, handler handler.TypedEventHandler[T], predicates ...predicate.TypedPredicate[T]) SyncingSource { return &internal.Kind[T]{