diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index e6ac63e417..86805ece5d 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -183,12 +183,12 @@ func New(config *rest.Config, opts Options) (Cache, error) { return &informerCache{ scheme: opts.Scheme, - InformersMap: internal.NewInformersMap(config, &internal.InformersMapOptions{ + Informers: internal.NewInformers(config, &internal.InformersOpts{ Scheme: opts.Scheme, Mapper: opts.Mapper, ResyncPeriod: *opts.Resync, Namespace: opts.Namespace, - ByGVK: internal.InformersMapOptionsByGVK{ + ByGVK: internal.InformersOptsByGVK{ Selectors: internalSelectorsByGVK, DisableDeepCopy: disableDeepCopyByGVK, Transformers: transformByGVK, diff --git a/pkg/cache/informer_cache.go b/pkg/cache/informer_cache.go index 408da6c1db..4e782cddaf 100644 --- a/pkg/cache/informer_cache.go +++ b/pkg/cache/informer_cache.go @@ -45,20 +45,21 @@ func (*ErrCacheNotStarted) Error() string { return "the cache is not started, can not read objects" } -// informerCache is a Kubernetes Object cache populated from InformersMap. informerCache wraps an InformersMap. +// informerCache is a Kubernetes Object cache populated from internal.Informers. +// informerCache wraps internal.Informers. type informerCache struct { scheme *runtime.Scheme - *internal.InformersMap + *internal.Informers } // Get implements Reader. -func (ip *informerCache) Get(ctx context.Context, key client.ObjectKey, out client.Object, opts ...client.GetOption) error { - gvk, err := apiutil.GVKForObject(out, ip.scheme) +func (ic *informerCache) Get(ctx context.Context, key client.ObjectKey, out client.Object, opts ...client.GetOption) error { + gvk, err := apiutil.GVKForObject(out, ic.scheme) if err != nil { return err } - started, cache, err := ip.InformersMap.Get(ctx, gvk, out) + started, cache, err := ic.Informers.Get(ctx, gvk, out) if err != nil { return err } @@ -70,13 +71,13 @@ func (ip *informerCache) Get(ctx context.Context, key client.ObjectKey, out clie } // List implements Reader. -func (ip *informerCache) List(ctx context.Context, out client.ObjectList, opts ...client.ListOption) error { - gvk, cacheTypeObj, err := ip.objectTypeForListObject(out) +func (ic *informerCache) List(ctx context.Context, out client.ObjectList, opts ...client.ListOption) error { + gvk, cacheTypeObj, err := ic.objectTypeForListObject(out) if err != nil { return err } - started, cache, err := ip.InformersMap.Get(ctx, *gvk, cacheTypeObj) + started, cache, err := ic.Informers.Get(ctx, *gvk, cacheTypeObj) if err != nil { return err } @@ -91,8 +92,8 @@ func (ip *informerCache) List(ctx context.Context, out client.ObjectList, opts . // objectTypeForListObject tries to find the runtime.Object and associated GVK // for a single object corresponding to the passed-in list type. We need them // because they are used as cache map key. -func (ip *informerCache) objectTypeForListObject(list client.ObjectList) (*schema.GroupVersionKind, runtime.Object, error) { - gvk, err := apiutil.GVKForObject(list, ip.scheme) +func (ic *informerCache) objectTypeForListObject(list client.ObjectList) (*schema.GroupVersionKind, runtime.Object, error) { + gvk, err := apiutil.GVKForObject(list, ic.scheme) if err != nil { return nil, nil, err } @@ -115,7 +116,7 @@ func (ip *informerCache) objectTypeForListObject(list client.ObjectList) (*schem // Any other list type should have a corresponding non-list type registered // in the scheme. Use that to create a new instance of the non-list type. - cacheTypeObj, err := ip.scheme.New(gvk) + cacheTypeObj, err := ic.scheme.New(gvk) if err != nil { return nil, nil, err } @@ -123,14 +124,14 @@ func (ip *informerCache) objectTypeForListObject(list client.ObjectList) (*schem } // GetInformerForKind returns the informer for the GroupVersionKind. -func (ip *informerCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind) (Informer, error) { +func (ic *informerCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind) (Informer, error) { // Map the gvk to an object - obj, err := ip.scheme.New(gvk) + obj, err := ic.scheme.New(gvk) if err != nil { return nil, err } - _, i, err := ip.InformersMap.Get(ctx, gvk, obj) + _, i, err := ic.Informers.Get(ctx, gvk, obj) if err != nil { return nil, err } @@ -138,13 +139,13 @@ func (ip *informerCache) GetInformerForKind(ctx context.Context, gvk schema.Grou } // GetInformer returns the informer for the obj. -func (ip *informerCache) GetInformer(ctx context.Context, obj client.Object) (Informer, error) { - gvk, err := apiutil.GVKForObject(obj, ip.scheme) +func (ic *informerCache) GetInformer(ctx context.Context, obj client.Object) (Informer, error) { + gvk, err := apiutil.GVKForObject(obj, ic.scheme) if err != nil { return nil, err } - _, i, err := ip.InformersMap.Get(ctx, gvk, obj) + _, i, err := ic.Informers.Get(ctx, gvk, obj) if err != nil { return nil, err } @@ -153,7 +154,7 @@ func (ip *informerCache) GetInformer(ctx context.Context, obj client.Object) (In // NeedLeaderElection implements the LeaderElectionRunnable interface // to indicate that this can be started without requiring the leader lock. -func (ip *informerCache) NeedLeaderElection() bool { +func (ic *informerCache) NeedLeaderElection() bool { return false } @@ -162,8 +163,8 @@ func (ip *informerCache) NeedLeaderElection() bool { // to List. For one-to-one compatibility with "normal" field selectors, only return one value. // The values may be anything. They will automatically be prefixed with the namespace of the // given object, if present. The objects passed are guaranteed to be objects of the correct type. -func (ip *informerCache) IndexField(ctx context.Context, obj client.Object, field string, extractValue client.IndexerFunc) error { - informer, err := ip.GetInformer(ctx, obj) +func (ic *informerCache) IndexField(ctx context.Context, obj client.Object, field string, extractValue client.IndexerFunc) error { + informer, err := ic.GetInformer(ctx, obj) if err != nil { return err } diff --git a/pkg/cache/informer_cache_unit_test.go b/pkg/cache/informer_cache_unit_test.go index a864c983ea..130059bc40 100644 --- a/pkg/cache/informer_cache_unit_test.go +++ b/pkg/cache/informer_cache_unit_test.go @@ -39,8 +39,8 @@ const ( var _ = Describe("ip.objectTypeForListObject", func() { ip := &informerCache{ - scheme: scheme.Scheme, - InformersMap: &internal.InformersMap{}, + scheme: scheme.Scheme, + Informers: &internal.Informers{}, } It("should find the object type for unstructured lists", func() { diff --git a/pkg/cache/internal/informers_map.go b/pkg/cache/internal/informers.go similarity index 76% rename from pkg/cache/internal/informers_map.go rename to pkg/cache/internal/informers.go index f798e0c3d2..b084d4e696 100644 --- a/pkg/cache/internal/informers_map.go +++ b/pkg/cache/internal/informers.go @@ -42,47 +42,47 @@ func init() { rand.Seed(time.Now().UnixNano()) } -// InformersMapOptions configures an InformerMap. -type InformersMapOptions struct { +// InformersOpts configures an InformerMap. +type InformersOpts struct { Scheme *runtime.Scheme Mapper meta.RESTMapper ResyncPeriod time.Duration Namespace string - ByGVK InformersMapOptionsByGVK + ByGVK InformersOptsByGVK } -// InformersMapOptionsByGVK configured additional by group version kind (or object) +// InformersOptsByGVK configured additional by group version kind (or object) // in an InformerMap. -type InformersMapOptionsByGVK struct { +type InformersOptsByGVK struct { Selectors SelectorsByGVK Transformers TransformFuncByGVK DisableDeepCopy DisableDeepCopyByGVK } -// NewInformersMap creates a new InformersMap that can create informers under the hood. -func NewInformersMap(config *rest.Config, options *InformersMapOptions) *InformersMap { - return &InformersMap{ +// NewInformers creates a new InformersMap that can create informers under the hood. +func NewInformers(config *rest.Config, options *InformersOpts) *Informers { + return &Informers{ config: config, scheme: options.Scheme, mapper: options.Mapper, - informers: informers{ - Structured: make(map[schema.GroupVersionKind]*MapEntry), - Unstructured: make(map[schema.GroupVersionKind]*MapEntry), - Metadata: make(map[schema.GroupVersionKind]*MapEntry), + tracker: tracker{ + Structured: make(map[schema.GroupVersionKind]*Cache), + Unstructured: make(map[schema.GroupVersionKind]*Cache), + Metadata: make(map[schema.GroupVersionKind]*Cache), }, - codecs: serializer.NewCodecFactory(options.Scheme), - paramCodec: runtime.NewParameterCodec(options.Scheme), - resync: options.ResyncPeriod, - startWait: make(chan struct{}), - namespace: options.Namespace, - selectors: options.ByGVK.Selectors.forGVK, - disableDeepCopy: options.ByGVK.DisableDeepCopy, - transformers: options.ByGVK.Transformers, + codecs: serializer.NewCodecFactory(options.Scheme), + paramCodec: runtime.NewParameterCodec(options.Scheme), + resync: options.ResyncPeriod, + startWait: make(chan struct{}), + namespace: options.Namespace, + selectorByGVK: options.ByGVK.Selectors.forGVK, + disableDeepCopyByGVK: options.ByGVK.DisableDeepCopy, + transformersByGVK: options.ByGVK.Transformers, } } -// MapEntry contains the cached data for an Informer. -type MapEntry struct { +// Cache contains the cached data for an Cache. +type Cache struct { // Informer is the cached informer Informer cache.SharedIndexInformer @@ -90,15 +90,15 @@ type MapEntry struct { Reader CacheReader } -type informers struct { - Structured map[schema.GroupVersionKind]*MapEntry - Unstructured map[schema.GroupVersionKind]*MapEntry - Metadata map[schema.GroupVersionKind]*MapEntry +type tracker struct { + Structured map[schema.GroupVersionKind]*Cache + Unstructured map[schema.GroupVersionKind]*Cache + Metadata map[schema.GroupVersionKind]*Cache } -// InformersMap create and caches Informers for (runtime.Object, schema.GroupVersionKind) pairs. +// Informers create and caches Informers for (runtime.Object, schema.GroupVersionKind) pairs. // It uses a standard parameter codec constructed based on the given generated Scheme. -type InformersMap struct { +type Informers struct { // scheme maps runtime.Objects to GroupVersionKinds scheme *runtime.Scheme @@ -108,8 +108,8 @@ type InformersMap struct { // mapper maps GroupVersionKinds to Resources mapper meta.RESTMapper - // informers is the cache of informers keyed by their type and groupVersionKind - informers informers + // tracker tracks informers keyed by their type and groupVersionKind + tracker tracker // codecs is used to create a new REST client codecs serializer.CodecFactory @@ -145,20 +145,20 @@ type InformersMap struct { // default or empty string means all namespaces namespace string - // selectors are the label or field selectors that will be added to the + // selectorByGVK are the label or field selectorByGVK that will be added to the // ListWatch ListOptions. - selectors func(gvk schema.GroupVersionKind) Selector + selectorByGVK func(gvk schema.GroupVersionKind) Selector - // disableDeepCopy indicates not to deep copy objects during get or list objects. - disableDeepCopy DisableDeepCopyByGVK + // disableDeepCopyByGVK indicates not to deep copy objects during get or list objects. + disableDeepCopyByGVK DisableDeepCopyByGVK // transform funcs are applied to objects before they are committed to the cache - transformers TransformFuncByGVK + transformersByGVK TransformFuncByGVK } // Start calls Run on each of the informers and sets started to true. Blocks on the context. // It doesn't return start because it can't return an error, and it's not a runnable directly. -func (ip *InformersMap) Start(ctx context.Context) error { +func (ip *Informers) Start(ctx context.Context) error { func() { ip.mu.Lock() defer ip.mu.Unlock() @@ -167,13 +167,13 @@ func (ip *InformersMap) Start(ctx context.Context) error { ip.ctx = ctx // Start each informer - for _, i := range ip.informers.Structured { + for _, i := range ip.tracker.Structured { ip.startInformerLocked(i.Informer) } - for _, i := range ip.informers.Unstructured { + for _, i := range ip.tracker.Unstructured { ip.startInformerLocked(i.Informer) } - for _, i := range ip.informers.Metadata { + for _, i := range ip.tracker.Metadata { ip.startInformerLocked(i.Informer) } @@ -189,7 +189,7 @@ func (ip *InformersMap) Start(ctx context.Context) error { return nil } -func (ip *InformersMap) startInformerLocked(informer cache.SharedIndexInformer) { +func (ip *Informers) startInformerLocked(informer cache.SharedIndexInformer) { // Don't start the informer in case we are already waiting for the items in // the waitGroup to finish, since waitGroups don't support waiting and adding // at the same time. @@ -204,7 +204,7 @@ func (ip *InformersMap) startInformerLocked(informer cache.SharedIndexInformer) }() } -func (ip *InformersMap) waitForStarted(ctx context.Context) bool { +func (ip *Informers) waitForStarted(ctx context.Context) bool { select { case <-ip.startWait: return true @@ -213,35 +213,35 @@ func (ip *InformersMap) waitForStarted(ctx context.Context) bool { } } -// HasSyncedFuncs returns all the HasSynced functions for the informers in this map. -func (ip *InformersMap) HasSyncedFuncs() []cache.InformerSynced { +// getHasSyncedFuncs returns all the HasSynced functions for the informers in this map. +func (ip *Informers) getHasSyncedFuncs() []cache.InformerSynced { ip.mu.RLock() defer ip.mu.RUnlock() res := make([]cache.InformerSynced, 0, - len(ip.informers.Structured)+len(ip.informers.Unstructured)+len(ip.informers.Metadata), + len(ip.tracker.Structured)+len(ip.tracker.Unstructured)+len(ip.tracker.Metadata), ) - for _, i := range ip.informers.Structured { + for _, i := range ip.tracker.Structured { res = append(res, i.Informer.HasSynced) } - for _, i := range ip.informers.Unstructured { + for _, i := range ip.tracker.Unstructured { res = append(res, i.Informer.HasSynced) } - for _, i := range ip.informers.Metadata { + for _, i := range ip.tracker.Metadata { res = append(res, i.Informer.HasSynced) } return res } // WaitForCacheSync waits until all the caches have been started and synced. -func (ip *InformersMap) WaitForCacheSync(ctx context.Context) bool { +func (ip *Informers) WaitForCacheSync(ctx context.Context) bool { if !ip.waitForStarted(ctx) { return false } - return cache.WaitForCacheSync(ctx.Done(), ip.HasSyncedFuncs()...) + return cache.WaitForCacheSync(ctx.Done(), ip.getHasSyncedFuncs()...) } -func (ip *InformersMap) get(gvk schema.GroupVersionKind, obj runtime.Object) (entry *MapEntry, started bool, ok bool) { +func (ip *Informers) get(gvk schema.GroupVersionKind, obj runtime.Object) (res *Cache, started bool, ok bool) { ip.mu.RLock() defer ip.mu.RUnlock() i, ok := ip.informersByType(obj)[gvk] @@ -250,7 +250,7 @@ func (ip *InformersMap) get(gvk schema.GroupVersionKind, obj runtime.Object) (en // Get will create a new Informer and add it to the map of specificInformersMap if none exists. Returns // the Informer from the map. -func (ip *InformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object) (bool, *MapEntry, error) { +func (ip *Informers) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object) (bool, *Cache, error) { // Return the informer if it is found i, started, ok := ip.get(gvk, obj) if !ok { @@ -270,18 +270,18 @@ func (ip *InformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, ob return started, i, nil } -func (ip *InformersMap) informersByType(obj runtime.Object) map[schema.GroupVersionKind]*MapEntry { +func (ip *Informers) informersByType(obj runtime.Object) map[schema.GroupVersionKind]*Cache { switch obj.(type) { case *unstructured.Unstructured, *unstructured.UnstructuredList: - return ip.informers.Unstructured + return ip.tracker.Unstructured case *metav1.PartialObjectMetadata, *metav1.PartialObjectMetadataList: - return ip.informers.Metadata + return ip.tracker.Metadata default: - return ip.informers.Structured + return ip.tracker.Structured } } -func (ip *InformersMap) addInformerToMap(gvk schema.GroupVersionKind, obj runtime.Object) (*MapEntry, bool, error) { +func (ip *Informers) addInformerToMap(gvk schema.GroupVersionKind, obj runtime.Object) (*Cache, bool, error) { ip.mu.Lock() defer ip.mu.Unlock() @@ -293,41 +293,42 @@ func (ip *InformersMap) addInformerToMap(gvk schema.GroupVersionKind, obj runtim } // Create a NewSharedIndexInformer and add it to the map. - lw, err := ip.makeListWatcher(gvk, obj) + listWatcher, err := ip.makeListWatcher(gvk, obj) if err != nil { return nil, false, err } - ni := cache.NewSharedIndexInformer(&cache.ListWatch{ + sharedIndexInformer := cache.NewSharedIndexInformer(&cache.ListWatch{ ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { - ip.selectors(gvk).ApplyToList(&opts) - return lw.ListFunc(opts) + ip.selectorByGVK(gvk).ApplyToList(&opts) + return listWatcher.ListFunc(opts) }, WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { - ip.selectors(gvk).ApplyToList(&opts) + ip.selectorByGVK(gvk).ApplyToList(&opts) opts.Watch = true // Watch needs to be set to true separately - return lw.WatchFunc(opts) + return listWatcher.WatchFunc(opts) }, }, obj, resyncPeriod(ip.resync)(), cache.Indexers{ cache.NamespaceIndex: cache.MetaNamespaceIndexFunc, }) // Check to see if there is a transformer for this gvk - if err := ni.SetTransform(ip.transformers.Get(gvk)); err != nil { + if err := sharedIndexInformer.SetTransform(ip.transformersByGVK.Get(gvk)); err != nil { return nil, false, err } - rm, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version) + mapping, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version) if err != nil { return nil, false, err } - i := &MapEntry{ - Informer: ni, + // Create the new entry and set it in the map. + i := &Cache{ + Informer: sharedIndexInformer, Reader: CacheReader{ - indexer: ni.GetIndexer(), + indexer: sharedIndexInformer.GetIndexer(), groupVersionKind: gvk, - scopeName: rm.Scope.Name(), - disableDeepCopy: ip.disableDeepCopy.IsDisabled(gvk), + scopeName: mapping.Scope.Name(), + disableDeepCopy: ip.disableDeepCopyByGVK.IsDisabled(gvk), }, } ip.informersByType(obj)[gvk] = i @@ -340,7 +341,7 @@ func (ip *InformersMap) addInformerToMap(gvk schema.GroupVersionKind, obj runtim return i, ip.started, nil } -func (ip *InformersMap) makeListWatcher(gvk schema.GroupVersionKind, obj runtime.Object) (*cache.ListWatch, error) { +func (ip *Informers) makeListWatcher(gvk schema.GroupVersionKind, obj runtime.Object) (*cache.ListWatch, error) { // Kubernetes APIs work against Resources, not GroupVersionKinds. Map the // groupVersionKind to the Resource API we will use. mapping, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version) @@ -351,7 +352,7 @@ func (ip *InformersMap) makeListWatcher(gvk schema.GroupVersionKind, obj runtime // Figure out if the GVK we're dealing with is global, or namespace scoped. var namespace string if mapping.Scope.Name() == meta.RESTScopeNameNamespace { - namespace = restrictNamespaceBySelector(ip.namespace, ip.selectors(gvk)) + namespace = restrictNamespaceBySelector(ip.namespace, ip.selectorByGVK(gvk)) } switch obj.(type) { diff --git a/pkg/cache/internal/informers_map_test.go b/pkg/cache/internal/informers_test.go similarity index 100% rename from pkg/cache/internal/informers_map_test.go rename to pkg/cache/internal/informers_test.go