Skip to content

Commit

Permalink
Proposal for dynamic informer cache
Browse files Browse the repository at this point in the history
This PR shows how Gatekeeper has forked controller runtime
to support the dynamic addition/removal of informers.

Happy to flesh this out if people are interested. Not sure
what the correct licensing actions are for moving code across CNCF
projects.
  • Loading branch information
maxsmythe committed Apr 22, 2023
1 parent a24b949 commit 521c21f
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 8 deletions.
30 changes: 30 additions & 0 deletions pkg/cache/informer_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"strings"

apierrors "k8s.io/apimachinery/pkg/api/errors"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
Expand Down Expand Up @@ -152,6 +153,24 @@ func (ic *informerCache) GetInformer(ctx context.Context, obj client.Object) (In
return i.Informer, err
}

// GetInformerNonBlocking returns the informer for the obj without waiting for its cache to sync.
func (ic *informerCache) GetInformerNonBlocking(ctx context.Context, obj client.Object) (Informer, error) {
gvk, err := apiutil.GVKForObject(obj, ic.scheme)
if err != nil {
return nil, err
}

// Use a canceled context to signal non-blocking
ctx, cancel := context.WithCancel(context.Background())
cancel()

_, i, err := ic.Informers.Get(ctx, gvk, obj)
if err != nil && !apierrors.IsTimeout(err) {
return nil, err
}
return i.Informer, nil
}

// NeedLeaderElection implements the LeaderElectionRunnable interface
// to indicate that this can be started without requiring the leader lock.
func (ic *informerCache) NeedLeaderElection() bool {
Expand All @@ -171,6 +190,17 @@ func (ic *informerCache) IndexField(ctx context.Context, obj client.Object, fiel
return indexByField(informer, field, extractValue)
}

// Remove removes an informer specified by the obj argument from the cache and stops it if it existed.
func (ic *informerCache) Remove(obj client.Object) error {
gvk, err := apiutil.GVKForObject(obj, ic.scheme)
if err != nil {
return err
}

ic.Informers.Remove(gvk, obj)
return nil
}

func indexByField(indexer Informer, field string, extractor client.IndexerFunc) error {
indexFunc := func(objRaw interface{}) ([]string, error) {
// TODO(directxman12): check if this is the correct type?
Expand Down
97 changes: 89 additions & 8 deletions pkg/cache/internal/informers.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,20 @@ type Cache struct {

// CacheReader wraps Informer and implements the CacheReader interface for a single type
Reader CacheReader

// Stop can be used to stop this individual informer.
stop chan struct{}
}

// Start starts the informer managed by a MapEntry.
// Blocks until the informer stops. The informer can be stopped
// either individually (via the entry's stop channel) or globally
// via the provided stop argument.
func (c *Cache) Start(stop <-chan struct{}) {
// Stop on either the whole map stopping or just this informer being removed.
internalStop, cancel := eitherChan(stop, c.stop)
defer cancel()
c.Informer.Run(internalStop)
}

type tracker struct {
Expand Down Expand Up @@ -198,13 +212,13 @@ func (ip *Informers) Start(ctx context.Context) error {

// Start each informer
for _, i := range ip.tracker.Structured {
ip.startInformerLocked(i.Informer)
ip.startInformerLocked(i)
}
for _, i := range ip.tracker.Unstructured {
ip.startInformerLocked(i.Informer)
ip.startInformerLocked(i)
}
for _, i := range ip.tracker.Metadata {
ip.startInformerLocked(i.Informer)
ip.startInformerLocked(i)
}

// Set started to true so we immediately start any informers added later.
Expand All @@ -219,7 +233,7 @@ func (ip *Informers) Start(ctx context.Context) error {
return nil
}

func (ip *Informers) startInformerLocked(informer cache.SharedIndexInformer) {
func (ip *Informers) startInformerLocked(cacheEntry *Cache) {
// Don't start the informer in case we are already waiting for the items in
// the waitGroup to finish, since waitGroups don't support waiting and adding
// at the same time.
Expand All @@ -230,7 +244,7 @@ func (ip *Informers) startInformerLocked(informer cache.SharedIndexInformer) {
ip.waitGroup.Add(1)
go func() {
defer ip.waitGroup.Done()
informer.Run(ip.ctx.Done())
cacheEntry.Start(ip.ctx.Done())
}()
}

Expand Down Expand Up @@ -291,15 +305,34 @@ func (ip *Informers) Get(ctx context.Context, gvk schema.GroupVersionKind, obj r
}

if started && !i.Informer.HasSynced() {
// Cancel for context, informer stopping, or entire map stopping.
syncStop, cancel := mergeChan(ctx.Done(), i.stop, ip.ctx.Done())
defer cancel()

// Wait for it to sync before returning the Informer so that folks don't read from a stale cache.
if !cache.WaitForCacheSync(ctx.Done(), i.Informer.HasSynced) {
return started, nil, apierrors.NewTimeoutError(fmt.Sprintf("failed waiting for %T Informer to sync", obj), 0)
if !cache.WaitForCacheSync(syncStop, i.Informer.HasSynced) {
return started, i, apierrors.NewTimeoutError(fmt.Sprintf("failed waiting for %T Informer to sync", obj), 0)
}
}

return started, i, nil
}

// Remove removes an informer entry and stops it if it was running.
func (ip *Informers) Remove(gvk schema.GroupVersionKind, obj runtime.Object) {
ip.mu.Lock()
defer ip.mu.Unlock()

informerMap := ip.informersByType(obj)

entry, ok := informerMap[gvk]
if !ok {
return
}
close(entry.stop)
delete(informerMap, gvk)
}

func (ip *Informers) informersByType(obj runtime.Object) map[schema.GroupVersionKind]*Cache {
switch obj.(type) {
case runtime.Unstructured:
Expand Down Expand Up @@ -360,13 +393,14 @@ func (ip *Informers) addInformerToMap(gvk schema.GroupVersionKind, obj runtime.O
scopeName: mapping.Scope.Name(),
disableDeepCopy: ip.getDisableDeepCopy(gvk),
},
stop: make(chan struct{}),
}
ip.informersByType(obj)[gvk] = i

// Start the informer in case the InformersMap has started, otherwise it will be
// started when the InformersMap starts.
if ip.started {
ip.startInformerLocked(i.Informer)
ip.startInformerLocked(i)
}
return i, ip.started, nil
}
Expand Down Expand Up @@ -558,3 +592,50 @@ func restrictNamespaceBySelector(namespaceOpt string, s Selector) string {
}
return ""
}

// eitherChan returns a channel that is closed when either of the input channels are signaled.
// The caller must call the returned CancelFunc to ensure no resources are leaked.
func eitherChan(a, b <-chan struct{}) (<-chan struct{}, context.CancelFunc) {
var once sync.Once
out := make(chan struct{})
cancel := make(chan struct{})
cancelFunc := func() {
once.Do(func() {
close(cancel)
})
}
go func() {
defer close(out)
select {
case <-a:
case <-b:
case <-cancel:
}
}()

return out, cancelFunc
}

// mergeChan returns a channel that is closed when any of the input channels are signaled.
// The caller must call the returned CancelFunc to ensure no resources are leaked.
func mergeChan(a, b, c <-chan struct{}) (<-chan struct{}, context.CancelFunc) {
var once sync.Once
out := make(chan struct{})
cancel := make(chan struct{})
cancelFunc := func() {
once.Do(func() {
close(cancel)
})
}
go func() {
defer close(out)
select {
case <-a:
case <-b:
case <-c:
case <-cancel:
}
}()

return out, cancelFunc
}

0 comments on commit 521c21f

Please sign in to comment.