Skip to content

Commit

Permalink
Add FieldSelectorByResource option to cache
Browse files Browse the repository at this point in the history
All instance for a same resources are being cached by
controller-runtime, for some use cases this consumes a lot of memory and
CPU. This change add a option to the cache so resources can be selected
by field and label.

Signed-off-by: Quique Llorente <ellorent@redhat.com>
  • Loading branch information
qinqon committed Apr 5, 2021
1 parent 10ae090 commit 48fe9c7
Show file tree
Hide file tree
Showing 5 changed files with 153 additions and 11 deletions.
22 changes: 21 additions & 1 deletion pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ type Informer interface {
HasSynced() bool
}

// SelectorsByGroupResource associate a GroupResource to a field/label selector
type SelectorsByGroupResource internal.SelectorsByGroupResource

// Options are the optional arguments for creating a new InformersMap object
type Options struct {
// Scheme is the scheme to use for mapping objects to GroupVersionKinds
Expand All @@ -103,6 +106,13 @@ type Options struct {
// Namespace restricts the cache's ListWatch to the desired namespace
// Default watches all namespaces
Namespace string

// FieldSelectorByResource restricts the cache's ListWatch to the desired
// fields per resource, the map's value must implement Selector [1] using
// for example a Set [2]
// [1] https://pkg.go.dev/k8s.io/apimachinery/pkg/fields#Selector
// [2] https://pkg.go.dev/k8s.io/apimachinery/pkg/fields#Set
SelectorByResource SelectorsByGroupResource
}

var defaultResyncTime = 10 * time.Hour
Expand All @@ -113,10 +123,20 @@ func New(config *rest.Config, opts Options) (Cache, error) {
if err != nil {
return nil, err
}
im := internal.NewInformersMap(config, opts.Scheme, opts.Mapper, *opts.Resync, opts.Namespace)
im := internal.NewInformersMap(config, opts.Scheme, opts.Mapper, *opts.Resync, opts.Namespace, internal.SelectorsByGroupResource(opts.SelectorByResource))
return &informerCache{InformersMap: im}, nil
}

// BuilderWithSelectors returns a Cache constructor that will filter
// contents using fieldSelectorByResource
// WARNING: filtered out resources are not returned.
func BuilderWithSelectors(selectors SelectorsByGroupResource) NewCacheFunc {
return func(config *rest.Config, opts Options) (Cache, error) {
opts.SelectorByResource = selectors
return New(config, opts)
}
}

func defaultOpts(config *rest.Config, opts Options) (Options, error) {
// Use the default Kubernetes Scheme if unset
if opts.Scheme == nil {
Expand Down
75 changes: 75 additions & 0 deletions pkg/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
kmetav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime/schema"
kscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
Expand Down Expand Up @@ -789,7 +790,81 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
Eventually(out).Should(Receive(Equal(pod)))
close(done)
})
It("should be able to filter informers at list watch level by field", func() {
By("creating the cache")
builder := cache.BuilderWithSelectors(
cache.SelectorsByGroupResource{
{Group: "", Resource: "pods"}: {
Field: fields.SelectorFromSet(fields.Set{"metadata.name": "foo"}),
},
},
)
informer, err := builder(cfg, cache.Options{})
Expect(err).NotTo(HaveOccurred())

By("running the cache and waiting for it to sync")
go func() {
defer GinkgoRecover()
Expect(informer.Start(informerCacheCtx)).To(Succeed())
}()
Expect(informer.WaitForCacheSync(informerCacheCtx)).NotTo(BeFalse())

gvk := schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Pod"}
sii, err := informer.GetInformerForKind(context.TODO(), gvk)
Expect(err).NotTo(HaveOccurred())
Expect(sii).NotTo(BeNil())
Expect(sii.HasSynced()).To(BeTrue())

By("adding an event handler listening for object creation which sends the object to a channel")
out := make(chan interface{})
addFunc := func(obj interface{}) {
out <- obj
}
sii.AddEventHandler(kcache.ResourceEventHandlerFuncs{AddFunc: addFunc})

By("adding a pair of objects")
cl, err := client.New(cfg, client.Options{})
Expect(err).NotTo(HaveOccurred())
podFoo := &kcorev1.Pod{
ObjectMeta: kmetav1.ObjectMeta{
Name: "foo",
Namespace: "default",
},
Spec: kcorev1.PodSpec{
Containers: []kcorev1.Container{
{
Name: "nginx",
Image: "nginx",
},
},
},
}
Expect(cl.Create(context.Background(), podFoo)).To(Succeed())
defer deletePod(podFoo)

podBar := &kcorev1.Pod{
ObjectMeta: kmetav1.ObjectMeta{
Name: "bar",
Namespace: "default",
},
Spec: kcorev1.PodSpec{
Containers: []kcorev1.Container{
{
Name: "nginx",
Image: "nginx",
},
},
},
}
Expect(cl.Create(context.Background(), podBar)).To(Succeed())
defer deletePod(podBar)

By("verifying the filter out object is not received on the channel")
var obtainedObj interface{}
Expect(out).Should(Receive(&obtainedObj), "should receive something")
Expect(obtainedObj).Should(Equal(podFoo), "should receive the pod 'foo'")
Consistently(out).ShouldNot(Receive(), "should not receive anything else")
})
It("should be able to index an object field then retrieve objects by that field", func() {
By("creating the cache")
informer, err := cache.New(cfg, cache.Options{})
Expand Down
25 changes: 15 additions & 10 deletions pkg/cache/internal/deleg_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,14 @@ func NewInformersMap(config *rest.Config,
scheme *runtime.Scheme,
mapper meta.RESTMapper,
resync time.Duration,
namespace string) *InformersMap {
namespace string,
selectors SelectorsByGroupResource,
) *InformersMap {

return &InformersMap{
structured: newStructuredInformersMap(config, scheme, mapper, resync, namespace),
unstructured: newUnstructuredInformersMap(config, scheme, mapper, resync, namespace),
metadata: newMetadataInformersMap(config, scheme, mapper, resync, namespace),
structured: newStructuredInformersMap(config, scheme, mapper, resync, namespace, selectors),
unstructured: newUnstructuredInformersMap(config, scheme, mapper, resync, namespace, selectors),
metadata: newMetadataInformersMap(config, scheme, mapper, resync, namespace, selectors),

Scheme: scheme,
}
Expand Down Expand Up @@ -105,16 +107,19 @@ func (m *InformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj
}

// newStructuredInformersMap creates a new InformersMap for structured objects.
func newStructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, namespace string) *specificInformersMap {
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, createStructuredListWatch)
func newStructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration,
namespace string, selectors SelectorsByGroupResource) *specificInformersMap {
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, selectors, createStructuredListWatch)
}

// newUnstructuredInformersMap creates a new InformersMap for unstructured objects.
func newUnstructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, namespace string) *specificInformersMap {
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, createUnstructuredListWatch)
func newUnstructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration,
namespace string, selectors SelectorsByGroupResource) *specificInformersMap {
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, selectors, createUnstructuredListWatch)
}

// newMetadataInformersMap creates a new InformersMap for metadata-only objects.
func newMetadataInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, namespace string) *specificInformersMap {
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, createMetadataListWatch)
func newMetadataInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration,
namespace string, selectors SelectorsByGroupResource) *specificInformersMap {
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, selectors, createMetadataListWatch)
}
10 changes: 10 additions & 0 deletions pkg/cache/internal/informers_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func newSpecificInformersMap(config *rest.Config,
mapper meta.RESTMapper,
resync time.Duration,
namespace string,
selectors SelectorsByGroupResource,
createListWatcher createListWatcherFunc) *specificInformersMap {
ip := &specificInformersMap{
config: config,
Expand All @@ -60,6 +61,7 @@ func newSpecificInformersMap(config *rest.Config,
startWait: make(chan struct{}),
createListWatcher: createListWatcher,
namespace: namespace,
selectors: selectors,
}
return ip
}
Expand Down Expand Up @@ -120,6 +122,8 @@ type specificInformersMap struct {
// namespace is the namespace that all ListWatches are restricted to
// default or empty string means all namespaces
namespace string

selectors SelectorsByGroupResource
}

// Start calls Run on each of the informers and sets started to true. Blocks on the context.
Expand Down Expand Up @@ -256,13 +260,15 @@ func createStructuredListWatch(gvk schema.GroupVersionKind, ip *specificInformer
// Create a new ListWatch for the obj
return &cache.ListWatch{
ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
ip.selectors.FindByGR(mapping.Resource).FillInListOpts(&opts)
res := listObj.DeepCopyObject()
isNamespaceScoped := ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot
err := client.Get().NamespaceIfScoped(ip.namespace, isNamespaceScoped).Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Do(ctx).Into(res)
return res, err
},
// Setup the watch function
WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
ip.selectors.FindByGR(mapping.Resource).FillInListOpts(&opts)
// Watch needs to be set to true separately
opts.Watch = true
isNamespaceScoped := ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot
Expand All @@ -289,13 +295,15 @@ func createUnstructuredListWatch(gvk schema.GroupVersionKind, ip *specificInform
// Create a new ListWatch for the obj
return &cache.ListWatch{
ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
ip.selectors.FindByGR(mapping.Resource).FillInListOpts(&opts)
if ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot {
return dynamicClient.Resource(mapping.Resource).Namespace(ip.namespace).List(ctx, opts)
}
return dynamicClient.Resource(mapping.Resource).List(ctx, opts)
},
// Setup the watch function
WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
ip.selectors.FindByGR(mapping.Resource).FillInListOpts(&opts)
// Watch needs to be set to true separately
opts.Watch = true
if ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot {
Expand Down Expand Up @@ -327,13 +335,15 @@ func createMetadataListWatch(gvk schema.GroupVersionKind, ip *specificInformersM
// create the relevant listwatch
return &cache.ListWatch{
ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
ip.selectors.FindByGR(mapping.Resource).FillInListOpts(&opts)
if ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot {
return client.Resource(mapping.Resource).Namespace(ip.namespace).List(ctx, opts)
}
return client.Resource(mapping.Resource).List(ctx, opts)
},
// Setup the watch function
WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
ip.selectors.FindByGR(mapping.Resource).FillInListOpts(&opts)
// Watch needs to be set to true separately
opts.Watch = true
if ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot {
Expand Down
32 changes: 32 additions & 0 deletions pkg/cache/internal/selector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package internal

import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
)

// SelectorsByGroupResource associate a GroupResource to a field/label selector
type SelectorsByGroupResource map[schema.GroupResource]Selector

// Selector specify the label/field selector to fill in ListOptions
type Selector struct {
Label labels.Selector
Field fields.Selector
}

// FillInListOpts fill in ListOptions LabelSelector and FieldSelector if needed
func (s Selector) FillInListOpts(listOpts *metav1.ListOptions) {
if s.Label != nil {
listOpts.LabelSelector = s.Label.String()
}
if s.Field != nil {
listOpts.FieldSelector = s.Field.String()
}
}

// FindByGR use the GVR group and resource to find the field/label selector
func (s SelectorsByGroupResource) FindByGR(gvr schema.GroupVersionResource) Selector {
return s[schema.GroupResource{Group: gvr.Group, Resource: gvr.Resource}]
}

0 comments on commit 48fe9c7

Please sign in to comment.