Skip to content

Commit

Permalink
only allow starting a source once
Browse files Browse the repository at this point in the history
Signed-off-by: Tim Ramlot <42113979+inteon@users.noreply.github.com>
  • Loading branch information
inteon committed Feb 13, 2024
1 parent a4e8aca commit bde0a5e
Show file tree
Hide file tree
Showing 10 changed files with 372 additions and 172 deletions.
2 changes: 1 addition & 1 deletion pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ var _ = Describe("controller.Controller", func() {

ctx, cancel := context.WithCancel(context.Background())
watchChan := make(chan event.GenericEvent, 1)
watch := &source.Channel{Source: watchChan}
watch := source.Channel(source.NewChannelBroadcaster(watchChan))
watchChan <- event.GenericEvent{Object: &corev1.Pod{}}

reconcileStarted := make(chan struct{})
Expand Down
9 changes: 4 additions & 5 deletions pkg/internal/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,7 @@ var _ = Describe("controller", func() {
Object: p,
}

ins := &source.Channel{Source: ch}
ins.DestBufferSize = 1
ins := source.Channel(source.NewChannelBroadcaster(ch), source.WithDestBufferSize(1))

// send the event to the channel
ch <- evt
Expand All @@ -249,18 +248,18 @@ var _ = Describe("controller", func() {
<-processed
})

It("should error when channel source is not specified", func() {
It("should error when ChannelBroadcaster is not specified", func() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

ins := &source.Channel{}
ins := source.Channel(nil)
ctrl.startWatches = []watchDescription{{
src: ins,
}}

e := ctrl.Start(ctx)
Expect(e).To(HaveOccurred())
Expect(e.Error()).To(ContainSubstring("must specify Channel.Source"))
Expect(e.Error()).To(ContainSubstring("must create Channel with a non-nil Broadcaster"))
})

It("should call Start on sources with the appropriate EventHandler, Queue, and Predicates", func() {
Expand Down
4 changes: 3 additions & 1 deletion pkg/source/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ func ExampleChannel() {
events := make(chan event.GenericEvent)

err := ctrl.Watch(
&source.Channel{Source: events},
source.Channel(
source.NewChannelBroadcaster(events),
),
&handler.EnqueueRequestForObject{},
)
if err != nil {
Expand Down
136 changes: 55 additions & 81 deletions pkg/source/internal/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,30 +27,25 @@ import (
"sigs.k8s.io/controller-runtime/pkg/predicate"
)

const (
// defaultBufferSize is the default number of event notifications that can be buffered.
defaultBufferSize = 1024
)
// ChannelOptions contains the options for the Channel source.
type ChannelOptions struct {
// DestBufferSize is the specified buffer size of dest channels.
// Default to 1024 if not specified.
DestBufferSize int
}

// Channel is used to provide a source of events originating outside the cluster
// (e.g. GitHub Webhook callback). Channel requires the user to wire the external
// source (eh.g. http handler) to write GenericEvents to the underlying channel.
type Channel struct {
// once ensures the event distribution goroutine will be performed only once
once sync.Once

// Source is the source channel to fetch GenericEvents
Source <-chan event.GenericEvent
Options ChannelOptions

// dest is the destination channels of the added event handlers
dest []chan event.GenericEvent
// Broadcaster contains the source channel for events.
Broadcaster *ChannelBroadcaster

// DestBufferSize is the specified buffer size of dest channels.
// Default to 1024 if not specified.
DestBufferSize int

// destLock is to ensure the destination channels are safely added/removed
destLock sync.Mutex
mu sync.Mutex
// isStarted is true if the source has been started. A source can only be started once.
isStarted bool
}

func (cs *Channel) String() string {
Expand All @@ -63,88 +58,67 @@ func (cs *Channel) Start(
handler handler.EventHandler,
queue workqueue.RateLimitingInterface,
prct ...predicate.Predicate) error {
// Source should have been specified by the user.
if cs.Source == nil {
return fmt.Errorf("must specify Channel.Source")
// Broadcaster should have been specified by the user.
if cs.Broadcaster == nil {
return fmt.Errorf("must create Channel with a non-nil Broadcaster")
}

// use default value if DestBufferSize not specified
if cs.DestBufferSize == 0 {
cs.DestBufferSize = defaultBufferSize
cs.mu.Lock()
defer cs.mu.Unlock()
if cs.isStarted {
return fmt.Errorf("cannot start an already started Channel source")
}
cs.isStarted = true

dst := make(chan event.GenericEvent, cs.DestBufferSize)

cs.destLock.Lock()
cs.dest = append(cs.dest, dst)
cs.destLock.Unlock()

cs.once.Do(func() {
// Distribute GenericEvents to all EventHandler / Queue pairs Watching this source
go cs.syncLoop(ctx)
})
// Create a destination channel for the event handler
// and add it to the list of destinations
destination := make(chan event.GenericEvent, cs.Options.DestBufferSize)
cs.Broadcaster.AddListener(destination)

go func() {
for evt := range dst {
shouldHandle := true
for _, p := range prct {
if !p.Generic(evt) {
shouldHandle = false
break
}
}

if shouldHandle {
func() {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
handler.Generic(ctx, evt, queue)
}()
}
}
// Remove the listener and wait for the broadcaster
// to stop sending events to the destination channel.
defer cs.Broadcaster.RemoveListener(destination)

cs.processReceivedEvents(
ctx,
destination,
queue,
handler,
prct,
)
}()

return nil
}

func (cs *Channel) doStop() {
cs.destLock.Lock()
defer cs.destLock.Unlock()

for _, dst := range cs.dest {
close(dst)
}
}

func (cs *Channel) distribute(evt event.GenericEvent) {
cs.destLock.Lock()
defer cs.destLock.Unlock()

for _, dst := range cs.dest {
// We cannot make it under goroutine here, or we'll meet the
// race condition of writing message to closed channels.
// To avoid blocking, the dest channels are expected to be of
// proper buffer size. If we still see it blocked, then
// the controller is thought to be in an abnormal state.
dst <- evt
}
}

func (cs *Channel) syncLoop(ctx context.Context) {
func (cs *Channel) processReceivedEvents(
ctx context.Context,
destination <-chan event.GenericEvent,
queue workqueue.RateLimitingInterface,
eventHandler handler.EventHandler,
predicates []predicate.Predicate,
) {
eventloop:
for {
select {
case <-ctx.Done():
// Close destination channels
cs.doStop()
return
case evt, stillOpen := <-cs.Source:
case event, stillOpen := <-destination:
if !stillOpen {
// if the source channel is closed, we're never gonna get
// anything more on it, so stop & bail
cs.doStop()
return
}
cs.distribute(evt)

// Check predicates against the event first
// and continue the outer loop if any of them fail.
for _, p := range predicates {
if !p.Generic(event) {
continue eventloop
}
}

// Call the event handler with the event.
eventHandler.Generic(ctx, event, queue)
}
}
}
Loading

0 comments on commit bde0a5e

Please sign in to comment.