From b50077c75f0656ce21ae559d0af233b0c0c87a16 Mon Sep 17 00:00:00 2001 From: Steve Kuznetsov Date: Thu, 13 Jul 2023 10:03:10 -0600 Subject: [PATCH] pkg/{cache,client}: add options for cache miss policy This commit allows users to opt out of the "start informers in the background" behavior that the current cache implementation uses. Additionally, when opting out of this behavior, the client can be configured to do a live lookup on a cache miss. The default behaviors are: pkg/cache: backfill data on a miss (today's default, unchanged) pkg/client: live lookup when cache is configured to miss Signed-off-by: Steve Kuznetsov --- Makefile | 1 + pkg/cache/cache.go | 11 +++++ pkg/cache/cache_test.go | 85 +++++++++++++++++++++++++++++++++ pkg/cache/informer_cache.go | 37 ++++++++++++-- pkg/cache/internal/informers.go | 5 +- pkg/client/client.go | 8 +++- pkg/client/client_test.go | 27 +++++++++++ pkg/manager/manager_test.go | 10 ++++ 8 files changed, 176 insertions(+), 8 deletions(-) diff --git a/Makefile b/Makefile index a46c7e1871..007889c5a5 100644 --- a/Makefile +++ b/Makefile @@ -112,6 +112,7 @@ generate: $(CONTROLLER_GEN) ## Runs controller-gen for internal types for config .PHONY: clean clean: ## Cleanup. + $(GOLANGCI_LINT) cache clean $(MAKE) clean-bin .PHONY: clean-bin diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index fb9ed35cb4..0e1aa09800 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -145,6 +145,15 @@ type Options struct { // instead of `reconcile.Result{}`. SyncPeriod *time.Duration + // FailOnUnknownResource configures the cache to return a ErrResourceNotCached error when a user + // requests, using Get() and List(), a resource the cache does not already have an informer for. + // + // This error is distinct from an errors.NotFound. + // + // Defaults to false, which means that the cache will start a new informer + // for every new requested resource. + FailOnUnknownResource bool + // DefaultNamespaces maps namespace names to cache configs. If set, only // the namespaces in here will be watched and it will by used to default // ByObject.Namespaces for all objects if that is nil. @@ -329,6 +338,7 @@ func newCache(restConfig *rest.Config, opts Options) newCacheFunc { Transform: config.Transform, UnsafeDisableDeepCopy: pointer.BoolDeref(config.UnsafeDisableDeepCopy, false), }), + failOnUnknown: opts.FailOnUnknownResource, } } } @@ -410,6 +420,7 @@ func defaultOpts(config *rest.Config, opts Options) (Options, error) { if opts.SyncPeriod == nil { opts.SyncPeriod = &defaultSyncPeriod } + return opts, nil } diff --git a/pkg/cache/cache_test.go b/pkg/cache/cache_test.go index c158bb27f8..1966cfdeaa 100644 --- a/pkg/cache/cache_test.go +++ b/pkg/cache/cache_test.go @@ -18,6 +18,7 @@ package cache_test import ( "context" + "errors" "fmt" "reflect" "sort" @@ -117,6 +118,11 @@ func deletePod(pod client.Object) { var _ = Describe("Informer Cache", func() { CacheTest(cache.New, cache.Options{}) }) + +var _ = Describe("Informer Cache with FailOnUnknownResource", func() { + CacheTestFailOnUnknownResource(cache.New, cache.Options{FailOnUnknownResource: true}) +}) + var _ = Describe("Multi-Namespace Informer Cache", func() { CacheTest(cache.New, cache.Options{ DefaultNamespaces: map[string]cache.Config{ @@ -422,6 +428,85 @@ var _ = Describe("Cache with selectors", func() { }) }) +func CacheTestFailOnUnknownResource(createCacheFunc func(config *rest.Config, opts cache.Options) (cache.Cache, error), opts cache.Options) { + Describe("Cache test with UnknownResourcePolicy = fail", func() { + var ( + informerCache cache.Cache + informerCacheCtx context.Context + informerCacheCancel context.CancelFunc + errNotCached *cache.ErrResourceNotCached + ) + + BeforeEach(func() { + informerCacheCtx, informerCacheCancel = context.WithCancel(context.Background()) + Expect(cfg).NotTo(BeNil()) + + By("creating the informer cache") + var err error + informerCache, err = createCacheFunc(cfg, opts) + Expect(err).NotTo(HaveOccurred()) + By("running the cache and waiting for it to sync") + // pass as an arg so that we don't race between close and re-assign + go func(ctx context.Context) { + defer GinkgoRecover() + Expect(informerCache.Start(ctx)).To(Succeed()) + }(informerCacheCtx) + Expect(informerCache.WaitForCacheSync(informerCacheCtx)).To(BeTrue()) + }) + + AfterEach(func() { + informerCacheCancel() + }) + + Describe("as a Reader", func() { + Context("with structured objects", func() { + It("should not be able to list objects that haven't been watched previously", func() { + By("listing all services in the cluster") + listObj := &corev1.ServiceList{} + Expect(errors.As(informerCache.List(context.Background(), listObj), &errNotCached)).To(BeTrue()) + }) + + It("should not be able to get objects that haven't been watched previously", func() { + By("getting the Kubernetes service") + svc := &corev1.Service{} + svcKey := client.ObjectKey{Namespace: "default", Name: "kubernetes"} + Expect(errors.As(informerCache.Get(context.Background(), svcKey, svc), &errNotCached)).To(BeTrue()) + }) + + It("should be able to list objects that are configured to be watched", func() { + By("indicating that we need to watch services") + _, err := informerCache.GetInformer(context.Background(), &corev1.Service{}) + Expect(err).ToNot(HaveOccurred()) + + By("listing all services in the cluster") + svcList := &corev1.ServiceList{} + Expect(informerCache.List(context.Background(), svcList)).To(Succeed()) + + By("verifying that the returned service looks reasonable") + Expect(svcList.Items).To(HaveLen(1)) + Expect(svcList.Items[0].Name).To(Equal("kubernetes")) + Expect(svcList.Items[0].Namespace).To(Equal("default")) + }) + + It("should be able to get objects that are configured to be watched", func() { + By("indicating that we need to watch services") + _, err := informerCache.GetInformer(context.Background(), &corev1.Service{}) + Expect(err).ToNot(HaveOccurred()) + + By("getting the Kubernetes service") + svc := &corev1.Service{} + svcKey := client.ObjectKey{Namespace: "default", Name: "kubernetes"} + Expect(informerCache.Get(context.Background(), svcKey, svc)).To(Succeed()) + + By("verifying that the returned service looks reasonable") + Expect(svc.Name).To(Equal("kubernetes")) + Expect(svc.Namespace).To(Equal("default")) + }) + }) + }) + }) +} + func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (cache.Cache, error), opts cache.Options) { Describe("Cache test", func() { var ( diff --git a/pkg/cache/informer_cache.go b/pkg/cache/informer_cache.go index 70c9e1eced..fb59cbd097 100644 --- a/pkg/cache/informer_cache.go +++ b/pkg/cache/informer_cache.go @@ -46,11 +46,27 @@ func (*ErrCacheNotStarted) Error() string { return "the cache is not started, can not read objects" } +var _ error = (*ErrCacheNotStarted)(nil) + +// ErrResourceNotCached indicates that the resource type +// the client asked the cache for is not cached. +type ErrResourceNotCached struct { + GVK schema.GroupVersionKind +} + +// Error returns the error +func (r ErrResourceNotCached) Error() string { + return fmt.Sprintf("%s is not cached", r.GVK.String()) +} + +var _ error = (*ErrResourceNotCached)(nil) + // informerCache is a Kubernetes Object cache populated from internal.Informers. // informerCache wraps internal.Informers. type informerCache struct { scheme *runtime.Scheme *internal.Informers + failOnUnknown bool } // Get implements Reader. @@ -60,7 +76,7 @@ func (ic *informerCache) Get(ctx context.Context, key client.ObjectKey, out clie return err } - started, cache, err := ic.Informers.Get(ctx, gvk, out) + started, cache, err := ic.getInformerForKind(ctx, gvk, out) if err != nil { return err } @@ -78,7 +94,7 @@ func (ic *informerCache) List(ctx context.Context, out client.ObjectList, opts . return err } - started, cache, err := ic.Informers.Get(ctx, *gvk, cacheTypeObj) + started, cache, err := ic.getInformerForKind(ctx, *gvk, cacheTypeObj) if err != nil { return err } @@ -124,14 +140,13 @@ func (ic *informerCache) objectTypeForListObject(list client.ObjectList) (*schem return &gvk, cacheTypeObj, nil } -// GetInformerForKind returns the informer for the GroupVersionKind. +// GetInformerForKind returns the informer for the GroupVersionKind. If no informer exists, one will be started. func (ic *informerCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind) (Informer, error) { // Map the gvk to an object obj, err := ic.scheme.New(gvk) if err != nil { return nil, err } - _, i, err := ic.Informers.Get(ctx, gvk, obj) if err != nil { return nil, err @@ -139,7 +154,7 @@ func (ic *informerCache) GetInformerForKind(ctx context.Context, gvk schema.Grou return i.Informer, nil } -// GetInformer returns the informer for the obj. +// GetInformer returns the informer for the obj. If no informer exists, one will be started. func (ic *informerCache) GetInformer(ctx context.Context, obj client.Object) (Informer, error) { gvk, err := apiutil.GVKForObject(obj, ic.scheme) if err != nil { @@ -153,6 +168,18 @@ func (ic *informerCache) GetInformer(ctx context.Context, obj client.Object) (In return i.Informer, nil } +func (ic *informerCache) getInformerForKind(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object) (bool, *internal.Cache, error) { + if ic.failOnUnknown { + cache, started, ok := ic.Informers.Peek(gvk, obj) + if !ok { + return false, nil, &ErrResourceNotCached{GVK: gvk} + } + return started, cache, nil + } + + return ic.Informers.Get(ctx, gvk, obj) +} + // NeedLeaderElection implements the LeaderElectionRunnable interface // to indicate that this can be started without requiring the leader lock. func (ic *informerCache) NeedLeaderElection() bool { diff --git a/pkg/cache/internal/informers.go b/pkg/cache/internal/informers.go index ea72a5bf43..66e5e9ef72 100644 --- a/pkg/cache/internal/informers.go +++ b/pkg/cache/internal/informers.go @@ -230,7 +230,8 @@ func (ip *Informers) WaitForCacheSync(ctx context.Context) bool { return cache.WaitForCacheSync(ctx.Done(), ip.getHasSyncedFuncs()...) } -func (ip *Informers) get(gvk schema.GroupVersionKind, obj runtime.Object) (res *Cache, started bool, ok bool) { +// Peek attempts to get the informer for the GVK, but does not start one if one does not exist. +func (ip *Informers) Peek(gvk schema.GroupVersionKind, obj runtime.Object) (res *Cache, started bool, ok bool) { ip.mu.RLock() defer ip.mu.RUnlock() i, ok := ip.informersByType(obj)[gvk] @@ -241,7 +242,7 @@ func (ip *Informers) get(gvk schema.GroupVersionKind, obj runtime.Object) (res * // the Informer from the map. func (ip *Informers) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object) (bool, *Cache, error) { // Return the informer if it is found - i, started, ok := ip.get(gvk, obj) + i, started, ok := ip.Peek(gvk, obj) if !ok { var err error if i, started, err = ip.addInformerToMap(gvk, obj); err != nil { diff --git a/pkg/client/client.go b/pkg/client/client.go index 0d8b9fbe18..2fb0acb7b3 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -77,10 +77,12 @@ type CacheOptions struct { // Reader is a cache-backed reader that will be used to read objects from the cache. // +required Reader Reader - // DisableFor is a list of objects that should not be read from the cache. + // DisableFor is a list of objects that should never be read from the cache. + // Objects configured here always result in a live lookup. DisableFor []Object // Unstructured is a flag that indicates whether the cache-backed client should // read unstructured objects or lists from the cache. + // If false, unstructured objects will always result in a live lookup. Unstructured bool } @@ -342,9 +344,11 @@ func (c *client) Get(ctx context.Context, key ObjectKey, obj Object, opts ...Get if isUncached, err := c.shouldBypassCache(obj); err != nil { return err } else if !isUncached { + // Attempt to get from the cache. return c.cache.Get(ctx, key, obj, opts...) } + // Perform a live lookup. switch obj.(type) { case runtime.Unstructured: return c.unstructuredClient.Get(ctx, key, obj, opts...) @@ -362,9 +366,11 @@ func (c *client) List(ctx context.Context, obj ObjectList, opts ...ListOption) e if isUncached, err := c.shouldBypassCache(obj); err != nil { return err } else if !isUncached { + // Attempt to get from the cache. return c.cache.List(ctx, obj, opts...) } + // Perform a live lookup. switch x := obj.(type) { case runtime.Unstructured: return c.unstructuredClient.List(ctx, obj, opts...) diff --git a/pkg/client/client_test.go b/pkg/client/client_test.go index d8b3995050..177f6de798 100644 --- a/pkg/client/client_test.go +++ b/pkg/client/client_test.go @@ -19,6 +19,7 @@ package client_test import ( "context" "encoding/json" + "errors" "fmt" "reflect" "sync/atomic" @@ -41,6 +42,7 @@ import ( "k8s.io/apimachinery/pkg/types" kscheme "k8s.io/client-go/kubernetes/scheme" "k8s.io/utils/pointer" + cacheerrors "sigs.k8s.io/controller-runtime/pkg/cache/errors" "sigs.k8s.io/controller-runtime/examples/crd/pkg" "sigs.k8s.io/controller-runtime/pkg/client" @@ -143,6 +145,7 @@ var _ = Describe("Client", func() { var count uint64 = 0 var replicaCount int32 = 2 var ns = "default" + var errNotCached *cacheerrors.ErrResourceNotCached ctx := context.TODO() BeforeEach(func() { @@ -278,6 +281,16 @@ U5wwSivyi7vmegHKmblOzNVKA5qPO8zWzqBC Expect(cache.Called).To(Equal(2)) }) + It("should propagate cache unknown resources errors", func() { + c := &fakeUncachedReader{} + cl, err := client.New(cfg, client.Options{Cache: &client.CacheOptions{Reader: c}}) + Expect(err).NotTo(HaveOccurred()) + Expect(cl).NotTo(BeNil()) + Expect(errors.As(cl.Get(ctx, client.ObjectKey{Name: "test"}, &appsv1.Deployment{}), &errNotCached)).To(BeTrue()) + Expect(errors.As(cl.List(ctx, &appsv1.DeploymentList{}), &errNotCached)).To(BeTrue()) + Expect(c.Called).To(Equal(2)) + }) + It("should not use the provided reader cache if provided, on get and list for uncached GVKs", func() { cache := &fakeReader{} cl, err := client.New(cfg, client.Options{Cache: &client.CacheOptions{Reader: cache, DisableFor: []client.Object{&corev1.Namespace{}}}}) @@ -3938,6 +3951,20 @@ func (f *fakeReader) List(ctx context.Context, list client.ObjectList, opts ...c return nil } +type fakeUncachedReader struct { + Called int +} + +func (f *fakeUncachedReader) Get(ctx context.Context, key client.ObjectKey, obj client.Object, opts ...client.GetOption) error { + f.Called++ + return &cacheerrors.ErrResourceNotCached{} +} + +func (f *fakeUncachedReader) List(ctx context.Context, list client.ObjectList, opts ...client.ListOption) error { + f.Called++ + return &cacheerrors.ErrResourceNotCached{} +} + func ptr[T any](to T) *T { return &to } diff --git a/pkg/manager/manager_test.go b/pkg/manager/manager_test.go index 49ed0b4033..0663608bc0 100644 --- a/pkg/manager/manager_test.go +++ b/pkg/manager/manager_test.go @@ -600,6 +600,16 @@ var _ = Describe("manger.Manager", func() { cancel() }) + It("should should be able to create a manager with a cache to fail on unknown resources", func() { + m, err := New(cfg, Options{ + Cache: cache.Options{ + FailOnUnknownResource: true, + }, + }) + Expect(m).ToNot(BeNil()) + Expect(err).ToNot(HaveOccurred()) + }) + It("should return an error if the metrics bind address is already in use", func() { ln, err := net.Listen("tcp", ":0") //nolint:gosec Expect(err).ShouldNot(HaveOccurred())