Skip to content

Commit

Permalink
add StopWatch
Browse files Browse the repository at this point in the history
Signed-off-by: Tim Ramlot <42113979+inteon@users.noreply.github.com>
  • Loading branch information
inteon committed Dec 27, 2023
1 parent 7a34661 commit 6c16925
Show file tree
Hide file tree
Showing 3 changed files with 156 additions and 11 deletions.
6 changes: 6 additions & 0 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,12 @@ type Controller interface {
// EventHandler if all provided Predicates evaluate to true.
Watch(src source.Source, eventhandler handler.EventHandler, predicates ...predicate.Predicate) error

// StopWatch stops watching a source that was previously registered by Watch().
//
// StopWatch may be called multiple times, even concurrently. All such calls will
// block until all goroutines have terminated.
StopWatch(src source.Source) error

// Start starts the controller. Start blocks until the context is closed or a
// controller has an error starting.
Start(ctx context.Context) error
Expand Down
97 changes: 97 additions & 0 deletions pkg/controller/controller_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,103 @@ var _ = Describe("controller", func() {
List(context.Background(), &controllertest.UnconventionalListTypeList{})
Expect(err).NotTo(HaveOccurred())
})

It("should not reconcile after watch is stopped", func() {
By("Creating the Manager")
cm, err := manager.New(cfg, manager.Options{})
Expect(err).NotTo(HaveOccurred())

By("Creating the Controller")
instance, err := controller.New("foo-controller", cm, controller.Options{
Reconciler: reconcile.Func(
func(_ context.Context, request reconcile.Request) (reconcile.Result, error) {
reconciled <- request
return reconcile.Result{}, nil
}),
})
Expect(err).NotTo(HaveOccurred())

By("Watching Resources")
deploySource := source.Kind(cm.GetCache(), &appsv1.Deployment{})
err = instance.Watch(
deploySource,
&handler.EnqueueRequestForObject{},
)
Expect(err).NotTo(HaveOccurred())

By("Starting the Manager")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
defer GinkgoRecover()
Expect(cm.Start(ctx)).NotTo(HaveOccurred())
}()

deploymentDefinition := &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{Name: "deployment-name"},
Spec: appsv1.DeploymentSpec{
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{"foo": "bar"},
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"foo": "bar"}},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "nginx",
Image: "nginx",
SecurityContext: &corev1.SecurityContext{
Privileged: truePtr(),
},
},
},
},
},
},
}
deployment := deploymentDefinition.DeepCopy()
expectedReconcileRequest := reconcile.Request{NamespacedName: types.NamespacedName{
Namespace: "default",
Name: "deployment-name",
}}

By("Invoking Reconciling for Create")
deployment, err = clientset.AppsV1().Deployments("default").Create(ctx, deployment, metav1.CreateOptions{})
Expect(err).NotTo(HaveOccurred())
Expect(<-reconciled).To(Equal(expectedReconcileRequest))

By("Stopping the deployment watch")
Expect(instance.StopWatch(deploySource)).NotTo(HaveOccurred())

By("Test No Reconciling for Update")
newDeployment := deployment.DeepCopy()
newDeployment.Labels = map[string]string{"foo": "bar"}
_, err = clientset.AppsV1().Deployments("default").Update(ctx, newDeployment, metav1.UpdateOptions{})
Expect(err).NotTo(HaveOccurred())
Consistently(reconciled).ShouldNot(Receive())

By("Test No Reconciling for Delete")
err = clientset.AppsV1().Deployments("default").
Delete(ctx, "deployment-name", metav1.DeleteOptions{})
Expect(err).NotTo(HaveOccurred())
Consistently(reconciled).ShouldNot(Receive())

By("Try starting the old deployment watch")
Expect(instance.Watch(deploySource, &handler.EnqueueRequestForObject{})).To(MatchError("cannot start an already started Kind source"))

By("Starting a new deployment watch")
deploySource = source.Kind(cm.GetCache(), &appsv1.Deployment{})
Expect(instance.Watch(deploySource, &handler.EnqueueRequestForObject{})).NotTo(HaveOccurred())

deployment = deploymentDefinition.DeepCopy()

By("Invoking Reconciling for Update")
newDeployment = deployment.DeepCopy()
newDeployment.Labels = map[string]string{"foo": "bar"}
_, err = clientset.AppsV1().Deployments("default").Create(ctx, newDeployment, metav1.CreateOptions{})
Expect(err).NotTo(HaveOccurred())
Expect(<-reconciled).To(Equal(expectedReconcileRequest))
})
})
})

Expand Down
64 changes: 53 additions & 11 deletions pkg/internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type Controller struct {
Queue workqueue.RateLimitingInterface

// startedSources maintains a list of sources that have already started.
startedSources []source.Source
startedSources []cancelableSource

// mu is used to synchronize Controller setup
mu sync.Mutex
Expand Down Expand Up @@ -143,11 +143,54 @@ func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prc
}

c.LogConstructor(nil).Info("Starting EventSource", "source", src)
err := src.Start(c.ctx, evthdler, c.Queue, prct...)
if err == nil {
c.startedSources = append(c.startedSources, src)
return c.startWatch(src, evthdler, prct...)
}

type cancelableSource struct {
source.Source
cancel context.CancelFunc
}

func (c *Controller) startWatch(src source.Source, evthdler handler.EventHandler, prct ...predicate.Predicate) error {
watchCtx, watchCancel := context.WithCancel(c.ctx)
if err := src.Start(watchCtx, evthdler, c.Queue, prct...); err != nil {
watchCancel()
return err
}
return err

c.startedSources = append(c.startedSources, cancelableSource{
Source: src,
cancel: watchCancel,
})

return nil
}

// StopWatch implements controller.Controller.
func (c *Controller) StopWatch(src source.Source) error {
err := func() error {
c.mu.Lock()
defer c.mu.Unlock()

if c.Stopped {
return fmt.Errorf("can not stop watch in a stopped controller")
}

for i, startedSrc := range c.startedSources {
if startedSrc.Source == src {
c.startedSources = append(c.startedSources[:i], c.startedSources[i+1:]...)
startedSrc.cancel()
return startedSrc.Source.Shutdown()
}
}

return nil
}()
if err != nil {
return err
}

return src.Shutdown()
}

// NeedLeaderElection implements the manager.LeaderElectionRunnable interface.
Expand All @@ -173,22 +216,22 @@ func (c *Controller) Start(ctx context.Context) error {
c.initMetrics()

// Set the internal context.
var cancel context.CancelFunc
c.ctx, cancel = context.WithCancel(ctx)
var cancelAllSources context.CancelFunc
c.ctx, cancelAllSources = context.WithCancel(ctx)

wg := &sync.WaitGroup{}

c.Queue = c.MakeQueue()
defer func() {
var startedSources []source.Source
var startedSources []cancelableSource
c.mu.Lock()
c.Stopped = true
startedSources = c.startedSources
c.mu.Unlock()

c.Queue.ShutDown() // Stop receiving new items in the queue

cancel() // cancel the context to stop all the sources
cancelAllSources() // cancel the context to stop all the sources
for _, src := range startedSources {
if err := src.Shutdown(); err != nil {
c.LogConstructor(nil).Error(err, "Failed to stop watch source when controller stopping", "source", src)
Expand All @@ -212,10 +255,9 @@ func (c *Controller) Start(ctx context.Context) error {
for _, watch := range c.startWatches {
c.LogConstructor(nil).Info("Starting EventSource", "source", fmt.Sprintf("%s", watch.src))

if err := watch.src.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil {
if err := c.startWatch(watch.src, watch.handler, watch.predicates...); err != nil {
return err
}
c.startedSources = append(c.startedSources, watch.src)
}

// Start the SharedIndexInformer factories to begin populating the SharedIndexInformer caches
Expand Down

0 comments on commit 6c16925

Please sign in to comment.