Skip to content

Commit

Permalink
SQUASH: add ClusterAwareSource
Browse files Browse the repository at this point in the history
Signed-off-by: Dr. Stefan Schimanski <stefan.schimanski@gmail.com>
  • Loading branch information
sttts committed Apr 24, 2024
1 parent fc48477 commit 489224f
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 8 deletions.
23 changes: 17 additions & 6 deletions pkg/builder/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,12 @@ var _ controller.ClusterWatcher = &clusterWatcher{}

// clusterWatcher sets up watches between a cluster and a controller.
type clusterWatcher struct {
ctrl controller.Controller
forInput ForInput
ownsInput []OwnsInput
watchesInput []WatchesInput
globalPredicates []predicate.Predicate
ctrl controller.Controller
forInput ForInput
ownsInput []OwnsInput
watchesInput []WatchesInput
globalPredicates []predicate.Predicate
clusterAwareRawSources []source.ClusterAwareSource
}

// Builder builds a Controller.
Expand Down Expand Up @@ -197,8 +198,12 @@ func (blder *Builder) WatchesMetadata(object client.Object, eventHandler handler
//
// WatchesRawSource does not respect predicates configured through WithEventFilter.
func (blder *Builder) WatchesRawSource(src source.Source) *Builder {
blder.rawSources = append(blder.rawSources, src)
if src, ok := src.(source.ClusterAwareSource); ok {
blder.clusterAwareRawSources = append(blder.clusterAwareRawSources, src)
return blder
}

blder.rawSources = append(blder.rawSources, src)
return blder
}

Expand Down Expand Up @@ -344,6 +349,12 @@ func (cc *clusterWatcher) Watch(ctx context.Context, cl cluster.Cluster) error {
}
}

for _, src := range cc.clusterAwareRawSources {
if err := cc.ctrl.Watch(src); err != nil {
return err
}
}

return nil
}

Expand Down
32 changes: 30 additions & 2 deletions pkg/controller/multicluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ type MultiClusterController interface {
Controller
}

// MultiClusterOption is a functional option for MultiClusterController.
type MultiClusterOption func(*multiClusterController)

// ClusterWatcher starts watches for a given Cluster. The ctx should be
// used to cancel the watch when the Cluster is disengaged.
type ClusterWatcher interface {
Expand All @@ -40,12 +43,25 @@ type ClusterWatcher interface {

// NewMultiClusterController creates a new MultiClusterController for the given
// controller with the given ClusterWatcher.
func NewMultiClusterController(c Controller, watcher ClusterWatcher) MultiClusterController {
return &multiClusterController{
func NewMultiClusterController(c Controller, watcher ClusterWatcher, opts ...MultiClusterOption) MultiClusterController {
mcc := &multiClusterController{
Controller: c,
watcher: watcher,
clusters: map[string]struct{}{},
}
for _, opt := range opts {
opt(mcc)
}

return mcc
}

// WithClusterAware adds the given cluster.Aware instances to the MultiClusterController,
// being engaged and disengaged when the clusters are added or removed.
func WithClusterAware(awares ...cluster.Aware) MultiClusterOption {
return func(c *multiClusterController) {
c.awares = append(c.awares, awares...)
}
}

type multiClusterController struct {
Expand All @@ -54,6 +70,7 @@ type multiClusterController struct {

lock sync.Mutex
clusters map[string]struct{}
awares []cluster.Aware
}

// Engage gets called when the runnable should start operations for the given Cluster.
Expand Down Expand Up @@ -84,6 +101,17 @@ func (c *multiClusterController) Engage(clusterCtx context.Context, cl cluster.C
engaged = append(engaged, ctrl)
}

// engage cluster aware instances
for _, aware := range c.awares {
if err := aware.Engage(clusterCtx, cl); err != nil {
if err := disengage(); err != nil {
return err
}
return err
}
engaged = append(engaged, aware)
}

// start watches on the cluster
if err := c.watcher.Watch(clusterCtx, cl); err != nil {
if err := disengage(); err != nil {
Expand Down
8 changes: 8 additions & 0 deletions pkg/source/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"k8s.io/client-go/util/workqueue"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/cluster"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
internal "sigs.k8s.io/controller-runtime/pkg/internal/source"
Expand Down Expand Up @@ -54,6 +55,13 @@ type SyncingSource interface {
WaitForSync(ctx context.Context) error
}

// ClusterAwareSource is a source that can be engaged and disengaged when
// clusters are added or removed from the manager.
type ClusterAwareSource interface {
Source
cluster.Aware
}

// Kind creates a KindSource with the given cache provider.
func Kind[T client.Object](cache cache.Cache, object T, handler handler.TypedEventHandler[T], predicates ...predicate.TypedPredicate[T]) SyncingSource {
return &internal.Kind[T]{
Expand Down

0 comments on commit 489224f

Please sign in to comment.