Skip to content

Commit

Permalink
Add EventHandler reference counting
Browse files Browse the repository at this point in the history
  • Loading branch information
kevindelgado committed Oct 19, 2020
1 parent 579fb28 commit 1cc6939
Show file tree
Hide file tree
Showing 8 changed files with 167 additions and 13 deletions.
8 changes: 8 additions & 0 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,14 @@ type Informer interface {
AddIndexers(indexers toolscache.Indexers) error
//HasSynced return true if the informers underlying store has synced
HasSynced() bool

// RemoveEventHandler currently just decrements a the count of event handlers
// The goals it to have SharedInformer support RemoveEventHandler (and actually remove
// the handler instead of just decrementing a count).
RemoveEventHandler(id int) error

// CountEventHandlers returns the number of event handlers added to an informer.
CountEventHandlers() int
}

// Options are the optional arguments for creating a new InformersMap object
Expand Down
3 changes: 1 addition & 2 deletions pkg/cache/informer_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,5 @@ func (ip *informerCache) Remove(ctx context.Context, obj runtime.Object) error {
return err
}

ip.InformersMap.Remove(gvk, obj)
return nil
return ip.InformersMap.Remove(gvk, obj)
}
86 changes: 86 additions & 0 deletions pkg/cache/internal/counting_informer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package internal

import (
"fmt"
"time"

"k8s.io/client-go/tools/cache"
)

// CountingInformer exposes a way to track the number of EventHandlers
// registered on an Informer.
type CountingInformer interface {
cache.SharedIndexInformer
CountEventHandlers() int
RemoveEventHandler(id int) error
}

// HandlerCountingInformer implements the CountingInformer.
// It increments the count every time AddEventHandler is called,
// and decrements the count every time RemoveEventHandler is called.
//
// It doesn't actually RemoveEventHandlers because that feature is not implemented
// in client-go, but we're are naming it this way to suggest what the interface would look
// like if/when it does get added to client-go.
//
// We can get rid of this if apimachinery adds the ability to retrieve this from the SharedIndexInformer
// but until then, we have to track it ourselves
type HandlerCountingInformer struct {
// Informer is the cached informer
informer cache.SharedIndexInformer

// count indicates the number of EventHandlers registered on the informer
count int
}

func (i *HandlerCountingInformer) RemoveEventHandler(id int) error {
i.count--
fmt.Printf("decrement, count is %+v\n", i.count)
return nil
}

func (i *HandlerCountingInformer) AddEventHandler(handler cache.ResourceEventHandler) {
i.count++
fmt.Printf("increment, count is %+v\n", i.count)
i.informer.AddEventHandler(handler)
}

func (i *HandlerCountingInformer) CountEventHandlers() int {
return i.count
}

func (i *HandlerCountingInformer) AddEventHandlerWithResyncPeriod(handler cache.ResourceEventHandler, resyncPeriod time.Duration) {
i.count++
i.informer.AddEventHandlerWithResyncPeriod(handler, resyncPeriod)
}
func (i *HandlerCountingInformer) AddIndexers(indexers cache.Indexers) error {
return i.informer.AddIndexers(indexers)
}

func (i *HandlerCountingInformer) HasSynced() bool {
return i.informer.HasSynced()
}

func (i *HandlerCountingInformer) GetStore() cache.Store {
return i.informer.GetStore()
}

func (i *HandlerCountingInformer) GetController() cache.Controller {
return i.informer.GetController()
}

func (i *HandlerCountingInformer) LastSyncResourceVersion() string {
return i.informer.LastSyncResourceVersion()
}

func (i *HandlerCountingInformer) SetWatchErrorHandler(handler cache.WatchErrorHandler) error {
return i.informer.SetWatchErrorHandler(handler)
}

func (i *HandlerCountingInformer) GetIndexer() cache.Indexer {
return i.informer.GetIndexer()
}

func (i *HandlerCountingInformer) Run(stopCh <-chan struct{}) {
i.informer.Run(stopCh)
}
6 changes: 3 additions & 3 deletions pkg/cache/internal/deleg_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,16 +93,16 @@ func (m *InformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj
}

// Remove will remove an new Informer from the InformersMap and stop it if it exists.
func (m *InformersMap) Remove(gvk schema.GroupVersionKind, obj runtime.Object) {
func (m *InformersMap) Remove(gvk schema.GroupVersionKind, obj runtime.Object) error {
_, isUnstructured := obj.(*unstructured.Unstructured)
_, isUnstructuredList := obj.(*unstructured.UnstructuredList)
isUnstructured = isUnstructured || isUnstructuredList

switch {
case isUnstructured:
m.unstructured.Remove(gvk)
return m.unstructured.Remove(gvk)
default:
m.structured.Remove(gvk)
return m.structured.Remove(gvk)
}
}

Expand Down
23 changes: 18 additions & 5 deletions pkg/cache/internal/informers_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ func newSpecificInformersMap(config *rest.Config,

// MapEntry contains the cached data for an Informer
type MapEntry struct {
// Informer is the cached informer
Informer cache.SharedIndexInformer
// Informer is a SharedIndexInformer with addition count and remove event handler functionality.
Informer CountingInformer

// CacheReader wraps Informer and implements the CacheReader interface for a single type
Reader CacheReader
Expand Down Expand Up @@ -230,7 +230,7 @@ func (ip *specificInformersMap) addInformerToMap(gvk schema.GroupVersionKind, ob
cache.NamespaceIndex: cache.MetaNamespaceIndexFunc,
})
i := &MapEntry{
Informer: ni,
Informer: &HandlerCountingInformer{ni, 0},
Reader: CacheReader{indexer: ni.GetIndexer(), groupVersionKind: gvk},
stop: make(chan struct{}),
}
Expand All @@ -241,21 +241,34 @@ func (ip *specificInformersMap) addInformerToMap(gvk schema.GroupVersionKind, ob
// can you add eventhandlers?
if ip.started {
go i.Start(ip.stop)
//go i.Start(StopOptions{
// StopChannel: ip.stop,
//})
}
return i, ip.started, nil
}

// Remove removes an informer entry and stops it if it was running.
func (ip *specificInformersMap) Remove(gvk schema.GroupVersionKind) {
func (ip *specificInformersMap) Remove(gvk schema.GroupVersionKind) error {
ip.mu.Lock()
defer ip.mu.Unlock()

entry, ok := ip.informersByGVK[gvk]
if !ok {
return
return nil
}

chInformer, ok := entry.Informer.(*HandlerCountingInformer)
if !ok {
return fmt.Errorf("entry informer is not a HandlerCountingInformer")
}
if chInformer.CountEventHandlers() != 0 {
return fmt.Errorf("attempting to remove informer with %d references", chInformer.CountEventHandlers())
}

close(entry.stop)
delete(ip.informersByGVK, gvk)
return nil
}

// newListWatch returns a new ListWatch object that can be used to create a SharedIndexInformer.
Expand Down
17 changes: 17 additions & 0 deletions pkg/cache/multi_namespace_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,23 @@ type multiNamespaceInformer struct {

var _ Informer = &multiNamespaceInformer{}

func (i *multiNamespaceInformer) CountEventHandlers() int {
total := 0
for _, informer := range i.namespaceToInformer {
total += informer.CountEventHandlers()
}
return total
}

func (i *multiNamespaceInformer) RemoveEventHandler(id int) error {
for _, informer := range i.namespaceToInformer {
if err := informer.RemoveEventHandler(id); err != nil {
return err
}
}
return nil
}

// AddEventHandler adds the handler to each namespaced informer
func (i *multiNamespaceInformer) AddEventHandler(handler toolscache.ResourceEventHandler) {
for _, informer := range i.namespaceToInformer {
Expand Down
14 changes: 11 additions & 3 deletions pkg/internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,17 @@ func (c *Controller) Start(ctx context.Context) error {
// caches to sync so that they have a chance to register their intendeded
// caches.
for _, watch := range c.startWatches {
c.Log.Info("Starting EventSource", "source", watch.src)
if err := watch.src.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil {
return err
stoppableSource, ok := watch.src.(source.StoppableSource)
if ok {
// TODO: use errgroup or waitgroup to not return until all goros have exited
// (or something else to prevent leaks)
go stoppableSource.StartStoppable(ctx, watch.handler, c.Queue, watch.predicates...)
c.Log.Info("Starting STOPPABLE EventSource", "source", watch.src)
} else {
c.Log.Info("Starting EventSource", "source", watch.src)
if err := watch.src.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil {
return err
}
}
}

Expand Down
23 changes: 23 additions & 0 deletions pkg/source/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,14 @@ type SyncingSource interface {
WaitForSync(ctx context.Context) error
}

// StoppableSource expands the Source interface to add a start method that
// blocks on the context's Done channel, so that we know when the controller has
// been stopped and can remove/decremnt the EventHandler count on the informer appropriately.
type StoppableSource interface {
Source
StartStoppable(context.Context, handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error
}

// NewKindWithCache creates a Source without InjectCache, so that it is assured that the given cache is used
// and not overwritten. It can be used to watch objects in a different cluster by passing the cache
// from that other cluster
Expand Down Expand Up @@ -123,6 +131,21 @@ func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue w
return nil
}

// StartStoppable blocks for start to finish and then calls RemoveEventHandler on the kind's informer.
func (ks *Kind) StartStoppable(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface,
prct ...predicate.Predicate) error {
i, err := ks.cache.GetInformer(ctx, ks.Type)
if err != nil {
return err
}
if err := ks.Start(ctx, handler, queue, prct...); err != nil {
return err
}
<-ctx.Done()
i.RemoveEventHandler(-1)
return nil
}

func (ks *Kind) String() string {
if ks.Type != nil && ks.Type.GetObjectKind() != nil {
return fmt.Sprintf("kind source: %v", ks.Type.GetObjectKind().GroupVersionKind().String())
Expand Down

0 comments on commit 1cc6939

Please sign in to comment.