Skip to content

Commit

Permalink
SQUASH: either watch default cluster or providers clusters, not both
Browse files Browse the repository at this point in the history
Signed-off-by: Dr. Stefan Schimanski <stefan.schimanski@gmail.com>
  • Loading branch information
sttts committed Mar 25, 2024
1 parent 4549d78 commit 2b98078
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 31 deletions.
51 changes: 36 additions & 15 deletions pkg/builder/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,11 @@ func ControllerManagedBy(m manager.Manager) *Builder {

// ForInput represents the information set by the For method.
type ForInput struct {
object client.Object
predicates []predicate.Predicate
objectProjection objectProjection
err error
object client.Object
forceDefaultCluster bool // in cluster-aware mode, force the object to be watched in the default cluster
predicates []predicate.Predicate
objectProjection objectProjection
err error
}

// For defines the type of Object being *reconciled*, and configures the ControllerManagedBy to respond to create / delete /
Expand All @@ -100,10 +101,11 @@ func (blder *Builder) For(object client.Object, opts ...ForOption) *Builder {

// OwnsInput represents the information set by Owns method.
type OwnsInput struct {
matchEveryOwner bool
object client.Object
predicates []predicate.Predicate
objectProjection objectProjection
matchEveryOwner bool
object client.Object
forceDefaultCluster bool // in cluster-aware mode, force the object to be watched in the default cluster
predicates []predicate.Predicate
objectProjection objectProjection
}

// Owns defines types of Objects being *generated* by the ControllerManagedBy, and configures the ControllerManagedBy to respond to
Expand All @@ -126,10 +128,11 @@ func (blder *Builder) Owns(object client.Object, opts ...OwnsOption) *Builder {

// WatchesInput represents the information set by Watches method.
type WatchesInput struct {
src source.Source
eventHandler handler.EventHandler
predicates []predicate.Predicate
objectProjection objectProjection
src source.Source
forceDefaultCluster bool // in cluster-aware mode, force the object to be watched in the default cluster
eventHandler handler.EventHandler
predicates []predicate.Predicate
objectProjection objectProjection
}

// Watches defines the type of Object to watch, and configures the ControllerManagedBy to respond to create / delete /
Expand Down Expand Up @@ -284,7 +287,10 @@ func (blder *Builder) doWatch() error {
if err != nil {
return err
}
src := source.Kind(blder.cluster.GetCache(), obj)
src := clusterAwareSource{
Source: source.Kind(blder.cluster.GetCache(), obj),
forceDefaultCluster: blder.forInput.forceDefaultCluster,
}
hdler := &handler.EnqueueRequestForObject{}
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
allPredicates = append(allPredicates, blder.forInput.predicates...)
Expand All @@ -302,7 +308,10 @@ func (blder *Builder) doWatch() error {
if err != nil {
return err
}
src := source.Kind(blder.cluster.GetCache(), obj)
src := clusterAwareSource{
Source: source.Kind(blder.cluster.GetCache(), obj),
forceDefaultCluster: own.forceDefaultCluster,
}
opts := []handler.OwnerOption{}
if !own.matchEveryOwner {
opts = append(opts, handler.OnlyControllerOwner())
Expand Down Expand Up @@ -342,7 +351,10 @@ func (blder *Builder) doWatch() error {
}
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
allPredicates = append(allPredicates, w.predicates...)
if err := blder.ctrl.Watch(w.src, w.eventHandler, allPredicates...); err != nil {
if err := blder.ctrl.Watch(
clusterAwareSource{Source: w.src, forceDefaultCluster: w.forceDefaultCluster},
w.eventHandler, allPredicates...,
); err != nil {
return err
}
}
Expand Down Expand Up @@ -431,3 +443,12 @@ func (blder *Builder) doController(r reconcile.Reconciler) error {
blder.ctrl, err = newController(controllerName, blder.mgr, ctrlOptions)
return err
}

type clusterAwareSource struct {
source.Source
forceDefaultCluster bool
}

func (s clusterAwareSource) ForceDefaultCluster() bool {
return s.forceDefaultCluster
}
18 changes: 18 additions & 0 deletions pkg/builder/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,3 +154,21 @@ type matchEveryOwner struct{}
func (o matchEveryOwner) ApplyToOwns(opts *OwnsInput) {
opts.matchEveryOwner = true
}

// InDefaultCluster configures the input to only watch objects on the default
// cluster, even if a cluster provider is set.
var InDefaultCluster = inDefaultCluster{}

type inDefaultCluster struct{}

func (f inDefaultCluster) ApplyToFor(opts *ForInput) {
opts.forceDefaultCluster = true
}

func (f inDefaultCluster) ApplyToOwns(opts *OwnsInput) {
opts.forceDefaultCluster = true
}

func (f inDefaultCluster) ApplyToWatches(opts *WatchesInput) {
opts.forceDefaultCluster = true
}
5 changes: 5 additions & 0 deletions pkg/config/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,9 @@ type Controller struct {
// NeedLeaderElection indicates whether the controller needs to use leader election.
// Defaults to true, which means the controller will use leader election.
NeedLeaderElection *bool

// WatchProviderClusters indicates whether the controller should
// only watch clusters that are engaged by the cluster provider. Defaults to false
// if no provider is set, and to true if a provider is set.
WatchProviderClusters *bool
}
14 changes: 14 additions & 0 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/go-logr/logr"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"k8s.io/utils/pointer"

"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/internal/controller"
Expand Down Expand Up @@ -62,6 +63,11 @@ type Options struct {
// LogConstructor is used to construct a logger used for this controller and passed
// to each reconciliation via the context field.
LogConstructor func(request *reconcile.Request) logr.Logger

// WatchProviderClusters indicates whether the controller should
// only watch clusters that are engaged by the cluster provider. Defaults to false
// if no provider is set, and to true if a provider is set.
WatchProviderClusters *bool
}

// Controller implements a Kubernetes API. A Controller manages a work queue fed reconcile.Requests
Expand Down Expand Up @@ -155,6 +161,13 @@ func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller
options.NeedLeaderElection = mgr.GetControllerOptions().NeedLeaderElection
}

if options.WatchProviderClusters == nil {
options.WatchProviderClusters = mgr.GetControllerOptions().WatchProviderClusters
if options.WatchProviderClusters == nil { // should never happen
options.WatchProviderClusters = pointer.Bool(false)
}
}

// Create controller with dependencies set
return &controller.Controller{
Do: options.Reconciler,
Expand All @@ -169,6 +182,7 @@ func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller
LogConstructor: options.LogConstructor,
RecoverPanic: options.RecoverPanic,
LeaderElected: options.NeedLeaderElection,
WatchProviderClusters: *options.WatchProviderClusters,
}, nil
}

Expand Down
44 changes: 28 additions & 16 deletions pkg/internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ type Controller struct {

// clustersByName is used to manage the fleet of clusters.
clustersByName map[string]*clusterDescription
// WatchProviderClusters indicates whether the controller should
// only watch clusters that are engaged by the cluster provider. Defaults to false
// if no provider is set, and to true if a provider is set.
WatchProviderClusters bool

// LogConstructor is used to construct a logger to then log messages to users during reconciliation,
// or for example when a watch is started.
Expand All @@ -100,6 +104,13 @@ type Controller struct {
LeaderElected *bool
}

// ClusterAwareSource is a source that knows whether to watch in the default cluster
// in the clusters engaged by the cluster provider.
type ClusterAwareSource interface {
source.Source
ForceDefaultCluster() bool
}

type clusterDescription struct {
cluster.Cluster
ctx context.Context
Expand Down Expand Up @@ -174,8 +185,23 @@ func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prc
watchDesc := &watchDescription{src: src, handler: evthdler, predicates: prct}

// If the source is cluster aware, store it in a separate list.
if watchDesc.IsClusterAware() {
_, forceDefaultClsuter := src.(ClusterAwareSource)
if c.WatchProviderClusters && !forceDefaultClsuter {
if !watchDesc.IsClusterAware() {
return fmt.Errorf("source %s is not cluster aware, but WatchProviderClusters is true", src)
}
c.clusterAwareWatches = append(c.clusterAwareWatches, watchDesc)

// If the watch is cluster aware, start it for all the clusters
// This covers the case where a Watch was added later to the controller.
var errs []error
for _, cldesc := range c.clustersByName {
if err := c.startClusterAwareWatchLocked(cldesc, watchDesc); err != nil {
errs = append(errs, err)
}
}

return kerrors.NewAggregate(errs)
}

// Controller hasn't started yet, store the watches locally and return.
Expand All @@ -186,22 +212,8 @@ func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prc
return nil
}

var errs []error
c.LogConstructor(nil).Info("Starting EventSource", "source", src)
if err := src.Start(c.ctx, evthdler, c.Queue, prct...); err != nil {
errs = append(errs, err)
}

// If the watch is cluster aware, start it for all the clusters
// This covers the case where a Watch was added later to the controller.
if watchDesc.IsClusterAware() {
for _, cldesc := range c.clustersByName {
if err := c.startClusterAwareWatchLocked(cldesc, watchDesc); err != nil {
errs = append(errs, err)
}
}
}
return kerrors.NewAggregate(errs)
return src.Start(c.ctx, evthdler, c.Queue, prct...)
}

// Engage implements cluster.AwareRunnable.
Expand Down
4 changes: 4 additions & 0 deletions pkg/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -686,5 +686,9 @@ func setOptionsDefaults(options Options) Options {
options.WebhookServer = webhook.NewServer(webhook.Options{})
}

if options.Controller.WatchProviderClusters == nil {
options.Controller.WatchProviderClusters = pointer.Bool(options.clusterProvider != nil)
}

return options
}

0 comments on commit 2b98078

Please sign in to comment.