From 48fe9c7eaf0853b2f70c7a01ba5c71273bb00315 Mon Sep 17 00:00:00 2001 From: Quique Llorente Date: Tue, 2 Mar 2021 14:07:25 +0100 Subject: [PATCH] Add FieldSelectorByResource option to cache All instance for a same resources are being cached by controller-runtime, for some use cases this consumes a lot of memory and CPU. This change add a option to the cache so resources can be selected by field and label. Signed-off-by: Quique Llorente --- pkg/cache/cache.go | 22 ++++++++- pkg/cache/cache_test.go | 75 +++++++++++++++++++++++++++++ pkg/cache/internal/deleg_map.go | 25 ++++++---- pkg/cache/internal/informers_map.go | 10 ++++ pkg/cache/internal/selector.go | 32 ++++++++++++ 5 files changed, 153 insertions(+), 11 deletions(-) create mode 100644 pkg/cache/internal/selector.go diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index 71dfbd0454..8c767fd46e 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -86,6 +86,9 @@ type Informer interface { HasSynced() bool } +// SelectorsByGroupResource associate a GroupResource to a field/label selector +type SelectorsByGroupResource internal.SelectorsByGroupResource + // Options are the optional arguments for creating a new InformersMap object type Options struct { // Scheme is the scheme to use for mapping objects to GroupVersionKinds @@ -103,6 +106,13 @@ type Options struct { // Namespace restricts the cache's ListWatch to the desired namespace // Default watches all namespaces Namespace string + + // FieldSelectorByResource restricts the cache's ListWatch to the desired + // fields per resource, the map's value must implement Selector [1] using + // for example a Set [2] + // [1] https://pkg.go.dev/k8s.io/apimachinery/pkg/fields#Selector + // [2] https://pkg.go.dev/k8s.io/apimachinery/pkg/fields#Set + SelectorByResource SelectorsByGroupResource } var defaultResyncTime = 10 * time.Hour @@ -113,10 +123,20 @@ func New(config *rest.Config, opts Options) (Cache, error) { if err != nil { return nil, err } - im := internal.NewInformersMap(config, opts.Scheme, opts.Mapper, *opts.Resync, opts.Namespace) + im := internal.NewInformersMap(config, opts.Scheme, opts.Mapper, *opts.Resync, opts.Namespace, internal.SelectorsByGroupResource(opts.SelectorByResource)) return &informerCache{InformersMap: im}, nil } +// BuilderWithSelectors returns a Cache constructor that will filter +// contents using fieldSelectorByResource +// WARNING: filtered out resources are not returned. +func BuilderWithSelectors(selectors SelectorsByGroupResource) NewCacheFunc { + return func(config *rest.Config, opts Options) (Cache, error) { + opts.SelectorByResource = selectors + return New(config, opts) + } +} + func defaultOpts(config *rest.Config, opts Options) (Options, error) { // Use the default Kubernetes Scheme if unset if opts.Scheme == nil { diff --git a/pkg/cache/cache_test.go b/pkg/cache/cache_test.go index d23db63f75..3402ca6388 100644 --- a/pkg/cache/cache_test.go +++ b/pkg/cache/cache_test.go @@ -26,6 +26,7 @@ import ( "k8s.io/apimachinery/pkg/api/errors" kmetav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime/schema" kscheme "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" @@ -789,7 +790,81 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca Eventually(out).Should(Receive(Equal(pod))) close(done) }) + It("should be able to filter informers at list watch level by field", func() { + By("creating the cache") + builder := cache.BuilderWithSelectors( + cache.SelectorsByGroupResource{ + {Group: "", Resource: "pods"}: { + Field: fields.SelectorFromSet(fields.Set{"metadata.name": "foo"}), + }, + }, + ) + informer, err := builder(cfg, cache.Options{}) + Expect(err).NotTo(HaveOccurred()) + + By("running the cache and waiting for it to sync") + go func() { + defer GinkgoRecover() + Expect(informer.Start(informerCacheCtx)).To(Succeed()) + }() + Expect(informer.WaitForCacheSync(informerCacheCtx)).NotTo(BeFalse()) + + gvk := schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Pod"} + sii, err := informer.GetInformerForKind(context.TODO(), gvk) + Expect(err).NotTo(HaveOccurred()) + Expect(sii).NotTo(BeNil()) + Expect(sii.HasSynced()).To(BeTrue()) + By("adding an event handler listening for object creation which sends the object to a channel") + out := make(chan interface{}) + addFunc := func(obj interface{}) { + out <- obj + } + sii.AddEventHandler(kcache.ResourceEventHandlerFuncs{AddFunc: addFunc}) + + By("adding a pair of objects") + cl, err := client.New(cfg, client.Options{}) + Expect(err).NotTo(HaveOccurred()) + podFoo := &kcorev1.Pod{ + ObjectMeta: kmetav1.ObjectMeta{ + Name: "foo", + Namespace: "default", + }, + Spec: kcorev1.PodSpec{ + Containers: []kcorev1.Container{ + { + Name: "nginx", + Image: "nginx", + }, + }, + }, + } + Expect(cl.Create(context.Background(), podFoo)).To(Succeed()) + defer deletePod(podFoo) + + podBar := &kcorev1.Pod{ + ObjectMeta: kmetav1.ObjectMeta{ + Name: "bar", + Namespace: "default", + }, + Spec: kcorev1.PodSpec{ + Containers: []kcorev1.Container{ + { + Name: "nginx", + Image: "nginx", + }, + }, + }, + } + Expect(cl.Create(context.Background(), podBar)).To(Succeed()) + defer deletePod(podBar) + + By("verifying the filter out object is not received on the channel") + var obtainedObj interface{} + Expect(out).Should(Receive(&obtainedObj), "should receive something") + Expect(obtainedObj).Should(Equal(podFoo), "should receive the pod 'foo'") + Consistently(out).ShouldNot(Receive(), "should not receive anything else") + }) It("should be able to index an object field then retrieve objects by that field", func() { By("creating the cache") informer, err := cache.New(cfg, cache.Options{}) diff --git a/pkg/cache/internal/deleg_map.go b/pkg/cache/internal/deleg_map.go index 02bb1919f7..762f6aa757 100644 --- a/pkg/cache/internal/deleg_map.go +++ b/pkg/cache/internal/deleg_map.go @@ -49,12 +49,14 @@ func NewInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, - namespace string) *InformersMap { + namespace string, + selectors SelectorsByGroupResource, +) *InformersMap { return &InformersMap{ - structured: newStructuredInformersMap(config, scheme, mapper, resync, namespace), - unstructured: newUnstructuredInformersMap(config, scheme, mapper, resync, namespace), - metadata: newMetadataInformersMap(config, scheme, mapper, resync, namespace), + structured: newStructuredInformersMap(config, scheme, mapper, resync, namespace, selectors), + unstructured: newUnstructuredInformersMap(config, scheme, mapper, resync, namespace, selectors), + metadata: newMetadataInformersMap(config, scheme, mapper, resync, namespace, selectors), Scheme: scheme, } @@ -105,16 +107,19 @@ func (m *InformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj } // newStructuredInformersMap creates a new InformersMap for structured objects. -func newStructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, namespace string) *specificInformersMap { - return newSpecificInformersMap(config, scheme, mapper, resync, namespace, createStructuredListWatch) +func newStructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, + namespace string, selectors SelectorsByGroupResource) *specificInformersMap { + return newSpecificInformersMap(config, scheme, mapper, resync, namespace, selectors, createStructuredListWatch) } // newUnstructuredInformersMap creates a new InformersMap for unstructured objects. -func newUnstructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, namespace string) *specificInformersMap { - return newSpecificInformersMap(config, scheme, mapper, resync, namespace, createUnstructuredListWatch) +func newUnstructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, + namespace string, selectors SelectorsByGroupResource) *specificInformersMap { + return newSpecificInformersMap(config, scheme, mapper, resync, namespace, selectors, createUnstructuredListWatch) } // newMetadataInformersMap creates a new InformersMap for metadata-only objects. -func newMetadataInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, namespace string) *specificInformersMap { - return newSpecificInformersMap(config, scheme, mapper, resync, namespace, createMetadataListWatch) +func newMetadataInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, + namespace string, selectors SelectorsByGroupResource) *specificInformersMap { + return newSpecificInformersMap(config, scheme, mapper, resync, namespace, selectors, createMetadataListWatch) } diff --git a/pkg/cache/internal/informers_map.go b/pkg/cache/internal/informers_map.go index 6b57c6fa61..183f5cf6fb 100644 --- a/pkg/cache/internal/informers_map.go +++ b/pkg/cache/internal/informers_map.go @@ -48,6 +48,7 @@ func newSpecificInformersMap(config *rest.Config, mapper meta.RESTMapper, resync time.Duration, namespace string, + selectors SelectorsByGroupResource, createListWatcher createListWatcherFunc) *specificInformersMap { ip := &specificInformersMap{ config: config, @@ -60,6 +61,7 @@ func newSpecificInformersMap(config *rest.Config, startWait: make(chan struct{}), createListWatcher: createListWatcher, namespace: namespace, + selectors: selectors, } return ip } @@ -120,6 +122,8 @@ type specificInformersMap struct { // namespace is the namespace that all ListWatches are restricted to // default or empty string means all namespaces namespace string + + selectors SelectorsByGroupResource } // Start calls Run on each of the informers and sets started to true. Blocks on the context. @@ -256,6 +260,7 @@ func createStructuredListWatch(gvk schema.GroupVersionKind, ip *specificInformer // Create a new ListWatch for the obj return &cache.ListWatch{ ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + ip.selectors.FindByGR(mapping.Resource).FillInListOpts(&opts) res := listObj.DeepCopyObject() isNamespaceScoped := ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot err := client.Get().NamespaceIfScoped(ip.namespace, isNamespaceScoped).Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Do(ctx).Into(res) @@ -263,6 +268,7 @@ func createStructuredListWatch(gvk schema.GroupVersionKind, ip *specificInformer }, // Setup the watch function WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + ip.selectors.FindByGR(mapping.Resource).FillInListOpts(&opts) // Watch needs to be set to true separately opts.Watch = true isNamespaceScoped := ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot @@ -289,6 +295,7 @@ func createUnstructuredListWatch(gvk schema.GroupVersionKind, ip *specificInform // Create a new ListWatch for the obj return &cache.ListWatch{ ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + ip.selectors.FindByGR(mapping.Resource).FillInListOpts(&opts) if ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot { return dynamicClient.Resource(mapping.Resource).Namespace(ip.namespace).List(ctx, opts) } @@ -296,6 +303,7 @@ func createUnstructuredListWatch(gvk schema.GroupVersionKind, ip *specificInform }, // Setup the watch function WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + ip.selectors.FindByGR(mapping.Resource).FillInListOpts(&opts) // Watch needs to be set to true separately opts.Watch = true if ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot { @@ -327,6 +335,7 @@ func createMetadataListWatch(gvk schema.GroupVersionKind, ip *specificInformersM // create the relevant listwatch return &cache.ListWatch{ ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + ip.selectors.FindByGR(mapping.Resource).FillInListOpts(&opts) if ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot { return client.Resource(mapping.Resource).Namespace(ip.namespace).List(ctx, opts) } @@ -334,6 +343,7 @@ func createMetadataListWatch(gvk schema.GroupVersionKind, ip *specificInformersM }, // Setup the watch function WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + ip.selectors.FindByGR(mapping.Resource).FillInListOpts(&opts) // Watch needs to be set to true separately opts.Watch = true if ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot { diff --git a/pkg/cache/internal/selector.go b/pkg/cache/internal/selector.go new file mode 100644 index 0000000000..5552cd6c93 --- /dev/null +++ b/pkg/cache/internal/selector.go @@ -0,0 +1,32 @@ +package internal + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime/schema" +) + +// SelectorsByGroupResource associate a GroupResource to a field/label selector +type SelectorsByGroupResource map[schema.GroupResource]Selector + +// Selector specify the label/field selector to fill in ListOptions +type Selector struct { + Label labels.Selector + Field fields.Selector +} + +// FillInListOpts fill in ListOptions LabelSelector and FieldSelector if needed +func (s Selector) FillInListOpts(listOpts *metav1.ListOptions) { + if s.Label != nil { + listOpts.LabelSelector = s.Label.String() + } + if s.Field != nil { + listOpts.FieldSelector = s.Field.String() + } +} + +// FindByGR use the GVR group and resource to find the field/label selector +func (s SelectorsByGroupResource) FindByGR(gvr schema.GroupVersionResource) Selector { + return s[schema.GroupResource{Group: gvr.Group, Resource: gvr.Resource}] +}