Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Add TransformFuncByObject Option for Informer Cache #1805

Merged
merged 3 commits into from
Apr 20, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 33 additions & 1 deletion pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,18 @@ type Options struct {
// Be very careful with this, when enabled you must DeepCopy any object before mutating it,
// otherwise you will mutate the object in the cache.
UnsafeDisableDeepCopyByObject DisableDeepCopyByObject

// TransformFuncByObject is a map from GVKs to transformer functions which
// get applied when objects of the transformation are about to be committed
// to cache.
//
// This function is called both for new objects to enter the cache,
// and for updated objects.
TransformFuncByObject TransformFuncByObject
alexzielenski marked this conversation as resolved.
Show resolved Hide resolved

// DefaultTransform is the transform used for all GVKs which do
// not have an explicit transform func set in TransformFuncByObject
DefaultTransform toolscache.TransformFunc
alexzielenski marked this conversation as resolved.
Show resolved Hide resolved
}

var defaultResyncTime = 10 * time.Hour
Expand All @@ -146,7 +158,12 @@ func New(config *rest.Config, opts Options) (Cache, error) {
if err != nil {
return nil, err
}
im := internal.NewInformersMap(config, opts.Scheme, opts.Mapper, *opts.Resync, opts.Namespace, selectorsByGVK, disableDeepCopyByGVK)
transformByGVK, err := convertToTransformByKindAndGVK(opts.TransformFuncByObject, opts.DefaultTransform, opts.Scheme)
if err != nil {
return nil, err
}

im := internal.NewInformersMap(config, opts.Scheme, opts.Mapper, *opts.Resync, opts.Namespace, selectorsByGVK, disableDeepCopyByGVK, transformByGVK)
return &informerCache{InformersMap: im}, nil
}

Expand Down Expand Up @@ -241,3 +258,18 @@ func convertToDisableDeepCopyByGVK(disableDeepCopyByObject DisableDeepCopyByObje
}
return disableDeepCopyByGVK, nil
}

// TransformFuncByObject associate a client.Object's GVK to a transformer function
// to be applied when storing the object into the cache.
type TransformFuncByObject map[client.Object]toolscache.TransformFunc

func convertToTransformByKindAndGVK(t TransformFuncByObject, defaultTransform toolscache.TransformFunc, scheme *runtime.Scheme) (internal.TransformFuncByObject, error) {
result := internal.NewTransformFuncByObject()
for obj, transformation := range t {
if err := result.Set(obj, scheme, transformation); err != nil {
return nil, err
}
}
result.SetDefault(defaultTransform)
return result, nil
}
212 changes: 212 additions & 0 deletions pkg/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@ import (

corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
kscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
Expand Down Expand Up @@ -121,6 +123,216 @@ var _ = Describe("Multi-Namespace Informer Cache", func() {
var _ = Describe("Informer Cache without DeepCopy", func() {
CacheTest(cache.New, cache.Options{UnsafeDisableDeepCopyByObject: cache.DisableDeepCopyByObject{cache.ObjectAll{}: true}})
})

var _ = Describe("Cache with transformers", func() {
var (
informerCache cache.Cache
informerCacheCtx context.Context
informerCacheCancel context.CancelFunc
knownPod1 client.Object
knownPod2 client.Object
knownPod3 client.Object
knownPod4 client.Object
knownPod5 client.Object
knownPod6 client.Object
)

getTransformValue := func(obj client.Object) string {
accessor, err := meta.Accessor(obj)
if err == nil {
annotations := accessor.GetAnnotations()
if val, exists := annotations["transformed"]; exists {
return val
}
}
return ""
}

BeforeEach(func() {
informerCacheCtx, informerCacheCancel = context.WithCancel(context.Background())
Expect(cfg).NotTo(BeNil())

By("creating three pods")
cl, err := client.New(cfg, client.Options{})
Expect(err).NotTo(HaveOccurred())
err = ensureNode(testNodeOne, cl)
Expect(err).NotTo(HaveOccurred())
err = ensureNamespace(testNamespaceOne, cl)
Expect(err).NotTo(HaveOccurred())
err = ensureNamespace(testNamespaceTwo, cl)
Expect(err).NotTo(HaveOccurred())
err = ensureNamespace(testNamespaceThree, cl)
Expect(err).NotTo(HaveOccurred())
// Includes restart policy since these objects are indexed on this field.
knownPod1 = createPod("test-pod-1", testNamespaceOne, corev1.RestartPolicyNever)
knownPod2 = createPod("test-pod-2", testNamespaceTwo, corev1.RestartPolicyAlways)
knownPod3 = createPodWithLabels("test-pod-3", testNamespaceTwo, corev1.RestartPolicyOnFailure, map[string]string{"common-label": "common"})
knownPod4 = createPodWithLabels("test-pod-4", testNamespaceThree, corev1.RestartPolicyNever, map[string]string{"common-label": "common"})
knownPod5 = createPod("test-pod-5", testNamespaceOne, corev1.RestartPolicyNever)
knownPod6 = createPod("test-pod-6", testNamespaceTwo, corev1.RestartPolicyAlways)

podGVK := schema.GroupVersionKind{
Kind: "Pod",
Version: "v1",
}

knownPod1.GetObjectKind().SetGroupVersionKind(podGVK)
knownPod2.GetObjectKind().SetGroupVersionKind(podGVK)
knownPod3.GetObjectKind().SetGroupVersionKind(podGVK)
knownPod4.GetObjectKind().SetGroupVersionKind(podGVK)
knownPod5.GetObjectKind().SetGroupVersionKind(podGVK)
knownPod6.GetObjectKind().SetGroupVersionKind(podGVK)

By("creating the informer cache")
informerCache, err = cache.New(cfg, cache.Options{
DefaultTransform: func(i interface{}) (interface{}, error) {
if obj := i.(runtime.Object); obj != nil {
alexzielenski marked this conversation as resolved.
Show resolved Hide resolved
accessor, err := meta.Accessor(obj)
if err == nil {
annotations := accessor.GetAnnotations()
if _, exists := annotations["transformed"]; !exists {
if annotations == nil {
annotations = make(map[string]string)
}
annotations["transformed"] = "default"
accessor.SetAnnotations(annotations)
}
}

}
return i, nil
},
TransformFuncByObject: cache.TransformFuncByObject{
&corev1.Pod{}: func(i interface{}) (interface{}, error) {
if obj := i.(runtime.Object); obj != nil {
accessor, err := meta.Accessor(obj)
if err == nil {
annotations := accessor.GetAnnotations()
if _, exists := annotations["transformed"]; !exists {
if annotations == nil {
annotations = make(map[string]string)
}
annotations["transformed"] = "explicit"
accessor.SetAnnotations(annotations)
}
}

}
return i, nil
},
},
})
Expect(err).NotTo(HaveOccurred())
By("running the cache and waiting for it to sync")
// pass as an arg so that we don't race between close and re-assign
go func(ctx context.Context) {
defer GinkgoRecover()
Expect(informerCache.Start(ctx)).To(Succeed())
}(informerCacheCtx)
Expect(informerCache.WaitForCacheSync(informerCacheCtx)).To(BeTrue())
})

AfterEach(func() {
By("cleaning up created pods")
deletePod(knownPod1)
deletePod(knownPod2)
deletePod(knownPod3)
deletePod(knownPod4)
deletePod(knownPod5)
deletePod(knownPod6)

informerCacheCancel()
})

Context("with structured objects", func() {
It("should apply transformers to explicitly specified GVKS", func() {
By("listing pods")
out := corev1.PodList{}
Expect(informerCache.List(context.Background(), &out)).To(Succeed())

By("verifying that the returned pods were transformed")
for i := 0; i < len(out.Items); i++ {
Expect(getTransformValue(&out.Items[i])).To(BeIdenticalTo("explicit"))
}
})

It("should apply default transformer to objects when none is specified", func() {
By("getting the Kubernetes service")
svc := &corev1.Service{}
svcKey := client.ObjectKey{Namespace: "default", Name: "kubernetes"}
Expect(informerCache.Get(context.Background(), svcKey, svc)).To(Succeed())

By("verifying that the returned service was transformed")
Expect(getTransformValue(svc)).To(BeIdenticalTo("default"))
})
})

Context("with unstructured objects", func() {
It("should apply transformers to explicitly specified GVKS", func() {
By("listing pods")
out := unstructured.UnstructuredList{}
out.SetGroupVersionKind(schema.GroupVersionKind{
Group: "",
Version: "v1",
Kind: "PodList",
})
Expect(informerCache.List(context.Background(), &out)).To(Succeed())

By("verifying that the returned pods were transformed")
for i := 0; i < len(out.Items); i++ {
Expect(getTransformValue(&out.Items[i])).To(BeIdenticalTo("explicit"))
}
})

It("should apply default transformer to objects when none is specified", func() {
By("getting the Kubernetes service")
svc := &unstructured.Unstructured{}
svc.SetGroupVersionKind(schema.GroupVersionKind{
Group: "",
Version: "v1",
Kind: "Service",
})
svcKey := client.ObjectKey{Namespace: "default", Name: "kubernetes"}
Expect(informerCache.Get(context.Background(), svcKey, svc)).To(Succeed())

By("verifying that the returned service was transformed")
Expect(getTransformValue(svc)).To(BeIdenticalTo("default"))
})
})

Context("with metadata-only objects", func() {
It("should apply transformers to explicitly specified GVKS", func() {
By("listing pods")
out := metav1.PartialObjectMetadataList{}
out.SetGroupVersionKind(schema.GroupVersionKind{
Group: "",
Version: "v1",
Kind: "PodList",
})
Expect(informerCache.List(context.Background(), &out)).To(Succeed())

By("verifying that the returned pods were transformed")
for i := 0; i < len(out.Items); i++ {
Expect(getTransformValue(&out.Items[i])).To(BeIdenticalTo("explicit"))
}
})
It("should apply default transformer to objects when none is specified", func() {
By("getting the Kubernetes service")
svc := &metav1.PartialObjectMetadata{}
svc.SetGroupVersionKind(schema.GroupVersionKind{
Group: "",
Version: "v1",
Kind: "Service",
})
svcKey := client.ObjectKey{Namespace: "default", Name: "kubernetes"}
Expect(informerCache.Get(context.Background(), svcKey, svc)).To(Succeed())

By("verifying that the returned service was transformed")
Expect(getTransformValue(svc)).To(BeIdenticalTo("default"))
})
})
})

var _ = Describe("Cache with selectors", func() {
defer GinkgoRecover()
var (
Expand Down
19 changes: 10 additions & 9 deletions pkg/cache/internal/deleg_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,12 @@ func NewInformersMap(config *rest.Config,
namespace string,
selectors SelectorsByGVK,
disableDeepCopy DisableDeepCopyByGVK,
transformers TransformFuncByObject,
) *InformersMap {
return &InformersMap{
structured: newStructuredInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy),
unstructured: newUnstructuredInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy),
metadata: newMetadataInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy),
structured: newStructuredInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy, transformers),
unstructured: newUnstructuredInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy, transformers),
metadata: newMetadataInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy, transformers),

Scheme: scheme,
}
Expand Down Expand Up @@ -108,18 +109,18 @@ func (m *InformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj

// newStructuredInformersMap creates a new InformersMap for structured objects.
func newStructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration,
namespace string, selectors SelectorsByGVK, disableDeepCopy DisableDeepCopyByGVK) *specificInformersMap {
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy, createStructuredListWatch)
namespace string, selectors SelectorsByGVK, disableDeepCopy DisableDeepCopyByGVK, transformers TransformFuncByObject) *specificInformersMap {
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy, transformers, createStructuredListWatch)
}

// newUnstructuredInformersMap creates a new InformersMap for unstructured objects.
func newUnstructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration,
namespace string, selectors SelectorsByGVK, disableDeepCopy DisableDeepCopyByGVK) *specificInformersMap {
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy, createUnstructuredListWatch)
namespace string, selectors SelectorsByGVK, disableDeepCopy DisableDeepCopyByGVK, transformers TransformFuncByObject) *specificInformersMap {
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy, transformers, createUnstructuredListWatch)
}

// newMetadataInformersMap creates a new InformersMap for metadata-only objects.
func newMetadataInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration,
namespace string, selectors SelectorsByGVK, disableDeepCopy DisableDeepCopyByGVK) *specificInformersMap {
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy, createMetadataListWatch)
namespace string, selectors SelectorsByGVK, disableDeepCopy DisableDeepCopyByGVK, transformers TransformFuncByObject) *specificInformersMap {
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy, transformers, createMetadataListWatch)
}
14 changes: 13 additions & 1 deletion pkg/cache/internal/informers_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ func newSpecificInformersMap(config *rest.Config,
namespace string,
selectors SelectorsByGVK,
disableDeepCopy DisableDeepCopyByGVK,
createListWatcher createListWatcherFunc) *specificInformersMap {
transformers TransformFuncByObject,
createListWatcher createListWatcherFunc,
) *specificInformersMap {
ip := &specificInformersMap{
config: config,
Scheme: scheme,
Expand All @@ -68,6 +70,7 @@ func newSpecificInformersMap(config *rest.Config,
namespace: namespace,
selectors: selectors.forGVK,
disableDeepCopy: disableDeepCopy,
transformers: transformers,
}
return ip
}
Expand Down Expand Up @@ -135,6 +138,9 @@ type specificInformersMap struct {

// disableDeepCopy indicates not to deep copy objects during get or list objects.
disableDeepCopy DisableDeepCopyByGVK

// transform funcs are applied to objects before they are committed to the cache
transformers TransformFuncByObject
}

// Start calls Run on each of the informers and sets started to true. Blocks on the context.
Expand Down Expand Up @@ -227,6 +233,12 @@ func (ip *specificInformersMap) addInformerToMap(gvk schema.GroupVersionKind, ob
ni := cache.NewSharedIndexInformer(lw, obj, resyncPeriod(ip.resync)(), cache.Indexers{
cache.NamespaceIndex: cache.MetaNamespaceIndexFunc,
})

// Check to see if there is a transformer for this gvk
if err := ni.SetTransform(ip.transformers.Get(gvk)); err != nil {
alexzielenski marked this conversation as resolved.
Show resolved Hide resolved
return nil, false, err
}

rm, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
if err != nil {
return nil, false, err
Expand Down
Loading