Skip to content

Commit

Permalink
SQUASH: flatten DeepCopyFor types
Browse files Browse the repository at this point in the history
Signed-off-by: Dr. Stefan Schimanski <stefan.schimanski@gmail.com>
  • Loading branch information
sttts committed Mar 25, 2024
1 parent 2b98078 commit a37d78a
Show file tree
Hide file tree
Showing 9 changed files with 69 additions and 72 deletions.
20 changes: 11 additions & 9 deletions pkg/builder/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,8 +288,8 @@ func (blder *Builder) doWatch() error {
return err
}
src := clusterAwareSource{
Source: source.Kind(blder.cluster.GetCache(), obj),
forceDefaultCluster: blder.forInput.forceDefaultCluster,
DeepCopyableSyncingSource: source.Kind(blder.cluster.GetCache(), obj),
forceDefaultCluster: blder.forInput.forceDefaultCluster,
}
hdler := &handler.EnqueueRequestForObject{}
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
Expand All @@ -309,8 +309,8 @@ func (blder *Builder) doWatch() error {
return err
}
src := clusterAwareSource{
Source: source.Kind(blder.cluster.GetCache(), obj),
forceDefaultCluster: own.forceDefaultCluster,
DeepCopyableSyncingSource: source.Kind(blder.cluster.GetCache(), obj),
forceDefaultCluster: own.forceDefaultCluster,
}
opts := []handler.OwnerOption{}
if !own.matchEveryOwner {
Expand All @@ -334,12 +334,17 @@ func (blder *Builder) doWatch() error {
}
for _, w := range blder.watchesInput {
// If the source of this watch is of type Kind, project it.
src := w.src
if srcKind, ok := w.src.(*internalsource.Kind); ok {
typeForSrc, err := blder.project(srcKind.Type, w.objectProjection)
if err != nil {
return err
}
srcKind.Type = typeForSrc
src = clusterAwareSource{
DeepCopyableSyncingSource: srcKind,
forceDefaultCluster: w.forceDefaultCluster,
}
} else if !ok {
// If we're building a cluster-aware controller, raw watches are not allowed
// given that the cache cannot be validated to be coming from the same cluster.
Expand All @@ -351,10 +356,7 @@ func (blder *Builder) doWatch() error {
}
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
allPredicates = append(allPredicates, w.predicates...)
if err := blder.ctrl.Watch(
clusterAwareSource{Source: w.src, forceDefaultCluster: w.forceDefaultCluster},
w.eventHandler, allPredicates...,
); err != nil {
if err := blder.ctrl.Watch(src, w.eventHandler, allPredicates...); err != nil {
return err
}
}
Expand Down Expand Up @@ -445,7 +447,7 @@ func (blder *Builder) doController(r reconcile.Reconciler) error {
}

type clusterAwareSource struct {
source.Source
source.DeepCopyableSyncingSource
forceDefaultCluster bool
}

Expand Down
7 changes: 0 additions & 7 deletions pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,6 @@ type AwareRunnable interface {
Disengage(context.Context, Cluster) error
}

// AwareDeepCopy is an interface that can be implemented by types
// that are cluster-aware, and can return a copy of themselves
// for a given cluster.
type AwareDeepCopy[T any] interface {
DeepCopyFor(Cluster) T
}

// ByNameGetterFunc is a function that returns a cluster for a given identifying cluster name.
type ByNameGetterFunc func(ctx context.Context, clusterName string) (Cluster, error)

Expand Down
2 changes: 1 addition & 1 deletion pkg/handler/enqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,6 @@ func (e *EnqueueRequestForObject) Generic(ctx context.Context, evt event.Generic
}

// DeepCopyFor implements cluster.AwareDeepCopy[EventHandler].
func (e *EnqueueRequestForObject) DeepCopyFor(c cluster.Cluster) EventHandler {
func (e *EnqueueRequestForObject) DeepCopyFor(c cluster.Cluster) DeepCopyableEventHandler {
return &EnqueueRequestForObject{cluster: c}
}
2 changes: 1 addition & 1 deletion pkg/handler/enqueue_mapped.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (e *enqueueRequestsFromMapFunc) mapAndEnqueue(ctx context.Context, q workqu
}
}

func (e *enqueueRequestsFromMapFunc) DeepCopyFor(c cluster.Cluster) EventHandler {
func (e *enqueueRequestsFromMapFunc) DeepCopyFor(c cluster.Cluster) DeepCopyableEventHandler {
return &enqueueRequestsFromMapFunc{
cluster: c,
toRequests: e.toRequests,
Expand Down
2 changes: 1 addition & 1 deletion pkg/handler/enqueue_owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func (e *enqueueRequestForOwner) getOwnersReferences(object metav1.Object) []met
return nil
}

func (e *enqueueRequestForOwner) DeepCopyFor(c cluster.Cluster) EventHandler {
func (e *enqueueRequestForOwner) DeepCopyFor(c cluster.Cluster) DeepCopyableEventHandler {
copy := &enqueueRequestForOwner{
cluster: c,
ownerType: e.ownerType,
Expand Down
8 changes: 8 additions & 0 deletions pkg/handler/eventhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"

"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/cluster"
"sigs.k8s.io/controller-runtime/pkg/event"
)

Expand Down Expand Up @@ -56,6 +57,13 @@ type EventHandler interface {
Generic(context.Context, event.GenericEvent, workqueue.RateLimitingInterface)
}

// DeepCopyableEventHandler is an EventHandler that can be deep copied for use
// in a different Cluster. This is used if a cluster provider is set in a manager.
type DeepCopyableEventHandler interface {
EventHandler
DeepCopyFor(c cluster.Cluster) DeepCopyableEventHandler
}

var _ EventHandler = Funcs{}

// Funcs implements EventHandler.
Expand Down
67 changes: 25 additions & 42 deletions pkg/internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/cluster"
"sigs.k8s.io/controller-runtime/pkg/handler"
ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/internal/controller/metrics"
internal "sigs.k8s.io/controller-runtime/pkg/internal/source"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
Expand Down Expand Up @@ -82,7 +81,7 @@ type Controller struct {
startWatches []*watchDescription

// clusterAwareWatches maintains a list of cluster aware sources, handlers, and predicates to start when the controller is started.
clusterAwareWatches []*watchDescription
clusterAwareWatches []*deepcopyableWatchDescription

// clustersByName is used to manage the fleet of clusters.
clustersByName map[string]*clusterDescription
Expand Down Expand Up @@ -124,37 +123,13 @@ type watchDescription struct {
predicates []predicate.Predicate
}

func (w *watchDescription) IsClusterAware() bool {
if _, ok := w.src.(cluster.AwareDeepCopy[*internal.Kind]); !ok {
if _, ok := w.src.(cluster.AwareDeepCopy[source.Source]); !ok {
return false
}
}
if _, ok := w.handler.(cluster.AwareDeepCopy[handler.EventHandler]); !ok {
return false
}
return true
}

func (w *watchDescription) DeepCopyFor(c cluster.Cluster) *watchDescription {
copy := &watchDescription{
predicates: w.predicates,
}
if clusterAwareSource, ok := w.src.(cluster.AwareDeepCopy[*internal.Kind]); ok {
copy.src = clusterAwareSource.DeepCopyFor(c)
} else if clusterAwareSource, ok := w.src.(cluster.AwareDeepCopy[source.Source]); ok {
copy.src = clusterAwareSource.DeepCopyFor(c)
} else {
return nil
}

if clusterAwareHandler, ok := w.handler.(cluster.AwareDeepCopy[handler.EventHandler]); ok {
copy.handler = clusterAwareHandler.DeepCopyFor(c)
} else {
return nil
}

return copy
// deepcopyableWatchDescription contains all the information necessary to start
// a watch. In addition to watchDescription it also contains the DeepCopyFor
// method to adapt it to a different cluster.
type deepcopyableWatchDescription struct {
src source.DeepCopyableSyncingSource
handler handler.DeepCopyableEventHandler
predicates []predicate.Predicate
}

// Reconcile implements reconcile.Reconciler.
Expand Down Expand Up @@ -182,14 +157,22 @@ func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prc
c.mu.Lock()
defer c.mu.Unlock()

watchDesc := &watchDescription{src: src, handler: evthdler, predicates: prct}

// If the source is cluster aware, store it in a separate list.
_, forceDefaultClsuter := src.(ClusterAwareSource)
if c.WatchProviderClusters && !forceDefaultClsuter {
if !watchDesc.IsClusterAware() {
return fmt.Errorf("source %s is not cluster aware, but WatchProviderClusters is true", src)
var forceDefaultCluster bool
if src, ok := src.(ClusterAwareSource); ok {
forceDefaultCluster = src.ForceDefaultCluster()
}
if c.WatchProviderClusters && !forceDefaultCluster {
src, ok := src.(source.DeepCopyableSyncingSource)
if !ok {
return fmt.Errorf("source %T is not cluster aware, but WatchProviderClusters is true", src)
}
evthdler, ok := evthdler.(handler.DeepCopyableEventHandler)
if !ok {
return fmt.Errorf("handler %T is not cluster aware, but WatchProviderClusters is true", evthdler)
}

watchDesc := &deepcopyableWatchDescription{src: src, handler: evthdler, predicates: prct}
c.clusterAwareWatches = append(c.clusterAwareWatches, watchDesc)

// If the watch is cluster aware, start it for all the clusters
Expand All @@ -208,7 +191,7 @@ func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prc
//
// These watches are going to be held on the controller struct until the manager or user calls Start(...).
if !c.Started {
c.startWatches = append(c.startWatches, watchDesc)
c.startWatches = append(c.startWatches, &watchDescription{src: src, handler: evthdler, predicates: prct})
return nil
}

Expand Down Expand Up @@ -260,8 +243,8 @@ func (c *Controller) Disengage(ctx context.Context, cluster cluster.Cluster) err
return nil
}

func (c *Controller) startClusterAwareWatchLocked(cldesc *clusterDescription, watchDesc *watchDescription) error {
watch := watchDesc.DeepCopyFor(cldesc)
func (c *Controller) startClusterAwareWatchLocked(cldesc *clusterDescription, watchDesc *deepcopyableWatchDescription) error {
watch := &deepcopyableWatchDescription{src: watchDesc.src.DeepCopyFor(cldesc.Cluster), handler: watchDesc.handler.DeepCopyFor(cldesc.Cluster), predicates: watchDesc.predicates}
if watch == nil {
return nil
}
Expand Down
16 changes: 15 additions & 1 deletion pkg/internal/source/kind.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,20 @@ import (
"sigs.k8s.io/controller-runtime/pkg/predicate"
)

type Source interface {
Start(context.Context, handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error
}

type SyncingSource interface {
Source
WaitForSync(ctx context.Context) error
}

type DeepCopyableSyncingSource interface {
SyncingSource
DeepCopyFor(cluster cluster.Cluster) DeepCopyableSyncingSource
}

// Kind is used to provide a source of events originating inside the cluster from Watches (e.g. Pod Create).
type Kind struct {
// Type is the type of object to watch. e.g. &v1.Pod{}
Expand Down Expand Up @@ -118,7 +132,7 @@ func (ks *Kind) WaitForSync(ctx context.Context) error {
}

// DeepCopyFor implements cluster.AwareDeepCopy[Source].
func (ks *Kind) DeepCopyFor(c cluster.Cluster) *Kind {
func (ks *Kind) DeepCopyFor(c cluster.Cluster) DeepCopyableSyncingSource {
return &Kind{
Type: ks.Type,
Cache: c.GetCache(),
Expand Down
17 changes: 7 additions & 10 deletions pkg/source/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,21 +44,18 @@ const (
// * Use Channel for events originating outside the cluster (eh.g. GitHub Webhook callback, Polling external urls).
//
// Users may build their own Source implementations.
type Source interface {
// Start is internal and should be called only by the Controller to register an EventHandler with the Informer
// to enqueue reconcile.Requests.
Start(context.Context, handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error
}
type Source = internal.Source

// SyncingSource is a source that needs syncing prior to being usable. The controller
// will call its WaitForSync prior to starting workers.
type SyncingSource interface {
Source
WaitForSync(ctx context.Context) error
}
type SyncingSource = internal.SyncingSource

// DeepCopyableSyncingSource is a source that can be deep copied for a specific cluster.
// It is used in setups with a cluster provider set in the manager.
type DeepCopyableSyncingSource = internal.DeepCopyableSyncingSource

// Kind creates a KindSource with the given cache provider.
func Kind(cache cache.Cache, object client.Object) SyncingSource {
func Kind(cache cache.Cache, object client.Object) DeepCopyableSyncingSource {
return &internal.Kind{Type: object, Cache: cache}
}

Expand Down

0 comments on commit a37d78a

Please sign in to comment.