From 5580820a5b127076b1fa2b2bb74704806cec82eb Mon Sep 17 00:00:00 2001 From: Alexander Zielenski Date: Fri, 1 Apr 2022 10:05:10 -0700 Subject: [PATCH] add new cache.Options field to customize transform --- pkg/cache/cache.go | 34 ++++- pkg/cache/cache_test.go | 212 ++++++++++++++++++++++++++++ pkg/cache/internal/deleg_map.go | 19 +-- pkg/cache/internal/informers_map.go | 14 +- pkg/cache/internal/transformers.go | 50 +++++++ 5 files changed, 318 insertions(+), 11 deletions(-) create mode 100644 pkg/cache/internal/transformers.go diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index f381098fe0..249b485baa 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -128,6 +128,18 @@ type Options struct { // Be very careful with this, when enabled you must DeepCopy any object before mutating it, // otherwise you will mutate the object in the cache. UnsafeDisableDeepCopyByObject DisableDeepCopyByObject + + // TransformFuncByObject is a map from GVKs to transformer functions which + // get applied when objects of the transformation are about to be committed + // to cache. + // + // This function is called both for new objects to enter the cache, + // and for updated objects. + TransformFuncByObject TransformFuncByObject + + // DefaultTransform is the transform used for all GVKs which do + // not have an explicit transform func set in TransformFuncByObject + DefaultTransform toolscache.TransformFunc } var defaultResyncTime = 10 * time.Hour @@ -146,7 +158,12 @@ func New(config *rest.Config, opts Options) (Cache, error) { if err != nil { return nil, err } - im := internal.NewInformersMap(config, opts.Scheme, opts.Mapper, *opts.Resync, opts.Namespace, selectorsByGVK, disableDeepCopyByGVK) + transformByGVK, err := convertToTransformByKindAndGVK(opts.TransformFuncByObject, opts.DefaultTransform, opts.Scheme) + if err != nil { + return nil, err + } + + im := internal.NewInformersMap(config, opts.Scheme, opts.Mapper, *opts.Resync, opts.Namespace, selectorsByGVK, disableDeepCopyByGVK, transformByGVK) return &informerCache{InformersMap: im}, nil } @@ -241,3 +258,18 @@ func convertToDisableDeepCopyByGVK(disableDeepCopyByObject DisableDeepCopyByObje } return disableDeepCopyByGVK, nil } + +// TransformFuncByObject associate a client.Object's GVK to a transformer function +// to be applied when storing the object into the cache. +type TransformFuncByObject map[client.Object]toolscache.TransformFunc + +func convertToTransformByKindAndGVK(t TransformFuncByObject, defaultTransform toolscache.TransformFunc, scheme *runtime.Scheme) (internal.TransformFuncByObject, error) { + result := internal.NewTransformFuncByObject() + for obj, transformation := range t { + if err := result.Set(obj, scheme, transformation); err != nil { + return nil, err + } + } + result.SetDefault(defaultTransform) + return result, nil +} diff --git a/pkg/cache/cache_test.go b/pkg/cache/cache_test.go index 07c96b1c24..80a94fecf5 100644 --- a/pkg/cache/cache_test.go +++ b/pkg/cache/cache_test.go @@ -29,10 +29,12 @@ import ( corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" kscheme "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" @@ -121,6 +123,216 @@ var _ = Describe("Multi-Namespace Informer Cache", func() { var _ = Describe("Informer Cache without DeepCopy", func() { CacheTest(cache.New, cache.Options{UnsafeDisableDeepCopyByObject: cache.DisableDeepCopyByObject{cache.ObjectAll{}: true}}) }) + +var _ = Describe("Cache with transformers", func() { + var ( + informerCache cache.Cache + informerCacheCtx context.Context + informerCacheCancel context.CancelFunc + knownPod1 client.Object + knownPod2 client.Object + knownPod3 client.Object + knownPod4 client.Object + knownPod5 client.Object + knownPod6 client.Object + ) + + getTransformValue := func(obj client.Object) string { + accessor, err := meta.Accessor(obj) + if err == nil { + annotations := accessor.GetAnnotations() + if val, exists := annotations["transformed"]; exists { + return val + } + } + return "" + } + + BeforeEach(func() { + informerCacheCtx, informerCacheCancel = context.WithCancel(context.Background()) + Expect(cfg).NotTo(BeNil()) + + By("creating three pods") + cl, err := client.New(cfg, client.Options{}) + Expect(err).NotTo(HaveOccurred()) + err = ensureNode(testNodeOne, cl) + Expect(err).NotTo(HaveOccurred()) + err = ensureNamespace(testNamespaceOne, cl) + Expect(err).NotTo(HaveOccurred()) + err = ensureNamespace(testNamespaceTwo, cl) + Expect(err).NotTo(HaveOccurred()) + err = ensureNamespace(testNamespaceThree, cl) + Expect(err).NotTo(HaveOccurred()) + // Includes restart policy since these objects are indexed on this field. + knownPod1 = createPod("test-pod-1", testNamespaceOne, corev1.RestartPolicyNever) + knownPod2 = createPod("test-pod-2", testNamespaceTwo, corev1.RestartPolicyAlways) + knownPod3 = createPodWithLabels("test-pod-3", testNamespaceTwo, corev1.RestartPolicyOnFailure, map[string]string{"common-label": "common"}) + knownPod4 = createPodWithLabels("test-pod-4", testNamespaceThree, corev1.RestartPolicyNever, map[string]string{"common-label": "common"}) + knownPod5 = createPod("test-pod-5", testNamespaceOne, corev1.RestartPolicyNever) + knownPod6 = createPod("test-pod-6", testNamespaceTwo, corev1.RestartPolicyAlways) + + podGVK := schema.GroupVersionKind{ + Kind: "Pod", + Version: "v1", + } + + knownPod1.GetObjectKind().SetGroupVersionKind(podGVK) + knownPod2.GetObjectKind().SetGroupVersionKind(podGVK) + knownPod3.GetObjectKind().SetGroupVersionKind(podGVK) + knownPod4.GetObjectKind().SetGroupVersionKind(podGVK) + knownPod5.GetObjectKind().SetGroupVersionKind(podGVK) + knownPod6.GetObjectKind().SetGroupVersionKind(podGVK) + + By("creating the informer cache") + informerCache, err = cache.New(cfg, cache.Options{ + DefaultTransform: func(i interface{}) (interface{}, error) { + if obj := i.(runtime.Object); obj != nil { + accessor, err := meta.Accessor(obj) + if err == nil { + annotations := accessor.GetAnnotations() + if _, exists := annotations["transformed"]; !exists { + if annotations == nil { + annotations = make(map[string]string) + } + annotations["transformed"] = "default" + accessor.SetAnnotations(annotations) + } + } + + } + return i, nil + }, + TransformFuncByObject: cache.TransformFuncByObject{ + &corev1.Pod{}: func(i interface{}) (interface{}, error) { + if obj := i.(runtime.Object); obj != nil { + accessor, err := meta.Accessor(obj) + if err == nil { + annotations := accessor.GetAnnotations() + if _, exists := annotations["transformed"]; !exists { + if annotations == nil { + annotations = make(map[string]string) + } + annotations["transformed"] = "explicit" + accessor.SetAnnotations(annotations) + } + } + + } + return i, nil + }, + }, + }) + Expect(err).NotTo(HaveOccurred()) + By("running the cache and waiting for it to sync") + // pass as an arg so that we don't race between close and re-assign + go func(ctx context.Context) { + defer GinkgoRecover() + Expect(informerCache.Start(ctx)).To(Succeed()) + }(informerCacheCtx) + Expect(informerCache.WaitForCacheSync(informerCacheCtx)).To(BeTrue()) + }) + + AfterEach(func() { + By("cleaning up created pods") + deletePod(knownPod1) + deletePod(knownPod2) + deletePod(knownPod3) + deletePod(knownPod4) + deletePod(knownPod5) + deletePod(knownPod6) + + informerCacheCancel() + }) + + Context("with structured objects", func() { + It("should apply transformers to explicitly specified GVKS", func() { + By("listing pods") + out := corev1.PodList{} + Expect(informerCache.List(context.Background(), &out)).To(Succeed()) + + By("verifying that the returned pods were transformed") + for i := 0; i < len(out.Items); i++ { + Expect(getTransformValue(&out.Items[i])).To(BeIdenticalTo("explicit")) + } + }) + + It("should apply default transformer to objects when none is specified", func() { + By("getting the Kubernetes service") + svc := &corev1.Service{} + svcKey := client.ObjectKey{Namespace: "default", Name: "kubernetes"} + Expect(informerCache.Get(context.Background(), svcKey, svc)).To(Succeed()) + + By("verifying that the returned service was transformed") + Expect(getTransformValue(svc)).To(BeIdenticalTo("default")) + }) + }) + + Context("with unstructured objects", func() { + It("should apply transformers to explicitly specified GVKS", func() { + By("listing pods") + out := unstructured.UnstructuredList{} + out.SetGroupVersionKind(schema.GroupVersionKind{ + Group: "", + Version: "v1", + Kind: "PodList", + }) + Expect(informerCache.List(context.Background(), &out)).To(Succeed()) + + By("verifying that the returned pods were transformed") + for i := 0; i < len(out.Items); i++ { + Expect(getTransformValue(&out.Items[i])).To(BeIdenticalTo("explicit")) + } + }) + + It("should apply default transformer to objects when none is specified", func() { + By("getting the Kubernetes service") + svc := &unstructured.Unstructured{} + svc.SetGroupVersionKind(schema.GroupVersionKind{ + Group: "", + Version: "v1", + Kind: "Service", + }) + svcKey := client.ObjectKey{Namespace: "default", Name: "kubernetes"} + Expect(informerCache.Get(context.Background(), svcKey, svc)).To(Succeed()) + + By("verifying that the returned service was transformed") + Expect(getTransformValue(svc)).To(BeIdenticalTo("default")) + }) + }) + + Context("with metadata-only objects", func() { + It("should apply transformers to explicitly specified GVKS", func() { + By("listing pods") + out := metav1.PartialObjectMetadataList{} + out.SetGroupVersionKind(schema.GroupVersionKind{ + Group: "", + Version: "v1", + Kind: "PodList", + }) + Expect(informerCache.List(context.Background(), &out)).To(Succeed()) + + By("verifying that the returned pods were transformed") + for i := 0; i < len(out.Items); i++ { + Expect(getTransformValue(&out.Items[i])).To(BeIdenticalTo("explicit")) + } + }) + It("should apply default transformer to objects when none is specified", func() { + By("getting the Kubernetes service") + svc := &metav1.PartialObjectMetadata{} + svc.SetGroupVersionKind(schema.GroupVersionKind{ + Group: "", + Version: "v1", + Kind: "Service", + }) + svcKey := client.ObjectKey{Namespace: "default", Name: "kubernetes"} + Expect(informerCache.Get(context.Background(), svcKey, svc)).To(Succeed()) + + By("verifying that the returned service was transformed") + Expect(getTransformValue(svc)).To(BeIdenticalTo("default")) + }) + }) +}) + var _ = Describe("Cache with selectors", func() { defer GinkgoRecover() var ( diff --git a/pkg/cache/internal/deleg_map.go b/pkg/cache/internal/deleg_map.go index 9bfc8463fd..27f46e3278 100644 --- a/pkg/cache/internal/deleg_map.go +++ b/pkg/cache/internal/deleg_map.go @@ -52,11 +52,12 @@ func NewInformersMap(config *rest.Config, namespace string, selectors SelectorsByGVK, disableDeepCopy DisableDeepCopyByGVK, + transformers TransformFuncByObject, ) *InformersMap { return &InformersMap{ - structured: newStructuredInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy), - unstructured: newUnstructuredInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy), - metadata: newMetadataInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy), + structured: newStructuredInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy, transformers), + unstructured: newUnstructuredInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy, transformers), + metadata: newMetadataInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy, transformers), Scheme: scheme, } @@ -108,18 +109,18 @@ func (m *InformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj // newStructuredInformersMap creates a new InformersMap for structured objects. func newStructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, - namespace string, selectors SelectorsByGVK, disableDeepCopy DisableDeepCopyByGVK) *specificInformersMap { - return newSpecificInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy, createStructuredListWatch) + namespace string, selectors SelectorsByGVK, disableDeepCopy DisableDeepCopyByGVK, transformers TransformFuncByObject) *specificInformersMap { + return newSpecificInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy, transformers, createStructuredListWatch) } // newUnstructuredInformersMap creates a new InformersMap for unstructured objects. func newUnstructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, - namespace string, selectors SelectorsByGVK, disableDeepCopy DisableDeepCopyByGVK) *specificInformersMap { - return newSpecificInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy, createUnstructuredListWatch) + namespace string, selectors SelectorsByGVK, disableDeepCopy DisableDeepCopyByGVK, transformers TransformFuncByObject) *specificInformersMap { + return newSpecificInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy, transformers, createUnstructuredListWatch) } // newMetadataInformersMap creates a new InformersMap for metadata-only objects. func newMetadataInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, - namespace string, selectors SelectorsByGVK, disableDeepCopy DisableDeepCopyByGVK) *specificInformersMap { - return newSpecificInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy, createMetadataListWatch) + namespace string, selectors SelectorsByGVK, disableDeepCopy DisableDeepCopyByGVK, transformers TransformFuncByObject) *specificInformersMap { + return newSpecificInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy, transformers, createMetadataListWatch) } diff --git a/pkg/cache/internal/informers_map.go b/pkg/cache/internal/informers_map.go index 2eb68e840a..1524d2316f 100644 --- a/pkg/cache/internal/informers_map.go +++ b/pkg/cache/internal/informers_map.go @@ -54,7 +54,9 @@ func newSpecificInformersMap(config *rest.Config, namespace string, selectors SelectorsByGVK, disableDeepCopy DisableDeepCopyByGVK, - createListWatcher createListWatcherFunc) *specificInformersMap { + transformers TransformFuncByObject, + createListWatcher createListWatcherFunc, +) *specificInformersMap { ip := &specificInformersMap{ config: config, Scheme: scheme, @@ -68,6 +70,7 @@ func newSpecificInformersMap(config *rest.Config, namespace: namespace, selectors: selectors.forGVK, disableDeepCopy: disableDeepCopy, + transformers: transformers, } return ip } @@ -135,6 +138,9 @@ type specificInformersMap struct { // disableDeepCopy indicates not to deep copy objects during get or list objects. disableDeepCopy DisableDeepCopyByGVK + + // transform funcs are applied to objects before they are committed to the cache + transformers TransformFuncByObject } // Start calls Run on each of the informers and sets started to true. Blocks on the context. @@ -227,6 +233,12 @@ func (ip *specificInformersMap) addInformerToMap(gvk schema.GroupVersionKind, ob ni := cache.NewSharedIndexInformer(lw, obj, resyncPeriod(ip.resync)(), cache.Indexers{ cache.NamespaceIndex: cache.MetaNamespaceIndexFunc, }) + + // Check to see if there is a transformer for this gvk + if err := ni.SetTransform(ip.transformers.Get(gvk)); err != nil { + return nil, false, err + } + rm, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version) if err != nil { return nil, false, err diff --git a/pkg/cache/internal/transformers.go b/pkg/cache/internal/transformers.go new file mode 100644 index 0000000000..8cf642c4bd --- /dev/null +++ b/pkg/cache/internal/transformers.go @@ -0,0 +1,50 @@ +package internal + +import ( + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/tools/cache" + "sigs.k8s.io/controller-runtime/pkg/client/apiutil" +) + +// TransformFuncByObject provides access to the correct transform function for +// any given GVK. +type TransformFuncByObject interface { + Set(runtime.Object, *runtime.Scheme, cache.TransformFunc) error + Get(schema.GroupVersionKind) cache.TransformFunc + SetDefault(transformer cache.TransformFunc) +} + +type transformFuncByGVK struct { + defaultTransform cache.TransformFunc + transformers map[schema.GroupVersionKind]cache.TransformFunc +} + +// NewTransformFuncByObject creates a new TransformFuncByObject instance. +func NewTransformFuncByObject() TransformFuncByObject { + return &transformFuncByGVK{ + transformers: make(map[schema.GroupVersionKind]cache.TransformFunc), + defaultTransform: nil, + } +} + +func (t *transformFuncByGVK) SetDefault(transformer cache.TransformFunc) { + t.defaultTransform = transformer +} + +func (t *transformFuncByGVK) Set(obj runtime.Object, scheme *runtime.Scheme, transformer cache.TransformFunc) error { + gvk, err := apiutil.GVKForObject(obj, scheme) + if err != nil { + return err + } + + t.transformers[gvk] = transformer + return nil +} + +func (t transformFuncByGVK) Get(gvk schema.GroupVersionKind) cache.TransformFunc { + if val, ok := t.transformers[gvk]; ok { + return val + } + return t.defaultTransform +}