From 94002ac1fe50995eafc3bbea7e3b6778ceea5526 Mon Sep 17 00:00:00 2001 From: Alexander Zielenski <351783+alexzielenski@users.noreply.github.com> Date: Thu, 14 Apr 2022 16:13:48 -0700 Subject: [PATCH] fixup! add new cache.Options field to customize transform improve transformer test reliability --- pkg/cache/cache_test.go | 372 +++++++++++++++++----------------------- 1 file changed, 162 insertions(+), 210 deletions(-) diff --git a/pkg/cache/cache_test.go b/pkg/cache/cache_test.go index bd9448fdf0..c576cae986 100644 --- a/pkg/cache/cache_test.go +++ b/pkg/cache/cache_test.go @@ -29,10 +29,12 @@ import ( corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" kscheme "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" @@ -127,88 +129,102 @@ var _ = Describe("Cache with transformers", func() { informerCache cache.Cache informerCacheCtx context.Context informerCacheCancel context.CancelFunc - clien client.Client - transformerHits chan struct{} + knownPod1 client.Object + knownPod2 client.Object + knownPod3 client.Object + knownPod4 client.Object + knownPod5 client.Object + knownPod6 client.Object ) - pod := &corev1.Pod{ - TypeMeta: metav1.TypeMeta{}, - ObjectMeta: metav1.ObjectMeta{ - Name: "test.pod", - Namespace: testNamespaceOne, - }, - Spec: corev1.PodSpec{ - Containers: []corev1.Container{ - { - Name: "my-cool-container", - Image: "nginx", - }, - }, - }, - Status: corev1.PodStatus{}, - } - pod.SetGroupVersionKind(schema.GroupVersionKind{ - Kind: "Pod", - Version: "v1", - }) - - node := &corev1.Node{ - TypeMeta: metav1.TypeMeta{}, - ObjectMeta: metav1.ObjectMeta{ - Name: "test.node", - Namespace: testNamespaceOne, - }, - Spec: corev1.NodeSpec{}, - Status: corev1.NodeStatus{}, + getTransformValue := func(obj client.Object) string { + accessor, err := meta.Accessor(obj) + if err == nil { + annotations := accessor.GetAnnotations() + if val, exists := annotations["transformed"]; exists { + return val + } + } + return "" } - node.SetGroupVersionKind(schema.GroupVersionKind{ - Kind: "Node", - Version: "v1", - }) BeforeEach(func() { - hitsChan := make(chan struct{}, 10) - - var err error - var transform = func(i interface{}) (interface{}, error) { - select { - case hitsChan <- struct{}{}: - // Notified the hit - default: - // don't block on sending - } - return i, nil - } - transformerHits = hitsChan + informerCacheCtx, informerCacheCancel = context.WithCancel(context.Background()) + Expect(cfg).NotTo(BeNil()) - clien, err = client.New(cfg, client.Options{}) + By("creating three pods") + cl, err := client.New(cfg, client.Options{}) Expect(err).NotTo(HaveOccurred()) - err = ensureNamespace(testNamespaceOne, clien) + err = ensureNode(testNodeOne, cl) + Expect(err).NotTo(HaveOccurred()) + err = ensureNamespace(testNamespaceOne, cl) Expect(err).NotTo(HaveOccurred()) + err = ensureNamespace(testNamespaceTwo, cl) + Expect(err).NotTo(HaveOccurred()) + err = ensureNamespace(testNamespaceThree, cl) + Expect(err).NotTo(HaveOccurred()) + // Includes restart policy since these objects are indexed on this field. + knownPod1 = createPod("test-pod-1", testNamespaceOne, corev1.RestartPolicyNever) + knownPod2 = createPod("test-pod-2", testNamespaceTwo, corev1.RestartPolicyAlways) + knownPod3 = createPodWithLabels("test-pod-3", testNamespaceTwo, corev1.RestartPolicyOnFailure, map[string]string{"common-label": "common"}) + knownPod4 = createPodWithLabels("test-pod-4", testNamespaceThree, corev1.RestartPolicyNever, map[string]string{"common-label": "common"}) + knownPod5 = createPod("test-pod-5", testNamespaceOne, corev1.RestartPolicyNever) + knownPod6 = createPod("test-pod-6", testNamespaceTwo, corev1.RestartPolicyAlways) + + podGVK := schema.GroupVersionKind{ + Kind: "Pod", + Version: "v1", + } + + knownPod1.GetObjectKind().SetGroupVersionKind(podGVK) + knownPod2.GetObjectKind().SetGroupVersionKind(podGVK) + knownPod3.GetObjectKind().SetGroupVersionKind(podGVK) + knownPod4.GetObjectKind().SetGroupVersionKind(podGVK) + knownPod5.GetObjectKind().SetGroupVersionKind(podGVK) + knownPod6.GetObjectKind().SetGroupVersionKind(podGVK) By("creating the informer cache") informerCache, err = cache.New(cfg, cache.Options{ + DefaultTransform: func(i interface{}) (interface{}, error) { + if obj := i.(runtime.Object); obj != nil { + accessor, err := meta.Accessor(obj) + if err == nil { + annotations := accessor.GetAnnotations() + if _, exists := annotations["transformed"]; !exists { + if annotations == nil { + annotations = make(map[string]string) + } + annotations["transformed"] = "default" + accessor.SetAnnotations(annotations) + } + } + + } + return i, nil + }, TransformFuncByObject: cache.TransformFuncByObject{ - pod: transform, + &corev1.Pod{}: func(i interface{}) (interface{}, error) { + if obj := i.(runtime.Object); obj != nil { + accessor, err := meta.Accessor(obj) + if err == nil { + annotations := accessor.GetAnnotations() + if _, exists := annotations["transformed"]; !exists { + if annotations == nil { + annotations = make(map[string]string) + } + annotations["transformed"] = "explicit" + accessor.SetAnnotations(annotations) + } + } + + } + return i, nil + }, }, - DefaultTransform: transform, }) Expect(err).NotTo(HaveOccurred()) - - By("inserting a v1.Pod") - informerCacheCtx, informerCacheCancel = context.WithCancel(context.Background()) - Expect(cfg).NotTo(BeNil()) - - podCopy := pod.DeepCopy() - err = clien.Create(informerCacheCtx, podCopy) - Expect(err).ToNot(HaveOccurred()) - - By("inserting a v1.Node") - nodeCopy := node.DeepCopy() - err = clien.Create(informerCacheCtx, nodeCopy) - Expect(err).ToNot(HaveOccurred()) - - By("running the cache") + By("running the cache and waiting for it to sync") + // pass as an arg so that we don't race between close and re-assign go func(ctx context.Context) { defer GinkgoRecover() Expect(informerCache.Start(ctx)).To(Succeed()) @@ -217,167 +233,103 @@ var _ = Describe("Cache with transformers", func() { }) AfterEach(func() { - err := clien.Delete(informerCacheCtx, pod.DeepCopy()) - Expect(err).To(BeNil()) - err = clien.Delete(informerCacheCtx, node.DeepCopy()) - Expect(err).To(BeNil()) - informerCacheCancel() + By("cleaning up created pods") + deletePod(knownPod1) + deletePod(knownPod2) + deletePod(knownPod3) + deletePod(knownPod4) + deletePod(knownPod5) + deletePod(knownPod6) - // Reset channel for future tests - transformerHits = nil + informerCacheCancel() }) - It("should transform unstructured objects", func() { - By("making sure transform is applied to initial cache listing") - podsList := &unstructured.UnstructuredList{ - Object: map[string]interface{}{ - "apiVersion": "v1", - "kind": "PodList", - }, - } - - err := informerCache.List(informerCacheCtx, podsList) - Expect(err).ToNot(HaveOccurred()) + Context("with structured objects", func() { + It("should apply transformers to explicitly specified GVKS", func() { + By("listing pods") + out := corev1.PodList{} + Expect(informerCache.List(context.Background(), &out)).To(Succeed()) - // Make sure transformer is applied on create and not again until there - // is an event - Eventually(transformerHits).Should(Receive()) - Consistently(transformerHits).Should(BeEmpty()) - - By("making sure transform is applied to updated cache listings") - unstructuredPod := &podsList.Items[0] - unstructuredPod.SetAnnotations(map[string]string{ - "test": "hello", + By("verifying that the returned pods were transformed") + for _, pod := range out.Items { + Expect(getTransformValue(&pod)).To(BeIdenticalTo("explicit")) + } }) - err = clien.Update(informerCacheCtx, unstructuredPod) - Expect(err).ToNot(HaveOccurred()) - - podsList = &unstructured.UnstructuredList{ - Object: map[string]interface{}{ - "apiVersion": "v1", - "kind": "PodList", - }, - } - - err = informerCache.List(informerCacheCtx, podsList) - Expect(err).ToNot(HaveOccurred()) - - Expect(informerCache.WaitForCacheSync(informerCacheCtx)).To(BeTrue()) - - // The transformer should get called after requesting a relisting - Eventually(transformerHits).Should(Receive()) + It("should apply default transformer to objects when none is specified", func() { + By("getting the Kubernetes service") + svc := &corev1.Service{} + svcKey := client.ObjectKey{Namespace: "default", Name: "kubernetes"} + Expect(informerCache.Get(context.Background(), svcKey, svc)).To(Succeed()) - // Make sure transformer does not get hit again when no events are - // expected to fire - Consistently(transformerHits).Should(BeEmpty()) - }) - It("should transform structured objects", func() { - By("making sure transform is applied to initial cache listing") - podsList := &corev1.PodList{} - err := informerCache.List(informerCacheCtx, podsList) - Expect(err).ToNot(HaveOccurred()) - - // Make sure transformer is applied on create - Eventually(transformerHits).Should(Receive()) - Consistently(transformerHits).Should(BeEmpty()) - - By("making sure transform is applied to updated cache listings") - pod := &podsList.Items[0] - pod.SetAnnotations(map[string]string{ - "test": "hello", + By("verifying that the returned service was transformed") + Expect(getTransformValue(svc)).To(BeIdenticalTo("default")) }) - - err = clien.Update(informerCacheCtx, pod) - Expect(err).ToNot(HaveOccurred()) - - podsList = &corev1.PodList{} - err = informerCache.List(informerCacheCtx, podsList) - Expect(err).ToNot(HaveOccurred()) - - // The transformer should get called after requesting a relisting - Eventually(transformerHits).Should(Receive()) - - // Make sure transformer does not get hit again when no events are - // expected to fire - Consistently(transformerHits).Should(BeEmpty()) }) - It("should transform metadata objects", func() { - By("making sure transform is applied to initial cache listing") - podsList := &metav1.PartialObjectMetadataList{ - TypeMeta: metav1.TypeMeta{ - APIVersion: "v1", - Kind: "PodList", - }, - } - err := informerCache.List(informerCacheCtx, podsList) - Expect(err).ToNot(HaveOccurred()) - - // Make sure transformer is applied on create - Eventually(transformerHits).Should(Receive()) - Consistently(transformerHits).Should(BeEmpty()) + Context("with unstructured objects", func() { + It("should apply transformers to explicitly specified GVKS", func() { + By("listing pods") + out := unstructured.UnstructuredList{} + out.SetGroupVersionKind(schema.GroupVersionKind{ + Group: "", + Version: "v1", + Kind: "PodList", + }) + Expect(informerCache.List(context.Background(), &out)).To(Succeed()) - By("making sure transform is applied to updated cache listings") - pod := pod.DeepCopy() - pod.SetAnnotations(map[string]string{ - "test": "hello", + By("verifying that the returned pods were transformed") + for _, pod := range out.Items { + Expect(getTransformValue(&pod)).To(BeIdenticalTo("explicit")) + } }) - err = clien.Update(informerCacheCtx, pod) - Expect(err).ToNot(HaveOccurred()) - - podsList = &metav1.PartialObjectMetadataList{ - TypeMeta: metav1.TypeMeta{ - APIVersion: "v1", - Kind: "PodList", - }, - } - - err = informerCache.List(informerCacheCtx, podsList) - Expect(err).ToNot(HaveOccurred()) - - // The transformer should get called after requesting a relisting - Eventually(transformerHits).Should(Receive()) + It("should apply default transformer to objects when none is specified", func() { + By("getting the Kubernetes service") + svc := &unstructured.Unstructured{} + svc.SetGroupVersionKind(schema.GroupVersionKind{ + Group: "", + Version: "v1", + Kind: "Service", + }) + svcKey := client.ObjectKey{Namespace: "default", Name: "kubernetes"} + Expect(informerCache.Get(context.Background(), svcKey, svc)).To(Succeed()) - // Make sure transformer does not get hit again when no events are - // expected to fire - Eventually(transformerHits).Should(BeEmpty()) - }) - It("should use default transform if none was available", func() { - // Grab a node - // Make sure default transform is used - By("making sure transform is applied to initial cache listing") - nodeList := &corev1.NodeList{} - err := informerCache.List(informerCacheCtx, nodeList) - Expect(err).ToNot(HaveOccurred()) - - Expect(nodeList.Items).To(HaveLen(1)) - - // Make sure transformer is applied on create - Eventually(transformerHits).Should(HaveLen(1)) - Eventually(transformerHits).Should(Receive()) - Consistently(transformerHits).Should(BeEmpty()) - - By("making sure transform is applied to updated cache listings") - pod := &nodeList.Items[0] - pod.SetAnnotations(map[string]string{ - "test": "hello", + By("verifying that the returned service was transformed") + Expect(getTransformValue(svc)).To(BeIdenticalTo("default")) }) + }) - err = clien.Update(informerCacheCtx, pod) - Expect(err).ToNot(HaveOccurred()) - - nodeList = &corev1.NodeList{} - err = informerCache.List(informerCacheCtx, nodeList) - Expect(err).ToNot(HaveOccurred()) + Context("with metadata-only objects", func() { + It("should apply transformers to explicitly specified GVKS", func() { + By("listing pods") + out := metav1.PartialObjectMetadataList{} + out.SetGroupVersionKind(schema.GroupVersionKind{ + Group: "", + Version: "v1", + Kind: "PodList", + }) + Expect(informerCache.List(context.Background(), &out)).To(Succeed()) - // The transformer should get called after requesting a relisting - Eventually(transformerHits).Should(Receive()) + By("verifying that the returned pods were transformed") + for _, pod := range out.Items { + Expect(getTransformValue(&pod)).To(BeIdenticalTo("explicit")) + } + }) + It("should apply default transformer to objects when none is specified", func() { + By("getting the Kubernetes service") + svc := &metav1.PartialObjectMetadata{} + svc.SetGroupVersionKind(schema.GroupVersionKind{ + Group: "", + Version: "v1", + Kind: "Service", + }) + svcKey := client.ObjectKey{Namespace: "default", Name: "kubernetes"} + Expect(informerCache.Get(context.Background(), svcKey, svc)).To(Succeed()) - // Make sure transformer does not get hit again when no events are - // expected to fire - Consistently(transformerHits).Should(BeEmpty()) + By("verifying that the returned service was transformed") + Expect(getTransformValue(svc)).To(BeIdenticalTo("default")) + }) }) })