From 521c21f689d609fc1c1a97f6b93ca9cb0dc34d6b Mon Sep 17 00:00:00 2001 From: Max Smythe Date: Fri, 21 Apr 2023 19:09:45 -0700 Subject: [PATCH] Proposal for dynamic informer cache This PR shows how Gatekeeper has forked controller runtime to support the dynamic addition/removal of informers. Happy to flesh this out if people are interested. Not sure what the correct licensing actions are for moving code across CNCF projects. --- pkg/cache/informer_cache.go | 30 ++++++++++ pkg/cache/internal/informers.go | 97 ++++++++++++++++++++++++++++++--- 2 files changed, 119 insertions(+), 8 deletions(-) diff --git a/pkg/cache/informer_cache.go b/pkg/cache/informer_cache.go index 771244d52a..2bbe178273 100644 --- a/pkg/cache/informer_cache.go +++ b/pkg/cache/informer_cache.go @@ -21,6 +21,7 @@ import ( "fmt" "strings" + apierrors "k8s.io/apimachinery/pkg/api/errors" apimeta "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -152,6 +153,24 @@ func (ic *informerCache) GetInformer(ctx context.Context, obj client.Object) (In return i.Informer, err } +// GetInformerNonBlocking returns the informer for the obj without waiting for its cache to sync. +func (ic *informerCache) GetInformerNonBlocking(ctx context.Context, obj client.Object) (Informer, error) { + gvk, err := apiutil.GVKForObject(obj, ic.scheme) + if err != nil { + return nil, err + } + + // Use a canceled context to signal non-blocking + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + _, i, err := ic.Informers.Get(ctx, gvk, obj) + if err != nil && !apierrors.IsTimeout(err) { + return nil, err + } + return i.Informer, nil +} + // NeedLeaderElection implements the LeaderElectionRunnable interface // to indicate that this can be started without requiring the leader lock. func (ic *informerCache) NeedLeaderElection() bool { @@ -171,6 +190,17 @@ func (ic *informerCache) IndexField(ctx context.Context, obj client.Object, fiel return indexByField(informer, field, extractValue) } +// Remove removes an informer specified by the obj argument from the cache and stops it if it existed. +func (ic *informerCache) Remove(obj client.Object) error { + gvk, err := apiutil.GVKForObject(obj, ic.scheme) + if err != nil { + return err + } + + ic.Informers.Remove(gvk, obj) + return nil +} + func indexByField(indexer Informer, field string, extractor client.IndexerFunc) error { indexFunc := func(objRaw interface{}) ([]string, error) { // TODO(directxman12): check if this is the correct type? diff --git a/pkg/cache/internal/informers.go b/pkg/cache/internal/informers.go index 09e0111114..7dcb1119ff 100644 --- a/pkg/cache/internal/informers.go +++ b/pkg/cache/internal/informers.go @@ -84,6 +84,20 @@ type Cache struct { // CacheReader wraps Informer and implements the CacheReader interface for a single type Reader CacheReader + + // Stop can be used to stop this individual informer. + stop chan struct{} +} + +// Start starts the informer managed by a MapEntry. +// Blocks until the informer stops. The informer can be stopped +// either individually (via the entry's stop channel) or globally +// via the provided stop argument. +func (c *Cache) Start(stop <-chan struct{}) { + // Stop on either the whole map stopping or just this informer being removed. + internalStop, cancel := eitherChan(stop, c.stop) + defer cancel() + c.Informer.Run(internalStop) } type tracker struct { @@ -198,13 +212,13 @@ func (ip *Informers) Start(ctx context.Context) error { // Start each informer for _, i := range ip.tracker.Structured { - ip.startInformerLocked(i.Informer) + ip.startInformerLocked(i) } for _, i := range ip.tracker.Unstructured { - ip.startInformerLocked(i.Informer) + ip.startInformerLocked(i) } for _, i := range ip.tracker.Metadata { - ip.startInformerLocked(i.Informer) + ip.startInformerLocked(i) } // Set started to true so we immediately start any informers added later. @@ -219,7 +233,7 @@ func (ip *Informers) Start(ctx context.Context) error { return nil } -func (ip *Informers) startInformerLocked(informer cache.SharedIndexInformer) { +func (ip *Informers) startInformerLocked(cacheEntry *Cache) { // Don't start the informer in case we are already waiting for the items in // the waitGroup to finish, since waitGroups don't support waiting and adding // at the same time. @@ -230,7 +244,7 @@ func (ip *Informers) startInformerLocked(informer cache.SharedIndexInformer) { ip.waitGroup.Add(1) go func() { defer ip.waitGroup.Done() - informer.Run(ip.ctx.Done()) + cacheEntry.Start(ip.ctx.Done()) }() } @@ -291,15 +305,34 @@ func (ip *Informers) Get(ctx context.Context, gvk schema.GroupVersionKind, obj r } if started && !i.Informer.HasSynced() { + // Cancel for context, informer stopping, or entire map stopping. + syncStop, cancel := mergeChan(ctx.Done(), i.stop, ip.ctx.Done()) + defer cancel() + // Wait for it to sync before returning the Informer so that folks don't read from a stale cache. - if !cache.WaitForCacheSync(ctx.Done(), i.Informer.HasSynced) { - return started, nil, apierrors.NewTimeoutError(fmt.Sprintf("failed waiting for %T Informer to sync", obj), 0) + if !cache.WaitForCacheSync(syncStop, i.Informer.HasSynced) { + return started, i, apierrors.NewTimeoutError(fmt.Sprintf("failed waiting for %T Informer to sync", obj), 0) } } return started, i, nil } +// Remove removes an informer entry and stops it if it was running. +func (ip *Informers) Remove(gvk schema.GroupVersionKind, obj runtime.Object) { + ip.mu.Lock() + defer ip.mu.Unlock() + + informerMap := ip.informersByType(obj) + + entry, ok := informerMap[gvk] + if !ok { + return + } + close(entry.stop) + delete(informerMap, gvk) +} + func (ip *Informers) informersByType(obj runtime.Object) map[schema.GroupVersionKind]*Cache { switch obj.(type) { case runtime.Unstructured: @@ -360,13 +393,14 @@ func (ip *Informers) addInformerToMap(gvk schema.GroupVersionKind, obj runtime.O scopeName: mapping.Scope.Name(), disableDeepCopy: ip.getDisableDeepCopy(gvk), }, + stop: make(chan struct{}), } ip.informersByType(obj)[gvk] = i // Start the informer in case the InformersMap has started, otherwise it will be // started when the InformersMap starts. if ip.started { - ip.startInformerLocked(i.Informer) + ip.startInformerLocked(i) } return i, ip.started, nil } @@ -558,3 +592,50 @@ func restrictNamespaceBySelector(namespaceOpt string, s Selector) string { } return "" } + +// eitherChan returns a channel that is closed when either of the input channels are signaled. +// The caller must call the returned CancelFunc to ensure no resources are leaked. +func eitherChan(a, b <-chan struct{}) (<-chan struct{}, context.CancelFunc) { + var once sync.Once + out := make(chan struct{}) + cancel := make(chan struct{}) + cancelFunc := func() { + once.Do(func() { + close(cancel) + }) + } + go func() { + defer close(out) + select { + case <-a: + case <-b: + case <-cancel: + } + }() + + return out, cancelFunc +} + +// mergeChan returns a channel that is closed when any of the input channels are signaled. +// The caller must call the returned CancelFunc to ensure no resources are leaked. +func mergeChan(a, b, c <-chan struct{}) (<-chan struct{}, context.CancelFunc) { + var once sync.Once + out := make(chan struct{}) + cancel := make(chan struct{}) + cancelFunc := func() { + once.Do(func() { + close(cancel) + }) + } + go func() { + defer close(out) + select { + case <-a: + case <-b: + case <-c: + case <-cancel: + } + }() + + return out, cancelFunc +}