From bde0a5e2d9d01c39fc4b714ac4d7712470f19646 Mon Sep 17 00:00:00 2001 From: Tim Ramlot <42113979+inteon@users.noreply.github.com> Date: Tue, 13 Feb 2024 22:57:52 +0100 Subject: [PATCH] only allow starting a source once Signed-off-by: Tim Ramlot <42113979+inteon@users.noreply.github.com> --- pkg/controller/controller_test.go | 2 +- pkg/internal/controller/controller_test.go | 9 +- pkg/source/example_test.go | 4 +- pkg/source/internal/channel.go | 136 ++++++-------- pkg/source/internal/channel_broadcast.go | 196 +++++++++++++++++++++ pkg/source/internal/informer.go | 15 +- pkg/source/internal/kind.go | 9 +- pkg/source/source.go | 64 +++++-- pkg/source/source_integration_test.go | 6 +- pkg/source/source_test.go | 103 +++++------ 10 files changed, 372 insertions(+), 172 deletions(-) create mode 100644 pkg/source/internal/channel_broadcast.go diff --git a/pkg/controller/controller_test.go b/pkg/controller/controller_test.go index e49a2c5774..3568ab5d89 100644 --- a/pkg/controller/controller_test.go +++ b/pkg/controller/controller_test.go @@ -77,7 +77,7 @@ var _ = Describe("controller.Controller", func() { ctx, cancel := context.WithCancel(context.Background()) watchChan := make(chan event.GenericEvent, 1) - watch := &source.Channel{Source: watchChan} + watch := source.Channel(source.NewChannelBroadcaster(watchChan)) watchChan <- event.GenericEvent{Object: &corev1.Pod{}} reconcileStarted := make(chan struct{}) diff --git a/pkg/internal/controller/controller_test.go b/pkg/internal/controller/controller_test.go index ce2245e60f..f551598cbd 100644 --- a/pkg/internal/controller/controller_test.go +++ b/pkg/internal/controller/controller_test.go @@ -226,8 +226,7 @@ var _ = Describe("controller", func() { Object: p, } - ins := &source.Channel{Source: ch} - ins.DestBufferSize = 1 + ins := source.Channel(source.NewChannelBroadcaster(ch), source.WithDestBufferSize(1)) // send the event to the channel ch <- evt @@ -249,18 +248,18 @@ var _ = Describe("controller", func() { <-processed }) - It("should error when channel source is not specified", func() { + It("should error when ChannelBroadcaster is not specified", func() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - ins := &source.Channel{} + ins := source.Channel(nil) ctrl.startWatches = []watchDescription{{ src: ins, }} e := ctrl.Start(ctx) Expect(e).To(HaveOccurred()) - Expect(e.Error()).To(ContainSubstring("must specify Channel.Source")) + Expect(e.Error()).To(ContainSubstring("must create Channel with a non-nil Broadcaster")) }) It("should call Start on sources with the appropriate EventHandler, Queue, and Predicates", func() { diff --git a/pkg/source/example_test.go b/pkg/source/example_test.go index 77857729de..fc96dc4d8d 100644 --- a/pkg/source/example_test.go +++ b/pkg/source/example_test.go @@ -43,7 +43,9 @@ func ExampleChannel() { events := make(chan event.GenericEvent) err := ctrl.Watch( - &source.Channel{Source: events}, + source.Channel( + source.NewChannelBroadcaster(events), + ), &handler.EnqueueRequestForObject{}, ) if err != nil { diff --git a/pkg/source/internal/channel.go b/pkg/source/internal/channel.go index 9471be24e4..3272e9a6fd 100644 --- a/pkg/source/internal/channel.go +++ b/pkg/source/internal/channel.go @@ -27,30 +27,25 @@ import ( "sigs.k8s.io/controller-runtime/pkg/predicate" ) -const ( - // defaultBufferSize is the default number of event notifications that can be buffered. - defaultBufferSize = 1024 -) +// ChannelOptions contains the options for the Channel source. +type ChannelOptions struct { + // DestBufferSize is the specified buffer size of dest channels. + // Default to 1024 if not specified. + DestBufferSize int +} // Channel is used to provide a source of events originating outside the cluster // (e.g. GitHub Webhook callback). Channel requires the user to wire the external // source (eh.g. http handler) to write GenericEvents to the underlying channel. type Channel struct { - // once ensures the event distribution goroutine will be performed only once - once sync.Once - - // Source is the source channel to fetch GenericEvents - Source <-chan event.GenericEvent + Options ChannelOptions - // dest is the destination channels of the added event handlers - dest []chan event.GenericEvent + // Broadcaster contains the source channel for events. + Broadcaster *ChannelBroadcaster - // DestBufferSize is the specified buffer size of dest channels. - // Default to 1024 if not specified. - DestBufferSize int - - // destLock is to ensure the destination channels are safely added/removed - destLock sync.Mutex + mu sync.Mutex + // isStarted is true if the source has been started. A source can only be started once. + isStarted bool } func (cs *Channel) String() string { @@ -63,88 +58,67 @@ func (cs *Channel) Start( handler handler.EventHandler, queue workqueue.RateLimitingInterface, prct ...predicate.Predicate) error { - // Source should have been specified by the user. - if cs.Source == nil { - return fmt.Errorf("must specify Channel.Source") + // Broadcaster should have been specified by the user. + if cs.Broadcaster == nil { + return fmt.Errorf("must create Channel with a non-nil Broadcaster") } - // use default value if DestBufferSize not specified - if cs.DestBufferSize == 0 { - cs.DestBufferSize = defaultBufferSize + cs.mu.Lock() + defer cs.mu.Unlock() + if cs.isStarted { + return fmt.Errorf("cannot start an already started Channel source") } + cs.isStarted = true - dst := make(chan event.GenericEvent, cs.DestBufferSize) - - cs.destLock.Lock() - cs.dest = append(cs.dest, dst) - cs.destLock.Unlock() - - cs.once.Do(func() { - // Distribute GenericEvents to all EventHandler / Queue pairs Watching this source - go cs.syncLoop(ctx) - }) + // Create a destination channel for the event handler + // and add it to the list of destinations + destination := make(chan event.GenericEvent, cs.Options.DestBufferSize) + cs.Broadcaster.AddListener(destination) go func() { - for evt := range dst { - shouldHandle := true - for _, p := range prct { - if !p.Generic(evt) { - shouldHandle = false - break - } - } - - if shouldHandle { - func() { - ctx, cancel := context.WithCancel(ctx) - defer cancel() - handler.Generic(ctx, evt, queue) - }() - } - } + // Remove the listener and wait for the broadcaster + // to stop sending events to the destination channel. + defer cs.Broadcaster.RemoveListener(destination) + + cs.processReceivedEvents( + ctx, + destination, + queue, + handler, + prct, + ) }() return nil } -func (cs *Channel) doStop() { - cs.destLock.Lock() - defer cs.destLock.Unlock() - - for _, dst := range cs.dest { - close(dst) - } -} - -func (cs *Channel) distribute(evt event.GenericEvent) { - cs.destLock.Lock() - defer cs.destLock.Unlock() - - for _, dst := range cs.dest { - // We cannot make it under goroutine here, or we'll meet the - // race condition of writing message to closed channels. - // To avoid blocking, the dest channels are expected to be of - // proper buffer size. If we still see it blocked, then - // the controller is thought to be in an abnormal state. - dst <- evt - } -} - -func (cs *Channel) syncLoop(ctx context.Context) { +func (cs *Channel) processReceivedEvents( + ctx context.Context, + destination <-chan event.GenericEvent, + queue workqueue.RateLimitingInterface, + eventHandler handler.EventHandler, + predicates []predicate.Predicate, +) { +eventloop: for { select { case <-ctx.Done(): - // Close destination channels - cs.doStop() return - case evt, stillOpen := <-cs.Source: + case event, stillOpen := <-destination: if !stillOpen { - // if the source channel is closed, we're never gonna get - // anything more on it, so stop & bail - cs.doStop() return } - cs.distribute(evt) + + // Check predicates against the event first + // and continue the outer loop if any of them fail. + for _, p := range predicates { + if !p.Generic(event) { + continue eventloop + } + } + + // Call the event handler with the event. + eventHandler.Generic(ctx, event, queue) } } } diff --git a/pkg/source/internal/channel_broadcast.go b/pkg/source/internal/channel_broadcast.go new file mode 100644 index 0000000000..cb7c9431c6 --- /dev/null +++ b/pkg/source/internal/channel_broadcast.go @@ -0,0 +1,196 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package internal + +import ( + "sync" + + "sigs.k8s.io/controller-runtime/pkg/event" +) + +// ChannelBroadcaster is a wrapper around a channel that allows multiple listeners to all +// receive the events from the channel. +type ChannelBroadcaster struct { + source <-chan event.GenericEvent + + mu sync.Mutex + rcCount uint + managementCh chan managementMsg + doneCh chan struct{} +} + +type managementOperation bool + +const ( + addChannel managementOperation = true + removeChannel managementOperation = false +) + +type managementMsg struct { + operation managementOperation + ch chan event.GenericEvent +} + +// NewChannelBroadcaster creates a new ChannelBroadcaster for the given channel. +func NewChannelBroadcaster(source <-chan event.GenericEvent) *ChannelBroadcaster { + return &ChannelBroadcaster{ + source: source, + } +} + +// AddListener adds a new listener to the ChannelBroadcaster. Each listener +// will receive all events from the source channel. All listeners have to be +// removed using RemoveListener before the ChannelBroadcaster can be garbage +// collected. +func (sc *ChannelBroadcaster) AddListener(ch chan event.GenericEvent) { + var managementCh chan managementMsg + var doneCh chan struct{} + isFirst := false + func() { + sc.mu.Lock() + defer sc.mu.Unlock() + + isFirst = sc.rcCount == 0 + sc.rcCount++ + + if isFirst { + sc.managementCh = make(chan managementMsg) + sc.doneCh = make(chan struct{}) + } + + managementCh = sc.managementCh + doneCh = sc.doneCh + }() + + if isFirst { + go startLoop(sc.source, managementCh, doneCh) + } + + // If the goroutine is not yet stopped, send a message to add the + // destination channel. The routine might be stopped already because + // the source channel was closed. + select { + case <-doneCh: + default: + managementCh <- managementMsg{ + operation: addChannel, + ch: ch, + } + } +} + +func startLoop( + source <-chan event.GenericEvent, + managementCh chan managementMsg, + doneCh chan struct{}, +) { + defer close(doneCh) + + var destinations []chan event.GenericEvent + + // Close all remaining destinations in case the Source channel is closed. + defer func() { + for _, dst := range destinations { + close(dst) + } + }() + + // Wait for the first destination to be added before starting the loop. + for len(destinations) == 0 { + managementMsg := <-managementCh + if managementMsg.operation == addChannel { + destinations = append(destinations, managementMsg.ch) + } + } + + for { + select { + case msg := <-managementCh: + + switch msg.operation { + case addChannel: + destinations = append(destinations, msg.ch) + case removeChannel: + SearchLoop: + for i, dst := range destinations { + if dst == msg.ch { + destinations = append(destinations[:i], destinations[i+1:]...) + close(dst) + break SearchLoop + } + } + + if len(destinations) == 0 { + return + } + } + + case evt, stillOpen := <-source: + if !stillOpen { + return + } + + for _, dst := range destinations { + // We cannot make it under goroutine here, or we'll meet the + // race condition of writing message to closed channels. + // To avoid blocking, the dest channels are expected to be of + // proper buffer size. If we still see it blocked, then + // the controller is thought to be in an abnormal state. + dst <- evt + } + } + } +} + +// RemoveListener removes a listener from the ChannelBroadcaster. The listener +// will no longer receive events from the source channel. If this is the last +// listener, this function will block until the ChannelBroadcaster's is stopped. +func (sc *ChannelBroadcaster) RemoveListener(ch chan event.GenericEvent) { + var managementCh chan managementMsg + var doneCh chan struct{} + isLast := false + func() { + sc.mu.Lock() + defer sc.mu.Unlock() + + sc.rcCount-- + isLast = sc.rcCount == 0 + + managementCh = sc.managementCh + doneCh = sc.doneCh + }() + + // If the goroutine is not yet stopped, send a message to remove the + // destination channel. The routine might be stopped already because + // the source channel was closed. + select { + case <-doneCh: + default: + managementCh <- managementMsg{ + operation: removeChannel, + ch: ch, + } + } + + // Wait for the doneCh to be closed (in case we are the last one) + if isLast { + <-doneCh + } + + // Wait for the destination channel to be closed. + <-ch +} diff --git a/pkg/source/internal/informer.go b/pkg/source/internal/informer.go index 63023c583e..d7a8f8ba49 100644 --- a/pkg/source/internal/informer.go +++ b/pkg/source/internal/informer.go @@ -19,6 +19,7 @@ package internal import ( "context" "fmt" + "sync" "k8s.io/client-go/util/workqueue" "sigs.k8s.io/controller-runtime/pkg/cache" @@ -30,16 +31,26 @@ import ( type Informer struct { // Informer is the controller-runtime Informer Informer cache.Informer + + mu sync.Mutex + // isStarted is true if the source has been started. A source can only be started once. + isStarted bool } // Start is internal and should be called only by the Controller to register an EventHandler with the Informer // to enqueue reconcile.Requests. func (is *Informer) Start(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface, prct ...predicate.Predicate) error { - // Informer should have been specified by the user. if is.Informer == nil { - return fmt.Errorf("must specify Informer.Informer") + return fmt.Errorf("must create Informer with a non-nil Informer") + } + + is.mu.Lock() + defer is.mu.Unlock() + if is.isStarted { + return fmt.Errorf("cannot start an already started Informer source") } + is.isStarted = true _, err := is.Informer.AddEventHandler(NewEventHandler(ctx, queue, handler, prct).HandlerFuncs()) if err != nil { diff --git a/pkg/source/internal/kind.go b/pkg/source/internal/kind.go index 14483dcd08..37f0487ede 100644 --- a/pkg/source/internal/kind.go +++ b/pkg/source/internal/kind.go @@ -36,7 +36,6 @@ import ( type Kind struct { // Type is the type of object to watch. e.g. &v1.Pod{} Type client.Object - // Cache used to watch APIs Cache cache.Cache @@ -51,10 +50,14 @@ type Kind struct { func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface, prct ...predicate.Predicate) error { if ks.Type == nil { - return fmt.Errorf("must create Kind with a non-nil object") + return fmt.Errorf("must create Kind with a non-nil Type") } if ks.Cache == nil { - return fmt.Errorf("must create Kind with a non-nil cache") + return fmt.Errorf("must create Kind with a non-nil Cache") + } + + if ks.started != nil { + return fmt.Errorf("cannot start an already started Kind source") } // cache.GetInformer will block until its context is cancelled if the cache was already started and it can not diff --git a/pkg/source/source.go b/pkg/source/source.go index cf359d355c..74796e315c 100644 --- a/pkg/source/source.go +++ b/pkg/source/source.go @@ -22,10 +22,10 @@ import ( "k8s.io/client-go/util/workqueue" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/predicate" internal "sigs.k8s.io/controller-runtime/pkg/source/internal" "sigs.k8s.io/controller-runtime/pkg/cache" - "sigs.k8s.io/controller-runtime/pkg/predicate" ) // Source is a source of events (eh.g. Create, Update, Delete operations on Kubernetes Objects, Webhook callbacks, etc) @@ -49,24 +49,58 @@ type SyncingSource interface { WaitForSync(ctx context.Context) error } +var _ Source = &internal.Kind{} +var _ SyncingSource = &internal.Kind{} +var _ Source = &internal.Informer{} +var _ Source = &internal.Channel{} +var _ Source = internal.Func(nil) + // Kind creates a KindSource with the given cache provider. func Kind(cache cache.Cache, object client.Object) SyncingSource { - return &internal.Kind{Type: object, Cache: cache} + return &internal.Kind{Cache: cache, Type: object} } -// Informer is used to provide a source of events originating inside the cluster from Watches (e.g. Pod Create). -type Informer = internal.Informer +// NewChannelBroadcaster creates a new ChannelBroadcaster for the given channel. +// A ChannelBroadcaster is a wrapper around a channel that allows multiple listeners to all +// receive the events from the channel. +var NewChannelBroadcaster = internal.NewChannelBroadcaster -// Channel is used to provide a source of events originating outside the cluster -// (e.g. GitHub Webhook callback). Channel requires the user to wire the external -// source (eh.g. http handler) to write GenericEvents to the underlying channel. -type Channel = internal.Channel +// ChannelOption is a functional option for configuring a Channel source. +type ChannelOption func(*internal.ChannelOptions) -// Func is a function that implements Source. -type Func = internal.Func +// WithDestBufferSize specifies the buffer size of dest channels. +func WithDestBufferSize(destBufferSize int) ChannelOption { + return func(o *internal.ChannelOptions) { + if destBufferSize <= 0 { + return // ignore invalid buffer size + } -var _ Source = &internal.Kind{} -var _ SyncingSource = &internal.Kind{} -var _ Source = &internal.Informer{} -var _ Source = &internal.Channel{} -var _ Source = internal.Func(nil) + o.DestBufferSize = destBufferSize + } +} + +// Channel creates a ChannelSource with the given buffer size. +func Channel(broadcaster *internal.ChannelBroadcaster, options ...ChannelOption) Source { + opts := internal.ChannelOptions{ + // 1024 is the default number of event notifications that can be buffered. + DestBufferSize: 1024, + } + for _, o := range options { + if o == nil { + continue // ignore nil options + } + o(&opts) + } + + return &internal.Channel{Options: opts, Broadcaster: broadcaster} +} + +// Informer creates an InformerSource with the given cache provider. +func Informer(informer cache.Informer) Source { + return &internal.Informer{Informer: informer} +} + +// Func creates a FuncSource with the given function. +func Func(f internal.Func) Source { + return f +} diff --git a/pkg/source/source_integration_test.go b/pkg/source/source_integration_test.go index 594d3c9a9c..cfb92cb88f 100644 --- a/pkg/source/source_integration_test.go +++ b/pkg/source/source_integration_test.go @@ -241,7 +241,7 @@ var _ = Describe("Source", func() { c := make(chan struct{}) q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test") - instance := &source.Informer{Informer: depInformer} + instance := source.Informer(depInformer) err := instance.Start(ctx, handler.Funcs{ CreateFunc: func(ctx context.Context, evt event.CreateEvent, q2 workqueue.RateLimitingInterface) { defer GinkgoRecover() @@ -282,7 +282,7 @@ var _ = Describe("Source", func() { rs2.SetLabels(map[string]string{"biz": "baz"}) q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test") - instance := &source.Informer{Informer: depInformer} + instance := source.Informer(depInformer) err = instance.Start(ctx, handler.Funcs{ CreateFunc: func(ctx context.Context, evt event.CreateEvent, q2 workqueue.RateLimitingInterface) { }, @@ -319,7 +319,7 @@ var _ = Describe("Source", func() { c := make(chan struct{}) q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test") - instance := &source.Informer{Informer: depInformer} + instance := source.Informer(depInformer) err := instance.Start(ctx, handler.Funcs{ CreateFunc: func(context.Context, event.CreateEvent, workqueue.RateLimitingInterface) { }, diff --git a/pkg/source/source_test.go b/pkg/source/source_test.go index 16c365e8a2..39777278c3 100644 --- a/pkg/source/source_test.go +++ b/pkg/source/source_test.go @@ -183,20 +183,20 @@ var _ = Describe("Source", func() { instance := source.Kind(nil, &corev1.Pod{}) err := instance.Start(ctx, nil, nil) Expect(err).To(HaveOccurred()) - Expect(err.Error()).To(ContainSubstring("must create Kind with a non-nil cache")) + Expect(err.Error()).To(ContainSubstring("must create Kind with a non-nil Cache")) }) It("should return an error from Start if a type was not provided", func() { instance := source.Kind(ic, nil) err := instance.Start(ctx, nil, nil) Expect(err).To(HaveOccurred()) - Expect(err.Error()).To(ContainSubstring("must create Kind with a non-nil object")) + Expect(err.Error()).To(ContainSubstring("must create Kind with a non-nil Type")) }) It("should return an error if syncing fails", func() { f := false instance := source.Kind(&informertest.FakeInformers{Synced: &f}, &corev1.Pod{}) - Expect(instance.Start(context.Background(), nil, nil)).NotTo(HaveOccurred()) + Expect(instance.Start(context.Background(), &handler.EnqueueRequestForObject{}, nil)).NotTo(HaveOccurred()) err := instance.WaitForSync(context.Background()) Expect(err).To(HaveOccurred()) Expect(err.Error()).To(Equal("cache did not sync")) @@ -221,7 +221,7 @@ var _ = Describe("Source", func() { It("should return an error if syncing fails", func() { f := false instance := source.Kind(&informertest.FakeInformers{Synced: &f}, &corev1.Pod{}) - Expect(instance.Start(context.Background(), nil, nil)).NotTo(HaveOccurred()) + Expect(instance.Start(context.Background(), &handler.EnqueueRequestForObject{}, nil)).NotTo(HaveOccurred()) err := instance.WaitForSync(context.Background()) Expect(err).To(HaveOccurred()) Expect(err.Error()).To(Equal("cache did not sync")) @@ -289,7 +289,7 @@ var _ = Describe("Source", func() { } q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test") - instance := &source.Channel{Source: ch} + instance := source.Channel(source.NewChannelBroadcaster(ch)) err := instance.Start(ctx, handler.Funcs{ CreateFunc: func(context.Context, event.CreateEvent, workqueue.RateLimitingInterface) { defer GinkgoRecover() @@ -327,8 +327,7 @@ var _ = Describe("Source", func() { q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test") // Add a handler to get distribution blocked - instance := &source.Channel{Source: ch} - instance.DestBufferSize = 1 + instance := source.Channel(source.NewChannelBroadcaster(ch), source.WithDestBufferSize(1)) err := instance.Start(ctx, handler.Funcs{ CreateFunc: func(context.Context, event.CreateEvent, workqueue.RateLimitingInterface) { defer GinkgoRecover() @@ -383,8 +382,7 @@ var _ = Describe("Source", func() { q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test") // Add a handler to get distribution blocked - instance := &source.Channel{Source: ch} - instance.DestBufferSize = 1 + instance := source.Channel(source.NewChannelBroadcaster(ch), source.WithDestBufferSize(1)) err := instance.Start(ctx, handler.Funcs{ CreateFunc: func(context.Context, event.CreateEvent, workqueue.RateLimitingInterface) { @@ -422,7 +420,7 @@ var _ = Describe("Source", func() { close(ch) By("feeding that channel to a channel source") - src := &source.Channel{Source: ch} + src := source.Channel(source.NewChannelBroadcaster(ch)) processed := make(chan struct{}) defer close(processed) @@ -454,9 +452,9 @@ var _ = Describe("Source", func() { }) It("should get error if no source specified", func() { q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test") - instance := &source.Channel{ /*no source specified*/ } + instance := source.Channel(nil) err := instance.Start(ctx, handler.Funcs{}, q) - Expect(err).To(Equal(fmt.Errorf("must specify Channel.Source"))) + Expect(err).To(Equal(fmt.Errorf("must create Channel with a non-nil Broadcaster"))) }) }) Context("for multi sources (handlers)", func() { @@ -469,61 +467,44 @@ var _ = Describe("Source", func() { Object: p, } - var resEvent1, resEvent2 event.GenericEvent - c1 := make(chan struct{}) - c2 := make(chan struct{}) - q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test") - instance := &source.Channel{Source: ch} - err := instance.Start(ctx, handler.Funcs{ - CreateFunc: func(context.Context, event.CreateEvent, workqueue.RateLimitingInterface) { - defer GinkgoRecover() - Fail("Unexpected CreateEvent") - }, - UpdateFunc: func(context.Context, event.UpdateEvent, workqueue.RateLimitingInterface) { - defer GinkgoRecover() - Fail("Unexpected UpdateEvent") - }, - DeleteFunc: func(context.Context, event.DeleteEvent, workqueue.RateLimitingInterface) { - defer GinkgoRecover() - Fail("Unexpected DeleteEvent") - }, - GenericFunc: func(ctx context.Context, evt event.GenericEvent, q2 workqueue.RateLimitingInterface) { - defer GinkgoRecover() - Expect(q2).To(BeIdenticalTo(q)) - Expect(evt.Object).To(Equal(p)) - resEvent1 = evt - close(c1) - }, - }, q) + channelSource := source.NewChannelBroadcaster(ch) + + createHandler := func(c chan event.GenericEvent) handler.Funcs { + return handler.Funcs{ + CreateFunc: func(context.Context, event.CreateEvent, workqueue.RateLimitingInterface) { + defer GinkgoRecover() + Fail("Unexpected CreateEvent") + }, + UpdateFunc: func(context.Context, event.UpdateEvent, workqueue.RateLimitingInterface) { + defer GinkgoRecover() + Fail("Unexpected UpdateEvent") + }, + DeleteFunc: func(context.Context, event.DeleteEvent, workqueue.RateLimitingInterface) { + defer GinkgoRecover() + Fail("Unexpected DeleteEvent") + }, + GenericFunc: func(ctx context.Context, evt event.GenericEvent, q2 workqueue.RateLimitingInterface) { + defer GinkgoRecover() + Expect(q2).To(BeIdenticalTo(q)) + Expect(evt.Object).To(Equal(p)) + c <- evt + }, + } + } + + c1 := make(chan event.GenericEvent) + c2 := make(chan event.GenericEvent) + + err := source.Channel(channelSource).Start(ctx, createHandler(c1), q) Expect(err).NotTo(HaveOccurred()) - err = instance.Start(ctx, handler.Funcs{ - CreateFunc: func(context.Context, event.CreateEvent, workqueue.RateLimitingInterface) { - defer GinkgoRecover() - Fail("Unexpected CreateEvent") - }, - UpdateFunc: func(context.Context, event.UpdateEvent, workqueue.RateLimitingInterface) { - defer GinkgoRecover() - Fail("Unexpected UpdateEvent") - }, - DeleteFunc: func(context.Context, event.DeleteEvent, workqueue.RateLimitingInterface) { - defer GinkgoRecover() - Fail("Unexpected DeleteEvent") - }, - GenericFunc: func(ctx context.Context, evt event.GenericEvent, q2 workqueue.RateLimitingInterface) { - defer GinkgoRecover() - Expect(q2).To(BeIdenticalTo(q)) - Expect(evt.Object).To(Equal(p)) - resEvent2 = evt - close(c2) - }, - }, q) + err = source.Channel(channelSource).Start(ctx, createHandler(c2), q) Expect(err).NotTo(HaveOccurred()) ch <- evt - <-c1 - <-c2 + resEvent1 := <-c1 + resEvent2 := <-c2 // Validate the two handlers received same event Expect(resEvent1).To(Equal(resEvent2))