diff --git a/pkg/controller/controller_test.go b/pkg/controller/controller_test.go index e49a2c5774..71fe4f3c3c 100644 --- a/pkg/controller/controller_test.go +++ b/pkg/controller/controller_test.go @@ -77,7 +77,7 @@ var _ = Describe("controller.Controller", func() { ctx, cancel := context.WithCancel(context.Background()) watchChan := make(chan event.GenericEvent, 1) - watch := &source.Channel{Source: watchChan} + watch := &source.Channel{Broadcaster: source.NewChannelBroadcaster(watchChan)} watchChan <- event.GenericEvent{Object: &corev1.Pod{}} reconcileStarted := make(chan struct{}) diff --git a/pkg/controller/controllertest/util.go b/pkg/controller/controllertest/util.go index 60ec61edec..5fec7889c7 100644 --- a/pkg/controller/controllertest/util.go +++ b/pkg/controller/controllertest/util.go @@ -17,6 +17,8 @@ limitations under the License. package controllertest import ( + "fmt" + "sync" "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -33,7 +35,8 @@ type FakeInformer struct { // RunCount is incremented each time RunInformersAndControllers is called RunCount int - handlers []eventHandlerWrapper + mu sync.RWMutex + handlers []*eventHandlerWrapper } type modernResourceEventHandler interface { @@ -51,7 +54,8 @@ type legacyResourceEventHandler interface { // eventHandlerWrapper wraps a ResourceEventHandler in a manner that is compatible with client-go 1.27+ and older. // The interface was changed in these versions. type eventHandlerWrapper struct { - handler any + handler any + hasSynced bool } func (e eventHandlerWrapper) OnAdd(obj interface{}) { @@ -78,6 +82,10 @@ func (e eventHandlerWrapper) OnDelete(obj interface{}) { e.handler.(legacyResourceEventHandler).OnDelete(obj) } +func (e eventHandlerWrapper) HasSynced() bool { + return e.hasSynced +} + // AddIndexers does nothing. TODO(community): Implement this. func (f *FakeInformer) AddIndexers(indexers cache.Indexers) error { return nil @@ -98,10 +106,13 @@ func (f *FakeInformer) HasSynced() bool { return f.Synced } -// AddEventHandler implements the Informer interface. Adds an EventHandler to the fake Informers. TODO(community): Implement Registration. +// AddEventHandler implements the Informer interface. Adds an EventHandler to the fake Informers. func (f *FakeInformer) AddEventHandler(handler cache.ResourceEventHandler) (cache.ResourceEventHandlerRegistration, error) { - f.handlers = append(f.handlers, eventHandlerWrapper{handler}) - return nil, nil + f.mu.Lock() + defer f.mu.Unlock() + eh := &eventHandlerWrapper{handler, true} + f.handlers = append(f.handlers, eh) + return eh, nil } // Run implements the Informer interface. Increments f.RunCount. @@ -111,6 +122,8 @@ func (f *FakeInformer) Run(<-chan struct{}) { // Add fakes an Add event for obj. func (f *FakeInformer) Add(obj metav1.Object) { + f.mu.RLock() + defer f.mu.RUnlock() for _, h := range f.handlers { h.OnAdd(obj) } @@ -118,6 +131,8 @@ func (f *FakeInformer) Add(obj metav1.Object) { // Update fakes an Update event for obj. func (f *FakeInformer) Update(oldObj, newObj metav1.Object) { + f.mu.RLock() + defer f.mu.RUnlock() for _, h := range f.handlers { h.OnUpdate(oldObj, newObj) } @@ -125,6 +140,8 @@ func (f *FakeInformer) Update(oldObj, newObj metav1.Object) { // Delete fakes an Delete event for obj. func (f *FakeInformer) Delete(obj metav1.Object) { + f.mu.RLock() + defer f.mu.RUnlock() for _, h := range f.handlers { h.OnDelete(obj) } @@ -135,8 +152,25 @@ func (f *FakeInformer) AddEventHandlerWithResyncPeriod(handler cache.ResourceEve return nil, nil } -// RemoveEventHandler does nothing. TODO(community): Implement this. +// RemoveEventHandler removes an EventHandler to the fake Informers. func (f *FakeInformer) RemoveEventHandler(handle cache.ResourceEventHandlerRegistration) error { + eh, ok := handle.(*eventHandlerWrapper) + if !ok { + return fmt.Errorf("invalid registration type %t", handle) + } + + f.mu.Lock() + defer f.mu.Unlock() + + handlers := make([]*eventHandlerWrapper, 0, len(f.handlers)) + for _, h := range f.handlers { + if h == eh { + continue + } + handlers = append(handlers, h) + } + f.handlers = handlers + return nil } diff --git a/pkg/internal/controller/controller.go b/pkg/internal/controller/controller.go index 33883647b9..d1ba5f3cf4 100644 --- a/pkg/internal/controller/controller.go +++ b/pkg/internal/controller/controller.go @@ -59,12 +59,18 @@ type Controller struct { // the Queue for processing Queue workqueue.RateLimitingInterface + // startedSources maintains a list of sources that have already started. + startedSources []source.Source + // mu is used to synchronize Controller setup mu sync.Mutex // Started is true if the Controller has been Started Started bool + // Stopped is true if the Controller has been Stopped + Stopped bool + // ctx is the context that was passed to Start() and used when starting watches. // // According to the docs, contexts should not be stored in a struct: https://golang.org/pkg/context, @@ -124,6 +130,10 @@ func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prc c.mu.Lock() defer c.mu.Unlock() + if c.Stopped { + return fmt.Errorf("can not start watch in a stopped controller") + } + // Controller hasn't started yet, store the watches locally and return. // // These watches are going to be held on the controller struct until the manager or user calls Start(...). @@ -133,7 +143,11 @@ func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prc } c.LogConstructor(nil).Info("Starting EventSource", "source", src) - return src.Start(c.ctx, evthdler, c.Queue, prct...) + err := src.Start(c.ctx, evthdler, c.Queue, prct...) + if err == nil { + c.startedSources = append(c.startedSources, src) + } + return err } // NeedLeaderElection implements the manager.LeaderElectionRunnable interface. @@ -149,6 +163,9 @@ func (c *Controller) Start(ctx context.Context) error { // use an IIFE to get proper lock handling // but lock outside to get proper handling of the queue shutdown c.mu.Lock() + if c.Stopped { + return fmt.Errorf("can not restart a stopped controller, you should create a new one") + } if c.Started { return errors.New("controller was started more than once. This is likely to be caused by being added to a manager multiple times") } @@ -156,15 +173,33 @@ func (c *Controller) Start(ctx context.Context) error { c.initMetrics() // Set the internal context. - c.ctx = ctx + var cancel context.CancelFunc + c.ctx, cancel = context.WithCancel(ctx) + + wg := &sync.WaitGroup{} c.Queue = c.MakeQueue() - go func() { - <-ctx.Done() - c.Queue.ShutDown() + defer func() { + var startedSources []source.Source + c.mu.Lock() + c.Stopped = true + startedSources = c.startedSources + c.mu.Unlock() + + c.Queue.ShutDown() // Stop receiving new items in the queue + + cancel() // cancel the context to stop all the sources + for _, src := range startedSources { + if err := src.Shutdown(); err != nil { + c.LogConstructor(nil).Error(err, "Failed to stop watch source when controller stopping", "source", src) + } + } + c.LogConstructor(nil).Info("All watch sources finished") + + wg.Wait() // Wait for workers to finish + c.LogConstructor(nil).Info("All workers finished") }() - wg := &sync.WaitGroup{} err := func() error { defer c.mu.Unlock() @@ -180,6 +215,7 @@ func (c *Controller) Start(ctx context.Context) error { if err := watch.src.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil { return err } + c.startedSources = append(c.startedSources, watch.src) } // Start the SharedIndexInformer factories to begin populating the SharedIndexInformer caches @@ -238,8 +274,6 @@ func (c *Controller) Start(ctx context.Context) error { <-ctx.Done() c.LogConstructor(nil).Info("Shutdown signal received, waiting for all workers to finish") - wg.Wait() - c.LogConstructor(nil).Info("All workers finished") return nil } diff --git a/pkg/internal/controller/controller_test.go b/pkg/internal/controller/controller_test.go index ce2245e60f..5d44dd01ba 100644 --- a/pkg/internal/controller/controller_test.go +++ b/pkg/internal/controller/controller_test.go @@ -151,7 +151,7 @@ var _ = Describe("controller", func() { err = ctrl.Start(context.TODO()) Expect(err).To(HaveOccurred()) - Expect(err.Error()).To(ContainSubstring("failed to wait for testcontroller caches to sync: timed out waiting for cache to be synced")) + Expect(err.Error()).To(ContainSubstring("timed out trying to get an informer from cache and waiting for cache to be synced for Kind *v1.Deployment")) }) It("should not error when context cancelled", func() { @@ -226,7 +226,7 @@ var _ = Describe("controller", func() { Object: p, } - ins := &source.Channel{Source: ch} + ins := &source.Channel{Broadcaster: source.NewChannelBroadcaster(ch)} ins.DestBufferSize = 1 // send the event to the channel @@ -260,7 +260,7 @@ var _ = Describe("controller", func() { e := ctrl.Start(ctx) Expect(e).To(HaveOccurred()) - Expect(e.Error()).To(ContainSubstring("must specify Channel.Source")) + Expect(e.Error()).To(ContainSubstring("must create Channel with a non-nil Broadcaster")) }) It("should call Start on sources with the appropriate EventHandler, Queue, and Predicates", func() { @@ -308,7 +308,7 @@ var _ = Describe("controller", func() { Expect(ctrl.Start(ctx)).To(Succeed()) err := ctrl.Start(ctx) Expect(err).To(HaveOccurred()) - Expect(err.Error()).To(Equal("controller was started more than once. This is likely to be caused by being added to a manager multiple times")) + Expect(err.Error()).To(Equal("can not restart a stopped controller, you should create a new one")) }) }) diff --git a/pkg/internal/source/kind.go b/pkg/internal/source/kind.go index b3a8227125..ff0d1620b0 100644 --- a/pkg/internal/source/kind.go +++ b/pkg/internal/source/kind.go @@ -4,11 +4,16 @@ import ( "context" "errors" "fmt" + "sync" + "sync/atomic" "time" "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/runtime" + kerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/discovery" + toolscache "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" @@ -24,94 +29,271 @@ type Kind struct { // Cache used to watch APIs Cache cache.Cache - // started may contain an error if one was encountered during startup. If its closed and does not - // contain an error, startup and syncing finished. - started chan error - startCancel func() + mu sync.RWMutex + // shuttingDown is true if the source has been shuttingDown and causes any following/ in-progress + // Start calls to no-op. + shuttingDown bool + // isStarted is true if the source has been started. A source can only be started once. + isStarted bool + + // informer is the informer that we obtained from the cache. + informer cache.Informer + // registration is the registered EventHandler handle. + registration toolscache.ResourceEventHandlerRegistration + // started may contain an error if one was encountered during startup. If its closed and + // does not contain an error, startup and syncing finished. + startupDoneCh chan struct{} + // startupErr contains an error if one was encountered during startup. + startupErr atomic.Value + // cancelStartup is used to cancel the startup of the source. It is used when the WaitForSync + // context is canceled. + cancelStartup context.CancelFunc +} + +type startupErr struct { + isCanceled bool + err error +} + +func (ks *Kind) String() string { + if ks.Type != nil { + return fmt.Sprintf("kind source: %T", ks.Type) + } + return "kind source: unknown type" } -// Start is internal and should be called only by the Controller to register an EventHandler with the Informer -// to enqueue reconcile.Requests. +// Start is internal and should be called only by the Controller to start a source which enqueue reconcile.Requests. +// It should NOT block, instead, it should start a goroutine and return immediately. +// The context passed to Start can be used to cancel this goroutine. After the context is canceled, Shutdown can be +// called to wait for the goroutine to exit. +// Start can be called only once, it is thus not possible to share a single source between multiple controllers. func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface, prct ...predicate.Predicate) error { + // Type should have been specified by the user. if ks.Type == nil { - return fmt.Errorf("must create Kind with a non-nil object") + return fmt.Errorf("must create Kind with a non-nil Type") } + // Cache should have been specified by the user. if ks.Cache == nil { - return fmt.Errorf("must create Kind with a non-nil cache") + return fmt.Errorf("must create Kind with a non-nil Cache") } - // cache.GetInformer will block until its context is cancelled if the cache was already started and it can not - // sync that informer (most commonly due to RBAC issues). - ctx, ks.startCancel = context.WithCancel(ctx) - ks.started = make(chan error) + ks.mu.Lock() + defer ks.mu.Unlock() + if ks.shuttingDown { + return nil + } + if ks.isStarted { + return fmt.Errorf("cannot start an already started Kind source") + } + + ks.isStarted = true + + ctx, ks.cancelStartup = context.WithCancel(ctx) + + ks.startupDoneCh = make(chan struct{}) go func() { - var ( - i cache.Informer - lastErr error + defer close(ks.startupDoneCh) + + err := ks.registerEventHandler( + ctx, + NewEventHandler(ctx, queue, handler, prct).HandlerFuncs(), ) + if err != nil { + ks.startupErr.Store(startupErr{ + isCanceled: errors.Is(ctx.Err(), context.Canceled), + err: err, + }) + } + }() - // Tries to get an informer until it returns true, - // an error or the specified context is cancelled or expired. - if err := wait.PollUntilContextCancel(ctx, 10*time.Second, true, func(ctx context.Context) (bool, error) { - // Lookup the Informer from the Cache and add an EventHandler which populates the Queue - i, lastErr = ks.Cache.GetInformer(ctx, ks.Type) - if lastErr != nil { - kindMatchErr := &meta.NoKindMatchError{} - switch { - case errors.As(lastErr, &kindMatchErr): - log.Error(lastErr, "if kind is a CRD, it should be installed before calling Start", - "kind", kindMatchErr.GroupKind) - case runtime.IsNotRegisteredError(lastErr): - log.Error(lastErr, "kind must be registered to the Scheme") - default: - log.Error(lastErr, "failed to get informer from cache") + return nil +} + +func getInformer(ctx context.Context, informerCache cache.Cache, resourceType client.Object) (cache.Informer, error) { + var ( + i cache.Informer + lastErr error + ) + + // Tries to get an informer until it returns true, + // an error or the specified context is cancelled or expired. + if err := wait.PollUntilContextCancel(ctx, 10*time.Second, true, func(pollCtx context.Context) (bool, error) { + // Lookup the Informer from the Cache. + i, lastErr = informerCache.GetInformer(pollCtx, resourceType, cache.BlockUntilSynced(false)) + if lastErr != nil { + kindMatchErr := &meta.NoKindMatchError{} + discoveryErr := &discovery.ErrGroupDiscoveryFailed{} + switch { + case errors.As(lastErr, &kindMatchErr): + // We got a NoKindMatchError, which means the kind is a CRD and it's not installed yet. + // We should retry until it's installed. + log.Error(lastErr, "waiting for CRD to be installed", "groupKind", kindMatchErr.GroupKind) + return false, nil // Retry. + case errors.As(lastErr, &discoveryErr): + // We got a ErrGroupDiscoveryFailed, which means the kind is a CRD and it's not installed yet. + // We should retry until it's installed. + for gv, err := range discoveryErr.Groups { + log.Error(err, "waiting for CRD to be installed", "groupVersion", gv) } return false, nil // Retry. + case runtime.IsNotRegisteredError(lastErr): + // We got a IsNotRegisteredError, which means the kind is not registered to the Scheme. + // This is a programming error, so we should stop retrying and return the error. + return true, fmt.Errorf("kind must be registered to the Scheme: %w", lastErr) + default: + // We got an error that is not a NoKindMatchError or IsNotRegisteredError, so we should + // stop retrying and return the error. + return true, fmt.Errorf("failed to get informer from cache: %w", lastErr) } - return true, nil - }); err != nil { - if lastErr != nil { - ks.started <- fmt.Errorf("failed to get informer from cache: %w", lastErr) - return - } - ks.started <- err - return } + return true, nil + }); err != nil { + return nil, err + } - _, err := i.AddEventHandler(NewEventHandler(ctx, queue, handler, prct).HandlerFuncs()) - if err != nil { - ks.started <- err - return + return i, nil +} + +func (ks *Kind) registerEventHandler(ctx context.Context, eventHandler toolscache.ResourceEventHandlerFuncs) error { + informer, err := getInformer(ctx, ks.Cache, ks.Type) + if err != nil { + return err + } + + err = func() error { + ks.mu.Lock() + defer ks.mu.Unlock() + + // The source was shuttingDown while we were getting the informer. + // We should not add the event handler or save the obtained informer. + if ks.shuttingDown { + return nil } - if !ks.Cache.WaitForCacheSync(ctx) { - // Would be great to return something more informative here - ks.started <- errors.New("cache did not sync") + + // Save the informer so that we use it to unregister the event handler in Stop. + ks.informer = informer + + // Register the event handler and save the registration so that we can unregister it in Stop. + registration, err := ks.informer.AddEventHandler(eventHandler) + if err != nil { + return err } - close(ks.started) + ks.registration = registration + + return nil }() + if err != nil { + return err + } - return nil -} + // Wait for the cache to sync. + if !ks.Cache.WaitForCacheSync(ctx) { + if ctx.Err() != nil { + return fmt.Errorf("cache did not sync: %w", ctx.Err()) + } -func (ks *Kind) String() string { - if ks.Type != nil { - return fmt.Sprintf("kind source: %T", ks.Type) + return fmt.Errorf("cache did not sync") } - return "kind source: unknown type" + + return nil } // WaitForSync implements SyncingSource to allow controllers to wait with starting // workers until the cache is synced. +// +// WaitForSync blocks until the cache is synced or the passed context is canceled. If the passed context is canceled, +// with a non-context.Canceled error, WaitForSync will return an error. +// +// If the cache is stopped before it is synced, WaitForSync will return an error in case the cache was canceled with a +// non-context.Canceled error. func (ks *Kind) WaitForSync(ctx context.Context) error { + if func() bool { + ks.mu.RLock() + defer ks.mu.RUnlock() + + return !ks.isStarted + }() { + return fmt.Errorf("cannot wait for sync on an unstarted source") + } + + var returnErrors []error + select { - case err := <-ks.started: - return err - case <-ctx.Done(): - ks.startCancel() - if errors.Is(ctx.Err(), context.Canceled) { + case <-ks.startupDoneCh: + // Check if we have any startup errors. We ignore the errors if the context was canceled, because it means that the + // source was stopped by the user. + if startErr, _ := ks.startupErr.Load().(startupErr); !startErr.isCanceled && startErr.err == nil { + // The cache was synced and the startup goroutine returned without an error. return nil } - return fmt.Errorf("timed out waiting for cache to be synced for Kind %T", ks.Type) + + // The errors will be collected in Shutdown. + case <-ctx.Done(): + // If the context is cancelled, we should cancel the startup context + // and wait for the startup goroutine to return. + ks.cancelStartup() + + // Return an additional timeout error if the context was not cancelled. + if !errors.Is(ctx.Err(), context.Canceled) { + returnErrors = append(returnErrors, fmt.Errorf("timed out trying to get an informer from cache and waiting for cache to be synced for Kind %T", ks.Type)) + } + } + + // Try to stop the source to clean up any resources + // and collect any errors. + collectedErrors := ks.Shutdown() + if collectedErrors != nil { + returnErrors = append(returnErrors, collectedErrors) + } + + if len(returnErrors) == 1 { + return returnErrors[0] + } + return kerrors.NewAggregate(returnErrors) +} + +// Shutdown marks a Source as shutting down. At that point no new +// informers can be started anymore and Start will return without +// doing anything. +// +// In addition, Shutdown blocks until all goroutines have terminated. For that +// to happen, the close channel(s) that they were started with must be closed, +// either before Shutdown gets called or while it is waiting. +// +// Shutdown may be called multiple times, even concurrently. All such calls will +// block until all goroutines have terminated. +func (ks *Kind) Shutdown() error { + ks.mu.Lock() + defer ks.mu.Unlock() + // Ensure that when we release the lock, we stop an in-process & future calls to Start(). + ks.shuttingDown = true + + // If we haven't started yet, there's nothing to stop. + if !ks.isStarted { + return nil + } + + var errs []error + + // Wait for the started channel to be closed. + ks.mu.Unlock() + <-ks.startupDoneCh + + // Check if we have any startup errors. We ignore the errors if the context was canceled, because it means that the + // source was stopped by the user. + if startErr, _ := ks.startupErr.Load().(startupErr); !startErr.isCanceled && startErr.err != nil { + errs = append(errs, fmt.Errorf("failed to start source: %w", startErr.err)) } + ks.mu.Lock() + + // Remove event handler. + if ks.registration != nil { + if err := ks.informer.RemoveEventHandler(ks.registration); err != nil { + errs = append(errs, fmt.Errorf("failed to stop source: %w", err)) + } + ks.registration = nil + } + + return kerrors.NewAggregate(errs) } diff --git a/pkg/source/example_test.go b/pkg/source/example_test.go index 77857729de..834737c337 100644 --- a/pkg/source/example_test.go +++ b/pkg/source/example_test.go @@ -43,7 +43,7 @@ func ExampleChannel() { events := make(chan event.GenericEvent) err := ctrl.Watch( - &source.Channel{Source: events}, + &source.Channel{Broadcaster: source.NewChannelBroadcaster(events)}, &handler.EnqueueRequestForObject{}, ) if err != nil { diff --git a/pkg/source/source.go b/pkg/source/source.go index 099c8d68fa..d37cbbdc75 100644 --- a/pkg/source/source.go +++ b/pkg/source/source.go @@ -19,11 +19,9 @@ package source import ( "context" "fmt" - "sync" "k8s.io/client-go/util/workqueue" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" internal "sigs.k8s.io/controller-runtime/pkg/internal/source" @@ -31,11 +29,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/predicate" ) -const ( - // defaultBufferSize is the default number of event notifications that can be buffered. - defaultBufferSize = 1024 -) - // Source is a source of events (eh.g. Create, Update, Delete operations on Kubernetes Objects, Webhook callbacks, etc) // which should be processed by event.EventHandlers to enqueue reconcile.Requests. // @@ -45,9 +38,23 @@ const ( // // Users may build their own Source implementations. type Source interface { - // Start is internal and should be called only by the Controller to register an EventHandler with the Informer - // to enqueue reconcile.Requests. + // Start is internal and should be called only by the Controller to start a goroutine that enqueues + // reconcile.Requests. It should NOT block, instead, it should start a goroutine and return immediately. + // The context passed to Start can be used to cancel the blocking operations in the Start method. To cancel the + // goroutine/ shutdown the source a user should call Stop. Start(context.Context, handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error + + // Shutdown marks a Source as shutting down. At that point no new + // informers can be started anymore and Start will return without + // doing anything. + // + // In addition, Shutdown blocks until all goroutines have terminated. For that + // to happen, the close channel(s) that they were started with must be closed, + // either before Shutdown gets called or while it is waiting. + // + // Shutdown may be called multiple times, even concurrently. All such calls will + // block until all goroutines have terminated. + Shutdown() error } // SyncingSource is a source that needs syncing prior to being usable. The controller @@ -62,164 +69,26 @@ func Kind(cache cache.Cache, object client.Object) SyncingSource { return &internal.Kind{Type: object, Cache: cache} } -var _ Source = &Channel{} - -// Channel is used to provide a source of events originating outside the cluster -// (e.g. GitHub Webhook callback). Channel requires the user to wire the external -// source (eh.g. http handler) to write GenericEvents to the underlying channel. -type Channel struct { - // once ensures the event distribution goroutine will be performed only once - once sync.Once - - // Source is the source channel to fetch GenericEvents - Source <-chan event.GenericEvent - - // dest is the destination channels of the added event handlers - dest []chan event.GenericEvent - - // DestBufferSize is the specified buffer size of dest channels. - // Default to 1024 if not specified. - DestBufferSize int - - // destLock is to ensure the destination channels are safely added/removed - destLock sync.Mutex -} - -func (cs *Channel) String() string { - return fmt.Sprintf("channel source: %p", cs) -} - -// Start implements Source and should only be called by the Controller. -func (cs *Channel) Start( - ctx context.Context, - handler handler.EventHandler, - queue workqueue.RateLimitingInterface, - prct ...predicate.Predicate) error { - // Source should have been specified by the user. - if cs.Source == nil { - return fmt.Errorf("must specify Channel.Source") - } - - // use default value if DestBufferSize not specified - if cs.DestBufferSize == 0 { - cs.DestBufferSize = defaultBufferSize - } - - dst := make(chan event.GenericEvent, cs.DestBufferSize) - - cs.destLock.Lock() - cs.dest = append(cs.dest, dst) - cs.destLock.Unlock() - - cs.once.Do(func() { - // Distribute GenericEvents to all EventHandler / Queue pairs Watching this source - go cs.syncLoop(ctx) - }) - - go func() { - for evt := range dst { - shouldHandle := true - for _, p := range prct { - if !p.Generic(evt) { - shouldHandle = false - break - } - } - - if shouldHandle { - func() { - ctx, cancel := context.WithCancel(ctx) - defer cancel() - handler.Generic(ctx, evt, queue) - }() - } - } - }() - - return nil -} - -func (cs *Channel) doStop() { - cs.destLock.Lock() - defer cs.destLock.Unlock() - - for _, dst := range cs.dest { - close(dst) - } -} - -func (cs *Channel) distribute(evt event.GenericEvent) { - cs.destLock.Lock() - defer cs.destLock.Unlock() - - for _, dst := range cs.dest { - // We cannot make it under goroutine here, or we'll meet the - // race condition of writing message to closed channels. - // To avoid blocking, the dest channels are expected to be of - // proper buffer size. If we still see it blocked, then - // the controller is thought to be in an abnormal state. - dst <- evt - } -} - -func (cs *Channel) syncLoop(ctx context.Context) { - for { - select { - case <-ctx.Done(): - // Close destination channels - cs.doStop() - return - case evt, stillOpen := <-cs.Source: - if !stillOpen { - // if the source channel is closed, we're never gonna get - // anything more on it, so stop & bail - cs.doStop() - return - } - cs.distribute(evt) - } - } -} - -// Informer is used to provide a source of events originating inside the cluster from Watches (e.g. Pod Create). -type Informer struct { - // Informer is the controller-runtime Informer - Informer cache.Informer -} - -var _ Source = &Informer{} - -// Start is internal and should be called only by the Controller to register an EventHandler with the Informer -// to enqueue reconcile.Requests. -func (is *Informer) Start(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface, - prct ...predicate.Predicate) error { - // Informer should have been specified by the user. - if is.Informer == nil { - return fmt.Errorf("must specify Informer.Informer") - } - - _, err := is.Informer.AddEventHandler(internal.NewEventHandler(ctx, queue, handler, prct).HandlerFuncs()) - if err != nil { - return err - } - return nil -} - -func (is *Informer) String() string { - return fmt.Sprintf("informer source: %p", is.Informer) -} - var _ Source = Func(nil) // Func is a function that implements Source. +// Deprecated: use manually implemented Source instead. type Func func(context.Context, handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error -// Start implements Source. +// Start is internal and should be called only by the Controller to start a goroutine that enqueues +// reconcile.Requests. func (f Func) Start(ctx context.Context, evt handler.EventHandler, queue workqueue.RateLimitingInterface, pr ...predicate.Predicate) error { return f(ctx, evt, queue, pr...) } +// Shutdown is internal and should be called only by the Controller to stop the Source. It should block until the +// Source has stopped. +// WARNING: will always return an error. +func (f Func) Shutdown() error { + return fmt.Errorf("Func source is not stoppable") +} + func (f Func) String() string { return fmt.Sprintf("func source: %p", f) } diff --git a/pkg/source/source_channel.go b/pkg/source/source_channel.go new file mode 100644 index 0000000000..06d3ae3e77 --- /dev/null +++ b/pkg/source/source_channel.go @@ -0,0 +1,170 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package source + +import ( + "context" + "fmt" + "sync" + + "k8s.io/client-go/util/workqueue" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/predicate" +) + +const ( + // defaultBufferSize is the default number of event notifications that can be buffered. + defaultBufferSize = 1024 +) + +// Channel is used to provide a source of events originating outside the cluster +// (e.g. GitHub Webhook callback). Channel requires the user to wire the external +// source (eh.g. http handler) to write GenericEvents to the underlying channel. +type Channel struct { + // Broadcaster contains the source channel for events. + Broadcaster *ChannelBroadcaster + + // DestBufferSize is the size of the buffer for each destination channel. + // If DestBufferSize is 0, then a default size is used. + DestBufferSize int + + mu sync.Mutex + // shuttingDown is true if the source has been shuttingDown and causes any following/ in-progress + // Start calls to no-op. + shuttingDown bool + // isStarted is true if the source has been started. A source can only be started once. + isStarted bool + + // doneCh is closed when the source stopped listening to the broadcaster. + doneCh chan struct{} +} + +func (cs *Channel) String() string { + return fmt.Sprintf("channel source: %p", cs) +} + +// Start is internal and should be called only by the Controller to start a source which enqueue reconcile.Requests. +// It should NOT block, instead, it should start a goroutine and return immediately. +// The context passed to Start can be used to cancel this goroutine. After the context is canceled, Shutdown can be +// called to wait for the goroutine to exit. +// Start can be called only once, it is thus not possible to share a single source between multiple controllers. +func (cs *Channel) Start(ctx context.Context, eventHandler handler.EventHandler, queue workqueue.RateLimitingInterface, + predicates ...predicate.Predicate) error { + // Broadcaster should have been specified by the user. + if cs.Broadcaster == nil { + return fmt.Errorf("must create Channel with a non-nil Broadcaster") + } + + cs.mu.Lock() + defer cs.mu.Unlock() + if cs.shuttingDown { + return nil + } + if cs.isStarted { + return fmt.Errorf("cannot start an already started Channel source") + } + + cs.isStarted = true + + if cs.DestBufferSize <= 0 { + cs.DestBufferSize = defaultBufferSize + } + + // Create a destination channel for the event handler + // and add it to the list of destinations + destination := make(chan event.GenericEvent, cs.DestBufferSize) + cs.Broadcaster.addListener(destination) + + cs.doneCh = make(chan struct{}) + go func() { + defer close(cs.doneCh) + // Remove the listener and wait for the broadcaster + // to stop sending events to the destination channel. + defer cs.Broadcaster.removeListener(destination) + + cs.processReceivedEvents( + ctx, + destination, + eventHandler, + queue, + predicates..., + ) + }() + + return nil +} + +func (cs *Channel) processReceivedEvents( + ctx context.Context, + destination <-chan event.GenericEvent, + eventHandler handler.EventHandler, + queue workqueue.RateLimitingInterface, + predicates ...predicate.Predicate, +) { +eventloop: + for { + select { + case <-ctx.Done(): + return + case event, stillOpen := <-destination: + if !stillOpen { + return + } + + // Check predicates against the event first + // and continue the outer loop if any of them fail. + for _, p := range predicates { + if !p.Generic(event) { + continue eventloop + } + } + + // Call the event handler with the event. + eventHandler.Generic(ctx, event, queue) + } + } +} + +// Shutdown marks a Source as shutting down. At that point no new +// informers can be started anymore and Start will return without +// doing anything. +// +// In addition, Shutdown blocks until all goroutines have terminated. For that +// to happen, the close channel(s) that they were started with must be closed, +// either before Shutdown gets called or while it is waiting. +// +// Shutdown may be called multiple times, even concurrently. All such calls will +// block until all goroutines have terminated. +func (cs *Channel) Shutdown() error { + if func() bool { + cs.mu.Lock() + defer cs.mu.Unlock() + // Ensure that when we release the lock, we stop an in-process & future calls to Start(). + cs.shuttingDown = true + + // If we haven't started yet, there's nothing to stop. + return !cs.isStarted + }() { + return nil + } + + // Wait for the listener goroutine to exit. + <-cs.doneCh + + return nil +} diff --git a/pkg/source/source_channel_broadcast.go b/pkg/source/source_channel_broadcast.go new file mode 100644 index 0000000000..86db1baa9e --- /dev/null +++ b/pkg/source/source_channel_broadcast.go @@ -0,0 +1,189 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package source + +import ( + "sync" + + "sigs.k8s.io/controller-runtime/pkg/event" +) + +// ChannelBroadcaster is a wrapper around a channel that allows multiple listeners to all +// receive the events from the channel. +type ChannelBroadcaster struct { + source <-chan event.GenericEvent + + mu sync.Mutex + rcCount uint + managementCh chan managementMsg + doneCh chan struct{} +} + +type managementOperation bool + +const ( + addChannel managementOperation = true + removeChannel managementOperation = false +) + +type managementMsg struct { + operation managementOperation + ch chan event.GenericEvent +} + +// NewChannelBroadcaster creates a new ChannelBroadcaster for the given channel. +func NewChannelBroadcaster(source <-chan event.GenericEvent) *ChannelBroadcaster { + return &ChannelBroadcaster{ + source: source, + } +} + +func (sc *ChannelBroadcaster) addListener(ch chan event.GenericEvent) { + var managementCh chan managementMsg + var doneCh chan struct{} + isFirst := false + func() { + sc.mu.Lock() + defer sc.mu.Unlock() + + isFirst = sc.rcCount == 0 + sc.rcCount++ + + if isFirst { + sc.managementCh = make(chan managementMsg) + sc.doneCh = make(chan struct{}) + } + + managementCh = sc.managementCh + doneCh = sc.doneCh + }() + + if isFirst { + go startLoop(sc.source, managementCh, doneCh) + } + + // If the goroutine is not yet stopped, send a message to add the + // destination channel. The routine might be stopped already because + // the source channel was closed. + select { + case <-doneCh: + default: + managementCh <- managementMsg{ + operation: addChannel, + ch: ch, + } + } +} + +func startLoop( + source <-chan event.GenericEvent, + managementCh chan managementMsg, + doneCh chan struct{}, +) { + defer close(doneCh) + + var destinations []chan event.GenericEvent + + // Close all remaining destinations in case the Source channel is closed. + defer func() { + for _, dst := range destinations { + close(dst) + } + }() + + // Wait for the first destination to be added before starting the loop. + for len(destinations) == 0 { + managementMsg := <-managementCh + if managementMsg.operation == addChannel { + destinations = append(destinations, managementMsg.ch) + } + } + + for { + select { + case msg := <-managementCh: + + switch msg.operation { + case addChannel: + destinations = append(destinations, msg.ch) + case removeChannel: + SearchLoop: + for i, dst := range destinations { + if dst == msg.ch { + destinations = append(destinations[:i], destinations[i+1:]...) + close(dst) + break SearchLoop + } + } + + if len(destinations) == 0 { + return + } + } + + case evt, stillOpen := <-source: + if !stillOpen { + return + } + + for _, dst := range destinations { + // We cannot make it under goroutine here, or we'll meet the + // race condition of writing message to closed channels. + // To avoid blocking, the dest channels are expected to be of + // proper buffer size. If we still see it blocked, then + // the controller is thought to be in an abnormal state. + dst <- evt + } + } + } +} + +func (sc *ChannelBroadcaster) removeListener(ch chan event.GenericEvent) { + var managementCh chan managementMsg + var doneCh chan struct{} + isLast := false + func() { + sc.mu.Lock() + defer sc.mu.Unlock() + + sc.rcCount-- + isLast = sc.rcCount == 0 + + managementCh = sc.managementCh + doneCh = sc.doneCh + }() + + // If the goroutine is not yet stopped, send a message to remove the + // destination channel. The routine might be stopped already because + // the source channel was closed. + select { + case <-doneCh: + default: + managementCh <- managementMsg{ + operation: removeChannel, + ch: ch, + } + } + + // Wait for the doneCh to be closed (in case we are the last one) + if isLast { + <-doneCh + } + + // Wait for the destination channel to be closed. + <-ch +} diff --git a/pkg/source/source_informer.go b/pkg/source/source_informer.go new file mode 100644 index 0000000000..a482596637 --- /dev/null +++ b/pkg/source/source_informer.go @@ -0,0 +1,174 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package source + +import ( + "context" + "errors" + "fmt" + "sync" + + kerrors "k8s.io/apimachinery/pkg/util/errors" + toolscache "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/handler" + internal "sigs.k8s.io/controller-runtime/pkg/internal/source" + "sigs.k8s.io/controller-runtime/pkg/predicate" +) + +// Informer is used to provide a source of events originating inside the cluster from Watches (e.g. Pod Create). +type Informer struct { + // Informer is the controller-runtime Informer + Informer cache.Informer + + mu sync.RWMutex + // shuttingDown is true if the source has been shuttingDown and causes any following/ in-progress + // Start calls to no-op. + shuttingDown bool + // isStarted is true if the source has been started. A source can only be started once. + isStarted bool + + // registration is the registered EventHandler handle. + registration toolscache.ResourceEventHandlerRegistration + // startupErr may contain an error if one was encountered during startup. + startupErr startupErr +} + +type startupErr struct { + isCanceled bool + err error +} + +func (is *Informer) String() string { + return fmt.Sprintf("informer source: %p", is.Informer) +} + +// Start is internal and should be called only by the Controller to start a source which enqueue reconcile.Requests. +// It should NOT block, instead, it should start a goroutine and return immediately. +// The context passed to Start can be used to cancel this goroutine. After the context is canceled, Shutdown can be +// called to wait for the goroutine to exit. +// Start can be called only once, it is thus not possible to share a single source between multiple controllers. +func (is *Informer) Start(ctx context.Context, eventHandler handler.EventHandler, queue workqueue.RateLimitingInterface, + predicates ...predicate.Predicate) error { + // Informer should have been specified by the user. + if is.Informer == nil { + return fmt.Errorf("must create Informer with a non-nil Informer") + } + + is.mu.Lock() + defer is.mu.Unlock() + if is.shuttingDown { + return nil + } + if is.isStarted { + return fmt.Errorf("cannot start an already started Informer source") + } + + is.isStarted = true + + registration, err := is.Informer.AddEventHandler(internal.NewEventHandler(ctx, queue, eventHandler, predicates).HandlerFuncs()) + if err != nil { + is.startupErr = startupErr{ + isCanceled: errors.Is(ctx.Err(), context.Canceled), + err: fmt.Errorf("failed to add EventHandler to informer: %w", err), + } + } + is.registration = registration + + return nil +} + +// WaitForSync implements SyncingSource to allow controllers to wait with starting +// workers until the cache is synced. +// +// WaitForSync blocks until the cache is synced or the passed context is canceled. If the passed context is canceled, +// with a non-context.Canceled error, WaitForSync will return an error. +// +// If the cache is stopped before it is synced, WaitForSync will return an error in case the cache was canceled with a +// non-context.Canceled error. +func (is *Informer) WaitForSync(ctx context.Context) error { + if func() bool { + is.mu.RLock() + defer is.mu.RUnlock() + + return !is.isStarted + }() { + return fmt.Errorf("cannot wait for sync on an unstarted source") + } + + var returnErrors []error + + // Check if we have any startup errors. We ignore the errors if the context was canceled, because it means that the + // source was stopped by the user. + if !is.startupErr.isCanceled && is.startupErr.err == nil { + // The cache was synced and the startup goroutine returned without an error. + return nil + } + + // Try to stop the source to clean up any resources + // and collect any errors. + collectedErrors := is.Shutdown() + if collectedErrors != nil { + returnErrors = append(returnErrors, collectedErrors) + } + + if len(returnErrors) == 1 { + return returnErrors[0] + } + return kerrors.NewAggregate(returnErrors) +} + +// Shutdown marks a Source as shutting down. At that point no new +// informers can be started anymore and Start will return without +// doing anything. +// +// In addition, Shutdown blocks until all goroutines have terminated. For that +// to happen, the close channel(s) that they were started with must be closed, +// either before Shutdown gets called or while it is waiting. +// +// Shutdown may be called multiple times, even concurrently. All such calls will +// block until all goroutines have terminated. +func (is *Informer) Shutdown() error { + is.mu.Lock() + defer is.mu.Unlock() + // Ensure that when we release the lock, we stop an in-process & future calls to Start(). + is.shuttingDown = true + + // If we haven't started yet, there's nothing to stop. + if !is.isStarted { + return nil + } + + var errs []error + + // Check if we have any startup errors. We ignore the errors if the context was canceled, because it means that the + // source was stopped by the user. + if !is.startupErr.isCanceled && is.startupErr.err != nil { + errs = append(errs, fmt.Errorf("failed to start source: %w", is.startupErr.err)) + } + + // Remove event handler. + if is.registration != nil { + if err := is.Informer.RemoveEventHandler(is.registration); err != nil { + errs = append(errs, fmt.Errorf("failed to stop source: %w", err)) + } + is.registration = nil + } + + return kerrors.NewAggregate(errs) +} diff --git a/pkg/source/source_integration_test.go b/pkg/source/source_integration_test.go index 594d3c9a9c..7cd4a1d731 100644 --- a/pkg/source/source_integration_test.go +++ b/pkg/source/source_integration_test.go @@ -124,8 +124,9 @@ var _ = Describe("Source", func() { handler2 := newHandler(c2) // Create 2 instances - Expect(instance1.Start(ctx, handler1, q)).To(Succeed()) - Expect(instance2.Start(ctx, handler2, q)).To(Succeed()) + sourceCtx, sourceCancel := context.WithCancel(ctx) + Expect(instance1.Start(sourceCtx, handler1, q)).To(Succeed()) + Expect(instance2.Start(sourceCtx, handler2, q)).To(Succeed()) By("Creating a Deployment and expecting the CreateEvent.") created, err = client.Create(ctx, deployment, metav1.CreateOptions{}) @@ -185,6 +186,132 @@ var _ = Describe("Source", func() { Expect(ok).To(BeTrue(), fmt.Sprintf("expect %T to be %T", evt, event.DeleteEvent{})) deleteEvt.Object.SetResourceVersion("") Expect(deleteEvt.Object).To(Equal(deleted)) + + sourceCancel() + + Expect(instance1.Shutdown()).To(Succeed()) + Expect(instance2.Shutdown()).To(Succeed()) + }) + + It("should not get events after stopped", func() { + var created, updated *appsv1.Deployment + var err error + + // Get the client and Deployment used to create events + client := clientset.AppsV1().Deployments(ns) + deployment := &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{Name: "deployment-name-2"}, + Spec: appsv1.DeploymentSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{"foo": "bar"}, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"foo": "bar"}}, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "nginx", + Image: "nginx", + }, + }, + }, + }, + }, + } + + // Create an event handler to verify the events + newHandler := func(c chan interface{}) handler.Funcs { + return handler.Funcs{ + CreateFunc: func(ctx context.Context, evt event.CreateEvent, rli workqueue.RateLimitingInterface) { + defer GinkgoRecover() + Expect(rli).To(BeIdenticalTo(q)) + c <- evt + }, + UpdateFunc: func(ctx context.Context, evt event.UpdateEvent, rli workqueue.RateLimitingInterface) { + defer GinkgoRecover() + Expect(rli).To(BeIdenticalTo(q)) + c <- evt + }, + DeleteFunc: func(ctx context.Context, evt event.DeleteEvent, rli workqueue.RateLimitingInterface) { + defer GinkgoRecover() + Expect(rli).To(BeIdenticalTo(q)) + c <- evt + }, + } + } + handler1 := newHandler(c1) + handler2 := newHandler(c2) + + // Create 2 instances + sourceCtx1, sourceCancel1 := context.WithCancel(ctx) + Expect(instance1.Start(sourceCtx1, handler1, q)).To(Succeed()) + sourceCtx2, sourceCancel2 := context.WithCancel(ctx) + Expect(instance2.Start(sourceCtx2, handler2, q)).To(Succeed()) + + By("Creating a Deployment and expecting the CreateEvent.") + created, err = client.Create(ctx, deployment, metav1.CreateOptions{}) + Expect(err).NotTo(HaveOccurred()) + Expect(created).NotTo(BeNil()) + + // Check first CreateEvent + evt := <-c1 + createEvt, ok := evt.(event.CreateEvent) + Expect(ok).To(BeTrue(), fmt.Sprintf("expect %T to be %T", evt, event.CreateEvent{})) + Expect(createEvt.Object).To(Equal(created)) + + // Check second CreateEvent + evt = <-c2 + createEvt, ok = evt.(event.CreateEvent) + Expect(ok).To(BeTrue(), fmt.Sprintf("expect %T to be %T", evt, event.CreateEvent{})) + Expect(createEvt.Object).To(Equal(created)) + + By("Shutdown the second kind source") + sourceCancel2() + err = instance2.Shutdown() + Expect(err).NotTo(HaveOccurred()) + + By("Updating a Deployment and expecting the UpdateEvent only via first kind source.") + updated = created.DeepCopy() + updated.Labels = map[string]string{"biz": "buz"} + updated, err = client.Update(ctx, updated, metav1.UpdateOptions{}) + Expect(err).NotTo(HaveOccurred()) + + // Check first UpdateEvent + evt = <-c1 + updateEvt, ok := evt.(event.UpdateEvent) + Expect(ok).To(BeTrue(), fmt.Sprintf("expect %T to be %T", evt, event.UpdateEvent{})) + + Expect(updateEvt.ObjectNew).To(Equal(updated)) + + Expect(updateEvt.ObjectOld).To(Equal(created)) + + // Check second UpdateEvent should not receive + waitCtx1, cancel1 := context.WithTimeout(context.Background(), time.Second) + defer cancel1() + select { + case <-c2: + Fail("kind2 is expected to be stopped") + case <-waitCtx1.Done(): + } + + By("Shutdown the first kind source") + sourceCancel1() + err = instance1.Shutdown() + Expect(err).NotTo(HaveOccurred()) + + By("Deleting a Deployment and expecting no DeleteEvent via the first kind source.") + err = client.Delete(ctx, created.Name, metav1.DeleteOptions{}) + Expect(err).NotTo(HaveOccurred()) + + waitCtx2, cancel2 := context.WithTimeout(context.Background(), time.Second) + defer cancel2() + select { + case <-c1: + Fail("kind1 is expected to be stopped") + case <-waitCtx2.Done(): + } + + Expect(instance2.Shutdown()).To(Succeed()) }) }) diff --git a/pkg/source/source_test.go b/pkg/source/source_test.go index 16c365e8a2..b2f2fc27dc 100644 --- a/pkg/source/source_test.go +++ b/pkg/source/source_test.go @@ -25,6 +25,7 @@ import ( . "github.com/onsi/gomega" "sigs.k8s.io/controller-runtime/pkg/cache/informertest" + "sigs.k8s.io/controller-runtime/pkg/controller/controllertest" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/predicate" @@ -36,12 +37,63 @@ import ( ) var _ = Describe("Source", func() { + Describe("Informer", func() { + var inf *controllertest.FakeInformer + var ctx context.Context + var cancel context.CancelFunc + + BeforeEach(func() { + ctx, cancel = context.WithCancel(context.Background()) + inf = &controllertest.FakeInformer{} + }) + + AfterEach(func() { + cancel() + }) + + Context("should error", func() { + It("if no Informer specified", func() { + instance := &source.Informer{} + err := instance.Start(ctx, nil, nil) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("must create Informer with a non-nil Informer")) + }) + + It("when Start is called twice", func() { + instance := &source.Informer{Informer: inf} + Expect(instance.Start(ctx, nil, nil)).To(Succeed()) + err := instance.Start(ctx, nil, nil) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("cannot start an already started Informer source")) + }) + }) + + Context("should not panic", func() { + It("when Shutdown is called before Start", func() { + instance := &source.Informer{Informer: inf} + err := instance.Shutdown() + Expect(err).NotTo(HaveOccurred()) + }) + + It("when Shutdown is called twice", func() { + instance := &source.Informer{Informer: inf} + Expect(instance.Start(ctx, nil, nil)).To(Succeed()) + cancel() + Expect(instance.Shutdown()).To(Succeed()) + Expect(instance.Shutdown()).To(Succeed()) + }) + }) + }) + Describe("Kind", func() { var c chan struct{} var p *corev1.Pod var ic *informertest.FakeInformers + var ctx context.Context + var cancel context.CancelFunc BeforeEach(func() { + ctx, cancel = context.WithCancel(context.Background()) ic = &informertest.FakeInformers{} c = make(chan struct{}) p = &corev1.Pod{ @@ -53,6 +105,10 @@ var _ = Describe("Source", func() { } }) + AfterEach(func() { + cancel() + }) + Context("for a Pod resource", func() { It("should provide a Pod CreateEvent", func() { c := make(chan struct{}) @@ -179,27 +235,13 @@ var _ = Describe("Source", func() { }) }) - It("should return an error from Start cache was not provided", func() { - instance := source.Kind(nil, &corev1.Pod{}) - err := instance.Start(ctx, nil, nil) - Expect(err).To(HaveOccurred()) - Expect(err.Error()).To(ContainSubstring("must create Kind with a non-nil cache")) - }) - - It("should return an error from Start if a type was not provided", func() { - instance := source.Kind(ic, nil) - err := instance.Start(ctx, nil, nil) - Expect(err).To(HaveOccurred()) - Expect(err.Error()).To(ContainSubstring("must create Kind with a non-nil object")) - }) - It("should return an error if syncing fails", func() { f := false instance := source.Kind(&informertest.FakeInformers{Synced: &f}, &corev1.Pod{}) Expect(instance.Start(context.Background(), nil, nil)).NotTo(HaveOccurred()) err := instance.WaitForSync(context.Background()) Expect(err).To(HaveOccurred()) - Expect(err.Error()).To(Equal("cache did not sync")) + Expect(err.Error()).To(Equal("failed to start source: cache did not sync")) }) @@ -224,8 +266,48 @@ var _ = Describe("Source", func() { Expect(instance.Start(context.Background(), nil, nil)).NotTo(HaveOccurred()) err := instance.WaitForSync(context.Background()) Expect(err).To(HaveOccurred()) - Expect(err.Error()).To(Equal("cache did not sync")) + Expect(err.Error()).To(Equal("failed to start source: cache did not sync")) + + }) + + Context("should error", func() { + It("if no cache specified", func() { + instance := source.Kind(nil, &corev1.Pod{}) + err := instance.Start(ctx, nil, nil) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("must create Kind with a non-nil Cache")) + }) + + It("if no type specified", func() { + instance := source.Kind(ic, nil) + err := instance.Start(ctx, nil, nil) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("must create Kind with a non-nil Type")) + }) + It("when Start is called twice", func() { + instance := source.Kind(ic, &corev1.Pod{}) + Expect(instance.Start(ctx, nil, nil)).To(Succeed()) + err := instance.Start(ctx, nil, nil) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("cannot start an already started Kind source")) + }) + }) + + Context("should not panic", func() { + It("when Shutdown is called before Start", func() { + instance := source.Kind(ic, &corev1.Pod{}) + err := instance.Shutdown() + Expect(err).NotTo(HaveOccurred()) + }) + + It("when Shutdown is called twice", func() { + instance := source.Kind(ic, &corev1.Pod{}) + Expect(instance.Start(ctx, nil, nil)).To(Succeed()) + cancel() + Expect(instance.Shutdown()).To(Succeed()) + Expect(instance.Shutdown()).To(Succeed()) + }) }) }) @@ -289,7 +371,7 @@ var _ = Describe("Source", func() { } q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test") - instance := &source.Channel{Source: ch} + instance := &source.Channel{Broadcaster: source.NewChannelBroadcaster(ch)} err := instance.Start(ctx, handler.Funcs{ CreateFunc: func(context.Context, event.CreateEvent, workqueue.RateLimitingInterface) { defer GinkgoRecover() @@ -327,7 +409,7 @@ var _ = Describe("Source", func() { q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test") // Add a handler to get distribution blocked - instance := &source.Channel{Source: ch} + instance := &source.Channel{Broadcaster: source.NewChannelBroadcaster(ch)} instance.DestBufferSize = 1 err := instance.Start(ctx, handler.Funcs{ CreateFunc: func(context.Context, event.CreateEvent, workqueue.RateLimitingInterface) { @@ -383,7 +465,7 @@ var _ = Describe("Source", func() { q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test") // Add a handler to get distribution blocked - instance := &source.Channel{Source: ch} + instance := &source.Channel{Broadcaster: source.NewChannelBroadcaster(ch)} instance.DestBufferSize = 1 err := instance.Start(ctx, handler.Funcs{ @@ -422,7 +504,7 @@ var _ = Describe("Source", func() { close(ch) By("feeding that channel to a channel source") - src := &source.Channel{Source: ch} + src := &source.Channel{Broadcaster: source.NewChannelBroadcaster(ch)} processed := make(chan struct{}) defer close(processed) @@ -452,12 +534,6 @@ var _ = Describe("Source", func() { Eventually(processed).Should(Receive()) Consistently(processed).ShouldNot(Receive()) }) - It("should get error if no source specified", func() { - q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test") - instance := &source.Channel{ /*no source specified*/ } - err := instance.Start(ctx, handler.Funcs{}, q) - Expect(err).To(Equal(fmt.Errorf("must specify Channel.Source"))) - }) }) Context("for multi sources (handlers)", func() { It("should provide GenericEvents for all handlers", func() { @@ -474,8 +550,8 @@ var _ = Describe("Source", func() { c2 := make(chan struct{}) q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test") - instance := &source.Channel{Source: ch} - err := instance.Start(ctx, handler.Funcs{ + broadcaster := source.NewChannelBroadcaster(ch) + err := (&source.Channel{Broadcaster: broadcaster}).Start(ctx, handler.Funcs{ CreateFunc: func(context.Context, event.CreateEvent, workqueue.RateLimitingInterface) { defer GinkgoRecover() Fail("Unexpected CreateEvent") @@ -498,7 +574,7 @@ var _ = Describe("Source", func() { }, q) Expect(err).NotTo(HaveOccurred()) - err = instance.Start(ctx, handler.Funcs{ + err = (&source.Channel{Broadcaster: broadcaster}).Start(ctx, handler.Funcs{ CreateFunc: func(context.Context, event.CreateEvent, workqueue.RateLimitingInterface) { defer GinkgoRecover() Fail("Unexpected CreateEvent") @@ -529,5 +605,41 @@ var _ = Describe("Source", func() { Expect(resEvent1).To(Equal(resEvent2)) }) }) + + Context("should error", func() { + It("if no source specified", func() { + q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test") + instance := &source.Channel{ /*no source specified*/ } + err := instance.Start(ctx, handler.Funcs{}, q) + Expect(err).To(Equal(fmt.Errorf("must create Channel with a non-nil Broadcaster"))) + }) + + It("when Start is called twice", func() { + ch := make(chan event.GenericEvent) + instance := &source.Channel{Broadcaster: source.NewChannelBroadcaster(ch)} + Expect(instance.Start(ctx, nil, nil)).To(Succeed()) + err := instance.Start(ctx, nil, nil) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("cannot start an already started Channel source")) + }) + }) + + Context("should not panic", func() { + It("when Shutdown is called before Start", func() { + ch := make(chan event.GenericEvent) + instance := &source.Channel{Broadcaster: source.NewChannelBroadcaster(ch)} + err := instance.Shutdown() + Expect(err).NotTo(HaveOccurred()) + }) + + It("when Shutdown is called twice", func() { + ch := make(chan event.GenericEvent) + instance := &source.Channel{Broadcaster: source.NewChannelBroadcaster(ch)} + Expect(instance.Start(ctx, nil, nil)).To(Succeed()) + cancel() + Expect(instance.Shutdown()).To(Succeed()) + Expect(instance.Shutdown()).To(Succeed()) + }) + }) }) })