Skip to content

Commit

Permalink
Merge pull request #2416 from sbueringer/pr-minor-cache
Browse files Browse the repository at this point in the history
⚠ Minor improvements to godoc, code style in cache pkg
  • Loading branch information
k8s-ci-robot committed Jul 24, 2023
2 parents b1d6919 + 806abef commit 0fd3354
Show file tree
Hide file tree
Showing 8 changed files with 111 additions and 163 deletions.
53 changes: 26 additions & 27 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@ var (
// to receive events for Kubernetes objects (at a low-level),
// and add indices to fields on the objects stored in the cache.
type Cache interface {
// Cache acts as a client to objects stored in the cache.
// Reader acts as a client to objects stored in the cache.
client.Reader

// Cache loads informers and adds field indices.
// Informers loads informers and adds field indices.
Informers
}

Expand All @@ -70,39 +70,43 @@ type Informers interface {
// It blocks.
Start(ctx context.Context) error

// WaitForCacheSync waits for all the caches to sync. Returns false if it could not sync a cache.
// WaitForCacheSync waits for all the caches to sync. Returns false if it could not sync a cache.
WaitForCacheSync(ctx context.Context) bool

// Informers knows how to add indices to the caches (informers) that it manages.
// FieldIndexer adds indices to the managed informers.
client.FieldIndexer
}

// Informer - informer allows you interact with the underlying informer.
// Informer allows you to interact with the underlying informer.
type Informer interface {
// AddEventHandler adds an event handler to the shared informer using the shared informer's resync
// period. Events to a single handler are delivered sequentially, but there is no coordination
// period. Events to a single handler are delivered sequentially, but there is no coordination
// between different handlers.
// It returns a registration handle for the handler that can be used to remove
// the handler again.
// the handler again and an error if the handler cannot be added.
AddEventHandler(handler toolscache.ResourceEventHandler) (toolscache.ResourceEventHandlerRegistration, error)

// AddEventHandlerWithResyncPeriod adds an event handler to the shared informer using the
// specified resync period. Events to a single handler are delivered sequentially, but there is
// specified resync period. Events to a single handler are delivered sequentially, but there is
// no coordination between different handlers.
// It returns a registration handle for the handler that can be used to remove
// the handler again and an error if the handler cannot be added.
AddEventHandlerWithResyncPeriod(handler toolscache.ResourceEventHandler, resyncPeriod time.Duration) (toolscache.ResourceEventHandlerRegistration, error)
// RemoveEventHandler removes a formerly added event handler given by

// RemoveEventHandler removes a previously added event handler given by
// its registration handle.
// This function is guaranteed to be idempotent, and thread-safe.
// This function is guaranteed to be idempotent and thread-safe.
RemoveEventHandler(handle toolscache.ResourceEventHandlerRegistration) error
// AddIndexers adds more indexers to this store. If you call this after you already have data

// AddIndexers adds indexers to this store. If this is called after there is already data
// in the store, the results are undefined.
AddIndexers(indexers toolscache.Indexers) error

// HasSynced return true if the informers underlying store has synced.
HasSynced() bool
}

// Options are the optional arguments for creating a new InformersMap object.
// Options are the optional arguments for creating a new Cache object.
type Options struct {
// HTTPClient is the http client to use for the REST client
HTTPClient *http.Client
Expand Down Expand Up @@ -141,31 +145,31 @@ type Options struct {
SyncPeriod *time.Duration

// Namespaces restricts the cache's ListWatch to the desired namespaces
// Default watches all namespaces
// Per default ListWatch watches all namespaces
Namespaces []string

// DefaultLabelSelector will be used as a label selectors for all object types
// DefaultLabelSelector will be used as a label selector for all object types
// unless they have a more specific selector set in ByObject.
DefaultLabelSelector labels.Selector

// DefaultFieldSelector will be used as a field selectors for all object types
// DefaultFieldSelector will be used as a field selector for all object types
// unless they have a more specific selector set in ByObject.
DefaultFieldSelector fields.Selector

// DefaultTransform will be used as transform for all object types
// unless they have a more specific transform set in ByObject.
DefaultTransform toolscache.TransformFunc

// ByObject restricts the cache's ListWatch to the desired fields per GVK at the specified object.
ByObject map[client.Object]ByObject

// UnsafeDisableDeepCopy indicates not to deep copy objects during get or
// list objects for EVERY object.
// Be very careful with this, when enabled you must DeepCopy any object before mutating it,
// otherwise you will mutate the object in the cache.
//
// This is a global setting for all objects, and can be overridden by the ByObject setting.
UnsafeDisableDeepCopy *bool

// ByObject restricts the cache's ListWatch to the desired fields per GVK at the specified object.
ByObject map[client.Object]ByObject
}

// ByObject offers more fine-grained control over the cache's ListWatch by object.
Expand All @@ -176,9 +180,8 @@ type ByObject struct {
// Field represents a field selector for the object.
Field fields.Selector

// Transform is a map from objects to transformer functions which
// get applied when objects of the transformation are about to be committed
// to cache.
// Transform is a transformer function for the object which gets applied
// when objects of the transformation are about to be committed to the cache.
//
// This function is called both for new objects to enter the cache,
// and for updated objects.
Expand Down Expand Up @@ -236,15 +239,12 @@ func New(config *rest.Config, opts Options) (Cache, error) {
}

func defaultOpts(config *rest.Config, opts Options) (Options, error) {
logger := log.WithName("setup")

// Use the rest HTTP client for the provided config if unset
if opts.HTTPClient == nil {
var err error
opts.HTTPClient, err = rest.HTTPClientFor(config)
if err != nil {
logger.Error(err, "Failed to get HTTP client")
return opts, fmt.Errorf("could not create HTTP client from config: %w", err)
return Options{}, fmt.Errorf("could not create HTTP client from config: %w", err)
}
}

Expand All @@ -258,8 +258,7 @@ func defaultOpts(config *rest.Config, opts Options) (Options, error) {
var err error
opts.Mapper, err = apiutil.NewDiscoveryRESTMapper(config, opts.HTTPClient)
if err != nil {
logger.Error(err, "Failed to get API Group-Resources")
return opts, fmt.Errorf("could not create RESTMapper from config: %w", err)
return Options{}, fmt.Errorf("could not create RESTMapper from config: %w", err)
}
}

Expand Down
21 changes: 11 additions & 10 deletions pkg/cache/informer_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/tools/cache"

"sigs.k8s.io/controller-runtime/pkg/cache/internal"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
Expand Down Expand Up @@ -67,7 +68,7 @@ func (ic *informerCache) Get(ctx context.Context, key client.ObjectKey, out clie
if !started {
return &ErrCacheNotStarted{}
}
return cache.Reader.Get(ctx, key, out)
return cache.Reader.Get(ctx, key, out, opts...)
}

// List implements Reader.
Expand Down Expand Up @@ -135,7 +136,7 @@ func (ic *informerCache) GetInformerForKind(ctx context.Context, gvk schema.Grou
if err != nil {
return nil, err
}
return i.Informer, err
return i.Informer, nil
}

// GetInformer returns the informer for the obj.
Expand All @@ -149,7 +150,7 @@ func (ic *informerCache) GetInformer(ctx context.Context, obj client.Object) (In
if err != nil {
return nil, err
}
return i.Informer, err
return i.Informer, nil
}

// NeedLeaderElection implements the LeaderElectionRunnable interface
Expand All @@ -158,11 +159,11 @@ func (ic *informerCache) NeedLeaderElection() bool {
return false
}

// IndexField adds an indexer to the underlying cache, using extraction function to get
// value(s) from the given field. This index can then be used by passing a field selector
// IndexField adds an indexer to the underlying informer, using extractValue function to get
// value(s) from the given field. This index can then be used by passing a field selector
// to List. For one-to-one compatibility with "normal" field selectors, only return one value.
// The values may be anything. They will automatically be prefixed with the namespace of the
// given object, if present. The objects passed are guaranteed to be objects of the correct type.
// The values may be anything. They will automatically be prefixed with the namespace of the
// given object, if present. The objects passed are guaranteed to be objects of the correct type.
func (ic *informerCache) IndexField(ctx context.Context, obj client.Object, field string, extractValue client.IndexerFunc) error {
informer, err := ic.GetInformer(ctx, obj)
if err != nil {
Expand All @@ -171,7 +172,7 @@ func (ic *informerCache) IndexField(ctx context.Context, obj client.Object, fiel
return indexByField(informer, field, extractValue)
}

func indexByField(indexer Informer, field string, extractor client.IndexerFunc) error {
func indexByField(informer Informer, field string, extractValue client.IndexerFunc) error {
indexFunc := func(objRaw interface{}) ([]string, error) {
// TODO(directxman12): check if this is the correct type?
obj, isObj := objRaw.(client.Object)
Expand All @@ -184,7 +185,7 @@ func indexByField(indexer Informer, field string, extractor client.IndexerFunc)
}
ns := meta.GetNamespace()

rawVals := extractor(obj)
rawVals := extractValue(obj)
var vals []string
if ns == "" {
// if we're not doubling the keys for the namespaced case, just create a new slice with same length
Expand All @@ -207,5 +208,5 @@ func indexByField(indexer Informer, field string, extractor client.IndexerFunc)
return vals, nil
}

return indexer.AddIndexers(cache.Indexers{internal.FieldIndexName(field): indexFunc})
return informer.AddIndexers(cache.Indexers{internal.FieldIndexName(field): indexFunc})
}
22 changes: 4 additions & 18 deletions pkg/cache/informertest/fake_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/kubernetes/scheme"
toolscache "k8s.io/client-go/tools/cache"

"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllertest"
Expand Down Expand Up @@ -52,14 +53,7 @@ func (c *FakeInformers) GetInformerForKind(ctx context.Context, gvk schema.Group

// FakeInformerForKind implements Informers.
func (c *FakeInformers) FakeInformerForKind(ctx context.Context, gvk schema.GroupVersionKind) (*controllertest.FakeInformer, error) {
if c.Scheme == nil {
c.Scheme = scheme.Scheme
}
obj, err := c.Scheme.New(gvk)
if err != nil {
return nil, err
}
i, err := c.informerFor(gvk, obj)
i, err := c.GetInformerForKind(ctx, gvk)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -88,16 +82,8 @@ func (c *FakeInformers) WaitForCacheSync(ctx context.Context) bool {
}

// FakeInformerFor implements Informers.
func (c *FakeInformers) FakeInformerFor(obj runtime.Object) (*controllertest.FakeInformer, error) {
if c.Scheme == nil {
c.Scheme = scheme.Scheme
}
gvks, _, err := c.Scheme.ObjectKinds(obj)
if err != nil {
return nil, err
}
gvk := gvks[0]
i, err := c.informerFor(gvk, obj)
func (c *FakeInformers) FakeInformerFor(ctx context.Context, obj client.Object) (*controllertest.FakeInformer, error) {
i, err := c.GetInformer(ctx, obj)
if err != nil {
return nil, err
}
Expand Down
14 changes: 7 additions & 7 deletions pkg/cache/internal/cache_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type CacheReader struct {
}

// Get checks the indexer for the object and writes a copy of it if found.
func (c *CacheReader) Get(_ context.Context, key client.ObjectKey, out client.Object, opts ...client.GetOption) error {
func (c *CacheReader) Get(_ context.Context, key client.ObjectKey, out client.Object, _ ...client.GetOption) error {
if c.scopeName == apimeta.RESTScopeNameRoot {
key.Namespace = ""
}
Expand All @@ -67,9 +67,9 @@ func (c *CacheReader) Get(_ context.Context, key client.ObjectKey, out client.Ob

// Not found, return an error
if !exists {
// Resource gets transformed into Kind in the error anyway, so this is fine
return apierrors.NewNotFound(schema.GroupResource{
Group: c.groupVersionKind.Group,
Group: c.groupVersionKind.Group,
// Resource gets set as Kind in the error so this is fine
Resource: c.groupVersionKind.Kind,
}, key.Name)
}
Expand Down Expand Up @@ -119,8 +119,8 @@ func (c *CacheReader) List(_ context.Context, out client.ObjectList, opts ...cli
if !requiresExact {
return fmt.Errorf("non-exact field matches are not supported by the cache")
}
// list all objects by the field selector. If this is namespaced and we have one, ask for the
// namespaced index key. Otherwise, ask for the non-namespaced variant by using the fake "all namespaces"
// list all objects by the field selector. If this is namespaced and we have one, ask for the
// namespaced index key. Otherwise, ask for the non-namespaced variant by using the fake "all namespaces"
// namespace.
objs, err = c.indexer.ByIndex(FieldIndexName(field), KeyToNamespacedKey(listOpts.Namespace, val))
case listOpts.Namespace != "":
Expand Down Expand Up @@ -175,7 +175,7 @@ func (c *CacheReader) List(_ context.Context, out client.ObjectList, opts ...cli
}

// objectKeyToStorageKey converts an object key to store key.
// It's akin to MetaNamespaceKeyFunc. It's separate from
// It's akin to MetaNamespaceKeyFunc. It's separate from
// String to allow keeping the key format easily in sync with
// MetaNamespaceKeyFunc.
func objectKeyToStoreKey(k client.ObjectKey) string {
Expand All @@ -191,7 +191,7 @@ func FieldIndexName(field string) string {
return "field:" + field
}

// noNamespaceNamespace is used as the "namespace" when we want to list across all namespaces.
// allNamespacesNamespace is used as the "namespace" when we want to list across all namespaces.
const allNamespacesNamespace = "__all_namespaces"

// KeyToNamespacedKey prefixes the given index key with a namespace
Expand Down
10 changes: 6 additions & 4 deletions pkg/cache/internal/informers.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"k8s.io/client-go/metadata"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"

"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
)

Expand All @@ -48,7 +49,7 @@ type InformersOpts struct {
ByGVK map[schema.GroupVersionKind]InformersOptsByGVK
}

// InformersOptsByGVK configured additional by group version kind (or object)
// InformersOptsByGVK configures additionally by group version kind (or object)
// in an InformerMap.
type InformersOptsByGVK struct {
Selector Selector
Expand Down Expand Up @@ -186,7 +187,7 @@ func (ip *Informers) getDisableDeepCopy(gvk schema.GroupVersionKind) bool {
return false
}

// Start calls Run on each of the informers and sets started to true. Blocks on the context.
// Start calls Run on each of the informers and sets started to true. Blocks on the context.
// It doesn't return start because it can't return an error, and it's not a runnable directly.
func (ip *Informers) Start(ctx context.Context) error {
func() {
Expand Down Expand Up @@ -278,7 +279,7 @@ func (ip *Informers) get(gvk schema.GroupVersionKind, obj runtime.Object) (res *
return i, ip.started, ok
}

// Get will create a new Informer and add it to the map of specificInformersMap if none exists. Returns
// Get will create a new Informer and add it to the map of specificInformersMap if none exists. Returns
// the Informer from the map.
func (ip *Informers) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object) (bool, *Cache, error) {
// Return the informer if it is found
Expand Down Expand Up @@ -311,11 +312,12 @@ func (ip *Informers) informersByType(obj runtime.Object) map[schema.GroupVersion
}
}

// addInformerToMap either returns an existing informer or creates a new informer, adds it to the map and returns it.
func (ip *Informers) addInformerToMap(gvk schema.GroupVersionKind, obj runtime.Object) (*Cache, bool, error) {
ip.mu.Lock()
defer ip.mu.Unlock()

// Check the cache to see if we already have an Informer. If we do, return the Informer.
// Check the cache to see if we already have an Informer. If we do, return the Informer.
// This is for the case where 2 routines tried to get the informer when it wasn't in the map
// so neither returned early, but the first one created it.
if i, ok := ip.informersByType(obj)[gvk]; ok {
Expand Down
Loading

0 comments on commit 0fd3354

Please sign in to comment.