Skip to content

Commit

Permalink
Merge pull request #371 from nvachhar/unsynced-informer-fix-option2
Browse files Browse the repository at this point in the history
🐛 Ensure returned informers are always correctly synced
  • Loading branch information
k8s-ci-robot authored Apr 5, 2019
2 parents ea673b4 + 2ea3125 commit 962ab1f
Showing 1 changed file with 42 additions and 47 deletions.
89 changes: 42 additions & 47 deletions pkg/cache/internal/informers_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,68 +147,63 @@ func (ip *specificInformersMap) HasSyncedFuncs() []cache.InformerSynced {
// the Informer from the map.
func (ip *specificInformersMap) Get(gvk schema.GroupVersionKind, obj runtime.Object) (*MapEntry, error) {
// Return the informer if it is found
i, ok := func() (*MapEntry, bool) {
i, started, ok := func() (*MapEntry, bool, bool) {
ip.mu.RLock()
defer ip.mu.RUnlock()
i, ok := ip.informersByGVK[gvk]
return i, ok
return i, ip.started, ok
}()
if ok {
return i, nil
}

// Do the mutex part in its own function so we can use defer without blocking pieces that don't
// need to be locked
var sync bool
i, err := func() (*MapEntry, error) {
ip.mu.Lock()
defer ip.mu.Unlock()

// Check the cache to see if we already have an Informer. If we do, return the Informer.
// This is for the case where 2 routines tried to get the informer when it wasn't in the map
// so neither returned early, but the first one created it.
var ok bool
i, ok := ip.informersByGVK[gvk]
if ok {
return i, nil
}

// Create a NewSharedIndexInformer and add it to the map.
var lw *cache.ListWatch
lw, err := ip.createListWatcher(gvk, ip)
if err != nil {
if !ok {
var err error
if i, started, err = ip.addInformerToMap(gvk, obj); err != nil {
return nil, err
}
ni := cache.NewSharedIndexInformer(lw, obj, ip.resync, cache.Indexers{
cache.NamespaceIndex: cache.MetaNamespaceIndexFunc,
})
i = &MapEntry{
Informer: ni,
Reader: CacheReader{indexer: ni.GetIndexer(), groupVersionKind: gvk},
}
ip.informersByGVK[gvk] = i

// Start the Informer if need by
// TODO(seans): write thorough tests and document what happens here - can you add indexers?
// can you add eventhandlers?
if ip.started {
sync = true
go i.Informer.Run(ip.stop)
}
return i, nil
}()
if err != nil {
return nil, err
}

if sync {
if started && !i.Informer.HasSynced() {
// Wait for it to sync before returning the Informer so that folks don't read from a stale cache.
if !cache.WaitForCacheSync(ip.stop, i.Informer.HasSynced) {
return nil, fmt.Errorf("failed waiting for %T Informer to sync", obj)
}
}

return i, err
return i, nil
}

func (ip *specificInformersMap) addInformerToMap(gvk schema.GroupVersionKind, obj runtime.Object) (*MapEntry, bool, error) {
ip.mu.Lock()
defer ip.mu.Unlock()

// Check the cache to see if we already have an Informer. If we do, return the Informer.
// This is for the case where 2 routines tried to get the informer when it wasn't in the map
// so neither returned early, but the first one created it.
if i, ok := ip.informersByGVK[gvk]; ok {
return i, ip.started, nil
}

// Create a NewSharedIndexInformer and add it to the map.
var lw *cache.ListWatch
lw, err := ip.createListWatcher(gvk, ip)
if err != nil {
return nil, false, err
}
ni := cache.NewSharedIndexInformer(lw, obj, ip.resync, cache.Indexers{
cache.NamespaceIndex: cache.MetaNamespaceIndexFunc,
})
i := &MapEntry{
Informer: ni,
Reader: CacheReader{indexer: ni.GetIndexer(), groupVersionKind: gvk},
}
ip.informersByGVK[gvk] = i

// Start the Informer if need by
// TODO(seans): write thorough tests and document what happens here - can you add indexers?
// can you add eventhandlers?
if ip.started {
go i.Informer.Run(ip.stop)
}
return i, ip.started, nil
}

// newListWatch returns a new ListWatch object that can be used to create a SharedIndexInformer.
Expand Down

0 comments on commit 962ab1f

Please sign in to comment.