diff --git a/pkg/controller/controllertest/util.go b/pkg/controller/controllertest/util.go index 92427ab21e..f7d9dfbdcb 100644 --- a/pkg/controller/controllertest/util.go +++ b/pkg/controller/controllertest/util.go @@ -101,7 +101,7 @@ func (f *FakeInformer) HasSynced() bool { return f.Synced } -// AddEventHandler implements the Informer interface. Adds an EventHandler to the fake Informers. TODO(community): Implement Registration. +// AddEventHandler implements the Informer interface. Adds an EventHandler to the fake Informers. func (f *FakeInformer) AddEventHandler(handler cache.ResourceEventHandler) (cache.ResourceEventHandlerRegistration, error) { f.mu.Lock() defer f.mu.Unlock() @@ -147,7 +147,7 @@ func (f *FakeInformer) AddEventHandlerWithResyncPeriod(handler cache.ResourceEve return nil, nil } -// RemoveEventHandler does nothing. TODO(community): Implement this. +// RemoveEventHandler does nothing. func (f *FakeInformer) RemoveEventHandler(handle cache.ResourceEventHandlerRegistration) error { eh, ok := handle.(*eventHandlerWrapper) if !ok { diff --git a/pkg/internal/source/kind.go b/pkg/internal/source/kind.go index 00cddd91d7..910aa30ce1 100644 --- a/pkg/internal/source/kind.go +++ b/pkg/internal/source/kind.go @@ -101,6 +101,7 @@ func getInformer(ctx context.Context, informerCache cache.Cache, resourceType cl // We got a NoKindMatchError, which means the kind is a CRD and it's not installed yet. // We should retry until it's installed. log.Error(lastErr, "if kind is a CRD, it should be installed before calling Start", "kind", kindMatchErr.GroupKind) + return false, nil // Retry. case runtime.IsNotRegisteredError(lastErr): // We got a IsNotRegisteredError, which means the kind is not registered to the Scheme. // This is a programming error, so we should stop retrying and return the error. @@ -110,7 +111,6 @@ func getInformer(ctx context.Context, informerCache cache.Cache, resourceType cl // stop retrying and return the error. return true, fmt.Errorf("failed to get informer from cache: %w", lastErr) } - return false, nil // Retry. } return true, nil }); err != nil { @@ -234,7 +234,7 @@ func (ks *Kind) Stop() error { } ks.mu.Lock() - // Remove all event handlers. + // Remove event handler. if ks.registration != nil { if err := ks.informer.RemoveEventHandler(ks.registration); err != nil { errs = append(errs, err) diff --git a/pkg/source/source.go b/pkg/source/source.go index be5143f58b..930cc8f713 100644 --- a/pkg/source/source.go +++ b/pkg/source/source.go @@ -48,8 +48,8 @@ const ( // // Users may build their own Source implementations. type Source interface { - // Start is internal and should be called only by the Controller to register an EventHandler with the Informer - // to enqueue reconcile.Requests. It should NOT block, instead, it should start a goroutine and return immediately. + // Start is internal and should be called only by the Controller to start a goroutine that enqueues + // reconcile.Requests. It should NOT block, instead, it should start a goroutine and return immediately. // The context passed to Start can be used to cancel the blocking operations in the Start method. To cancel the // goroutine/ shutdown the source a user should call Stop. Start(context.Context, handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error @@ -100,8 +100,8 @@ func (cs *Channel) String() string { return fmt.Sprintf("channel source: %p", cs) } -// Start is internal and should be called only by the Controller to register an EventHandler with the Informer -// to enqueue reconcile.Requests. It should NOT block, instead, it should start a goroutine and return immediately. +// Start is internal and should be called only by the Controller to start a goroutine that enqueues +// reconcile.Requests. It should NOT block, instead, it should start a goroutine and return immediately. // The context passed to Start can be used to cancel the blocking operations in the Start method. To cancel the // goroutine/ shutdown the source a user should call Stop. // Start can be called multiple times, allowing multiple controllers to share a single source. @@ -112,17 +112,17 @@ func (cs *Channel) Start(ctx context.Context, handler handler.EventHandler, queu return fmt.Errorf("must specify Channel.Source") } - // use default value if DestBufferSize not specified - if cs.DestBufferSize == 0 { - cs.DestBufferSize = defaultBufferSize - } - cs.mu.Lock() defer cs.mu.Unlock() if cs.canceled { return nil } + // use default value if DestBufferSize not specified + if cs.DestBufferSize == 0 { + cs.DestBufferSize = defaultBufferSize + } + // Create a destination channel for the event handler // and add it to the list of destinations destination := make(chan event.GenericEvent, cs.DestBufferSize) @@ -147,7 +147,7 @@ func (cs *Channel) Start(ctx context.Context, handler handler.EventHandler, queu eventloop: for { select { - case <-ctx.Done(): + case <-cs.ctx.Done(): return nil case event, stillOpen := <-destination: if !stillOpen { @@ -306,8 +306,8 @@ var _ Source = Func(nil) // Deprecated: use manually implemented Source instead. type Func func(context.Context, handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error -// Start is internal and should be called only by the Controller to register an EventHandler with the Informer -// to enqueue reconcile.Requests. +// Start is internal and should be called only by the Controller to start a goroutine that enqueues +// reconcile.Requests. func (f Func) Start(ctx context.Context, evt handler.EventHandler, queue workqueue.RateLimitingInterface, pr ...predicate.Predicate) error { return f(ctx, evt, queue, pr...) diff --git a/pkg/source/source_test.go b/pkg/source/source_test.go index 067f0d1b43..fbc9db2e95 100644 --- a/pkg/source/source_test.go +++ b/pkg/source/source_test.go @@ -24,6 +24,7 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "sigs.k8s.io/controller-runtime/pkg/cache/informertest" + "sigs.k8s.io/controller-runtime/pkg/controller/controllertest" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/predicate" @@ -35,6 +36,35 @@ import ( ) var _ = Describe("Source", func() { + Describe("Informer", func() { + var inf *controllertest.FakeInformer + + BeforeEach(func() { + inf = &controllertest.FakeInformer{} + }) + + Context("should not panic", func() { + It("when Stop is called before Start", func() { + instance := &source.Informer{Informer: inf} + err := instance.Stop() + Expect(err).NotTo(HaveOccurred()) + }) + + It("when Start is called twice", func() { + instance := &source.Informer{Informer: inf} + Expect(instance.Start(ctx, nil, nil)).To(Succeed()) + Expect(instance.Start(ctx, nil, nil)).To(Succeed()) + }) + + It("when Stop is called twice", func() { + instance := &source.Informer{Informer: inf} + Expect(instance.Start(ctx, nil, nil)).To(Succeed()) + Expect(instance.Stop()).To(Succeed()) + Expect(instance.Stop()).To(Succeed()) + }) + }) + }) + Describe("Kind", func() { var c chan struct{} var p *corev1.Pod