diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index f01de43810..ccb02b3a80 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -166,8 +166,31 @@ type Options struct { // // This is a global setting for all objects, and can be overridden by the ByObject setting. UnsafeDisableDeepCopy *bool + + // UnknownResourcePolicy determines how the cache should behave when no informers are started for the + // resource type a client is asking for in a Get or a List. See the UnknownResourcePolicies + // for documentation for each policy. The default policy is UnknownResourcePolicyBackfill. + UnknownResourcePolicy UnknownResourcePolicy } +// UnknownResourcePolicy determines how the cache should behave when no informers are started for the +// resource type a client is asking for in a Get or a List. +type UnknownResourcePolicy string + +const ( + // UnknownResourcePolicyBackfill configures the cache to spin up an informer for a resource when + // the first request for that resource comes in. This means that a Get or a List may take + // longer than normal to succeed on the first invocation as the cache waits until it's fully + // back-filled. + // This is the default policy. + UnknownResourcePolicyBackfill UnknownResourcePolicy = "backfill" + + // UnknownResourcePolicyFail configures the cache to return a ErrResourceNotCached error when a user + // requests a resource the cache is not configured to hold. This error is distinct from an + // errors.NotFound. + UnknownResourcePolicyFail UnknownResourcePolicy = "fail" +) + // ByObject offers more fine-grained control over the cache's ListWatch by object. type ByObject struct { // Label represents a label selector for the object. @@ -232,6 +255,7 @@ func New(config *rest.Config, opts Options) (Cache, error) { Namespace: opts.Namespaces[0], ByGVK: byGVK, }), + missPolicy: opts.UnknownResourcePolicy, }, nil } @@ -267,6 +291,11 @@ func defaultOpts(config *rest.Config, opts Options) (Options, error) { if opts.SyncPeriod == nil { opts.SyncPeriod = &defaultSyncPeriod } + + // Backfill by default + if opts.UnknownResourcePolicy == "" { + opts.UnknownResourcePolicy = UnknownResourcePolicyBackfill + } return opts, nil } diff --git a/pkg/cache/cache_test.go b/pkg/cache/cache_test.go index 100825854a..65ada13af8 100644 --- a/pkg/cache/cache_test.go +++ b/pkg/cache/cache_test.go @@ -18,6 +18,7 @@ package cache_test import ( "context" + "errors" "fmt" "reflect" "sort" @@ -39,6 +40,7 @@ import ( "k8s.io/client-go/rest" kcache "k8s.io/client-go/tools/cache" "k8s.io/utils/pointer" + cacheerrors "sigs.k8s.io/controller-runtime/pkg/cache/errors" "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" @@ -117,6 +119,11 @@ func deletePod(pod client.Object) { var _ = Describe("Informer Cache", func() { CacheTest(cache.New, cache.Options{}) }) + +var _ = Describe("Informer Cache with miss policy = fail", func() { + CacheTestMissPolicyFail(cache.New, cache.Options{UnknownResourcePolicy: cache.UnknownResourcePolicyFail}) +}) + var _ = Describe("Multi-Namespace Informer Cache", func() { CacheTest(cache.MultiNamespacedCacheBuilder([]string{testNamespaceOne, testNamespaceTwo, "default"}), cache.Options{}) }) @@ -414,6 +421,71 @@ var _ = Describe("Cache with selectors", func() { }) }) +func CacheTestMissPolicyFail(createCacheFunc func(config *rest.Config, opts cache.Options) (cache.Cache, error), opts cache.Options) { + Describe("Cache test with UnknownResourcePolicy = fail", func() { + var ( + informerCache cache.Cache + informerCacheCtx context.Context + informerCacheCancel context.CancelFunc + errNotCached *cacheerrors.ErrResourceNotCached + ) + + BeforeEach(func() { + informerCacheCtx, informerCacheCancel = context.WithCancel(context.Background()) + Expect(cfg).NotTo(BeNil()) + + By("creating the informer cache") + var err error + informerCache, err = createCacheFunc(cfg, opts) + Expect(err).NotTo(HaveOccurred()) + By("running the cache and waiting for it to sync") + // pass as an arg so that we don't race between close and re-assign + go func(ctx context.Context) { + defer GinkgoRecover() + Expect(informerCache.Start(ctx)).To(Succeed()) + }(informerCacheCtx) + Expect(informerCache.WaitForCacheSync(informerCacheCtx)).To(BeTrue()) + }) + + AfterEach(func() { + informerCacheCancel() + }) + + Describe("as a Reader", func() { + Context("with structured objects", func() { + It("should not be able to list objects that haven't been watched previously", func() { + By("listing all services in the cluster") + listObj := &corev1.ServiceList{} + fmt.Printf("%#v\n", informerCache.List(context.Background(), listObj)) + Expect(errors.As(informerCache.List(context.Background(), listObj), &errNotCached)).To(BeTrue()) + }) + + It("should not be able to get objects that haven't been watched previously", func() { + By("getting the Kubernetes service") + svc := &corev1.Service{} + svcKey := client.ObjectKey{Namespace: "default", Name: "kubernetes"} + Expect(errors.As(informerCache.Get(context.Background(), svcKey, svc), &errNotCached)).To(BeTrue()) + }) + + It("should be able to get objects that are configured to be watched", func() { + By("indicating that we need to watch services") + _, err := informerCache.GetInformer(context.Background(), &corev1.Service{}) + Expect(err).ToNot(HaveOccurred()) + + By("getting the Kubernetes service") + svc := &corev1.Service{} + svcKey := client.ObjectKey{Namespace: "default", Name: "kubernetes"} + Expect(informerCache.Get(context.Background(), svcKey, svc)).To(Succeed()) + + By("verifying that the returned service looks reasonable") + Expect(svc.Name).To(Equal("kubernetes")) + Expect(svc.Namespace).To(Equal("default")) + }) + }) + }) + }) +} + func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (cache.Cache, error), opts cache.Options) { Describe("Cache test", func() { var ( diff --git a/pkg/cache/errors/errors.go b/pkg/cache/errors/errors.go new file mode 100644 index 0000000000..2519d014a2 --- /dev/null +++ b/pkg/cache/errors/errors.go @@ -0,0 +1,20 @@ +package errors + +import ( + "fmt" + + "k8s.io/apimachinery/pkg/runtime/schema" +) + +// ErrResourceNotCached indicates that the resource type the client asked the cache for is not cached, and +// the cache is configured to fail on a cache miss. +type ErrResourceNotCached struct { + GVK schema.GroupVersionKind +} + +// Error returns the error +func (r ErrResourceNotCached) Error() string { + return fmt.Sprintf("%s is not cached and the cache is configured to fail on miss", r.GVK.String()) +} + +var _ error = (*ErrResourceNotCached)(nil) diff --git a/pkg/cache/informer_cache.go b/pkg/cache/informer_cache.go index 771244d52a..cf3b673831 100644 --- a/pkg/cache/informer_cache.go +++ b/pkg/cache/informer_cache.go @@ -27,6 +27,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/tools/cache" + "sigs.k8s.io/controller-runtime/pkg/cache/errors" "sigs.k8s.io/controller-runtime/pkg/cache/internal" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/apiutil" @@ -50,6 +51,7 @@ func (*ErrCacheNotStarted) Error() string { type informerCache struct { scheme *runtime.Scheme *internal.Informers + missPolicy UnknownResourcePolicy } // Get implements Reader. @@ -59,7 +61,7 @@ func (ic *informerCache) Get(ctx context.Context, key client.ObjectKey, out clie return err } - started, cache, err := ic.Informers.Get(ctx, gvk, out) + started, cache, err := ic.getInformerForKind(ctx, gvk, out) if err != nil { return err } @@ -77,7 +79,7 @@ func (ic *informerCache) List(ctx context.Context, out client.ObjectList, opts . return err } - started, cache, err := ic.Informers.Get(ctx, *gvk, cacheTypeObj) + started, cache, err := ic.getInformerForKind(ctx, *gvk, cacheTypeObj) if err != nil { return err } @@ -123,14 +125,13 @@ func (ic *informerCache) objectTypeForListObject(list client.ObjectList) (*schem return &gvk, cacheTypeObj, nil } -// GetInformerForKind returns the informer for the GroupVersionKind. +// GetInformerForKind returns the informer for the GroupVersionKind. If no informer exists, one will be started. func (ic *informerCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind) (Informer, error) { // Map the gvk to an object obj, err := ic.scheme.New(gvk) if err != nil { return nil, err } - _, i, err := ic.Informers.Get(ctx, gvk, obj) if err != nil { return nil, err @@ -138,7 +139,7 @@ func (ic *informerCache) GetInformerForKind(ctx context.Context, gvk schema.Grou return i.Informer, err } -// GetInformer returns the informer for the obj. +// GetInformer returns the informer for the obj. If no informer exists, one will be started. func (ic *informerCache) GetInformer(ctx context.Context, obj client.Object) (Informer, error) { gvk, err := apiutil.GVKForObject(obj, ic.scheme) if err != nil { @@ -152,6 +153,21 @@ func (ic *informerCache) GetInformer(ctx context.Context, obj client.Object) (In return i.Informer, err } +func (ic *informerCache) getInformerForKind(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object) (bool, *internal.Cache, error) { + switch ic.missPolicy { + case UnknownResourcePolicyFail: + cache, started, ok := ic.Informers.Peek(gvk, obj) + if !ok { + return false, nil, &errors.ErrResourceNotCached{GVK: gvk} + } + return started, cache, nil + case UnknownResourcePolicyBackfill: + return ic.Informers.Get(ctx, gvk, obj) + default: + panic(fmt.Errorf("invalid cache miss policy %q, programmer error", ic.missPolicy)) + } +} + // NeedLeaderElection implements the LeaderElectionRunnable interface // to indicate that this can be started without requiring the leader lock. func (ic *informerCache) NeedLeaderElection() bool { diff --git a/pkg/cache/internal/informers.go b/pkg/cache/internal/informers.go index 09e0111114..63e4ed94e2 100644 --- a/pkg/cache/internal/informers.go +++ b/pkg/cache/internal/informers.go @@ -271,7 +271,8 @@ func (ip *Informers) WaitForCacheSync(ctx context.Context) bool { return cache.WaitForCacheSync(ctx.Done(), ip.getHasSyncedFuncs()...) } -func (ip *Informers) get(gvk schema.GroupVersionKind, obj runtime.Object) (res *Cache, started bool, ok bool) { +// Peek attempts to get the informer for the GVK, but does not start one if one does not exist. +func (ip *Informers) Peek(gvk schema.GroupVersionKind, obj runtime.Object) (res *Cache, started bool, ok bool) { ip.mu.RLock() defer ip.mu.RUnlock() i, ok := ip.informersByType(obj)[gvk] @@ -282,7 +283,7 @@ func (ip *Informers) get(gvk schema.GroupVersionKind, obj runtime.Object) (res * // the Informer from the map. func (ip *Informers) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object) (bool, *Cache, error) { // Return the informer if it is found - i, started, ok := ip.get(gvk, obj) + i, started, ok := ip.Peek(gvk, obj) if !ok { var err error if i, started, err = ip.addInformerToMap(gvk, obj); err != nil { diff --git a/pkg/client/client.go b/pkg/client/client.go index 21067b6f8f..704789e942 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -31,6 +31,7 @@ import ( "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/metadata" "k8s.io/client-go/rest" + cacheerrors "sigs.k8s.io/controller-runtime/pkg/cache/errors" "sigs.k8s.io/controller-runtime/pkg/client/apiutil" "sigs.k8s.io/controller-runtime/pkg/log" @@ -77,12 +78,34 @@ type CacheOptions struct { // Reader is a cache-backed reader that will be used to read objects from the cache. // +required Reader Reader - // DisableFor is a list of objects that should not be read from the cache. + // DisableFor is a list of objects that should not be read from the cache. See the + // UncachedResourcePolicy for what the client will do when encountering requests for + // these kinds. DisableFor []Object // Unstructured is a flag that indicates whether the cache-backed client should // read unstructured objects or lists from the cache. Unstructured bool -} + // UncachedResourcePolicy determines how the client should behave when the underlying cache is + // configured to fail when encountering an unknown resource type or when the user asks the client + // for resources in the DisableFor set. See the UncachedResourcePolicies for documentation for + // each policy. The default policy is UncachedResourcePolicyLiveLookup. + UncachedResourcePolicy UncachedResourcePolicy +} + +// UncachedResourcePolicy determines how the client should behave when the underlying cache is +// configured to fail on a miss. +type UncachedResourcePolicy string + +const ( + // UncachedResourcePolicyLiveLookup configures the client to issue a live client lookup to get data + // for a resource that is not cached. + // This is the default policy. + UncachedResourcePolicyLiveLookup UncachedResourcePolicy = "live-lookup" + + // UncachedResourcePolicyFail configures the client to return an errors.NotFound error when a user + // requests a resource the cache is not configured to hold. + UncachedResourcePolicyFail UncachedResourcePolicy = "fail" +) // NewClientFunc allows a user to define how to create a client. type NewClientFunc func(config *rest.Config, options Options) (Client, error) @@ -185,9 +208,15 @@ func newClient(config *rest.Config, options Options) (*client, error) { return c, nil } + // Default to live lookups when missing in the cache + if options.Cache.UncachedResourcePolicy == "" { + options.Cache.UncachedResourcePolicy = UncachedResourcePolicyLiveLookup + } + // We want a cache if we're here. // Set the cache. c.cache = options.Cache.Reader + c.uncachedResourcePolicy = options.Cache.UncachedResourcePolicy // Load uncached GVKs. c.cacheUnstructured = options.Cache.Unstructured @@ -213,9 +242,10 @@ type client struct { scheme *runtime.Scheme mapper meta.RESTMapper - cache Reader - uncachedGVKs map[schema.GroupVersionKind]struct{} - cacheUnstructured bool + cache Reader + uncachedResourcePolicy UncachedResourcePolicy + uncachedGVKs map[schema.GroupVersionKind]struct{} + cacheUnstructured bool } func (c *client) shouldBypassCache(obj runtime.Object) (bool, error) { @@ -233,15 +263,23 @@ func (c *client) shouldBypassCache(obj runtime.Object) (bool, error) { gvk.Kind = strings.TrimSuffix(gvk.Kind, "List") } if _, isUncached := c.uncachedGVKs[gvk]; isUncached { - return true, nil + bypass := true + return bypass, c.shouldBypassCacheErr(gvk, bypass) } if !c.cacheUnstructured { _, isUnstructured := obj.(runtime.Unstructured) - return isUnstructured, nil + return isUnstructured, c.shouldBypassCacheErr(gvk, isUnstructured) } return false, nil } +func (c *client) shouldBypassCacheErr(gvk schema.GroupVersionKind, bypass bool) error { + if bypass && c.uncachedResourcePolicy == UncachedResourcePolicyFail { + return &cacheerrors.ErrResourceNotCached{GVK: gvk} + } + return nil +} + // resetGroupVersionKind is a helper function to restore and preserve GroupVersionKind on an object. func (c *client) resetGroupVersionKind(obj runtime.Object, gvk schema.GroupVersionKind) { if gvk != schema.EmptyObjectKind.GroupVersionKind() { @@ -338,7 +376,9 @@ func (c *client) Get(ctx context.Context, key ObjectKey, obj Object, opts ...Get if isUncached, err := c.shouldBypassCache(obj); err != nil { return err } else if !isUncached { - return c.cache.Get(ctx, key, obj, opts...) + if err := c.cache.Get(ctx, key, obj, opts...); c.shouldReturnCacheErr(err) { + return err + } } switch obj.(type) { @@ -353,12 +393,21 @@ func (c *client) Get(ctx context.Context, key ObjectKey, obj Object, opts ...Get } } +// shouldReturnCacheErr determines if we should return the error we got from the cache, which we do in every case +// except for when we're configured to do live lookups and the cache returns cache.ErrResourceNotCached +func (c *client) shouldReturnCacheErr(err error) bool { + var errNotCached *cacheerrors.ErrResourceNotCached + return !(c.uncachedResourcePolicy == UncachedResourcePolicyLiveLookup && errors.As(err, &errNotCached)) +} + // List implements client.Client. func (c *client) List(ctx context.Context, obj ObjectList, opts ...ListOption) error { if isUncached, err := c.shouldBypassCache(obj); err != nil { return err } else if !isUncached { - return c.cache.List(ctx, obj, opts...) + if err := c.cache.List(ctx, obj, opts...); c.shouldReturnCacheErr(err) { + return err + } } switch x := obj.(type) { diff --git a/pkg/client/client_test.go b/pkg/client/client_test.go index d8b3995050..e242013950 100644 --- a/pkg/client/client_test.go +++ b/pkg/client/client_test.go @@ -19,6 +19,7 @@ package client_test import ( "context" "encoding/json" + "errors" "fmt" "reflect" "sync/atomic" @@ -41,6 +42,7 @@ import ( "k8s.io/apimachinery/pkg/types" kscheme "k8s.io/client-go/kubernetes/scheme" "k8s.io/utils/pointer" + cacheerrors "sigs.k8s.io/controller-runtime/pkg/cache/errors" "sigs.k8s.io/controller-runtime/examples/crd/pkg" "sigs.k8s.io/controller-runtime/pkg/client" @@ -143,6 +145,7 @@ var _ = Describe("Client", func() { var count uint64 = 0 var replicaCount int32 = 2 var ns = "default" + var errNotCached *cacheerrors.ErrResourceNotCached ctx := context.TODO() BeforeEach(func() { @@ -278,6 +281,50 @@ U5wwSivyi7vmegHKmblOzNVKA5qPO8zWzqBC Expect(cache.Called).To(Equal(2)) }) + It("should use the provided reader cache if provided, but fail when configured to on cache misses for get and list", func() { + c := &fakeUncachedReader{} + cl, err := client.New(cfg, client.Options{Cache: &client.CacheOptions{Reader: c, UncachedResourcePolicy: client.UncachedResourcePolicyFail}}) + Expect(err).NotTo(HaveOccurred()) + Expect(cl).NotTo(BeNil()) + Expect(errors.As(cl.Get(ctx, client.ObjectKey{Name: "test"}, &appsv1.Deployment{}), &errNotCached)).To(BeTrue()) + Expect(errors.As(cl.List(ctx, &appsv1.DeploymentList{}), &errNotCached)).To(BeTrue()) + Expect(c.Called).To(Equal(2)) + }) + + It("should use the provided reader cache if provided, but live lookup when configured to on cache misses for get and list", func() { + cache := &fakeUncachedReader{} + cl, err := client.New(cfg, client.Options{Cache: &client.CacheOptions{Reader: cache, UncachedResourcePolicy: client.UncachedResourcePolicyLiveLookup}}) + Expect(err).NotTo(HaveOccurred()) + Expect(cl).NotTo(BeNil()) + + By("creating the object") + err = cl.Create(context.TODO(), dep) + Expect(err).NotTo(HaveOccurred()) + + By("getting the object with a get") + var actual appsv1.Deployment + Expect(cl.Get(ctx, client.ObjectKey{Name: dep.Name, Namespace: dep.Namespace}, &actual)).To(Succeed()) + Expect(dep).To(Equal(&actual)) + + By("getting the object with a list") + var list appsv1.DeploymentList + Expect(cl.List(ctx, &list)).To(Succeed()) + Expect(list.Items).To(HaveLen(1)) + Expect(dep).To(Equal(&list.Items[0])) + + Expect(cache.Called).To(Equal(2)) + }) + + It("should use the provided reader cache if provided, but forward cache errors even if configured to live lookup on cache misses for get and list", func() { + c := &fakeErrorReader{} + cl, err := client.New(cfg, client.Options{Cache: &client.CacheOptions{Reader: c, UncachedResourcePolicy: client.UncachedResourcePolicyLiveLookup}}) + Expect(err).NotTo(HaveOccurred()) + Expect(cl).NotTo(BeNil()) + Expect(cl.Get(ctx, client.ObjectKey{Name: "test"}, &appsv1.Deployment{})).To(MatchError(ContainSubstring("sentinel"))) + Expect(cl.List(ctx, &appsv1.DeploymentList{})).To(MatchError(ContainSubstring("sentinel"))) + Expect(c.Called).To(Equal(2)) + }) + It("should not use the provided reader cache if provided, on get and list for uncached GVKs", func() { cache := &fakeReader{} cl, err := client.New(cfg, client.Options{Cache: &client.CacheOptions{Reader: cache, DisableFor: []client.Object{&corev1.Namespace{}}}}) @@ -3938,6 +3985,34 @@ func (f *fakeReader) List(ctx context.Context, list client.ObjectList, opts ...c return nil } +type fakeUncachedReader struct { + Called int +} + +func (f *fakeUncachedReader) Get(ctx context.Context, key client.ObjectKey, obj client.Object, opts ...client.GetOption) error { + f.Called++ + return &cacheerrors.ErrResourceNotCached{} +} + +func (f *fakeUncachedReader) List(ctx context.Context, list client.ObjectList, opts ...client.ListOption) error { + f.Called++ + return &cacheerrors.ErrResourceNotCached{} +} + +type fakeErrorReader struct { + Called int +} + +func (f *fakeErrorReader) Get(ctx context.Context, key client.ObjectKey, obj client.Object, opts ...client.GetOption) error { + f.Called++ + return errors.New("sentinel") +} + +func (f *fakeErrorReader) List(ctx context.Context, list client.ObjectList, opts ...client.ListOption) error { + f.Called++ + return errors.New("sentinel") +} + func ptr[T any](to T) *T { return &to } diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index 7e65ef0c3a..ec72ef30a7 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -19,6 +19,7 @@ package manager import ( "context" "crypto/tls" + "errors" "fmt" "net" "net/http" @@ -394,6 +395,10 @@ func New(config *rest.Config, options Options) (Manager, error) { // Set default values for options fields options = setOptionsDefaults(options) + if err := validateOptions(options); err != nil { + return nil, err + } + cluster, err := cluster.New(config, func(clusterOptions *cluster.Options) { clusterOptions.Scheme = options.Scheme clusterOptions.MapperProvider = options.MapperProvider @@ -744,3 +749,16 @@ func setOptionsDefaults(options Options) Options { return options } + +func validateOptions(options Options) error { + if options.Client.Cache == nil || options.Client.Cache.UncachedResourcePolicy == "" { + return nil + } + cacheConfiguredToExcludeResources := len(options.Client.Cache.DisableFor) > 0 || options.Cache.UnknownResourcePolicy == cache.UnknownResourcePolicyFail + clientConfiguredToHandleExcludedResources := options.Client.Cache.UncachedResourcePolicy != "" + if !cacheConfiguredToExcludeResources && clientConfiguredToHandleExcludedResources { + return errors.New("you've configured the client to handle uncached resource kinds, but have not configured the cache to exclude any resources") + } + + return nil +}