Skip to content

Commit

Permalink
pkg/{cache,client}: add options for cache miss policy
Browse files Browse the repository at this point in the history
This commit allows users to opt out of the "start informers in the
background" behavior that the current cache implementation uses.
Additionally, when opting out of this behavior, the client can be
configured to do a live lookup on a cache miss. The default behaviors
are:

  pkg/cache: backfill data on a miss (today's default, unchanged)
  pkg/client: live lookup when cache is configured to miss

Signed-off-by: Steve Kuznetsov <skuznets@redhat.com>
  • Loading branch information
stevekuznetsov committed Jul 14, 2023
1 parent 56a973c commit 9cf53b5
Show file tree
Hide file tree
Showing 7 changed files with 251 additions and 9 deletions.
29 changes: 29 additions & 0 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,31 @@ type Options struct {
//
// This is a global setting for all objects, and can be overridden by the ByObject setting.
UnsafeDisableDeepCopy *bool

// MissPolicy determines how the cache should behave when no informers are started for the
// resource type a client is asking for in a Get or a List. See the CacheMissPolicies for
// documentation for each policy. The default policy is MissPolicyBackfill.
MissPolicy MissPolicy
}

// MissPolicy determines how the cache should behave when no informers are started for the
// resource type a client is asking for in a Get or a List.
type MissPolicy string

const (
// MissPolicyBackfill configures the cache to spin up an informer for a resource when
// the first request for that resource comes in. This means that a Get or a List may take
// longer than normal to succeed on the first invocation as the cache waits until it's fully
// back-filled.
// This is the default policy.
MissPolicyBackfill MissPolicy = "backfill"

// MissPolicyFail configures the cache to return a ErrResourceNotCached error when a user
// requests a resource the cache is not configured to hold. This error is distinct from an
// errors.NotFound.
MissPolicyFail MissPolicy = "fail"
)

// ByObject offers more fine-grained control over the cache's ListWatch by object.
type ByObject struct {
// Label represents a label selector for the object.
Expand Down Expand Up @@ -232,6 +255,7 @@ func New(config *rest.Config, opts Options) (Cache, error) {
Namespace: opts.Namespaces[0],
ByGVK: byGVK,
}),
missPolicy: opts.MissPolicy,
}, nil
}

Expand Down Expand Up @@ -267,6 +291,11 @@ func defaultOpts(config *rest.Config, opts Options) (Options, error) {
if opts.SyncPeriod == nil {
opts.SyncPeriod = &defaultSyncPeriod
}

// Backfill by default
if opts.MissPolicy == "" {
opts.MissPolicy = MissPolicyBackfill
}
return opts, nil
}

Expand Down
57 changes: 57 additions & 0 deletions pkg/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package cache_test

import (
"context"
"errors"
"fmt"
"reflect"
"sort"
Expand All @@ -39,6 +40,7 @@ import (
"k8s.io/client-go/rest"
kcache "k8s.io/client-go/tools/cache"
"k8s.io/utils/pointer"
cacheerrors "sigs.k8s.io/controller-runtime/pkg/cache/errors"

"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -117,6 +119,11 @@ func deletePod(pod client.Object) {
var _ = Describe("Informer Cache", func() {
CacheTest(cache.New, cache.Options{})
})

var _ = Describe("Informer Cache with miss policy = fail", func() {
CacheTestMissPolicyFail(cache.New, cache.Options{MissPolicy: cache.MissPolicyFail})
})

var _ = Describe("Multi-Namespace Informer Cache", func() {
CacheTest(cache.MultiNamespacedCacheBuilder([]string{testNamespaceOne, testNamespaceTwo, "default"}), cache.Options{})
})
Expand Down Expand Up @@ -414,6 +421,56 @@ var _ = Describe("Cache with selectors", func() {
})
})

func CacheTestMissPolicyFail(createCacheFunc func(config *rest.Config, opts cache.Options) (cache.Cache, error), opts cache.Options) {
Describe("Cache test with MissPolicy = fail", func() {
var (
informerCache cache.Cache
informerCacheCtx context.Context
informerCacheCancel context.CancelFunc
errNotCached *cacheerrors.ErrResourceNotCached
)

BeforeEach(func() {
informerCacheCtx, informerCacheCancel = context.WithCancel(context.Background())
Expect(cfg).NotTo(BeNil())

By("creating the informer cache")
var err error
informerCache, err = createCacheFunc(cfg, opts)
Expect(err).NotTo(HaveOccurred())
By("running the cache and waiting for it to sync")
// pass as an arg so that we don't race between close and re-assign
go func(ctx context.Context) {
defer GinkgoRecover()
Expect(informerCache.Start(ctx)).To(Succeed())
}(informerCacheCtx)
Expect(informerCache.WaitForCacheSync(informerCacheCtx)).To(BeTrue())
})

AfterEach(func() {
informerCacheCancel()
})

Describe("as a Reader", func() {
Context("with structured objects", func() {
It("should not be able to list objects that haven't been watched previously", func() {
By("listing all services in the cluster")
listObj := &corev1.ServiceList{}
fmt.Printf("%#v\n", informerCache.List(context.Background(), listObj))
Expect(errors.As(informerCache.List(context.Background(), listObj), &errNotCached)).To(BeTrue())
})

It("should not be able to get objects that haven't been watched previously", func() {
By("getting the Kubernetes service")
svc := &corev1.Service{}
svcKey := client.ObjectKey{Namespace: "default", Name: "kubernetes"}
Expect(errors.As(informerCache.Get(context.Background(), svcKey, svc), &errNotCached)).To(BeTrue())
})
})
})
})
}

func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (cache.Cache, error), opts cache.Options) {
Describe("Cache test", func() {
var (
Expand Down
20 changes: 20 additions & 0 deletions pkg/cache/errors/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package errors

import (
"fmt"

"k8s.io/apimachinery/pkg/runtime/schema"
)

// ErrResourceNotCached indicates that the resource type the client asked the cache for is not cached, and
// the cache is configured to fail on a cache miss.
type ErrResourceNotCached struct {
GVK schema.GroupVersionKind
}

// Error returns the error
func (r ErrResourceNotCached) Error() string {
return fmt.Sprintf("%s is not cached and the cache is configured to fail on miss", r.GVK.String())
}

var _ error = (*ErrResourceNotCached)(nil)
30 changes: 25 additions & 5 deletions pkg/cache/informer_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/tools/cache"
"sigs.k8s.io/controller-runtime/pkg/cache/errors"
"sigs.k8s.io/controller-runtime/pkg/cache/internal"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
Expand All @@ -50,6 +51,7 @@ func (*ErrCacheNotStarted) Error() string {
type informerCache struct {
scheme *runtime.Scheme
*internal.Informers
missPolicy MissPolicy
}

// Get implements Reader.
Expand All @@ -59,7 +61,7 @@ func (ic *informerCache) Get(ctx context.Context, key client.ObjectKey, out clie
return err
}

started, cache, err := ic.Informers.Get(ctx, gvk, out)
started, cache, err := ic.getInformerForKind(ctx, gvk, out)
if err != nil {
return err
}
Expand All @@ -77,7 +79,7 @@ func (ic *informerCache) List(ctx context.Context, out client.ObjectList, opts .
return err
}

started, cache, err := ic.Informers.Get(ctx, *gvk, cacheTypeObj)
started, cache, err := ic.getInformerForKind(ctx, *gvk, cacheTypeObj)
if err != nil {
return err
}
Expand Down Expand Up @@ -123,22 +125,21 @@ func (ic *informerCache) objectTypeForListObject(list client.ObjectList) (*schem
return &gvk, cacheTypeObj, nil
}

// GetInformerForKind returns the informer for the GroupVersionKind.
// GetInformerForKind returns the informer for the GroupVersionKind. If no informer exists, one will be started.
func (ic *informerCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind) (Informer, error) {
// Map the gvk to an object
obj, err := ic.scheme.New(gvk)
if err != nil {
return nil, err
}

_, i, err := ic.Informers.Get(ctx, gvk, obj)
if err != nil {
return nil, err
}
return i.Informer, err
}

// GetInformer returns the informer for the obj.
// GetInformer returns the informer for the obj. If no informer exists, one will be started.
func (ic *informerCache) GetInformer(ctx context.Context, obj client.Object) (Informer, error) {
gvk, err := apiutil.GVKForObject(obj, ic.scheme)
if err != nil {
Expand All @@ -152,6 +153,25 @@ func (ic *informerCache) GetInformer(ctx context.Context, obj client.Object) (In
return i.Informer, err
}

func (ic *informerCache) getInformerForKind(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object) (bool, *internal.Cache, error) {
var cache *internal.Cache
var started bool
var err error
switch ic.missPolicy {
case MissPolicyFail:
var ok bool
cache, started, ok = ic.Informers.Peek(gvk, obj)
if !ok {
err = &errors.ErrResourceNotCached{GVK: gvk}
}
case MissPolicyBackfill:
fallthrough
default:
started, cache, err = ic.Informers.Get(ctx, gvk, obj)
}
return started, cache, err
}

// NeedLeaderElection implements the LeaderElectionRunnable interface
// to indicate that this can be started without requiring the leader lock.
func (ic *informerCache) NeedLeaderElection() bool {
Expand Down
5 changes: 3 additions & 2 deletions pkg/cache/internal/informers.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,8 @@ func (ip *Informers) WaitForCacheSync(ctx context.Context) bool {
return cache.WaitForCacheSync(ctx.Done(), ip.getHasSyncedFuncs()...)
}

func (ip *Informers) get(gvk schema.GroupVersionKind, obj runtime.Object) (res *Cache, started bool, ok bool) {
// Peek attempts to get the informer for the GVK, but does not start one if one does not exist.
func (ip *Informers) Peek(gvk schema.GroupVersionKind, obj runtime.Object) (res *Cache, started bool, ok bool) {
ip.mu.RLock()
defer ip.mu.RUnlock()
i, ok := ip.informersByType(obj)[gvk]
Expand All @@ -282,7 +283,7 @@ func (ip *Informers) get(gvk schema.GroupVersionKind, obj runtime.Object) (res *
// the Informer from the map.
func (ip *Informers) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object) (bool, *Cache, error) {
// Return the informer if it is found
i, started, ok := ip.get(gvk, obj)
i, started, ok := ip.Peek(gvk, obj)
if !ok {
var err error
if i, started, err = ip.addInformerToMap(gvk, obj); err != nil {
Expand Down
44 changes: 42 additions & 2 deletions pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/metadata"
"k8s.io/client-go/rest"
cacheerrors "sigs.k8s.io/controller-runtime/pkg/cache/errors"

"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
"sigs.k8s.io/controller-runtime/pkg/log"
Expand Down Expand Up @@ -82,8 +83,27 @@ type CacheOptions struct {
// Unstructured is a flag that indicates whether the cache-backed client should
// read unstructured objects or lists from the cache.
Unstructured bool
// MissPolicy determines how the client should behave when the underlying cache is
// configured to fail on a miss. See the CacheMissPolicies for documentation for each
// policy. The default policy is CacheMissPolicyFail.
MissPolicy CacheMissPolicy
}

// CacheMissPolicy determines how the client should behave when the underlying cache is
// configured to fail on a miss.
type CacheMissPolicy string

const (
// CacheMissPolicyFail configures the client to return an errors.NotFound error when a user
// requests a resource the cache is not configured to hold.
// This is the default policy.
CacheMissPolicyFail CacheMissPolicy = "fail"

// CacheMissPolicyLiveLookup configures the client to issue a live client lookup to get data
// for a resource that is not cached.
CacheMissPolicyLiveLookup CacheMissPolicy = "live-lookup"
)

// NewClientFunc allows a user to define how to create a client.
type NewClientFunc func(config *rest.Config, options Options) (Client, error)

Expand Down Expand Up @@ -185,9 +205,15 @@ func newClient(config *rest.Config, options Options) (*client, error) {
return c, nil
}

// Default to failing when missing in the cache
if options.Cache.MissPolicy == "" {
options.Cache.MissPolicy = CacheMissPolicyFail
}

// We want a cache if we're here.
// Set the cache.
c.cache = options.Cache.Reader
c.cacheMissPolicy = options.Cache.MissPolicy

// Load uncached GVKs.
c.cacheUnstructured = options.Cache.Unstructured
Expand All @@ -214,6 +240,7 @@ type client struct {
mapper meta.RESTMapper

cache Reader
cacheMissPolicy CacheMissPolicy
uncachedGVKs map[schema.GroupVersionKind]struct{}
cacheUnstructured bool
}
Expand Down Expand Up @@ -338,7 +365,10 @@ func (c *client) Get(ctx context.Context, key ObjectKey, obj Object, opts ...Get
if isUncached, err := c.shouldBypassCache(obj); err != nil {
return err
} else if !isUncached {
return c.cache.Get(ctx, key, obj, opts...)
err := c.cache.Get(ctx, key, obj, opts...)
if c.shouldReturnCacheErr(err) {
return err
}
}

switch obj.(type) {
Expand All @@ -353,12 +383,22 @@ func (c *client) Get(ctx context.Context, key ObjectKey, obj Object, opts ...Get
}
}

// shouldReturnCacheErr determines if we should return the error we got from the cache, which we do in every case
// except for when we're configured to do live lookups and the cache returns cache.ErrResourceNotCached
func (c *client) shouldReturnCacheErr(err error) bool {
var errNotCached *cacheerrors.ErrResourceNotCached
return c.cacheMissPolicy != CacheMissPolicyLiveLookup || !errors.As(err, &errNotCached)
}

// List implements client.Client.
func (c *client) List(ctx context.Context, obj ObjectList, opts ...ListOption) error {
if isUncached, err := c.shouldBypassCache(obj); err != nil {
return err
} else if !isUncached {
return c.cache.List(ctx, obj, opts...)
err := c.cache.List(ctx, obj, opts...)
if c.shouldReturnCacheErr(err) {
return err
}
}

switch x := obj.(type) {
Expand Down
Loading

0 comments on commit 9cf53b5

Please sign in to comment.