diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index 9827ea0297..e6ac63e417 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -170,19 +170,31 @@ func New(config *rest.Config, opts Options) (Cache, error) { if err != nil { return nil, err } - transformByGVK, err := convertToByGVK(opts.TransformByObject, opts.DefaultTransform, opts.Scheme) + transformers, err := convertToByGVK(opts.TransformByObject, opts.DefaultTransform, opts.Scheme) if err != nil { return nil, err } - transformByObj := internal.TransformFuncByObjectFromMap(transformByGVK) + transformByGVK := internal.TransformFuncByGVKFromMap(transformers) internalSelectorsByGVK := internal.SelectorsByGVK{} for gvk, selector := range selectorsByGVK { internalSelectorsByGVK[gvk] = internal.Selector(selector) } - im := internal.NewInformersMap(config, opts.Scheme, opts.Mapper, *opts.Resync, opts.Namespace, internalSelectorsByGVK, disableDeepCopyByGVK, transformByObj) - return &informerCache{InformersMap: im}, nil + return &informerCache{ + scheme: opts.Scheme, + InformersMap: internal.NewInformersMap(config, &internal.InformersMapOptions{ + Scheme: opts.Scheme, + Mapper: opts.Mapper, + ResyncPeriod: *opts.Resync, + Namespace: opts.Namespace, + ByGVK: internal.InformersMapOptionsByGVK{ + Selectors: internalSelectorsByGVK, + DisableDeepCopy: disableDeepCopyByGVK, + Transformers: transformByGVK, + }, + }), + }, nil } // BuilderWithOptions returns a Cache constructor that will build a cache diff --git a/pkg/cache/informer_cache.go b/pkg/cache/informer_cache.go index 08e4e6df59..e3272f8bbc 100644 --- a/pkg/cache/informer_cache.go +++ b/pkg/cache/informer_cache.go @@ -47,12 +47,13 @@ func (*ErrCacheNotStarted) Error() string { // informerCache is a Kubernetes Object cache populated from InformersMap. informerCache wraps an InformersMap. type informerCache struct { + scheme *runtime.Scheme *internal.InformersMap } // Get implements Reader. func (ip *informerCache) Get(ctx context.Context, key client.ObjectKey, out client.Object, opts ...client.GetOption) error { - gvk, err := apiutil.GVKForObject(out, ip.Scheme) + gvk, err := apiutil.GVKForObject(out, ip.scheme) if err != nil { return err } @@ -91,7 +92,7 @@ func (ip *informerCache) List(ctx context.Context, out client.ObjectList, opts . // for a single object corresponding to the passed-in list type. We need them // because they are used as cache map key. func (ip *informerCache) objectTypeForListObject(list client.ObjectList) (*schema.GroupVersionKind, runtime.Object, error) { - gvk, err := apiutil.GVKForObject(list, ip.Scheme) + gvk, err := apiutil.GVKForObject(list, ip.scheme) if err != nil { return nil, nil, err } @@ -132,7 +133,7 @@ func (ip *informerCache) objectTypeForListObject(list client.ObjectList) (*schem // GetInformerForKind returns the informer for the GroupVersionKind. func (ip *informerCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind) (Informer, error) { // Map the gvk to an object - obj, err := ip.Scheme.New(gvk) + obj, err := ip.scheme.New(gvk) if err != nil { return nil, err } @@ -146,7 +147,7 @@ func (ip *informerCache) GetInformerForKind(ctx context.Context, gvk schema.Grou // GetInformer returns the informer for the obj. func (ip *informerCache) GetInformer(ctx context.Context, obj client.Object) (Informer, error) { - gvk, err := apiutil.GVKForObject(obj, ip.Scheme) + gvk, err := apiutil.GVKForObject(obj, ip.scheme) if err != nil { return nil, err } diff --git a/pkg/cache/informer_cache_unit_test.go b/pkg/cache/informer_cache_unit_test.go index 401cc70f8d..f35fe31b58 100644 --- a/pkg/cache/informer_cache_unit_test.go +++ b/pkg/cache/informer_cache_unit_test.go @@ -38,7 +38,8 @@ const ( var _ = Describe("ip.objectTypeForListObject", func() { ip := &informerCache{ - InformersMap: &internal.InformersMap{Scheme: scheme.Scheme}, + scheme: scheme.Scheme, + InformersMap: &internal.InformersMap{}, } It("should find the object type for unstructured lists", func() { @@ -70,14 +71,14 @@ var _ = Describe("ip.objectTypeForListObject", func() { It("should find the object type of a list with a slice of pointers items field", func() { By("registering the type", func() { - ip.Scheme = runtime.NewScheme() + ip.scheme = runtime.NewScheme() err := (&crscheme.Builder{ GroupVersion: schema.GroupVersion{Group: itemPointerSliceTypeGroupName, Version: itemPointerSliceTypeVersion}, }). Register( &controllertest.UnconventionalListType{}, &controllertest.UnconventionalListTypeList{}, - ).AddToScheme(ip.Scheme) + ).AddToScheme(ip.scheme) Expect(err).To(BeNil()) }) diff --git a/pkg/cache/internal/deleg_map.go b/pkg/cache/internal/deleg_map.go deleted file mode 100644 index 27f46e3278..0000000000 --- a/pkg/cache/internal/deleg_map.go +++ /dev/null @@ -1,126 +0,0 @@ -/* -Copyright 2018 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package internal - -import ( - "context" - "time" - - "k8s.io/apimachinery/pkg/api/meta" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/client-go/rest" - "k8s.io/client-go/tools/cache" -) - -// InformersMap create and caches Informers for (runtime.Object, schema.GroupVersionKind) pairs. -// It uses a standard parameter codec constructed based on the given generated Scheme. -type InformersMap struct { - // we abstract over the details of structured/unstructured/metadata with the specificInformerMaps - // TODO(directxman12): genericize this over different projections now that we have 3 different maps - - structured *specificInformersMap - unstructured *specificInformersMap - metadata *specificInformersMap - - // Scheme maps runtime.Objects to GroupVersionKinds - Scheme *runtime.Scheme -} - -// NewInformersMap creates a new InformersMap that can create informers for -// both structured and unstructured objects. -func NewInformersMap(config *rest.Config, - scheme *runtime.Scheme, - mapper meta.RESTMapper, - resync time.Duration, - namespace string, - selectors SelectorsByGVK, - disableDeepCopy DisableDeepCopyByGVK, - transformers TransformFuncByObject, -) *InformersMap { - return &InformersMap{ - structured: newStructuredInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy, transformers), - unstructured: newUnstructuredInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy, transformers), - metadata: newMetadataInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy, transformers), - - Scheme: scheme, - } -} - -// Start calls Run on each of the informers and sets started to true. Blocks on the context. -func (m *InformersMap) Start(ctx context.Context) error { - go m.structured.Start(ctx) - go m.unstructured.Start(ctx) - go m.metadata.Start(ctx) - <-ctx.Done() - return nil -} - -// WaitForCacheSync waits until all the caches have been started and synced. -func (m *InformersMap) WaitForCacheSync(ctx context.Context) bool { - syncedFuncs := append([]cache.InformerSynced(nil), m.structured.HasSyncedFuncs()...) - syncedFuncs = append(syncedFuncs, m.unstructured.HasSyncedFuncs()...) - syncedFuncs = append(syncedFuncs, m.metadata.HasSyncedFuncs()...) - - if !m.structured.waitForStarted(ctx) { - return false - } - if !m.unstructured.waitForStarted(ctx) { - return false - } - if !m.metadata.waitForStarted(ctx) { - return false - } - return cache.WaitForCacheSync(ctx.Done(), syncedFuncs...) -} - -// Get will create a new Informer and add it to the map of InformersMap if none exists. Returns -// the Informer from the map. -func (m *InformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object) (bool, *MapEntry, error) { - switch obj.(type) { - case *unstructured.Unstructured: - return m.unstructured.Get(ctx, gvk, obj) - case *unstructured.UnstructuredList: - return m.unstructured.Get(ctx, gvk, obj) - case *metav1.PartialObjectMetadata: - return m.metadata.Get(ctx, gvk, obj) - case *metav1.PartialObjectMetadataList: - return m.metadata.Get(ctx, gvk, obj) - default: - return m.structured.Get(ctx, gvk, obj) - } -} - -// newStructuredInformersMap creates a new InformersMap for structured objects. -func newStructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, - namespace string, selectors SelectorsByGVK, disableDeepCopy DisableDeepCopyByGVK, transformers TransformFuncByObject) *specificInformersMap { - return newSpecificInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy, transformers, createStructuredListWatch) -} - -// newUnstructuredInformersMap creates a new InformersMap for unstructured objects. -func newUnstructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, - namespace string, selectors SelectorsByGVK, disableDeepCopy DisableDeepCopyByGVK, transformers TransformFuncByObject) *specificInformersMap { - return newSpecificInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy, transformers, createUnstructuredListWatch) -} - -// newMetadataInformersMap creates a new InformersMap for metadata-only objects. -func newMetadataInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, - namespace string, selectors SelectorsByGVK, disableDeepCopy DisableDeepCopyByGVK, transformers TransformFuncByObject) *specificInformersMap { - return newSpecificInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy, transformers, createMetadataListWatch) -} diff --git a/pkg/cache/internal/informers_map.go b/pkg/cache/internal/informers_map.go index 1524d2316f..1f520a8d08 100644 --- a/pkg/cache/internal/informers_map.go +++ b/pkg/cache/internal/informers_map.go @@ -26,6 +26,7 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/serializer" @@ -34,7 +35,6 @@ import ( "k8s.io/client-go/metadata" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" - "sigs.k8s.io/controller-runtime/pkg/client/apiutil" ) @@ -42,37 +42,43 @@ func init() { rand.Seed(time.Now().UnixNano()) } -// clientListWatcherFunc knows how to create a ListWatcher. -type createListWatcherFunc func(gvk schema.GroupVersionKind, ip *specificInformersMap) (*cache.ListWatch, error) - -// newSpecificInformersMap returns a new specificInformersMap (like -// the generical InformersMap, except that it doesn't implement WaitForCacheSync). -func newSpecificInformersMap(config *rest.Config, - scheme *runtime.Scheme, - mapper meta.RESTMapper, - resync time.Duration, - namespace string, - selectors SelectorsByGVK, - disableDeepCopy DisableDeepCopyByGVK, - transformers TransformFuncByObject, - createListWatcher createListWatcherFunc, -) *specificInformersMap { - ip := &specificInformersMap{ - config: config, - Scheme: scheme, - mapper: mapper, - informersByGVK: make(map[schema.GroupVersionKind]*MapEntry), - codecs: serializer.NewCodecFactory(scheme), - paramCodec: runtime.NewParameterCodec(scheme), - resync: resync, - startWait: make(chan struct{}), - createListWatcher: createListWatcher, - namespace: namespace, - selectors: selectors.forGVK, - disableDeepCopy: disableDeepCopy, - transformers: transformers, +// InformersMapOptions configures an InformerMap. +type InformersMapOptions struct { + Scheme *runtime.Scheme + Mapper meta.RESTMapper + ResyncPeriod time.Duration + Namespace string + ByGVK InformersMapOptionsByGVK +} + +// InformersMapOptionsByGVK configured additional by group version kind (or object) +// in an InformerMap. +type InformersMapOptionsByGVK struct { + Selectors SelectorsByGVK + Transformers TransformFuncByGVK + DisableDeepCopy DisableDeepCopyByGVK +} + +// NewInformersMap creates a new InformersMap that can create informers under the hood. +func NewInformersMap(config *rest.Config, options *InformersMapOptions) *InformersMap { + return &InformersMap{ + config: config, + scheme: options.Scheme, + mapper: options.Mapper, + informers: informers{ + Structured: make(map[schema.GroupVersionKind]*MapEntry), + Unstructured: make(map[schema.GroupVersionKind]*MapEntry), + Metadata: make(map[schema.GroupVersionKind]*MapEntry), + }, + codecs: serializer.NewCodecFactory(options.Scheme), + paramCodec: runtime.NewParameterCodec(options.Scheme), + resync: options.ResyncPeriod, + startWait: make(chan struct{}), + namespace: options.Namespace, + selectors: options.ByGVK.Selectors.forGVK, + disableDeepCopy: options.ByGVK.DisableDeepCopy, + transformers: options.ByGVK.Transformers, } - return ip } // MapEntry contains the cached data for an Informer. @@ -84,11 +90,17 @@ type MapEntry struct { Reader CacheReader } -// specificInformersMap create and caches Informers for (runtime.Object, schema.GroupVersionKind) pairs. +type informers struct { + Structured map[schema.GroupVersionKind]*MapEntry + Unstructured map[schema.GroupVersionKind]*MapEntry + Metadata map[schema.GroupVersionKind]*MapEntry +} + +// InformersMap create and caches Informers for (runtime.Object, schema.GroupVersionKind) pairs. // It uses a standard parameter codec constructed based on the given generated Scheme. -type specificInformersMap struct { - // Scheme maps runtime.Objects to GroupVersionKinds - Scheme *runtime.Scheme +type InformersMap struct { + // scheme maps runtime.Objects to GroupVersionKinds + scheme *runtime.Scheme // config is used to talk to the apiserver config *rest.Config @@ -96,8 +108,8 @@ type specificInformersMap struct { // mapper maps GroupVersionKinds to Resources mapper meta.RESTMapper - // informersByGVK is the cache of informers keyed by groupVersionKind - informersByGVK map[schema.GroupVersionKind]*MapEntry + // informers is the cache of informers keyed by their type and groupVersionKind + informers informers // codecs is used to create a new REST client codecs serializer.CodecFactory @@ -123,11 +135,6 @@ type specificInformersMap struct { // informer has been started. startWait chan struct{} - // createClient knows how to create a client and a list object, - // and allows for abstracting over the particulars of structured vs - // unstructured objects. - createListWatcher createListWatcherFunc - // namespace is the namespace that all ListWatches are restricted to // default or empty string means all namespaces namespace string @@ -140,12 +147,12 @@ type specificInformersMap struct { disableDeepCopy DisableDeepCopyByGVK // transform funcs are applied to objects before they are committed to the cache - transformers TransformFuncByObject + transformers TransformFuncByGVK } // Start calls Run on each of the informers and sets started to true. Blocks on the context. // It doesn't return start because it can't return an error, and it's not a runnable directly. -func (ip *specificInformersMap) Start(ctx context.Context) { +func (ip *InformersMap) Start(ctx context.Context) error { func() { ip.mu.Lock() defer ip.mu.Unlock() @@ -154,8 +161,14 @@ func (ip *specificInformersMap) Start(ctx context.Context) { ip.stop = ctx.Done() // Start each informer - for _, informer := range ip.informersByGVK { - go informer.Informer.Run(ctx.Done()) + for _, i := range ip.informers.Structured { + go i.Informer.Run(ctx.Done()) + } + for _, i := range ip.informers.Unstructured { + go i.Informer.Run(ctx.Done()) + } + for _, i := range ip.informers.Metadata { + go i.Informer.Run(ctx.Done()) } // Set started to true so we immediately start any informers added later. @@ -163,9 +176,10 @@ func (ip *specificInformersMap) Start(ctx context.Context) { close(ip.startWait) }() <-ctx.Done() + return nil } -func (ip *specificInformersMap) waitForStarted(ctx context.Context) bool { +func (ip *InformersMap) waitForStarted(ctx context.Context) bool { select { case <-ip.startWait: return true @@ -175,27 +189,45 @@ func (ip *specificInformersMap) waitForStarted(ctx context.Context) bool { } // HasSyncedFuncs returns all the HasSynced functions for the informers in this map. -func (ip *specificInformersMap) HasSyncedFuncs() []cache.InformerSynced { +func (ip *InformersMap) HasSyncedFuncs() []cache.InformerSynced { ip.mu.RLock() defer ip.mu.RUnlock() - syncedFuncs := make([]cache.InformerSynced, 0, len(ip.informersByGVK)) - for _, informer := range ip.informersByGVK { - syncedFuncs = append(syncedFuncs, informer.Informer.HasSynced) + + res := make([]cache.InformerSynced, 0, + len(ip.informers.Structured)+len(ip.informers.Unstructured)+len(ip.informers.Metadata), + ) + for _, i := range ip.informers.Structured { + res = append(res, i.Informer.HasSynced) } - return syncedFuncs + for _, i := range ip.informers.Unstructured { + res = append(res, i.Informer.HasSynced) + } + for _, i := range ip.informers.Metadata { + res = append(res, i.Informer.HasSynced) + } + return res +} + +// WaitForCacheSync waits until all the caches have been started and synced. +func (ip *InformersMap) WaitForCacheSync(ctx context.Context) bool { + if !ip.waitForStarted(ctx) { + return false + } + return cache.WaitForCacheSync(ctx.Done(), ip.HasSyncedFuncs()...) +} + +func (ip *InformersMap) get(gvk schema.GroupVersionKind, obj runtime.Object) (entry *MapEntry, started bool, ok bool) { + ip.mu.RLock() + defer ip.mu.RUnlock() + i, ok := ip.informersByType(obj)[gvk] + return i, ip.started, ok } // Get will create a new Informer and add it to the map of specificInformersMap if none exists. Returns // the Informer from the map. -func (ip *specificInformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object) (bool, *MapEntry, error) { +func (ip *InformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object) (bool, *MapEntry, error) { // Return the informer if it is found - i, started, ok := func() (*MapEntry, bool, bool) { - ip.mu.RLock() - defer ip.mu.RUnlock() - i, ok := ip.informersByGVK[gvk] - return i, ip.started, ok - }() - + i, started, ok := ip.get(gvk, obj) if !ok { var err error if i, started, err = ip.addInformerToMap(gvk, obj); err != nil { @@ -213,24 +245,44 @@ func (ip *specificInformersMap) Get(ctx context.Context, gvk schema.GroupVersion return started, i, nil } -func (ip *specificInformersMap) addInformerToMap(gvk schema.GroupVersionKind, obj runtime.Object) (*MapEntry, bool, error) { +func (ip *InformersMap) informersByType(obj runtime.Object) map[schema.GroupVersionKind]*MapEntry { + switch obj.(type) { + case *unstructured.Unstructured, *unstructured.UnstructuredList: + return ip.informers.Unstructured + case *metav1.PartialObjectMetadata, *metav1.PartialObjectMetadataList: + return ip.informers.Metadata + default: + return ip.informers.Structured + } +} + +func (ip *InformersMap) addInformerToMap(gvk schema.GroupVersionKind, obj runtime.Object) (*MapEntry, bool, error) { ip.mu.Lock() defer ip.mu.Unlock() // Check the cache to see if we already have an Informer. If we do, return the Informer. // This is for the case where 2 routines tried to get the informer when it wasn't in the map // so neither returned early, but the first one created it. - if i, ok := ip.informersByGVK[gvk]; ok { + if i, ok := ip.informersByType(obj)[gvk]; ok { return i, ip.started, nil } // Create a NewSharedIndexInformer and add it to the map. - var lw *cache.ListWatch - lw, err := ip.createListWatcher(gvk, ip) + lw, err := ip.makeListWatcher(gvk, obj) if err != nil { return nil, false, err } - ni := cache.NewSharedIndexInformer(lw, obj, resyncPeriod(ip.resync)(), cache.Indexers{ + ni := cache.NewSharedIndexInformer(&cache.ListWatch{ + ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + ip.selectors(gvk).ApplyToList(&opts) + return lw.ListFunc(opts) + }, + WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + ip.selectors(gvk).ApplyToList(&opts) + opts.Watch = true // Watch needs to be set to true separately + return lw.WatchFunc(opts) + }, + }, obj, resyncPeriod(ip.resync)(), cache.Indexers{ cache.NamespaceIndex: cache.MetaNamespaceIndexFunc, }) @@ -253,7 +305,7 @@ func (ip *specificInformersMap) addInformerToMap(gvk schema.GroupVersionKind, ob disableDeepCopy: ip.disableDeepCopy.IsDisabled(gvk), }, } - ip.informersByGVK[gvk] = i + ip.informersByType(obj)[gvk] = i // Start the Informer if need by // TODO(seans): write thorough tests and document what happens here - can you add indexers? @@ -264,51 +316,11 @@ func (ip *specificInformersMap) addInformerToMap(gvk schema.GroupVersionKind, ob return i, ip.started, nil } -// newListWatch returns a new ListWatch object that can be used to create a SharedIndexInformer. -func createStructuredListWatch(gvk schema.GroupVersionKind, ip *specificInformersMap) (*cache.ListWatch, error) { - // Kubernetes APIs work against Resources, not GroupVersionKinds. Map the - // groupVersionKind to the Resource API we will use. - mapping, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version) - if err != nil { - return nil, err - } - - client, err := apiutil.RESTClientForGVK(gvk, false, ip.config, ip.codecs) - if err != nil { - return nil, err - } - listGVK := gvk.GroupVersion().WithKind(gvk.Kind + "List") - listObj, err := ip.Scheme.New(listGVK) - if err != nil { - return nil, err - } - - // TODO: the functions that make use of this ListWatch should be adapted to - // pass in their own contexts instead of relying on this fixed one here. +func (ip *InformersMap) makeListWatcher(gvk schema.GroupVersionKind, obj runtime.Object) (*cache.ListWatch, error) { + // TODO(vincepri): Wire the context in here and don't use TODO(). + // Can we use the context from the Get call? ctx := context.TODO() - // Create a new ListWatch for the obj - return &cache.ListWatch{ - ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { - ip.selectors(gvk).ApplyToList(&opts) - res := listObj.DeepCopyObject() - namespace := restrictNamespaceBySelector(ip.namespace, ip.selectors(gvk)) - isNamespaceScoped := namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot - err := client.Get().NamespaceIfScoped(namespace, isNamespaceScoped).Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Do(ctx).Into(res) - return res, err - }, - // Setup the watch function - WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { - ip.selectors(gvk).ApplyToList(&opts) - // Watch needs to be set to true separately - opts.Watch = true - namespace := restrictNamespaceBySelector(ip.namespace, ip.selectors(gvk)) - isNamespaceScoped := namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot - return client.Get().NamespaceIfScoped(namespace, isNamespaceScoped).Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Watch(ctx) - }, - }, nil -} -func createUnstructuredListWatch(gvk schema.GroupVersionKind, ip *specificInformersMap) (*cache.ListWatch, error) { // Kubernetes APIs work against Resources, not GroupVersionKinds. Map the // groupVersionKind to the Resource API we will use. mapping, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version) @@ -316,109 +328,128 @@ func createUnstructuredListWatch(gvk schema.GroupVersionKind, ip *specificInform return nil, err } - // If the rest configuration has a negotiated serializer passed in, - // we should remove it and use the one that the dynamic client sets for us. - cfg := rest.CopyConfig(ip.config) - cfg.NegotiatedSerializer = nil - dynamicClient, err := dynamic.NewForConfig(cfg) - if err != nil { - return nil, err + // Figure out if the GVK we're dealing with is global, or namespace scoped. + var namespace string + if mapping.Scope.Name() == meta.RESTScopeNameNamespace { + namespace = restrictNamespaceBySelector(ip.namespace, ip.selectors(gvk)) } - // TODO: the functions that make use of this ListWatch should be adapted to - // pass in their own contexts instead of relying on this fixed one here. - ctx := context.TODO() - // Create a new ListWatch for the obj - return &cache.ListWatch{ - ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { - ip.selectors(gvk).ApplyToList(&opts) - namespace := restrictNamespaceBySelector(ip.namespace, ip.selectors(gvk)) - if namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot { - return dynamicClient.Resource(mapping.Resource).Namespace(namespace).List(ctx, opts) - } - return dynamicClient.Resource(mapping.Resource).List(ctx, opts) - }, - // Setup the watch function - WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { - ip.selectors(gvk).ApplyToList(&opts) - // Watch needs to be set to true separately - opts.Watch = true - namespace := restrictNamespaceBySelector(ip.namespace, ip.selectors(gvk)) - if namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot { - return dynamicClient.Resource(mapping.Resource).Namespace(namespace).Watch(ctx, opts) - } - return dynamicClient.Resource(mapping.Resource).Watch(ctx, opts) - }, - }, nil -} - -func createMetadataListWatch(gvk schema.GroupVersionKind, ip *specificInformersMap) (*cache.ListWatch, error) { - // Kubernetes APIs work against Resources, not GroupVersionKinds. Map the - // groupVersionKind to the Resource API we will use. - mapping, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version) - if err != nil { - return nil, err - } - - // Always clear the negotiated serializer and use the one - // set from the metadata client. - cfg := rest.CopyConfig(ip.config) - cfg.NegotiatedSerializer = nil - - // grab the metadata client - client, err := metadata.NewForConfig(cfg) - if err != nil { - return nil, err - } - - // TODO: the functions that make use of this ListWatch should be adapted to - // pass in their own contexts instead of relying on this fixed one here. - ctx := context.TODO() - - // create the relevant listwatch - return &cache.ListWatch{ - ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { - ip.selectors(gvk).ApplyToList(&opts) + switch obj.(type) { + // + // Unstructured + // + case *unstructured.Unstructured, *unstructured.UnstructuredList: + // If the rest configuration has a negotiated serializer passed in, + // we should remove it and use the one that the dynamic client sets for us. + cfg := rest.CopyConfig(ip.config) + cfg.NegotiatedSerializer = nil + dynamicClient, err := dynamic.NewForConfig(cfg) + if err != nil { + return nil, err + } + resources := dynamicClient.Resource(mapping.Resource) + return &cache.ListWatch{ + ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + if namespace != "" { + return resources.Namespace(namespace).List(ctx, opts) + } + return resources.List(ctx, opts) + }, + // Setup the watch function + WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + if namespace != "" { + return resources.Namespace(namespace).Watch(ctx, opts) + } + return resources.Watch(ctx, opts) + }, + }, nil + // + // Metadata + // + case *metav1.PartialObjectMetadata, *metav1.PartialObjectMetadataList: + // Always clear the negotiated serializer and use the one + // set from the metadata client. + cfg := rest.CopyConfig(ip.config) + cfg.NegotiatedSerializer = nil + + // Grab the metadata metadataClient. + metadataClient, err := metadata.NewForConfig(cfg) + if err != nil { + return nil, err + } + resources := metadataClient.Resource(mapping.Resource) + + return &cache.ListWatch{ + ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + var ( + list *metav1.PartialObjectMetadataList + err error + ) + if namespace != "" { + list, err = resources.Namespace(namespace).List(ctx, opts) + } else { + list, err = resources.List(ctx, opts) + } + if list != nil { + for i := range list.Items { + list.Items[i].SetGroupVersionKind(gvk) + } + } + return list, err + }, + // Setup the watch function + WatchFunc: func(opts metav1.ListOptions) (watcher watch.Interface, err error) { + if namespace != "" { + watcher, err = resources.Namespace(namespace).Watch(ctx, opts) + } else { + watcher, err = resources.Watch(ctx, opts) + } + if err != nil { + return nil, err + } + return newGVKFixupWatcher(gvk, watcher), nil + }, + }, nil + // + // Structured. + // + default: + client, err := apiutil.RESTClientForGVK(gvk, false, ip.config, ip.codecs) + if err != nil { + return nil, err + } + listGVK := gvk.GroupVersion().WithKind(gvk.Kind + "List") + listObj, err := ip.scheme.New(listGVK) + if err != nil { + return nil, err + } + return &cache.ListWatch{ + ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + // Build the request. + req := client.Get().Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec) + if namespace != "" { + req.Namespace(namespace) + } - var ( - list *metav1.PartialObjectMetadataList - err error - ) - namespace := restrictNamespaceBySelector(ip.namespace, ip.selectors(gvk)) - if namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot { - list, err = client.Resource(mapping.Resource).Namespace(namespace).List(ctx, opts) - } else { - list, err = client.Resource(mapping.Resource).List(ctx, opts) - } - if list != nil { - for i := range list.Items { - list.Items[i].SetGroupVersionKind(gvk) + // Create the resulting object, and execute the request. + res := listObj.DeepCopyObject() + if err := req.Do(ctx).Into(res); err != nil { + return nil, err } - } - return list, err - }, - // Setup the watch function - WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { - ip.selectors(gvk).ApplyToList(&opts) - // Watch needs to be set to true separately - opts.Watch = true - - var ( - watcher watch.Interface - err error - ) - namespace := restrictNamespaceBySelector(ip.namespace, ip.selectors(gvk)) - if namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot { - watcher, err = client.Resource(mapping.Resource).Namespace(namespace).Watch(ctx, opts) - } else { - watcher, err = client.Resource(mapping.Resource).Watch(ctx, opts) - } - if watcher != nil { - watcher = newGVKFixupWatcher(gvk, watcher) - } - return watcher, err - }, - }, nil + return res, nil + }, + // Setup the watch function + WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + // Build the request. + req := client.Get().Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec) + if namespace != "" { + req.Namespace(namespace) + } + // Call the watch. + return req.Watch(ctx) + }, + }, nil + } } // newGVKFixupWatcher adds a wrapper that preserves the GVK information when diff --git a/pkg/cache/internal/transformers.go b/pkg/cache/internal/transformers.go index f69e02262a..0725f550c5 100644 --- a/pkg/cache/internal/transformers.go +++ b/pkg/cache/internal/transformers.go @@ -8,9 +8,9 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client/apiutil" ) -// TransformFuncByObject provides access to the correct transform function for +// TransformFuncByGVK provides access to the correct transform function for // any given GVK. -type TransformFuncByObject interface { +type TransformFuncByGVK interface { Set(runtime.Object, *runtime.Scheme, cache.TransformFunc) error Get(schema.GroupVersionKind) cache.TransformFunc SetDefault(transformer cache.TransformFunc) @@ -21,9 +21,9 @@ type transformFuncByGVK struct { transformers map[schema.GroupVersionKind]cache.TransformFunc } -// TransformFuncByObjectFromMap creates a TransformFuncByObject from a map that +// TransformFuncByGVKFromMap creates a TransformFuncByGVK from a map that // maps GVKs to TransformFuncs. -func TransformFuncByObjectFromMap(in map[schema.GroupVersionKind]cache.TransformFunc) TransformFuncByObject { +func TransformFuncByGVKFromMap(in map[schema.GroupVersionKind]cache.TransformFunc) TransformFuncByGVK { byGVK := &transformFuncByGVK{} if defaultFunc, hasDefault := in[schema.GroupVersionKind{}]; hasDefault { byGVK.defaultTransform = defaultFunc