Skip to content

Commit

Permalink
resolve feedback
Browse files Browse the repository at this point in the history
Signed-off-by: Tim Ramlot <42113979+inteon@users.noreply.github.com>
  • Loading branch information
inteon committed Apr 26, 2023
1 parent 1d67191 commit 918e4c9
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 16 deletions.
4 changes: 2 additions & 2 deletions pkg/controller/controllertest/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (f *FakeInformer) HasSynced() bool {
return f.Synced
}

// AddEventHandler implements the Informer interface. Adds an EventHandler to the fake Informers. TODO(community): Implement Registration.
// AddEventHandler implements the Informer interface. Adds an EventHandler to the fake Informers.
func (f *FakeInformer) AddEventHandler(handler cache.ResourceEventHandler) (cache.ResourceEventHandlerRegistration, error) {
f.mu.Lock()
defer f.mu.Unlock()
Expand Down Expand Up @@ -147,7 +147,7 @@ func (f *FakeInformer) AddEventHandlerWithResyncPeriod(handler cache.ResourceEve
return nil, nil
}

// RemoveEventHandler does nothing. TODO(community): Implement this.
// RemoveEventHandler does nothing.
func (f *FakeInformer) RemoveEventHandler(handle cache.ResourceEventHandlerRegistration) error {
eh, ok := handle.(*eventHandlerWrapper)
if !ok {
Expand Down
4 changes: 2 additions & 2 deletions pkg/internal/source/kind.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ func getInformer(ctx context.Context, informerCache cache.Cache, resourceType cl
// We got a NoKindMatchError, which means the kind is a CRD and it's not installed yet.
// We should retry until it's installed.
log.Error(lastErr, "if kind is a CRD, it should be installed before calling Start", "kind", kindMatchErr.GroupKind)
return false, nil // Retry.
case runtime.IsNotRegisteredError(lastErr):
// We got a IsNotRegisteredError, which means the kind is not registered to the Scheme.
// This is a programming error, so we should stop retrying and return the error.
Expand All @@ -110,7 +111,6 @@ func getInformer(ctx context.Context, informerCache cache.Cache, resourceType cl
// stop retrying and return the error.
return true, fmt.Errorf("failed to get informer from cache: %w", lastErr)
}
return false, nil // Retry.
}
return true, nil
}); err != nil {
Expand Down Expand Up @@ -234,7 +234,7 @@ func (ks *Kind) Stop() error {
}
ks.mu.Lock()

// Remove all event handlers.
// Remove event handler.
if ks.registration != nil {
if err := ks.informer.RemoveEventHandler(ks.registration); err != nil {
errs = append(errs, err)
Expand Down
24 changes: 12 additions & 12 deletions pkg/source/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ const (
//
// Users may build their own Source implementations.
type Source interface {
// Start is internal and should be called only by the Controller to register an EventHandler with the Informer
// to enqueue reconcile.Requests. It should NOT block, instead, it should start a goroutine and return immediately.
// Start is internal and should be called only by the Controller to start a goroutine that enqueues
// reconcile.Requests. It should NOT block, instead, it should start a goroutine and return immediately.
// The context passed to Start can be used to cancel the blocking operations in the Start method. To cancel the
// goroutine/ shutdown the source a user should call Stop.
Start(context.Context, handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error
Expand Down Expand Up @@ -100,8 +100,8 @@ func (cs *Channel) String() string {
return fmt.Sprintf("channel source: %p", cs)
}

// Start is internal and should be called only by the Controller to register an EventHandler with the Informer
// to enqueue reconcile.Requests. It should NOT block, instead, it should start a goroutine and return immediately.
// Start is internal and should be called only by the Controller to start a goroutine that enqueues
// reconcile.Requests. It should NOT block, instead, it should start a goroutine and return immediately.
// The context passed to Start can be used to cancel the blocking operations in the Start method. To cancel the
// goroutine/ shutdown the source a user should call Stop.
// Start can be called multiple times, allowing multiple controllers to share a single source.
Expand All @@ -112,17 +112,17 @@ func (cs *Channel) Start(ctx context.Context, handler handler.EventHandler, queu
return fmt.Errorf("must specify Channel.Source")
}

// use default value if DestBufferSize not specified
if cs.DestBufferSize == 0 {
cs.DestBufferSize = defaultBufferSize
}

cs.mu.Lock()
defer cs.mu.Unlock()
if cs.canceled {
return nil
}

// use default value if DestBufferSize not specified
if cs.DestBufferSize == 0 {
cs.DestBufferSize = defaultBufferSize
}

// Create a destination channel for the event handler
// and add it to the list of destinations
destination := make(chan event.GenericEvent, cs.DestBufferSize)
Expand All @@ -147,7 +147,7 @@ func (cs *Channel) Start(ctx context.Context, handler handler.EventHandler, queu
eventloop:
for {
select {
case <-ctx.Done():
case <-cs.ctx.Done():
return nil
case event, stillOpen := <-destination:
if !stillOpen {
Expand Down Expand Up @@ -306,8 +306,8 @@ var _ Source = Func(nil)
// Deprecated: use manually implemented Source instead.
type Func func(context.Context, handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error

// Start is internal and should be called only by the Controller to register an EventHandler with the Informer
// to enqueue reconcile.Requests.
// Start is internal and should be called only by the Controller to start a goroutine that enqueues
// reconcile.Requests.
func (f Func) Start(ctx context.Context, evt handler.EventHandler, queue workqueue.RateLimitingInterface,
pr ...predicate.Predicate) error {
return f(ctx, evt, queue, pr...)
Expand Down
30 changes: 30 additions & 0 deletions pkg/source/source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"sigs.k8s.io/controller-runtime/pkg/cache/informertest"
"sigs.k8s.io/controller-runtime/pkg/controller/controllertest"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
Expand All @@ -35,6 +36,35 @@ import (
)

var _ = Describe("Source", func() {
Describe("Informer", func() {
var inf *controllertest.FakeInformer

BeforeEach(func() {
inf = &controllertest.FakeInformer{}
})

Context("should not panic", func() {
It("when Stop is called before Start", func() {
instance := &source.Informer{Informer: inf}
err := instance.Stop()
Expect(err).NotTo(HaveOccurred())
})

It("when Start is called twice", func() {
instance := &source.Informer{Informer: inf}
Expect(instance.Start(ctx, nil, nil)).To(Succeed())
Expect(instance.Start(ctx, nil, nil)).To(Succeed())
})

It("when Stop is called twice", func() {
instance := &source.Informer{Informer: inf}
Expect(instance.Start(ctx, nil, nil)).To(Succeed())
Expect(instance.Stop()).To(Succeed())
Expect(instance.Stop()).To(Succeed())
})
})
})

Describe("Kind", func() {
var c chan struct{}
var p *corev1.Pod
Expand Down

0 comments on commit 918e4c9

Please sign in to comment.