diff --git a/go.mod b/go.mod index 589aa4da08..1bb11eb618 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,6 @@ module sigs.k8s.io/controller-runtime go 1.19 require ( - github.com/davecgh/go-spew v1.1.1 github.com/evanphx/json-patch/v5 v5.6.0 github.com/fsnotify/fsnotify v1.6.0 github.com/go-logr/logr v1.2.3 @@ -32,6 +31,7 @@ require ( require ( github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect github.com/emicklei/go-restful/v3 v3.9.0 // indirect github.com/evanphx/json-patch v4.12.0+incompatible // indirect github.com/go-openapi/jsonpointer v0.19.5 // indirect diff --git a/pkg/builder/controller.go b/pkg/builder/controller.go index 8d848d6926..755976aacb 100644 --- a/pkg/builder/controller.go +++ b/pkg/builder/controller.go @@ -68,14 +68,6 @@ type Builder struct { name string } -func (blder *Builder) clone() *Builder { - clone := *blder - clone.cluster = nil - clone.logicalName = "" - clone.ctrl = nil - return &clone -} - // ControllerManagedBy returns a new controller builder that will be started by the provided Manager. func ControllerManagedBy(m manager.Manager) *Builder { return &Builder{cluster: m, mgr: m} @@ -246,19 +238,6 @@ func (blder *Builder) Build(r reconcile.Reconciler) (controller.Controller, erro if blder.forInput.err != nil { return nil, blder.forInput.err } - - if err := blder.mgr.AddLogicalRunnableBuilder(func(name logical.Name, cl cluster.Cluster) (manager.Runnable, error) { - cloned := blder.clone() - cloned.cluster = cl - cloned.logicalName = name - if err := cloned.do(r); err != nil { - return nil, err - } - return cloned.ctrl, nil - }); err != nil { - return nil, err - } - if err := blder.do(r); err != nil { return nil, err } diff --git a/pkg/builder/controller_test.go b/pkg/builder/controller_test.go index 9676da0a0c..b8606652a4 100644 --- a/pkg/builder/controller_test.go +++ b/pkg/builder/controller_test.go @@ -22,7 +22,6 @@ import ( "strings" "sync/atomic" - "github.com/davecgh/go-spew/spew" "github.com/go-logr/logr" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" @@ -571,14 +570,36 @@ var _ = Describe("application", func() { mgr, err := manager.New(cfg, manager.Options{}.WithExperimentalLogicalAdapter(adapter)) Expect(err).NotTo(HaveOccurred()) - ch1 := make(chan reconcile.Request) - ch2 := make(chan reconcile.Request) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + By("Starting the manager") + go func() { + defer GinkgoRecover() + Expect(mgr.Start(ctx)).NotTo(HaveOccurred()) + }() + + cluster1, err := mgr.GetCluster(ctx, "cluster1") + Expect(err).NotTo(HaveOccurred()) + + By("Creating a custom namespace") + ns := &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "test-multi-cluster-", + }, + } + Expect(cluster1.GetClient().Create(ctx, ns)).To(Succeed()) + + ch1 := make(chan reconcile.Request, 1) + ch2 := make(chan reconcile.Request, 1) Expect( ControllerManagedBy(mgr). For(&appsv1.Deployment{}). Owns(&appsv1.ReplicaSet{}). Complete(reconcile.Func(func(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { - spew.Dump(req) + if req.Namespace != ns.Name { + return reconcile.Result{}, nil + } + defer GinkgoRecover() switch req.Cluster { case "cluster1": @@ -592,19 +613,11 @@ var _ = Describe("application", func() { })), ).To(Succeed()) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - By("Starting the manager") - go func() { - defer GinkgoRecover() - Expect(mgr.Start(ctx)).NotTo(HaveOccurred()) - }() - By("Creating a deployment") dep := &appsv1.Deployment{ ObjectMeta: metav1.ObjectMeta{ Name: "deploy-multi-cluster", - Namespace: "default", + Namespace: ns.Name, }, Spec: appsv1.DeploymentSpec{ Selector: &metav1.LabelSelector{ @@ -623,8 +636,6 @@ var _ = Describe("application", func() { }, }, } - cluster1, err := mgr.GetCluster(ctx, "cluster1") - Expect(err).NotTo(HaveOccurred()) Expect(cluster1.GetClient().Create(ctx, dep)).To(Succeed()) By("Waiting for the Deployment Reconcile on both clusters") @@ -647,7 +658,7 @@ var _ = Describe("application", func() { // Expect a Reconcile when an Owned object is managedObjects. rs := &appsv1.ReplicaSet{ ObjectMeta: metav1.ObjectMeta{ - Namespace: "default", + Namespace: dep.Namespace, Name: "rs-multi-cluster", Labels: dep.Spec.Selector.MatchLabels, OwnerReferences: []metav1.OwnerReference{ diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 7577e02980..3e69f82601 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -38,11 +38,43 @@ import ( intrec "sigs.k8s.io/controller-runtime/pkg/internal/recorder" ) +// AwareRunnable is an interface that can be implemented by runnable types +// that are cluster-aware. +type AwareRunnable interface { + // Engage gets called when the runnable should start operations for the given Cluster. + // The given context is tied to the Cluster's lifecycle and will be cancelled when the + // Cluster is removed or an error occurs. + // + // Implementers should return an error if they cannot start operations for the given Cluster, + // and should ensure this operation is re-entrant and non-blocking. + // + // \_________________|)____.---'--`---.____ + // || \----.________.----/ + // || / / `--' + // __||____/ /_ + // |___ \ + // `--------' + Engage(context.Context, Cluster) error + + // Disengage gets called when the runnable should stop operations for the given Cluster. + Disengage(context.Context, Cluster) error +} + +// AwareDeepCopy is an interface that can be implemented by types +// that are cluster-aware, and can return a copy of themselves +// for a given cluster. +type AwareDeepCopy[T any] interface { + DeepCopyFor(Cluster) T +} + // LogicalGetterFunc is a function that returns a cluster for a given logical cluster name. type LogicalGetterFunc func(context.Context, logical.Name) (Cluster, error) // Cluster provides various methods to interact with a cluster. type Cluster interface { + // Name returns the unique logical name of the cluster. + Name() logical.Name + // GetHTTPClient returns an HTTP client that can be used to talk to the apiserver GetHTTPClient() *http.Client @@ -81,6 +113,9 @@ type Cluster interface { // Options are the possible options that can be configured for a Cluster. type Options struct { + // Name is the unique name of the cluster. + Name logical.Name + // Scheme is the scheme used to resolve runtime.Objects to GroupVersionKinds / Resources // Defaults to the kubernetes/client-go scheme.Scheme, but it's almost always better // idea to pass your own scheme in. See the documentation in pkg/scheme for more information. @@ -279,6 +314,7 @@ func New(config *rest.Config, opts ...Option) (Cluster, error) { } return &cluster{ + name: options.Name, config: config, httpClient: options.HTTPClient, scheme: options.Scheme, @@ -347,3 +383,13 @@ func setOptionsDefaults(options Options, config *rest.Config) (Options, error) { return options, nil } + +// WithName sets the name of the cluster. +func WithName(name logical.Name) Option { + return func(o *Options) { + if o.Name != "" { + panic("cluster name cannot be set more than once") + } + o.Name = name + } +} diff --git a/pkg/cluster/internal.go b/pkg/cluster/internal.go index 2742764231..c30afab859 100644 --- a/pkg/cluster/internal.go +++ b/pkg/cluster/internal.go @@ -29,9 +29,12 @@ import ( "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" intrec "sigs.k8s.io/controller-runtime/pkg/internal/recorder" + "sigs.k8s.io/logical-cluster" ) type cluster struct { + name logical.Name + // config is the rest.config used to talk to the apiserver. Required. config *rest.Config @@ -59,6 +62,10 @@ type cluster struct { logger logr.Logger } +func (c *cluster) Name() logical.Name { + return c.name +} + func (c *cluster) GetConfig() *rest.Config { return c.config } diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 205f0ac99b..eceec45f73 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -138,8 +138,7 @@ func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller // Create controller with dependencies set return &controller.Controller{ - Cluster: options.LogicalCluster, - Do: options.Reconciler, + Do: options.Reconciler, MakeQueue: func() workqueue.RateLimitingInterface { return workqueue.NewNamedRateLimitingQueue(options.RateLimiter, name) }, diff --git a/pkg/handler/enqueue.go b/pkg/handler/enqueue.go index 348414b0d8..a8d7726490 100644 --- a/pkg/handler/enqueue.go +++ b/pkg/handler/enqueue.go @@ -21,6 +21,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/util/workqueue" + "sigs.k8s.io/controller-runtime/pkg/cluster" "sigs.k8s.io/controller-runtime/pkg/event" logf "sigs.k8s.io/controller-runtime/pkg/internal/log" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -36,7 +37,9 @@ var _ EventHandler = &EnqueueRequestForObject{} // EnqueueRequestForObject enqueues a Request containing the Name and Namespace of the object that is the source of the Event. // (e.g. the created / deleted / updated objects Name and Namespace). handler.EnqueueRequestForObject is used by almost all // Controllers that have associated Resources (e.g. CRDs) to reconcile the associated Resource. -type EnqueueRequestForObject struct{} +type EnqueueRequestForObject struct { + cluster cluster.Cluster +} // Create implements EventHandler. func (e *EnqueueRequestForObject) Create(ctx context.Context, evt event.CreateEvent, q workqueue.RateLimitingInterface) { @@ -44,8 +47,12 @@ func (e *EnqueueRequestForObject) Create(ctx context.Context, evt event.CreateEv enqueueLog.Error(nil, "CreateEvent received with no metadata", "event", evt) return } + var logicalClusterName logical.Name + if e.cluster != nil { + logicalClusterName = e.cluster.Name() + } q.Add(reconcile.Request{ - Cluster: logical.FromContext(ctx), + Cluster: logicalClusterName, NamespacedName: types.NamespacedName{ Name: evt.Object.GetName(), Namespace: evt.Object.GetNamespace(), @@ -55,12 +62,15 @@ func (e *EnqueueRequestForObject) Create(ctx context.Context, evt event.CreateEv // Update implements EventHandler. func (e *EnqueueRequestForObject) Update(ctx context.Context, evt event.UpdateEvent, q workqueue.RateLimitingInterface) { - logicalName := logical.FromContext(ctx) + var logicalClusterName logical.Name + if e.cluster != nil { + logicalClusterName = e.cluster.Name() + } switch { case evt.ObjectNew != nil: q.Add(reconcile.Request{ - Cluster: logicalName, + Cluster: logicalClusterName, NamespacedName: types.NamespacedName{ Name: evt.ObjectNew.GetName(), Namespace: evt.ObjectNew.GetNamespace(), @@ -68,7 +78,7 @@ func (e *EnqueueRequestForObject) Update(ctx context.Context, evt event.UpdateEv }) case evt.ObjectOld != nil: q.Add(reconcile.Request{ - Cluster: logicalName, + Cluster: logicalClusterName, NamespacedName: types.NamespacedName{ Name: evt.ObjectOld.GetName(), Namespace: evt.ObjectOld.GetNamespace(), @@ -85,8 +95,12 @@ func (e *EnqueueRequestForObject) Delete(ctx context.Context, evt event.DeleteEv enqueueLog.Error(nil, "DeleteEvent received with no metadata", "event", evt) return } + var logicalClusterName logical.Name + if e.cluster != nil { + logicalClusterName = e.cluster.Name() + } q.Add(reconcile.Request{ - Cluster: logical.FromContext(ctx), + Cluster: logicalClusterName, NamespacedName: types.NamespacedName{ Name: evt.Object.GetName(), Namespace: evt.Object.GetNamespace(), @@ -100,11 +114,20 @@ func (e *EnqueueRequestForObject) Generic(ctx context.Context, evt event.Generic enqueueLog.Error(nil, "GenericEvent received with no metadata", "event", evt) return } + var logicalClusterName logical.Name + if e.cluster != nil { + logicalClusterName = e.cluster.Name() + } q.Add(reconcile.Request{ - Cluster: logical.FromContext(ctx), + Cluster: logicalClusterName, NamespacedName: types.NamespacedName{ Name: evt.Object.GetName(), Namespace: evt.Object.GetNamespace(), }, }) } + +// DeepCopyFor implements cluster.AwareDeepCopy[EventHandler]. +func (e *EnqueueRequestForObject) DeepCopyFor(c cluster.Cluster) EventHandler { + return &EnqueueRequestForObject{cluster: c} +} diff --git a/pkg/handler/enqueue_mapped.go b/pkg/handler/enqueue_mapped.go index 59d1150059..2b63e66fd0 100644 --- a/pkg/handler/enqueue_mapped.go +++ b/pkg/handler/enqueue_mapped.go @@ -21,9 +21,9 @@ import ( "k8s.io/client-go/util/workqueue" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/cluster" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/reconcile" - "sigs.k8s.io/logical-cluster" ) // MapFunc is the signature required for enqueueing requests from a generic function. @@ -49,6 +49,8 @@ func EnqueueRequestsFromMapFunc(fn MapFunc) EventHandler { var _ EventHandler = &enqueueRequestsFromMapFunc{} type enqueueRequestsFromMapFunc struct { + cluster cluster.Cluster + // Mapper transforms the argument into a slice of keys to be reconciled toRequests MapFunc } @@ -85,11 +87,18 @@ func (e *enqueueRequestsFromMapFunc) mapAndEnqueue(ctx context.Context, q workqu continue } // If the request doesn't specify a cluster, use the cluster from the context. - if req.Cluster == "" { - req.Cluster = logical.FromContext(ctx) + if req.Cluster == "" && e.cluster != nil { + req.Cluster = e.cluster.Name() } // Enqueue the request and track it. q.Add(req) reqs[req] = empty{} } } + +func (e *enqueueRequestsFromMapFunc) DeepCopyFor(c cluster.Cluster) EventHandler { + return &enqueueRequestsFromMapFunc{ + cluster: c, + toRequests: e.toRequests, + } +} diff --git a/pkg/handler/enqueue_owner.go b/pkg/handler/enqueue_owner.go index 7f506a49be..a12ce45d37 100644 --- a/pkg/handler/enqueue_owner.go +++ b/pkg/handler/enqueue_owner.go @@ -27,10 +27,10 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/util/workqueue" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/cluster" "sigs.k8s.io/controller-runtime/pkg/event" logf "sigs.k8s.io/controller-runtime/pkg/internal/log" "sigs.k8s.io/controller-runtime/pkg/reconcile" - "sigs.k8s.io/logical-cluster" ) var _ EventHandler = &enqueueRequestForOwner{} @@ -70,8 +70,10 @@ func OnlyControllerOwner() OwnerOption { } type enqueueRequestForOwner struct { + cluster cluster.Cluster + // ownerType is the type of the Owner object to look for in OwnerReferences. Only Group and Kind are compared. - ownerType runtime.Object + ownerType client.Object // isController if set will only look at the first OwnerReference with Controller: true. isController bool @@ -142,7 +144,7 @@ func (e *enqueueRequestForOwner) parseOwnerTypeGroupKind(scheme *runtime.Scheme) // getOwnerReconcileRequest looks at object and builds a map of reconcile.Request to reconcile // owners of object that match e.OwnerType. -func (e *enqueueRequestForOwner) getOwnerReconcileRequest(ctx context.Context, object metav1.Object, result map[reconcile.Request]empty) { +func (e *enqueueRequestForOwner) getOwnerReconcileRequest(_ context.Context, object metav1.Object, result map[reconcile.Request]empty) { // Iterate through the OwnerReferences looking for a match on Group and Kind against what was requested // by the user for _, ref := range e.getOwnersReferences(object) { @@ -161,11 +163,13 @@ func (e *enqueueRequestForOwner) getOwnerReconcileRequest(ctx context.Context, o if ref.Kind == e.groupKind.Kind && refGV.Group == e.groupKind.Group { // Match found - add a Request for the object referred to in the OwnerReference request := reconcile.Request{ - Cluster: logical.FromContext(ctx), NamespacedName: types.NamespacedName{ Name: ref.Name, }, } + if e.cluster != nil { + request.Cluster = e.cluster.Name() + } // if owner is not namespaced then we should not set the namespace mapping, err := e.mapper.RESTMapping(e.groupKind, refGV.Version) @@ -201,3 +205,16 @@ func (e *enqueueRequestForOwner) getOwnersReferences(object metav1.Object) []met // No Controller OwnerReference found return nil } + +func (e *enqueueRequestForOwner) DeepCopyFor(c cluster.Cluster) EventHandler { + copy := &enqueueRequestForOwner{ + cluster: c, + ownerType: e.ownerType, + isController: e.isController, + mapper: c.GetRESTMapper(), + } + if err := copy.parseOwnerTypeGroupKind(c.GetScheme()); err != nil { + panic(err) + } + return copy +} diff --git a/pkg/internal/controller/controller.go b/pkg/internal/controller/controller.go index 94925a3b6d..fbb67deea9 100644 --- a/pkg/internal/controller/controller.go +++ b/pkg/internal/controller/controller.go @@ -25,11 +25,14 @@ import ( "github.com/go-logr/logr" "k8s.io/apimachinery/pkg/types" + kerrors "k8s.io/apimachinery/pkg/util/errors" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/client-go/util/workqueue" + "sigs.k8s.io/controller-runtime/pkg/cluster" "sigs.k8s.io/controller-runtime/pkg/handler" ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/internal/controller/metrics" + internal "sigs.k8s.io/controller-runtime/pkg/internal/source" logf "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -42,10 +45,6 @@ type Controller struct { // Name is used to uniquely identify a Controller in tracing, logging and monitoring. Name is required. Name string - // Cluster is the logical cluster that this controller is running against. - // +optional - Cluster logical.Name - // MaxConcurrentReconciles is the maximum number of concurrent Reconciles which can be run. Defaults to 1. MaxConcurrentReconciles int @@ -81,7 +80,13 @@ type Controller struct { CacheSyncTimeout time.Duration // startWatches maintains a list of sources, handlers, and predicates to start when the controller is started. - startWatches []watchDescription + startWatches []*watchDescription + + // clusterAwareWatches maintains a list of cluster aware sources, handlers, and predicates to start when the controller is started. + clusterAwareWatches []*watchDescription + + // clusters is used to manage the logical clusters. + clusters map[logical.Name]*clusterDescription // LogConstructor is used to construct a logger to then log messages to users during reconciliation, // or for example when a watch is started. @@ -96,6 +101,12 @@ type Controller struct { LeaderElected *bool } +type clusterDescription struct { + cluster.Cluster + ctx context.Context + sources []source.Source +} + // watchDescription contains all the information necessary to start a watch. type watchDescription struct { src source.Source @@ -103,6 +114,39 @@ type watchDescription struct { predicates []predicate.Predicate } +func (w *watchDescription) IsClusterAware() bool { + if _, ok := w.src.(cluster.AwareDeepCopy[*internal.Kind]); !ok { + if _, ok := w.src.(cluster.AwareDeepCopy[source.Source]); !ok { + return false + } + } + if _, ok := w.handler.(cluster.AwareDeepCopy[handler.EventHandler]); !ok { + return false + } + return true +} + +func (w *watchDescription) DeepCopyFor(c cluster.Cluster) *watchDescription { + copy := &watchDescription{ + predicates: w.predicates, + } + if clusterAwareSource, ok := w.src.(cluster.AwareDeepCopy[*internal.Kind]); ok { + copy.src = clusterAwareSource.DeepCopyFor(c) + } else if clusterAwareSource, ok := w.src.(cluster.AwareDeepCopy[source.Source]); ok { + copy.src = clusterAwareSource.DeepCopyFor(c) + } else { + return nil + } + + if clusterAwareHandler, ok := w.handler.(cluster.AwareDeepCopy[handler.EventHandler]); ok { + copy.handler = clusterAwareHandler.DeepCopyFor(c) + } else { + return nil + } + + return copy +} + // Reconcile implements reconcile.Reconciler. func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (_ reconcile.Result, err error) { defer func() { @@ -128,16 +172,94 @@ func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prc c.mu.Lock() defer c.mu.Unlock() + watchDesc := &watchDescription{src: src, handler: evthdler, predicates: prct} + + // If the source is cluster aware, store it in a separate list. + if watchDesc.IsClusterAware() { + c.clusterAwareWatches = append(c.clusterAwareWatches, watchDesc) + } + // Controller hasn't started yet, store the watches locally and return. // // These watches are going to be held on the controller struct until the manager or user calls Start(...). if !c.Started { - c.startWatches = append(c.startWatches, watchDescription{src: src, handler: evthdler, predicates: prct}) + c.startWatches = append(c.startWatches, watchDesc) return nil } + var errs []error c.LogConstructor(nil).Info("Starting EventSource", "source", src) - return src.Start(c.ctx, evthdler, c.Queue, prct...) + if err := src.Start(c.ctx, evthdler, c.Queue, prct...); err != nil { + errs = append(errs, err) + } + + // If the watch is cluster aware, start it for all the clusters + // This covers the case where a Watch was added later to the controller. + if watchDesc.IsClusterAware() { + for _, cldesc := range c.clusters { + if err := c.startClusterAwareWatchLocked(cldesc, watchDesc); err != nil { + errs = append(errs, err) + } + } + } + return kerrors.NewAggregate(errs) +} + +// Engage implements cluster.AwareRunnable. +func (c *Controller) Engage(ctx context.Context, cluster cluster.Cluster) error { + c.mu.Lock() + defer c.mu.Unlock() + if !c.Started { + return errors.New("controller must be started before calling Engage") + } + + var errs []error + cldesc := c.clusters[cluster.Name()] + if cldesc == nil { + // Initialize the cluster description. + c.clusters[cluster.Name()] = &clusterDescription{ + ctx: ctx, + Cluster: cluster, + } + cldesc = c.clusters[cluster.Name()] + } + for i, watchDesc := range c.clusterAwareWatches { + if i < len(cldesc.sources) { + // The source has already been started for this cluster. + continue + } + if err := c.startClusterAwareWatchLocked(cldesc, watchDesc); err != nil { + errs = append(errs, err) + } + } + return kerrors.NewAggregate(errs) +} + +// Disengage implements cluster.AwareRunnable. +func (c *Controller) Disengage(ctx context.Context, cluster cluster.Cluster) error { + c.mu.Lock() + defer c.mu.Unlock() + if !c.Started { + return errors.New("controller must be started before calling Disengage") + } + + c.LogConstructor(nil).Info("Disengaging Cluster", "cluster", cluster.Name()) + // TODO(vincepri): Stop the sources for this cluster before removing it, once we have a way to do that. + delete(c.clusters, cluster.Name()) + return nil +} + +func (c *Controller) startClusterAwareWatchLocked(cldesc *clusterDescription, watchDesc *watchDescription) error { + watch := watchDesc.DeepCopyFor(cldesc) + if watch == nil { + return nil + } + c.LogConstructor(nil).Info("Starting Cluster-Aware EventSource", "cluster", cldesc.Name(), "source", watch.src) + if err := watch.src.Start(cldesc.ctx, watch.handler, c.Queue, watch.predicates...); err != nil { + return err + } + cldesc.sources = append(cldesc.sources, watch.src) + return nil } // NeedLeaderElection implements the manager.LeaderElectionRunnable interface. @@ -156,17 +278,11 @@ func (c *Controller) Start(ctx context.Context) error { if c.Started { return errors.New("controller was started more than once. This is likely to be caused by being added to a manager multiple times") } - + c.clusters = make(map[logical.Name]*clusterDescription) c.initMetrics() // Set the internal context. c.ctx = ctx - if logical := logical.FromContext(c.ctx); logical != "" { - constructor := c.LogConstructor - c.LogConstructor = func(request *reconcile.Request) logr.Logger { - return constructor(request).WithValues("cluster", logical) - } - } c.Queue = c.MakeQueue() go func() { @@ -319,12 +435,9 @@ func (c *Controller) reconcileHandler(ctx context.Context, obj interface{}) { log = log.WithValues("reconcileID", reconcileID) ctx = logf.IntoContext(ctx, log) ctx = addReconcileID(ctx, reconcileID) - - // Set the Cluster on the request if it is set on the context. - if req.Cluster != c.Cluster { - panic(fmt.Sprintf("controller was setup for logical cluster %q, got a request for a cluster %q, not allowed!", c.Cluster, req.Cluster)) + if req.Cluster != "" { + log = log.WithValues("cluster", req.Cluster) } - req.Cluster = c.Cluster // RunInformersAndControllers the syncHandler, passing it the Namespace/Name string of the // resource to be synced. diff --git a/pkg/internal/controller/controller_test.go b/pkg/internal/controller/controller_test.go index 9024556fd0..e94cb08c12 100644 --- a/pkg/internal/controller/controller_test.go +++ b/pkg/internal/controller/controller_test.go @@ -126,7 +126,7 @@ var _ = Describe("controller", func() { Describe("Start", func() { It("should return an error if there is an error waiting for the informers", func() { f := false - ctrl.startWatches = []watchDescription{{ + ctrl.startWatches = []*watchDescription{{ src: source.Kind(&informertest.FakeInformers{Synced: &f}, &corev1.Pod{}), }} ctrl.Name = "foo" @@ -144,7 +144,7 @@ var _ = Describe("controller", func() { Expect(err).NotTo(HaveOccurred()) c = &cacheWithIndefinitelyBlockingGetInformer{c} - ctrl.startWatches = []watchDescription{{ + ctrl.startWatches = []*watchDescription{{ src: source.Kind(c, &appsv1.Deployment{}), }} ctrl.Name = "testcontroller" @@ -161,7 +161,7 @@ var _ = Describe("controller", func() { c, err := cache.New(cfg, cache.Options{}) Expect(err).NotTo(HaveOccurred()) c = &cacheWithIndefinitelyBlockingGetInformer{c} - ctrl.startWatches = []watchDescription{{ + ctrl.startWatches = []*watchDescription{{ src: &singnallingSourceWrapper{ SyncingSource: source.Kind(c, &appsv1.Deployment{}), cacheSyncDone: sourceSynced, @@ -189,7 +189,7 @@ var _ = Describe("controller", func() { sourceSynced := make(chan struct{}) c, err := cache.New(cfg, cache.Options{}) Expect(err).NotTo(HaveOccurred()) - ctrl.startWatches = []watchDescription{{ + ctrl.startWatches = []*watchDescription{{ src: &singnallingSourceWrapper{ SyncingSource: source.Kind(c, &appsv1.Deployment{}), cacheSyncDone: sourceSynced, @@ -232,7 +232,7 @@ var _ = Describe("controller", func() { // send the event to the channel ch <- evt - ctrl.startWatches = []watchDescription{{ + ctrl.startWatches = []*watchDescription{{ src: ins, handler: handler.Funcs{ GenericFunc: func(ctx context.Context, evt event.GenericEvent, q workqueue.RateLimitingInterface) { @@ -254,7 +254,7 @@ var _ = Describe("controller", func() { defer cancel() ins := &source.Channel{} - ctrl.startWatches = []watchDescription{{ + ctrl.startWatches = []*watchDescription{{ src: ins, }} diff --git a/pkg/internal/source/kind.go b/pkg/internal/source/kind.go index 2e765acce8..5840231566 100644 --- a/pkg/internal/source/kind.go +++ b/pkg/internal/source/kind.go @@ -12,6 +12,7 @@ import ( "k8s.io/client-go/util/workqueue" "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/cluster" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/predicate" ) @@ -115,3 +116,11 @@ func (ks *Kind) WaitForSync(ctx context.Context) error { return errors.New("timed out waiting for cache to be synced") } } + +// DeepCopyFor implements cluster.AwareDeepCopy[Source]. +func (ks *Kind) DeepCopyFor(c cluster.Cluster) *Kind { + return &Kind{ + Type: ks.Type, + Cache: c.GetCache(), + } +} diff --git a/pkg/manager/internal.go b/pkg/manager/internal.go index 8bc0df5938..9083acb6c2 100644 --- a/pkg/manager/internal.go +++ b/pkg/manager/internal.go @@ -69,10 +69,8 @@ var _ cluster.LogicalGetterFunc = (&controllerManager{}).GetCluster type logicalCluster struct { cluster.Cluster - - ctx context.Context - cancel context.CancelFunc - runnableBuilds []error + ctx context.Context + cancel context.CancelFunc } type controllerManager struct { @@ -83,16 +81,15 @@ type controllerManager struct { errChan chan error runnables *runnables - runnableBuilders []func(logical.Name, cluster.Cluster) (Runnable, error) - // defaultCluster holds a variety of methods to interact with a defaultCluster. Required. defaultCluster cluster.Cluster defaultClusterOptions cluster.Option logicalAdapter logical.Adapter - logicalLock sync.RWMutex // protects logicalClusters - logicalClusters map[logical.Name]*logicalCluster + logicalLock sync.RWMutex // protects logicalClusters + logicalClusters map[logical.Name]*logicalCluster + logicalClusterAwareRunnables []cluster.AwareRunnable // recorderProvider is used to generate event recorders that will be injected into Controllers // (and EventHandlers, Sources and Predicates). @@ -199,6 +196,9 @@ func (cm *controllerManager) Add(r Runnable) error { } func (cm *controllerManager) add(r Runnable) error { + if aware, ok := r.(cluster.AwareRunnable); ok { + cm.logicalClusterAwareRunnables = append(cm.logicalClusterAwareRunnables, aware) + } return cm.runnables.Add(r) } @@ -258,6 +258,10 @@ func (cm *controllerManager) AddReadyzCheck(name string, check healthz.Checker) return nil } +func (cm *controllerManager) Name() logical.Name { + return cm.defaultCluster.Name() +} + func (cm *controllerManager) GetCluster(ctx context.Context, name logical.Name) (cluster.Cluster, error) { return cm.getLogicalCluster(ctx, name) } @@ -298,7 +302,7 @@ func (cm *controllerManager) GetAPIReader() client.Reader { return cm.defaultCluster.GetAPIReader() } -func (cm *controllerManager) syncClusterAwareRunnables() { +func (cm *controllerManager) engageClusterAwareRunnables() { cm.Lock() defer cm.Unlock() @@ -310,29 +314,11 @@ func (cm *controllerManager) syncClusterAwareRunnables() { // If we successfully retrieved the cluster, check // that we schedule all the cluster aware runnables. for name, cluster := range cm.logicalClusters { - // Runnable builders can only be appended, so we can safely iterate over the slice - // and collect the build/start errors, if any, on a per-cluster basis. - for id, build := range cm.runnableBuilders { - if id < len(cluster.runnableBuilds) { - // The runnable is already scheduled. - continue - } - - // Build the runnable. - runnable, err := build(name, cluster) - if err != nil { - cluster.runnableBuilds = append(cluster.runnableBuilds, err) - cm.logger.Error(err, "failed to build cluster aware runnable, won't retry", "clusterName", name) + for _, aware := range cm.logicalClusterAwareRunnables { + if err := aware.Engage(cluster.ctx, cluster); err != nil { + cm.logger.Error(err, "failed to engage cluster with runnable, won't retry", "clusterName", name, "runnable", aware) continue } - - if err := cm.add(wrapRunnable(cluster.ctx, runnable)); err != nil { - cm.logger.Error(err, "failed to start cluster aware runnable, won't retry", "clusterName", name) - cluster.runnableBuilds = append(cluster.runnableBuilds, err) - } - - // Success. - cluster.runnableBuilds = append(cluster.runnableBuilds, nil) } } } @@ -380,7 +366,7 @@ func (cm *controllerManager) getLogicalCluster(ctx context.Context, name logical } } - cl, err := cluster.New(cfg, cm.defaultClusterOptions) + cl, err := cluster.New(cfg, cm.defaultClusterOptions, cluster.WithName(name)) if err != nil { return nil, fmt.Errorf("cannot create logical cluster %q: %w", name, err) } @@ -417,22 +403,17 @@ func (cm *controllerManager) removeLogicalCluster(name logical.Name) error { if !ok { return nil } - // Cancel the context and remove the cluster from the map. - c.cancel() - delete(cm.logicalClusters, name) - return nil -} -func (cm *controllerManager) AddLogicalRunnableBuilder(fn func(name logical.Name, cl cluster.Cluster) (Runnable, error)) error { - cm.Lock() - defer cm.Unlock() - cm.runnableBuilders = append(cm.runnableBuilders, fn) - if !cm.started || len(cm.logicalClusters) == 0 { - // If the manager is not started, or we have no logical clusters, we can return early. - return nil + // Disengage all the runnables. + for _, aware := range cm.logicalClusterAwareRunnables { + if err := aware.Disengage(c.ctx, c); err != nil { + return fmt.Errorf("failed to disengage cluster aware runnable: %w", err) + } } - go cm.syncClusterAwareRunnables() + // Cancel the context and remove the cluster from the map. + c.cancel() + delete(cm.logicalClusters, name) return nil } @@ -653,7 +634,7 @@ func (cm *controllerManager) Start(ctx context.Context) (err error) { //nolint:g return err } } - cm.syncClusterAwareRunnables() + cm.engageClusterAwareRunnables() return nil } @@ -688,7 +669,7 @@ func (cm *controllerManager) Start(ctx context.Context) (err error) { //nolint:g if _, err := cm.getLogicalCluster(ctx, event.Name); err != nil { return err } - cm.syncClusterAwareRunnables() + cm.engageClusterAwareRunnables() case watch.Deleted, watch.Error: if err := cm.removeLogicalCluster(event.Name); err != nil { return err diff --git a/pkg/manager/logical.go b/pkg/manager/logical.go index ceef7aff70..0249a03a02 100644 --- a/pkg/manager/logical.go +++ b/pkg/manager/logical.go @@ -1,28 +1,26 @@ package manager -import "context" +// func wrapRunnable(ctx context.Context, r Runnable) Runnable { +// return &logicalWrappedRunnable{ctx: ctx, Runnable: r} +// } -func wrapRunnable(ctx context.Context, r Runnable) Runnable { - return &logicalWrappedRunnable{ctx: ctx, Runnable: r} -} +// type logicalWrappedRunnable struct { +// Runnable -type logicalWrappedRunnable struct { - Runnable +// ctx context.Context +// } - ctx context.Context -} +// func (l *logicalWrappedRunnable) Start(runnableCtx context.Context) error { +// ctx, cancel := context.WithCancel(l.ctx) +// defer cancel() +// go func() { +// <-runnableCtx.Done() +// cancel() +// }() +// return l.Runnable.Start(ctx) +// } -func (l *logicalWrappedRunnable) Start(runnableCtx context.Context) error { - ctx, cancel := context.WithCancel(l.ctx) - defer cancel() - go func() { - <-runnableCtx.Done() - cancel() - }() - return l.Runnable.Start(ctx) -} - -// Unwrap returns the underlying runnable. -func (l *logicalWrappedRunnable) Unwrap() Runnable { - return l.Runnable -} +// // Unwrap returns the underlying runnable. +// func (l *logicalWrappedRunnable) Unwrap() Runnable { +// return l.Runnable +// } diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index 6b36a7dded..65ced07a5c 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -64,10 +64,6 @@ type Manager interface { // election was configured. Elected() <-chan struct{} - // AddLogicalRunnableBuilder adds a controller builder to the manager, which is used to build - // controllers for a given cluster. This is useful when the Manager is running against many logical clusters. - AddLogicalRunnableBuilder(func(logical.Name, cluster.Cluster) (Runnable, error)) error - // AddMetricsExtraHandler adds an extra handler served on path to the http server that serves metrics. // Might be useful to register some diagnostic endpoints e.g. pprof. Note that these endpoints meant to be // sensitive and shouldn't be exposed publicly. diff --git a/pkg/manager/manager_test.go b/pkg/manager/manager_test.go index 2c19ae1d0b..46953d5cb4 100644 --- a/pkg/manager/manager_test.go +++ b/pkg/manager/manager_test.go @@ -1689,18 +1689,8 @@ var _ = Describe("manger.Manager", func() { m, err := New(cfg, Options{}.WithExperimentalLogicalAdapter(adapter)) Expect(err).NotTo(HaveOccurred()) - var built atomic.Bool - var started atomic.Bool - err = m.AddLogicalRunnableBuilder(func(name logical.Name, cl cluster.Cluster) (Runnable, error) { - built.Store(true) - return RunnableFunc(func(ctx context.Context) error { - Expect(logical.FromContext(ctx)).To(Equal(logical.Name("test-cluster"))) - started.Store(true) - <-ctx.Done() - return nil - }), nil - }) - Expect(err).NotTo(HaveOccurred()) + aware := &fakeClusterAwareRunnable{} + Expect(m.Add(aware)).To(Succeed()) By("starting the manager") ctx, cancel := context.WithCancel(context.Background()) @@ -1712,8 +1702,12 @@ var _ = Describe("manger.Manager", func() { }() <-m.Elected() - Eventually(built.Load).Should(BeTrue()) - Eventually(started.Load).Should(BeTrue()) + Eventually(func() []logical.Name { + aware.Lock() + defer aware.Unlock() + return aware.engaged + }).Should(HaveLen(1)) + Expect(aware.engaged[0]).To(BeEquivalentTo("test-cluster")) By("making sure there's no extra go routines still running after we stop") cancel() <-doneCh @@ -1731,30 +1725,8 @@ var _ = Describe("manger.Manager", func() { m, err := New(cfg, Options{}.WithExperimentalLogicalAdapter(adapter)) Expect(err).NotTo(HaveOccurred()) - var built atomic.Int64 - var started atomic.Int64 - var completed atomic.Int64 - removedCh := make(chan struct{}) - err = m.AddLogicalRunnableBuilder(func(name logical.Name, cl cluster.Cluster) (Runnable, error) { - built.Add(1) - return RunnableFunc(func(ctx context.Context) error { - defer completed.Add(1) - started.Add(1) - switch logical.FromContext(ctx) { - case "test-cluster": - // All good! - case "removed-cluster": - // This should be canceled. - <-ctx.Done() - close(removedCh) - default: - Fail("unexpected logical cluster") - } - <-ctx.Done() - return nil - }), nil - }) - Expect(err).NotTo(HaveOccurred()) + aware := &fakeClusterAwareRunnable{} + Expect(m.Add(aware)).To(Succeed()) By("starting the manager") ctx, cancel := context.WithCancel(context.Background()) @@ -1766,21 +1738,28 @@ var _ = Describe("manger.Manager", func() { }() <-m.Elected() - Eventually(built.Load).Should(BeEquivalentTo(2)) - Eventually(started.Load).Should(BeEquivalentTo(2)) + Eventually(func() []logical.Name { + aware.Lock() + defer aware.Unlock() + return aware.engaged + }).Should(HaveLen(2)) + Expect(aware.engaged).To(ConsistOf(logical.Name("test-cluster"), logical.Name("removed-cluster"))) By("Deleting a cluster") adapter.watch <- logical.Event{ Type: watch.Deleted, Name: "removed-cluster", } - Eventually(removedCh).Should(BeClosed()) - Eventually(completed.Load).Should(BeEquivalentTo(1)) + Eventually(func() []logical.Name { + aware.Lock() + defer aware.Unlock() + return aware.disengaged + }).Should(HaveLen(1)) + Expect(aware.disengaged).To(ConsistOf(logical.Name("removed-cluster"))) By("making sure there's no extra go routines still running after we stop") cancel() <-doneCh - Eventually(completed.Load).Should(BeEquivalentTo(2)) }) }) }) @@ -1888,3 +1867,27 @@ func (f *fakeLogicalWatcher) Stop() { func (f *fakeLogicalWatcher) ResultChan() <-chan logical.Event { return f.ch } + +type fakeClusterAwareRunnable struct { + sync.Mutex + engaged []logical.Name + disengaged []logical.Name +} + +func (f *fakeClusterAwareRunnable) Start(ctx context.Context) error { + return nil +} + +func (f *fakeClusterAwareRunnable) Engage(ctx context.Context, cluster cluster.Cluster) error { + f.Lock() + defer f.Unlock() + f.engaged = append(f.engaged, cluster.Name()) + return nil +} + +func (f *fakeClusterAwareRunnable) Disengage(ctx context.Context, cluster cluster.Cluster) error { + f.Lock() + defer f.Unlock() + f.disengaged = append(f.disengaged, cluster.Name()) + return nil +} diff --git a/pkg/manager/runnable_group.go b/pkg/manager/runnable_group.go index d1c4511c66..0bb97c29a0 100644 --- a/pkg/manager/runnable_group.go +++ b/pkg/manager/runnable_group.go @@ -55,12 +55,12 @@ func (r *runnables) Add(fn Runnable) error { // And type switch on the unwrapped type. This is needed because the runnable // might have a different type, but we want to override the Start method to control // cancellation on a per cluster basis. - unwrapped := fn - if wrapped, ok := fn.(*logicalWrappedRunnable); ok { - unwrapped = wrapped.Unwrap() - } + // unwrapped := fn + // if wrapped, ok := fn.(*logicalWrappedRunnable); ok { + // unwrapped = wrapped.Unwrap() + // } - switch runnable := unwrapped.(type) { + switch runnable := fn.(type) { case hasCache: return r.Caches.Add(fn, func(ctx context.Context) bool { return runnable.GetCache().WaitForCacheSync(ctx) diff --git a/tools/setup-envtest/store/store_test.go b/tools/setup-envtest/store/store_test.go index 723eada3cd..d5607aede6 100644 --- a/tools/setup-envtest/store/store_test.go +++ b/tools/setup-envtest/store/store_test.go @@ -20,9 +20,9 @@ import ( "archive/tar" "bytes" "compress/gzip" + "crypto/rand" "io" "io/fs" - "math/rand" "path/filepath" . "github.com/onsi/ginkgo/v2" @@ -214,7 +214,7 @@ func makeFakeArchive(magic string) io.Reader { copy(chunk[:], magic) copy(chunk[len(magic):], fileName) start := len(magic) + len(fileName) - if _, err := rand.Read(chunk[start:]); err != nil { //nolint:gosec + if _, err := rand.Read(chunk[start:]); err != nil { panic(err) } diff --git a/tools/setup-envtest/workflows/workflows_testutils_test.go b/tools/setup-envtest/workflows/workflows_testutils_test.go index c1a6713d72..f236ce460e 100644 --- a/tools/setup-envtest/workflows/workflows_testutils_test.go +++ b/tools/setup-envtest/workflows/workflows_testutils_test.go @@ -8,8 +8,8 @@ import ( "bytes" "compress/gzip" "crypto/md5" //nolint:gosec + "crypto/rand" "encoding/base64" - "math/rand" "net/http" "path/filepath" @@ -105,7 +105,7 @@ func makeContents(names []string) []item { for i, name := range names { var chunk [1024 * 48]byte // 1.5 times our chunk read size in GetVersion copy(chunk[:], name) - if _, err := rand.Read(chunk[len(name):]); err != nil { //nolint:gosec + if _, err := rand.Read(chunk[len(name):]); err != nil { panic(err) } res[i] = verWith(name, chunk[:])