From 2ea31255f6ff517d497af5bb23fd48ddb64c221b Mon Sep 17 00:00:00 2001 From: Neil Vachharajani Date: Fri, 22 Mar 2019 11:42:58 -0700 Subject: [PATCH] Fix a race condition where an unsynced informer leaks from specificInformersMap --- pkg/cache/internal/informers_map.go | 89 ++++++++++++++--------------- 1 file changed, 42 insertions(+), 47 deletions(-) diff --git a/pkg/cache/internal/informers_map.go b/pkg/cache/internal/informers_map.go index 9dea36966b..b2787adfc8 100644 --- a/pkg/cache/internal/informers_map.go +++ b/pkg/cache/internal/informers_map.go @@ -147,68 +147,63 @@ func (ip *specificInformersMap) HasSyncedFuncs() []cache.InformerSynced { // the Informer from the map. func (ip *specificInformersMap) Get(gvk schema.GroupVersionKind, obj runtime.Object) (*MapEntry, error) { // Return the informer if it is found - i, ok := func() (*MapEntry, bool) { + i, started, ok := func() (*MapEntry, bool, bool) { ip.mu.RLock() defer ip.mu.RUnlock() i, ok := ip.informersByGVK[gvk] - return i, ok + return i, ip.started, ok }() - if ok { - return i, nil - } - - // Do the mutex part in its own function so we can use defer without blocking pieces that don't - // need to be locked - var sync bool - i, err := func() (*MapEntry, error) { - ip.mu.Lock() - defer ip.mu.Unlock() - // Check the cache to see if we already have an Informer. If we do, return the Informer. - // This is for the case where 2 routines tried to get the informer when it wasn't in the map - // so neither returned early, but the first one created it. - var ok bool - i, ok := ip.informersByGVK[gvk] - if ok { - return i, nil - } - - // Create a NewSharedIndexInformer and add it to the map. - var lw *cache.ListWatch - lw, err := ip.createListWatcher(gvk, ip) - if err != nil { + if !ok { + var err error + if i, started, err = ip.addInformerToMap(gvk, obj); err != nil { return nil, err } - ni := cache.NewSharedIndexInformer(lw, obj, ip.resync, cache.Indexers{ - cache.NamespaceIndex: cache.MetaNamespaceIndexFunc, - }) - i = &MapEntry{ - Informer: ni, - Reader: CacheReader{indexer: ni.GetIndexer(), groupVersionKind: gvk}, - } - ip.informersByGVK[gvk] = i - - // Start the Informer if need by - // TODO(seans): write thorough tests and document what happens here - can you add indexers? - // can you add eventhandlers? - if ip.started { - sync = true - go i.Informer.Run(ip.stop) - } - return i, nil - }() - if err != nil { - return nil, err } - if sync { + if started && !i.Informer.HasSynced() { // Wait for it to sync before returning the Informer so that folks don't read from a stale cache. if !cache.WaitForCacheSync(ip.stop, i.Informer.HasSynced) { return nil, fmt.Errorf("failed waiting for %T Informer to sync", obj) } } - return i, err + return i, nil +} + +func (ip *specificInformersMap) addInformerToMap(gvk schema.GroupVersionKind, obj runtime.Object) (*MapEntry, bool, error) { + ip.mu.Lock() + defer ip.mu.Unlock() + + // Check the cache to see if we already have an Informer. If we do, return the Informer. + // This is for the case where 2 routines tried to get the informer when it wasn't in the map + // so neither returned early, but the first one created it. + if i, ok := ip.informersByGVK[gvk]; ok { + return i, ip.started, nil + } + + // Create a NewSharedIndexInformer and add it to the map. + var lw *cache.ListWatch + lw, err := ip.createListWatcher(gvk, ip) + if err != nil { + return nil, false, err + } + ni := cache.NewSharedIndexInformer(lw, obj, ip.resync, cache.Indexers{ + cache.NamespaceIndex: cache.MetaNamespaceIndexFunc, + }) + i := &MapEntry{ + Informer: ni, + Reader: CacheReader{indexer: ni.GetIndexer(), groupVersionKind: gvk}, + } + ip.informersByGVK[gvk] = i + + // Start the Informer if need by + // TODO(seans): write thorough tests and document what happens here - can you add indexers? + // can you add eventhandlers? + if ip.started { + go i.Informer.Run(ip.stop) + } + return i, ip.started, nil } // newListWatch returns a new ListWatch object that can be used to create a SharedIndexInformer.