Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Ensure returned informers are always correctly synced #371

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 42 additions & 47 deletions pkg/cache/internal/informers_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,68 +147,63 @@ func (ip *specificInformersMap) HasSyncedFuncs() []cache.InformerSynced {
// the Informer from the map.
func (ip *specificInformersMap) Get(gvk schema.GroupVersionKind, obj runtime.Object) (*MapEntry, error) {
// Return the informer if it is found
i, ok := func() (*MapEntry, bool) {
i, started, ok := func() (*MapEntry, bool, bool) {
ip.mu.RLock()
defer ip.mu.RUnlock()
i, ok := ip.informersByGVK[gvk]
return i, ok
return i, ip.started, ok
}()
if ok {
return i, nil
}

// Do the mutex part in its own function so we can use defer without blocking pieces that don't
// need to be locked
var sync bool
i, err := func() (*MapEntry, error) {
ip.mu.Lock()
defer ip.mu.Unlock()

// Check the cache to see if we already have an Informer. If we do, return the Informer.
// This is for the case where 2 routines tried to get the informer when it wasn't in the map
// so neither returned early, but the first one created it.
var ok bool
i, ok := ip.informersByGVK[gvk]
if ok {
return i, nil
}

// Create a NewSharedIndexInformer and add it to the map.
var lw *cache.ListWatch
lw, err := ip.createListWatcher(gvk, ip)
if err != nil {
if !ok {
var err error
if i, started, err = ip.addInformerToMap(gvk, obj); err != nil {
return nil, err
}
ni := cache.NewSharedIndexInformer(lw, obj, ip.resync, cache.Indexers{
cache.NamespaceIndex: cache.MetaNamespaceIndexFunc,
})
i = &MapEntry{
Informer: ni,
Reader: CacheReader{indexer: ni.GetIndexer(), groupVersionKind: gvk},
}
ip.informersByGVK[gvk] = i

// Start the Informer if need by
// TODO(seans): write thorough tests and document what happens here - can you add indexers?
// can you add eventhandlers?
if ip.started {
sync = true
go i.Informer.Run(ip.stop)
}
return i, nil
}()
if err != nil {
return nil, err
}

if sync {
if started && !i.Informer.HasSynced() {
// Wait for it to sync before returning the Informer so that folks don't read from a stale cache.
if !cache.WaitForCacheSync(ip.stop, i.Informer.HasSynced) {
return nil, fmt.Errorf("failed waiting for %T Informer to sync", obj)
}
}

return i, err
return i, nil
}

func (ip *specificInformersMap) addInformerToMap(gvk schema.GroupVersionKind, obj runtime.Object) (*MapEntry, bool, error) {
ip.mu.Lock()
defer ip.mu.Unlock()

// Check the cache to see if we already have an Informer. If we do, return the Informer.
// This is for the case where 2 routines tried to get the informer when it wasn't in the map
// so neither returned early, but the first one created it.
if i, ok := ip.informersByGVK[gvk]; ok {
return i, ip.started, nil
}

// Create a NewSharedIndexInformer and add it to the map.
var lw *cache.ListWatch
lw, err := ip.createListWatcher(gvk, ip)
if err != nil {
return nil, false, err
}
ni := cache.NewSharedIndexInformer(lw, obj, ip.resync, cache.Indexers{
cache.NamespaceIndex: cache.MetaNamespaceIndexFunc,
})
i := &MapEntry{
Informer: ni,
Reader: CacheReader{indexer: ni.GetIndexer(), groupVersionKind: gvk},
}
ip.informersByGVK[gvk] = i

// Start the Informer if need by
// TODO(seans): write thorough tests and document what happens here - can you add indexers?
// can you add eventhandlers?
if ip.started {
go i.Informer.Run(ip.stop)
}
return i, ip.started, nil
}

// newListWatch returns a new ListWatch object that can be used to create a SharedIndexInformer.
Expand Down