diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index e48db41f94..15347486c9 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -80,6 +80,12 @@ type Controller interface { // EventHandler if all provided Predicates evaluate to true. Watch(src source.Source, eventhandler handler.EventHandler, predicates ...predicate.Predicate) error + // StopWatch stops watching a source that was previously registered by Watch(). + // + // StopWatch may be called multiple times, even concurrently. All such calls will + // block until all goroutines have terminated. + StopWatch(src source.Source) error + // Start starts the controller. Start blocks until the context is closed or a // controller has an error starting. Start(ctx context.Context) error diff --git a/pkg/controller/controller_integration_test.go b/pkg/controller/controller_integration_test.go index 48facf1e94..27adf2c6b3 100644 --- a/pkg/controller/controller_integration_test.go +++ b/pkg/controller/controller_integration_test.go @@ -171,6 +171,103 @@ var _ = Describe("controller", func() { List(context.Background(), &controllertest.UnconventionalListTypeList{}) Expect(err).NotTo(HaveOccurred()) }) + + It("should not reconcile after watch is stopped", func() { + By("Creating the Manager") + cm, err := manager.New(cfg, manager.Options{}) + Expect(err).NotTo(HaveOccurred()) + + By("Creating the Controller") + instance, err := controller.New("foo-controller", cm, controller.Options{ + Reconciler: reconcile.Func( + func(_ context.Context, request reconcile.Request) (reconcile.Result, error) { + reconciled <- request + return reconcile.Result{}, nil + }), + }) + Expect(err).NotTo(HaveOccurred()) + + By("Watching Resources") + deploySource := source.Kind(cm.GetCache(), &appsv1.Deployment{}) + err = instance.Watch( + deploySource, + &handler.EnqueueRequestForObject{}, + ) + Expect(err).NotTo(HaveOccurred()) + + By("Starting the Manager") + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { + defer GinkgoRecover() + Expect(cm.Start(ctx)).NotTo(HaveOccurred()) + }() + + deploymentDefinition := &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{Name: "deployment-name"}, + Spec: appsv1.DeploymentSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{"foo": "bar"}, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"foo": "bar"}}, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "nginx", + Image: "nginx", + SecurityContext: &corev1.SecurityContext{ + Privileged: truePtr(), + }, + }, + }, + }, + }, + }, + } + deployment := deploymentDefinition.DeepCopy() + expectedReconcileRequest := reconcile.Request{NamespacedName: types.NamespacedName{ + Namespace: "default", + Name: "deployment-name", + }} + + By("Invoking Reconciling for Create") + deployment, err = clientset.AppsV1().Deployments("default").Create(ctx, deployment, metav1.CreateOptions{}) + Expect(err).NotTo(HaveOccurred()) + Expect(<-reconciled).To(Equal(expectedReconcileRequest)) + + By("Stopping the deployment watch") + Expect(instance.StopWatch(deploySource)).NotTo(HaveOccurred()) + + By("Test No Reconciling for Update") + newDeployment := deployment.DeepCopy() + newDeployment.Labels = map[string]string{"foo": "bar"} + _, err = clientset.AppsV1().Deployments("default").Update(ctx, newDeployment, metav1.UpdateOptions{}) + Expect(err).NotTo(HaveOccurred()) + Consistently(reconciled).ShouldNot(Receive()) + + By("Test No Reconciling for Delete") + err = clientset.AppsV1().Deployments("default"). + Delete(ctx, "deployment-name", metav1.DeleteOptions{}) + Expect(err).NotTo(HaveOccurred()) + Consistently(reconciled).ShouldNot(Receive()) + + By("Try starting the old deployment watch") + Expect(instance.Watch(deploySource, &handler.EnqueueRequestForObject{})).To(MatchError("cannot start an already started Kind source")) + + By("Starting a new deployment watch") + deploySource = source.Kind(cm.GetCache(), &appsv1.Deployment{}) + Expect(instance.Watch(deploySource, &handler.EnqueueRequestForObject{})).NotTo(HaveOccurred()) + + deployment = deploymentDefinition.DeepCopy() + + By("Invoking Reconciling for Update") + newDeployment = deployment.DeepCopy() + newDeployment.Labels = map[string]string{"foo": "bar"} + _, err = clientset.AppsV1().Deployments("default").Create(ctx, newDeployment, metav1.CreateOptions{}) + Expect(err).NotTo(HaveOccurred()) + Expect(<-reconciled).To(Equal(expectedReconcileRequest)) + }) }) }) diff --git a/pkg/internal/controller/controller.go b/pkg/internal/controller/controller.go index d1ba5f3cf4..0dd5727ec7 100644 --- a/pkg/internal/controller/controller.go +++ b/pkg/internal/controller/controller.go @@ -60,7 +60,7 @@ type Controller struct { Queue workqueue.RateLimitingInterface // startedSources maintains a list of sources that have already started. - startedSources []source.Source + startedSources []cancelableSource // mu is used to synchronize Controller setup mu sync.Mutex @@ -143,11 +143,54 @@ func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prc } c.LogConstructor(nil).Info("Starting EventSource", "source", src) - err := src.Start(c.ctx, evthdler, c.Queue, prct...) - if err == nil { - c.startedSources = append(c.startedSources, src) + return c.startWatch(src, evthdler, prct...) +} + +type cancelableSource struct { + source.Source + cancel context.CancelFunc +} + +func (c *Controller) startWatch(src source.Source, evthdler handler.EventHandler, prct ...predicate.Predicate) error { + watchCtx, watchCancel := context.WithCancel(c.ctx) + if err := src.Start(watchCtx, evthdler, c.Queue, prct...); err != nil { + watchCancel() + return err } - return err + + c.startedSources = append(c.startedSources, cancelableSource{ + Source: src, + cancel: watchCancel, + }) + + return nil +} + +// StopWatch implements controller.Controller. +func (c *Controller) StopWatch(src source.Source) error { + err := func() error { + c.mu.Lock() + defer c.mu.Unlock() + + if c.Stopped { + return fmt.Errorf("can not stop watch in a stopped controller") + } + + for i, startedSrc := range c.startedSources { + if startedSrc.Source == src { + c.startedSources = append(c.startedSources[:i], c.startedSources[i+1:]...) + startedSrc.cancel() + return startedSrc.Source.Shutdown() + } + } + + return nil + }() + if err != nil { + return err + } + + return src.Shutdown() } // NeedLeaderElection implements the manager.LeaderElectionRunnable interface. @@ -173,14 +216,14 @@ func (c *Controller) Start(ctx context.Context) error { c.initMetrics() // Set the internal context. - var cancel context.CancelFunc - c.ctx, cancel = context.WithCancel(ctx) + var cancelAllSources context.CancelFunc + c.ctx, cancelAllSources = context.WithCancel(ctx) wg := &sync.WaitGroup{} c.Queue = c.MakeQueue() defer func() { - var startedSources []source.Source + var startedSources []cancelableSource c.mu.Lock() c.Stopped = true startedSources = c.startedSources @@ -188,7 +231,7 @@ func (c *Controller) Start(ctx context.Context) error { c.Queue.ShutDown() // Stop receiving new items in the queue - cancel() // cancel the context to stop all the sources + cancelAllSources() // cancel the context to stop all the sources for _, src := range startedSources { if err := src.Shutdown(); err != nil { c.LogConstructor(nil).Error(err, "Failed to stop watch source when controller stopping", "source", src) @@ -212,10 +255,9 @@ func (c *Controller) Start(ctx context.Context) error { for _, watch := range c.startWatches { c.LogConstructor(nil).Info("Starting EventSource", "source", fmt.Sprintf("%s", watch.src)) - if err := watch.src.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil { + if err := c.startWatch(watch.src, watch.handler, watch.predicates...); err != nil { return err } - c.startedSources = append(c.startedSources, watch.src) } // Start the SharedIndexInformer factories to begin populating the SharedIndexInformer caches