diff --git a/Makefile b/Makefile index a46c7e1871..007889c5a5 100644 --- a/Makefile +++ b/Makefile @@ -112,6 +112,7 @@ generate: $(CONTROLLER_GEN) ## Runs controller-gen for internal types for config .PHONY: clean clean: ## Cleanup. + $(GOLANGCI_LINT) cache clean $(MAKE) clean-bin .PHONY: clean-bin diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index fb9ed35cb4..a054576ee4 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -145,6 +145,15 @@ type Options struct { // instead of `reconcile.Result{}`. SyncPeriod *time.Duration + // FailOnUnknownResource configures the cache to return a ErrResourceNotCached error when a user + // requests, using Get() and List(), a resource the cache does not already have an informer for. + // + // This error is distinct from an errors.NotFound. + // + // Defaults to false, which means that the cache will start a new informer + // for every new requested resource. + FailOnUnknownResource bool + // DefaultNamespaces maps namespace names to cache configs. If set, only // the namespaces in here will be watched and it will by used to default // ByObject.Namespaces for all objects if that is nil. @@ -329,6 +338,7 @@ func newCache(restConfig *rest.Config, opts Options) newCacheFunc { Transform: config.Transform, UnsafeDisableDeepCopy: pointer.BoolDeref(config.UnsafeDisableDeepCopy, false), }), + failOnUnknown: opts.FailOnUnknownResource, } } } diff --git a/pkg/cache/cache_test.go b/pkg/cache/cache_test.go index c158bb27f8..0a577b71b3 100644 --- a/pkg/cache/cache_test.go +++ b/pkg/cache/cache_test.go @@ -18,6 +18,7 @@ package cache_test import ( "context" + "errors" "fmt" "reflect" "sort" @@ -117,6 +118,11 @@ func deletePod(pod client.Object) { var _ = Describe("Informer Cache", func() { CacheTest(cache.New, cache.Options{}) }) + +var _ = Describe("Informer Cache with FailOnUnknownResource", func() { + CacheTestFailOnUnknownResource(cache.New, cache.Options{FailOnUnknownResource: true}) +}) + var _ = Describe("Multi-Namespace Informer Cache", func() { CacheTest(cache.New, cache.Options{ DefaultNamespaces: map[string]cache.Config{ @@ -422,6 +428,85 @@ var _ = Describe("Cache with selectors", func() { }) }) +func CacheTestFailOnUnknownResource(createCacheFunc func(config *rest.Config, opts cache.Options) (cache.Cache, error), opts cache.Options) { + Describe("Cache test with FailOnUnknownResource = true", func() { + var ( + informerCache cache.Cache + informerCacheCtx context.Context + informerCacheCancel context.CancelFunc + errNotCached *cache.ErrResourceNotCached + ) + + BeforeEach(func() { + informerCacheCtx, informerCacheCancel = context.WithCancel(context.Background()) + Expect(cfg).NotTo(BeNil()) + + By("creating the informer cache") + var err error + informerCache, err = createCacheFunc(cfg, opts) + Expect(err).NotTo(HaveOccurred()) + By("running the cache and waiting for it to sync") + // pass as an arg so that we don't race between close and re-assign + go func(ctx context.Context) { + defer GinkgoRecover() + Expect(informerCache.Start(ctx)).To(Succeed()) + }(informerCacheCtx) + Expect(informerCache.WaitForCacheSync(informerCacheCtx)).To(BeTrue()) + }) + + AfterEach(func() { + informerCacheCancel() + }) + + Describe("as a Reader", func() { + Context("with structured objects", func() { + It("should not be able to list objects that haven't been watched previously", func() { + By("listing all services in the cluster") + listObj := &corev1.ServiceList{} + Expect(errors.As(informerCache.List(context.Background(), listObj), &errNotCached)).To(BeTrue()) + }) + + It("should not be able to get objects that haven't been watched previously", func() { + By("getting the Kubernetes service") + svc := &corev1.Service{} + svcKey := client.ObjectKey{Namespace: "default", Name: "kubernetes"} + Expect(errors.As(informerCache.Get(context.Background(), svcKey, svc), &errNotCached)).To(BeTrue()) + }) + + It("should be able to list objects that are configured to be watched", func() { + By("indicating that we need to watch services") + _, err := informerCache.GetInformer(context.Background(), &corev1.Service{}) + Expect(err).ToNot(HaveOccurred()) + + By("listing all services in the cluster") + svcList := &corev1.ServiceList{} + Expect(informerCache.List(context.Background(), svcList)).To(Succeed()) + + By("verifying that the returned service looks reasonable") + Expect(svcList.Items).To(HaveLen(1)) + Expect(svcList.Items[0].Name).To(Equal("kubernetes")) + Expect(svcList.Items[0].Namespace).To(Equal("default")) + }) + + It("should be able to get objects that are configured to be watched", func() { + By("indicating that we need to watch services") + _, err := informerCache.GetInformer(context.Background(), &corev1.Service{}) + Expect(err).ToNot(HaveOccurred()) + + By("getting the Kubernetes service") + svc := &corev1.Service{} + svcKey := client.ObjectKey{Namespace: "default", Name: "kubernetes"} + Expect(informerCache.Get(context.Background(), svcKey, svc)).To(Succeed()) + + By("verifying that the returned service looks reasonable") + Expect(svc.Name).To(Equal("kubernetes")) + Expect(svc.Namespace).To(Equal("default")) + }) + }) + }) + }) +} + func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (cache.Cache, error), opts cache.Options) { Describe("Cache test", func() { var ( diff --git a/pkg/cache/informer_cache.go b/pkg/cache/informer_cache.go index 70c9e1eced..a49222555f 100644 --- a/pkg/cache/informer_cache.go +++ b/pkg/cache/informer_cache.go @@ -46,11 +46,27 @@ func (*ErrCacheNotStarted) Error() string { return "the cache is not started, can not read objects" } +var _ error = (*ErrCacheNotStarted)(nil) + +// ErrResourceNotCached indicates that the resource type +// the client asked the cache for is not cached. +type ErrResourceNotCached struct { + GVK schema.GroupVersionKind +} + +// Error returns the error +func (r ErrResourceNotCached) Error() string { + return fmt.Sprintf("%s is not cached", r.GVK.String()) +} + +var _ error = (*ErrResourceNotCached)(nil) + // informerCache is a Kubernetes Object cache populated from internal.Informers. // informerCache wraps internal.Informers. type informerCache struct { scheme *runtime.Scheme *internal.Informers + failOnUnknown bool } // Get implements Reader. @@ -60,7 +76,7 @@ func (ic *informerCache) Get(ctx context.Context, key client.ObjectKey, out clie return err } - started, cache, err := ic.Informers.Get(ctx, gvk, out) + started, cache, err := ic.getInformerForKind(ctx, gvk, out) if err != nil { return err } @@ -78,7 +94,7 @@ func (ic *informerCache) List(ctx context.Context, out client.ObjectList, opts . return err } - started, cache, err := ic.Informers.Get(ctx, *gvk, cacheTypeObj) + started, cache, err := ic.getInformerForKind(ctx, *gvk, cacheTypeObj) if err != nil { return err } @@ -124,7 +140,7 @@ func (ic *informerCache) objectTypeForListObject(list client.ObjectList) (*schem return &gvk, cacheTypeObj, nil } -// GetInformerForKind returns the informer for the GroupVersionKind. +// GetInformerForKind returns the informer for the GroupVersionKind. If no informer exists, one will be started. func (ic *informerCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind) (Informer, error) { // Map the gvk to an object obj, err := ic.scheme.New(gvk) @@ -139,7 +155,7 @@ func (ic *informerCache) GetInformerForKind(ctx context.Context, gvk schema.Grou return i.Informer, nil } -// GetInformer returns the informer for the obj. +// GetInformer returns the informer for the obj. If no informer exists, one will be started. func (ic *informerCache) GetInformer(ctx context.Context, obj client.Object) (Informer, error) { gvk, err := apiutil.GVKForObject(obj, ic.scheme) if err != nil { @@ -153,6 +169,18 @@ func (ic *informerCache) GetInformer(ctx context.Context, obj client.Object) (In return i.Informer, nil } +func (ic *informerCache) getInformerForKind(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object) (bool, *internal.Cache, error) { + if ic.failOnUnknown { + cache, started, ok := ic.Informers.Peek(gvk, obj) + if !ok { + return false, nil, &ErrResourceNotCached{GVK: gvk} + } + return started, cache, nil + } + + return ic.Informers.Get(ctx, gvk, obj) +} + // NeedLeaderElection implements the LeaderElectionRunnable interface // to indicate that this can be started without requiring the leader lock. func (ic *informerCache) NeedLeaderElection() bool { diff --git a/pkg/cache/internal/informers.go b/pkg/cache/internal/informers.go index ea72a5bf43..66e5e9ef72 100644 --- a/pkg/cache/internal/informers.go +++ b/pkg/cache/internal/informers.go @@ -230,7 +230,8 @@ func (ip *Informers) WaitForCacheSync(ctx context.Context) bool { return cache.WaitForCacheSync(ctx.Done(), ip.getHasSyncedFuncs()...) } -func (ip *Informers) get(gvk schema.GroupVersionKind, obj runtime.Object) (res *Cache, started bool, ok bool) { +// Peek attempts to get the informer for the GVK, but does not start one if one does not exist. +func (ip *Informers) Peek(gvk schema.GroupVersionKind, obj runtime.Object) (res *Cache, started bool, ok bool) { ip.mu.RLock() defer ip.mu.RUnlock() i, ok := ip.informersByType(obj)[gvk] @@ -241,7 +242,7 @@ func (ip *Informers) get(gvk schema.GroupVersionKind, obj runtime.Object) (res * // the Informer from the map. func (ip *Informers) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object) (bool, *Cache, error) { // Return the informer if it is found - i, started, ok := ip.get(gvk, obj) + i, started, ok := ip.Peek(gvk, obj) if !ok { var err error if i, started, err = ip.addInformerToMap(gvk, obj); err != nil { diff --git a/pkg/client/client.go b/pkg/client/client.go index 0d8b9fbe18..2fb0acb7b3 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -77,10 +77,12 @@ type CacheOptions struct { // Reader is a cache-backed reader that will be used to read objects from the cache. // +required Reader Reader - // DisableFor is a list of objects that should not be read from the cache. + // DisableFor is a list of objects that should never be read from the cache. + // Objects configured here always result in a live lookup. DisableFor []Object // Unstructured is a flag that indicates whether the cache-backed client should // read unstructured objects or lists from the cache. + // If false, unstructured objects will always result in a live lookup. Unstructured bool } @@ -342,9 +344,11 @@ func (c *client) Get(ctx context.Context, key ObjectKey, obj Object, opts ...Get if isUncached, err := c.shouldBypassCache(obj); err != nil { return err } else if !isUncached { + // Attempt to get from the cache. return c.cache.Get(ctx, key, obj, opts...) } + // Perform a live lookup. switch obj.(type) { case runtime.Unstructured: return c.unstructuredClient.Get(ctx, key, obj, opts...) @@ -362,9 +366,11 @@ func (c *client) List(ctx context.Context, obj ObjectList, opts ...ListOption) e if isUncached, err := c.shouldBypassCache(obj); err != nil { return err } else if !isUncached { + // Attempt to get from the cache. return c.cache.List(ctx, obj, opts...) } + // Perform a live lookup. switch x := obj.(type) { case runtime.Unstructured: return c.unstructuredClient.List(ctx, obj, opts...) diff --git a/pkg/client/client_test.go b/pkg/client/client_test.go index d8b3995050..d5b3de0bc3 100644 --- a/pkg/client/client_test.go +++ b/pkg/client/client_test.go @@ -19,6 +19,7 @@ package client_test import ( "context" "encoding/json" + "errors" "fmt" "reflect" "sync/atomic" @@ -43,6 +44,7 @@ import ( "k8s.io/utils/pointer" "sigs.k8s.io/controller-runtime/examples/crd/pkg" + "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -143,6 +145,7 @@ var _ = Describe("Client", func() { var count uint64 = 0 var replicaCount int32 = 2 var ns = "default" + var errNotCached *cache.ErrResourceNotCached ctx := context.TODO() BeforeEach(func() { @@ -278,6 +281,16 @@ U5wwSivyi7vmegHKmblOzNVKA5qPO8zWzqBC Expect(cache.Called).To(Equal(2)) }) + It("should propagate cache unknown resources errors", func() { + c := &fakeUncachedReader{} + cl, err := client.New(cfg, client.Options{Cache: &client.CacheOptions{Reader: c}}) + Expect(err).NotTo(HaveOccurred()) + Expect(cl).NotTo(BeNil()) + Expect(errors.As(cl.Get(ctx, client.ObjectKey{Name: "test"}, &appsv1.Deployment{}), &errNotCached)).To(BeTrue()) + Expect(errors.As(cl.List(ctx, &appsv1.DeploymentList{}), &errNotCached)).To(BeTrue()) + Expect(c.Called).To(Equal(2)) + }) + It("should not use the provided reader cache if provided, on get and list for uncached GVKs", func() { cache := &fakeReader{} cl, err := client.New(cfg, client.Options{Cache: &client.CacheOptions{Reader: cache, DisableFor: []client.Object{&corev1.Namespace{}}}}) @@ -3938,6 +3951,20 @@ func (f *fakeReader) List(ctx context.Context, list client.ObjectList, opts ...c return nil } +type fakeUncachedReader struct { + Called int +} + +func (f *fakeUncachedReader) Get(_ context.Context, _ client.ObjectKey, _ client.Object, opts ...client.GetOption) error { + f.Called++ + return &cache.ErrResourceNotCached{} +} + +func (f *fakeUncachedReader) List(_ context.Context, _ client.ObjectList, _ ...client.ListOption) error { + f.Called++ + return &cache.ErrResourceNotCached{} +} + func ptr[T any](to T) *T { return &to } diff --git a/pkg/manager/manager_test.go b/pkg/manager/manager_test.go index 49ed0b4033..0663608bc0 100644 --- a/pkg/manager/manager_test.go +++ b/pkg/manager/manager_test.go @@ -600,6 +600,16 @@ var _ = Describe("manger.Manager", func() { cancel() }) + It("should should be able to create a manager with a cache to fail on unknown resources", func() { + m, err := New(cfg, Options{ + Cache: cache.Options{ + FailOnUnknownResource: true, + }, + }) + Expect(m).ToNot(BeNil()) + Expect(err).ToNot(HaveOccurred()) + }) + It("should return an error if the metrics bind address is already in use", func() { ln, err := net.Listen("tcp", ":0") //nolint:gosec Expect(err).ShouldNot(HaveOccurred())