diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index 0c1d265069..2dd002ef56 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -82,6 +82,8 @@ type Informer interface { // AddIndexers adds more indexers to this store. If you call this after you already have data // in the store, the results are undefined. AddIndexers(indexers toolscache.Indexers) error + //HasSynced return true if the informers underlying store has synced + HasSynced() bool } // Options are the optional arguments for creating a new InformersMap object diff --git a/pkg/cache/cache_test.go b/pkg/cache/cache_test.go index 08d98860cb..eed63a1f99 100644 --- a/pkg/cache/cache_test.go +++ b/pkg/cache/cache_test.go @@ -29,6 +29,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" kscheme "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest" kcache "k8s.io/client-go/tools/cache" "sigs.k8s.io/controller-runtime/pkg/cache" @@ -71,582 +72,578 @@ func deletePod(pod runtime.Object) { } var _ = Describe("Informer Cache", func() { + CacheTest(cache.New) +}) - var ( - informerCache cache.Cache - stop chan struct{} - knownPod1 runtime.Object - knownPod2 runtime.Object - knownPod3 runtime.Object - ) - - BeforeEach(func() { - stop = make(chan struct{}) - Expect(cfg).NotTo(BeNil()) - - By("creating three pods") - // Includes restart policy since these objects are indexed on this field. - knownPod1 = createPod("test-pod-1", testNamespaceOne, kcorev1.RestartPolicyNever) - knownPod2 = createPod("test-pod-2", testNamespaceTwo, kcorev1.RestartPolicyAlways) - knownPod3 = createPod("test-pod-3", testNamespaceTwo, kcorev1.RestartPolicyOnFailure) - podGVK := schema.GroupVersionKind{ - Kind: "Pod", - Version: "v1", - } - knownPod1.GetObjectKind().SetGroupVersionKind(podGVK) - knownPod2.GetObjectKind().SetGroupVersionKind(podGVK) - knownPod3.GetObjectKind().SetGroupVersionKind(podGVK) - - By("creating the informer cache") - var err error - informerCache, err = cache.New(cfg, cache.Options{}) - Expect(err).NotTo(HaveOccurred()) - By("running the cache and waiting for it to sync") - go func() { - defer GinkgoRecover() - Expect(informerCache.Start(stop)).To(Succeed()) - }() - Expect(informerCache.WaitForCacheSync(stop)).To(BeTrue()) - }) +// nolint: gocyclo +func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (cache.Cache, error)) { + Describe("Cache test", func() { + var ( + informerCache cache.Cache + stop chan struct{} + knownPod1 runtime.Object + knownPod2 runtime.Object + knownPod3 runtime.Object + ) + + BeforeEach(func() { + stop = make(chan struct{}) + Expect(cfg).NotTo(BeNil()) + + By("creating three pods") + // Includes restart policy since these objects are indexed on this field. + knownPod1 = createPod("test-pod-1", testNamespaceOne, kcorev1.RestartPolicyNever) + knownPod2 = createPod("test-pod-2", testNamespaceTwo, kcorev1.RestartPolicyAlways) + knownPod3 = createPod("test-pod-3", testNamespaceTwo, kcorev1.RestartPolicyOnFailure) + podGVK := schema.GroupVersionKind{ + Kind: "Pod", + Version: "v1", + } + knownPod1.GetObjectKind().SetGroupVersionKind(podGVK) + knownPod2.GetObjectKind().SetGroupVersionKind(podGVK) + knownPod3.GetObjectKind().SetGroupVersionKind(podGVK) + + By("creating the informer cache") + var err error + informerCache, err = createCacheFunc(cfg, cache.Options{}) + Expect(err).NotTo(HaveOccurred()) + By("running the cache and waiting for it to sync") + go func() { + defer GinkgoRecover() + Expect(informerCache.Start(stop)).To(Succeed()) + }() + Expect(informerCache.WaitForCacheSync(stop)).To(BeTrue()) + }) - AfterEach(func() { - By("cleaning up created pods") - deletePod(knownPod1) - deletePod(knownPod2) - deletePod(knownPod3) + AfterEach(func() { + By("cleaning up created pods") + deletePod(knownPod1) + deletePod(knownPod2) + deletePod(knownPod3) - close(stop) - }) + close(stop) + }) - Describe("as a Reader", func() { - Context("with structured objects", func() { - - It("should be able to list objects that haven't been watched previously", func() { - By("listing all services in the cluster") - listObj := &kcorev1.ServiceList{} - Expect(informerCache.List(context.Background(), listObj)).To(Succeed()) - - By("verifying that the returned list contains the Kubernetes service") - // NB: kubernetes default service is automatically created in testenv. - Expect(listObj.Items).NotTo(BeEmpty()) - hasKubeService := false - for _, svc := range listObj.Items { - if svc.Namespace == "default" && svc.Name == "kubernetes" { - hasKubeService = true - break + Describe("as a Reader", func() { + Context("with structured objects", func() { + + It("should be able to list objects that haven't been watched previously", func() { + By("listing all services in the cluster") + listObj := &kcorev1.ServiceList{} + Expect(informerCache.List(context.Background(), listObj)).To(Succeed()) + + By("verifying that the returned list contains the Kubernetes service") + // NB: kubernetes default service is automatically created in testenv. + Expect(listObj.Items).NotTo(BeEmpty()) + hasKubeService := false + for _, svc := range listObj.Items { + if svc.Namespace == "default" && svc.Name == "kubernetes" { + hasKubeService = true + break + } } - } - Expect(hasKubeService).To(BeTrue()) - }) - - It("should be able to get objects that haven't been watched previously", func() { - By("getting the Kubernetes service") - svc := &kcorev1.Service{} - svcKey := client.ObjectKey{Namespace: "default", Name: "kubernetes"} - Expect(informerCache.Get(context.Background(), svcKey, svc)).To(Succeed()) - - By("verifying that the returned service looks reasonable") - Expect(svc.Name).To(Equal("kubernetes")) - Expect(svc.Namespace).To(Equal("default")) - }) - - It("should support filtering by labels in a single namespace", func() { - By("listing pods with a particular label") - // NB: each pod has a "test-label": - out := kcorev1.PodList{} - Expect(informerCache.List(context.Background(), &out, - client.InNamespace(testNamespaceTwo), - client.MatchingLabels(map[string]string{"test-label": "test-pod-2"}))).To(Succeed()) - - By("verifying the returned pods have the correct label") - Expect(out.Items).NotTo(BeEmpty()) - Expect(out.Items).Should(HaveLen(1)) - actual := out.Items[0] - Expect(actual.Labels["test-label"]).To(Equal("test-pod-2")) - }) + Expect(hasKubeService).To(BeTrue()) + }) - It("should support filtering by labels from multiple namespaces", func() { - By("creating another pod with the same label but different namespace") - anotherPod := createPod("test-pod-2", testNamespaceOne, kcorev1.RestartPolicyAlways) + It("should be able to get objects that haven't been watched previously", func() { + By("getting the Kubernetes service") + svc := &kcorev1.Service{} + svcKey := client.ObjectKey{Namespace: "default", Name: "kubernetes"} + Expect(informerCache.Get(context.Background(), svcKey, svc)).To(Succeed()) - By("listing pods with a particular label") - // NB: each pod has a "test-label": - out := kcorev1.PodList{} - labels := map[string]string{"test-label": "test-pod-2"} - Expect(informerCache.List(context.Background(), &out, client.MatchingLabels(labels))).To(Succeed()) + By("verifying that the returned service looks reasonable") + Expect(svc.Name).To(Equal("kubernetes")) + Expect(svc.Namespace).To(Equal("default")) + }) - By("verifying multiple pods with the same label in different namespaces are returned") - Expect(out.Items).NotTo(BeEmpty()) - Expect(out.Items).Should(HaveLen(2)) - for _, actual := range out.Items { + It("should support filtering by labels in a single namespace", func() { + By("listing pods with a particular label") + // NB: each pod has a "test-label": + out := kcorev1.PodList{} + Expect(informerCache.List(context.Background(), &out, + client.InNamespace(testNamespaceTwo), + client.MatchingLabels(map[string]string{"test-label": "test-pod-2"}))).To(Succeed()) + + By("verifying the returned pods have the correct label") + Expect(out.Items).NotTo(BeEmpty()) + Expect(out.Items).Should(HaveLen(1)) + actual := out.Items[0] Expect(actual.Labels["test-label"]).To(Equal("test-pod-2")) - } - - deletePod(anotherPod) - }) - - It("should be able to list objects by namespace", func() { - By("listing pods in test-namespace-1") - listObj := &kcorev1.PodList{} - Expect(informerCache.List(context.Background(), listObj, - client.InNamespace(testNamespaceOne))).To(Succeed()) - - By("verifying that the returned pods are in test-namespace-1") - Expect(listObj.Items).NotTo(BeEmpty()) - Expect(listObj.Items).Should(HaveLen(1)) - actual := listObj.Items[0] - Expect(actual.Namespace).To(Equal(testNamespaceOne)) - }) - - It("should be able to restrict cache to a namespace", func() { - By("creating a namespaced cache") - namespacedCache, err := cache.New(cfg, cache.Options{Namespace: testNamespaceOne}) - Expect(err).NotTo(HaveOccurred()) - - By("running the cache and waiting for it to sync") - go func() { - defer GinkgoRecover() - Expect(namespacedCache.Start(stop)).To(Succeed()) - }() - Expect(namespacedCache.WaitForCacheSync(stop)).NotTo(BeFalse()) - - By("listing pods in all namespaces") - out := &kcorev1.PodList{} - Expect(namespacedCache.List(context.Background(), out)).To(Succeed()) - - By("verifying the returned pod is from the watched namespace") - Expect(out.Items).NotTo(BeEmpty()) - Expect(out.Items).Should(HaveLen(1)) - Expect(out.Items[0].Namespace).To(Equal(testNamespaceOne)) - - By("listing all namespaces - should still be able to get a cluster-scoped resource") - namespaceList := &kcorev1.NamespaceList{} - Expect(namespacedCache.List(context.Background(), namespaceList)).To(Succeed()) - - By("verifying the namespace list is not empty") - Expect(namespaceList.Items).NotTo(BeEmpty()) - }) + }) - It("should deep copy the object unless told otherwise", func() { - By("retrieving a specific pod from the cache") - out := &kcorev1.Pod{} - podKey := client.ObjectKey{Name: "test-pod-2", Namespace: testNamespaceTwo} - Expect(informerCache.Get(context.Background(), podKey, out)).To(Succeed()) + It("should support filtering by labels from multiple namespaces", func() { + By("creating another pod with the same label but different namespace") + anotherPod := createPod("test-pod-2", testNamespaceOne, kcorev1.RestartPolicyAlways) + + By("listing pods with a particular label") + // NB: each pod has a "test-label": + out := kcorev1.PodList{} + labels := map[string]string{"test-label": "test-pod-2"} + Expect(informerCache.List(context.Background(), &out, client.MatchingLabels(labels))).To(Succeed()) + + By("verifying multiple pods with the same label in different namespaces are returned") + Expect(out.Items).NotTo(BeEmpty()) + Expect(out.Items).Should(HaveLen(2)) + for _, actual := range out.Items { + Expect(actual.Labels["test-label"]).To(Equal("test-pod-2")) + } - By("verifying the retrieved pod is equal to a known pod") - Expect(out).To(Equal(knownPod2)) + deletePod(anotherPod) + }) - By("altering a field in the retrieved pod") - *out.Spec.ActiveDeadlineSeconds = 4 + It("should be able to list objects by namespace", func() { + By("listing pods in test-namespace-1") + listObj := &kcorev1.PodList{} + Expect(informerCache.List(context.Background(), listObj, + client.InNamespace(testNamespaceOne))).To(Succeed()) + + By("verifying that the returned pods are in test-namespace-1") + Expect(listObj.Items).NotTo(BeEmpty()) + Expect(listObj.Items).Should(HaveLen(1)) + actual := listObj.Items[0] + Expect(actual.Namespace).To(Equal(testNamespaceOne)) + }) - By("verifying the pods are no longer equal") - Expect(out).NotTo(Equal(knownPod2)) - }) + It("should deep copy the object unless told otherwise", func() { + By("retrieving a specific pod from the cache") + out := &kcorev1.Pod{} + podKey := client.ObjectKey{Name: "test-pod-2", Namespace: testNamespaceTwo} + Expect(informerCache.Get(context.Background(), podKey, out)).To(Succeed()) - It("should return an error if the object is not found", func() { - By("getting a service that does not exists") - svc := &kcorev1.Service{} - svcKey := client.ObjectKey{Namespace: "unknown", Name: "unknown"} + By("verifying the retrieved pod is equal to a known pod") + Expect(out).To(Equal(knownPod2)) - By("verifying that an error is returned") - err := informerCache.Get(context.Background(), svcKey, svc) - Expect(err).To(HaveOccurred()) - Expect(errors.IsNotFound(err)).To(BeTrue()) - }) - }) - Context("with unstructured objects", func() { - It("should be able to list objects that haven't been watched previously", func() { - By("listing all services in the cluster") - listObj := &unstructured.UnstructuredList{} - listObj.SetGroupVersionKind(schema.GroupVersionKind{ - Group: "", - Version: "v1", - Kind: "ServiceList", - }) - err := informerCache.List(context.Background(), listObj) - Expect(err).To(Succeed()) - - By("verifying that the returned list contains the Kubernetes service") - // NB: kubernetes default service is automatically created in testenv. - Expect(listObj.Items).NotTo(BeEmpty()) - hasKubeService := false - for _, svc := range listObj.Items { - if svc.GetNamespace() == "default" && svc.GetName() == "kubernetes" { - hasKubeService = true - break - } - } - Expect(hasKubeService).To(BeTrue()) - }) + By("altering a field in the retrieved pod") + *out.Spec.ActiveDeadlineSeconds = 4 - It("should be able to get objects that haven't been watched previously", func() { - By("getting the Kubernetes service") - svc := &unstructured.Unstructured{} - svc.SetGroupVersionKind(schema.GroupVersionKind{ - Group: "", - Version: "v1", - Kind: "Service", + By("verifying the pods are no longer equal") + Expect(out).NotTo(Equal(knownPod2)) }) - svcKey := client.ObjectKey{Namespace: "default", Name: "kubernetes"} - Expect(informerCache.Get(context.Background(), svcKey, svc)).To(Succeed()) - By("verifying that the returned service looks reasonable") - Expect(svc.GetName()).To(Equal("kubernetes")) - Expect(svc.GetNamespace()).To(Equal("default")) - }) + It("should return an error if the object is not found", func() { + By("getting a service that does not exists") + svc := &kcorev1.Service{} + svcKey := client.ObjectKey{Namespace: testNamespaceOne, Name: "unknown"} - It("should support filtering by labels in a single namespace", func() { - By("listing pods with a particular label") - // NB: each pod has a "test-label": - out := unstructured.UnstructuredList{} - out.SetGroupVersionKind(schema.GroupVersionKind{ - Group: "", - Version: "v1", - Kind: "PodList", + By("verifying that an error is returned") + err := informerCache.Get(context.Background(), svcKey, svc) + Expect(err).To(HaveOccurred()) + Expect(errors.IsNotFound(err)).To(BeTrue()) }) - err := informerCache.List(context.Background(), &out, - client.InNamespace(testNamespaceTwo), - client.MatchingLabels(map[string]string{"test-label": "test-pod-2"})) - Expect(err).To(Succeed()) - - By("verifying the returned pods have the correct label") - Expect(out.Items).NotTo(BeEmpty()) - Expect(out.Items).Should(HaveLen(1)) - actual := out.Items[0] - Expect(actual.GetLabels()["test-label"]).To(Equal("test-pod-2")) - }) - It("should support filtering by labels from multiple namespaces", func() { - By("creating another pod with the same label but different namespace") - anotherPod := createPod("test-pod-2", testNamespaceOne, kcorev1.RestartPolicyAlways) - - By("listing pods with a particular label") - // NB: each pod has a "test-label": - out := unstructured.UnstructuredList{} - out.SetGroupVersionKind(schema.GroupVersionKind{ - Group: "", - Version: "v1", - Kind: "PodList", - }) - labels := map[string]string{"test-label": "test-pod-2"} - err := informerCache.List(context.Background(), &out, client.MatchingLabels(labels)) - Expect(err).To(Succeed()) - - By("verifying multiple pods with the same label in different namespaces are returned") - Expect(out.Items).NotTo(BeEmpty()) - Expect(out.Items).Should(HaveLen(2)) - for _, actual := range out.Items { - Expect(actual.GetLabels()["test-label"]).To(Equal("test-pod-2")) - } + It("should return an error if getting object in unwatched namespace", func() { + By("getting a service that does not exists") + svc := &kcorev1.Service{} + svcKey := client.ObjectKey{Namespace: "unknown", Name: "unknown"} - deletePod(anotherPod) - }) - - It("should be able to list objects by namespace", func() { - By("listing pods in test-namespace-1") - listObj := &unstructured.UnstructuredList{} - listObj.SetGroupVersionKind(schema.GroupVersionKind{ - Group: "", - Version: "v1", - Kind: "PodList", + By("verifying that an error is returned") + err := informerCache.Get(context.Background(), svcKey, svc) + Expect(err).To(HaveOccurred()) }) - err := informerCache.List(context.Background(), listObj, client.InNamespace(testNamespaceOne)) - Expect(err).To(Succeed()) - - By("verifying that the returned pods are in test-namespace-1") - Expect(listObj.Items).NotTo(BeEmpty()) - Expect(listObj.Items).Should(HaveLen(1)) - actual := listObj.Items[0] - Expect(actual.GetNamespace()).To(Equal(testNamespaceOne)) }) - - It("should be able to restrict cache to a namespace", func() { - By("creating a namespaced cache") - namespacedCache, err := cache.New(cfg, cache.Options{Namespace: testNamespaceOne}) - Expect(err).NotTo(HaveOccurred()) - - By("running the cache and waiting for it to sync") - go func() { - defer GinkgoRecover() - Expect(namespacedCache.Start(stop)).To(Succeed()) - }() - Expect(namespacedCache.WaitForCacheSync(stop)).NotTo(BeFalse()) - - By("listing pods in all namespaces") - out := &unstructured.UnstructuredList{} - out.SetGroupVersionKind(schema.GroupVersionKind{ - Group: "", - Version: "v1", - Kind: "PodList", + Context("with unstructured objects", func() { + It("should be able to list objects that haven't been watched previously", func() { + By("listing all services in the cluster") + listObj := &unstructured.UnstructuredList{} + listObj.SetGroupVersionKind(schema.GroupVersionKind{ + Group: "", + Version: "v1", + Kind: "ServiceList", + }) + err := informerCache.List(context.Background(), listObj) + Expect(err).To(Succeed()) + + By("verifying that the returned list contains the Kubernetes service") + // NB: kubernetes default service is automatically created in testenv. + Expect(listObj.Items).NotTo(BeEmpty()) + hasKubeService := false + for _, svc := range listObj.Items { + if svc.GetNamespace() == "default" && svc.GetName() == "kubernetes" { + hasKubeService = true + break + } + } + Expect(hasKubeService).To(BeTrue()) }) - Expect(namespacedCache.List(context.Background(), out)).To(Succeed()) - - By("verifying the returned pod is from the watched namespace") - Expect(out.Items).NotTo(BeEmpty()) - Expect(out.Items).Should(HaveLen(1)) - Expect(out.Items[0].GetNamespace()).To(Equal(testNamespaceOne)) - - By("listing all namespaces - should still be able to get a cluster-scoped resource") - namespaceList := &unstructured.UnstructuredList{} - namespaceList.SetGroupVersionKind(schema.GroupVersionKind{ - Group: "", - Version: "v1", - Kind: "NamespaceList", + It("should be able to get objects that haven't been watched previously", func() { + By("getting the Kubernetes service") + svc := &unstructured.Unstructured{} + svc.SetGroupVersionKind(schema.GroupVersionKind{ + Group: "", + Version: "v1", + Kind: "Service", + }) + svcKey := client.ObjectKey{Namespace: "default", Name: "kubernetes"} + Expect(informerCache.Get(context.Background(), svcKey, svc)).To(Succeed()) + + By("verifying that the returned service looks reasonable") + Expect(svc.GetName()).To(Equal("kubernetes")) + Expect(svc.GetNamespace()).To(Equal("default")) }) - Expect(namespacedCache.List(context.Background(), namespaceList)).To(Succeed()) - By("verifying the namespace list is not empty") - Expect(namespaceList.Items).NotTo(BeEmpty()) - }) - - It("should deep copy the object unless told otherwise", func() { - By("retrieving a specific pod from the cache") - out := &unstructured.Unstructured{} - out.SetGroupVersionKind(schema.GroupVersionKind{ - Group: "", - Version: "v1", - Kind: "Pod", + It("should support filtering by labels in a single namespace", func() { + By("listing pods with a particular label") + // NB: each pod has a "test-label": + out := unstructured.UnstructuredList{} + out.SetGroupVersionKind(schema.GroupVersionKind{ + Group: "", + Version: "v1", + Kind: "PodList", + }) + err := informerCache.List(context.Background(), &out, + client.InNamespace(testNamespaceTwo), + client.MatchingLabels(map[string]string{"test-label": "test-pod-2"})) + Expect(err).To(Succeed()) + + By("verifying the returned pods have the correct label") + Expect(out.Items).NotTo(BeEmpty()) + Expect(out.Items).Should(HaveLen(1)) + actual := out.Items[0] + Expect(actual.GetLabels()["test-label"]).To(Equal("test-pod-2")) }) - uKnownPod2 := &unstructured.Unstructured{} - kscheme.Scheme.Convert(knownPod2, uKnownPod2, nil) - podKey := client.ObjectKey{Name: "test-pod-2", Namespace: testNamespaceTwo} - Expect(informerCache.Get(context.Background(), podKey, out)).To(Succeed()) + It("should support filtering by labels from multiple namespaces", func() { + By("creating another pod with the same label but different namespace") + anotherPod := createPod("test-pod-2", testNamespaceOne, kcorev1.RestartPolicyAlways) + + By("listing pods with a particular label") + // NB: each pod has a "test-label": + out := unstructured.UnstructuredList{} + out.SetGroupVersionKind(schema.GroupVersionKind{ + Group: "", + Version: "v1", + Kind: "PodList", + }) + labels := map[string]string{"test-label": "test-pod-2"} + err := informerCache.List(context.Background(), &out, client.MatchingLabels(labels)) + Expect(err).To(Succeed()) + + By("verifying multiple pods with the same label in different namespaces are returned") + Expect(out.Items).NotTo(BeEmpty()) + Expect(out.Items).Should(HaveLen(2)) + for _, actual := range out.Items { + Expect(actual.GetLabels()["test-label"]).To(Equal("test-pod-2")) + } - By("verifying the retrieved pod is equal to a known pod") - Expect(out).To(Equal(uKnownPod2)) + deletePod(anotherPod) + }) - By("altering a field in the retrieved pod") - m, _ := out.Object["spec"].(map[string]interface{}) - m["activeDeadlineSeconds"] = 4 + It("should be able to list objects by namespace", func() { + By("listing pods in test-namespace-1") + listObj := &unstructured.UnstructuredList{} + listObj.SetGroupVersionKind(schema.GroupVersionKind{ + Group: "", + Version: "v1", + Kind: "PodList", + }) + err := informerCache.List(context.Background(), listObj, client.InNamespace(testNamespaceOne)) + Expect(err).To(Succeed()) + + By("verifying that the returned pods are in test-namespace-1") + Expect(listObj.Items).NotTo(BeEmpty()) + Expect(listObj.Items).Should(HaveLen(1)) + actual := listObj.Items[0] + Expect(actual.GetNamespace()).To(Equal(testNamespaceOne)) + }) - By("verifying the pods are no longer equal") - Expect(out).NotTo(Equal(knownPod2)) - }) + It("should be able to restrict cache to a namespace", func() { + By("creating a namespaced cache") + namespacedCache, err := cache.New(cfg, cache.Options{Namespace: testNamespaceOne}) + Expect(err).NotTo(HaveOccurred()) + + By("running the cache and waiting for it to sync") + go func() { + defer GinkgoRecover() + Expect(namespacedCache.Start(stop)).To(Succeed()) + }() + Expect(namespacedCache.WaitForCacheSync(stop)).NotTo(BeFalse()) + + By("listing pods in all namespaces") + out := &unstructured.UnstructuredList{} + out.SetGroupVersionKind(schema.GroupVersionKind{ + Group: "", + Version: "v1", + Kind: "PodList", + }) + Expect(namespacedCache.List(context.Background(), out)).To(Succeed()) + + By("verifying the returned pod is from the watched namespace") + Expect(out.Items).NotTo(BeEmpty()) + Expect(out.Items).Should(HaveLen(1)) + Expect(out.Items[0].GetNamespace()).To(Equal(testNamespaceOne)) + + By("listing all namespaces - should still be able to get a cluster-scoped resource") + namespaceList := &unstructured.UnstructuredList{} + namespaceList.SetGroupVersionKind(schema.GroupVersionKind{ + Group: "", + Version: "v1", + Kind: "NamespaceList", + }) + Expect(namespacedCache.List(context.Background(), namespaceList)).To(Succeed()) + + By("verifying the namespace list is not empty") + Expect(namespaceList.Items).NotTo(BeEmpty()) + }) - It("should return an error if the object is not found", func() { - By("getting a service that does not exists") - svc := &unstructured.Unstructured{} - svc.SetGroupVersionKind(schema.GroupVersionKind{ - Group: "", - Version: "v1", - Kind: "Service", + It("should deep copy the object unless told otherwise", func() { + By("retrieving a specific pod from the cache") + out := &unstructured.Unstructured{} + out.SetGroupVersionKind(schema.GroupVersionKind{ + Group: "", + Version: "v1", + Kind: "Pod", + }) + uKnownPod2 := &unstructured.Unstructured{} + kscheme.Scheme.Convert(knownPod2, uKnownPod2, nil) + + podKey := client.ObjectKey{Name: "test-pod-2", Namespace: testNamespaceTwo} + Expect(informerCache.Get(context.Background(), podKey, out)).To(Succeed()) + + By("verifying the retrieved pod is equal to a known pod") + Expect(out).To(Equal(uKnownPod2)) + + By("altering a field in the retrieved pod") + m, _ := out.Object["spec"].(map[string]interface{}) + m["activeDeadlineSeconds"] = 4 + + By("verifying the pods are no longer equal") + Expect(out).NotTo(Equal(knownPod2)) }) - svcKey := client.ObjectKey{Namespace: "unknown", Name: "unknown"} - By("verifying that an error is returned") - err := informerCache.Get(context.Background(), svcKey, svc) - Expect(err).To(HaveOccurred()) - Expect(errors.IsNotFound(err)).To(BeTrue()) + It("should return an error if the object is not found", func() { + By("getting a service that does not exists") + svc := &unstructured.Unstructured{} + svc.SetGroupVersionKind(schema.GroupVersionKind{ + Group: "", + Version: "v1", + Kind: "Service", + }) + svcKey := client.ObjectKey{Namespace: testNamespaceOne, Name: "unknown"} + + By("verifying that an error is returned") + err := informerCache.Get(context.Background(), svcKey, svc) + Expect(err).To(HaveOccurred()) + Expect(errors.IsNotFound(err)).To(BeTrue()) + }) + It("should return an error if getting object in unwatched namespace", func() { + By("getting a service that does not exists") + svc := &kcorev1.Service{} + svcKey := client.ObjectKey{Namespace: "unknown", Name: "unknown"} + + By("verifying that an error is returned") + err := informerCache.Get(context.Background(), svcKey, svc) + Expect(err).To(HaveOccurred()) + }) }) }) - }) - - Describe("as an Informer", func() { - Context("with structured objects", func() { - It("should be able to get informer for the object", func(done Done) { - By("getting a shared index informer for a pod") - pod := &kcorev1.Pod{ - ObjectMeta: kmetav1.ObjectMeta{ - Name: "informer-obj", - Namespace: "default", - }, - Spec: kcorev1.PodSpec{ - Containers: []kcorev1.Container{ - { - Name: "nginx", - Image: "nginx", + Describe("as an Informer", func() { + Context("with structured objects", func() { + It("should be able to get informer for the object", func(done Done) { + By("getting a shared index informer for a pod") + pod := &kcorev1.Pod{ + ObjectMeta: kmetav1.ObjectMeta{ + Name: "informer-obj", + Namespace: "default", + }, + Spec: kcorev1.PodSpec{ + Containers: []kcorev1.Container{ + { + Name: "nginx", + Image: "nginx", + }, }, }, - }, - } - sii, err := informerCache.GetInformer(pod) - Expect(err).NotTo(HaveOccurred()) - Expect(sii).NotTo(BeNil()) - Expect(sii.HasSynced()).To(BeTrue()) - - By("adding an event handler listening for object creation which sends the object to a channel") - out := make(chan interface{}) - addFunc := func(obj interface{}) { - out <- obj - } - sii.AddEventHandler(kcache.ResourceEventHandlerFuncs{AddFunc: addFunc}) - - By("adding an object") - cl, err := client.New(cfg, client.Options{}) - Expect(err).NotTo(HaveOccurred()) - Expect(cl.Create(context.Background(), pod)).To(Succeed()) - - By("verifying the object is received on the channel") - Eventually(out).Should(Receive(Equal(pod))) - close(done) - }) + } + sii, err := informerCache.GetInformer(pod) + Expect(err).NotTo(HaveOccurred()) + Expect(sii).NotTo(BeNil()) + Expect(sii.HasSynced()).To(BeTrue()) + + By("adding an event handler listening for object creation which sends the object to a channel") + out := make(chan interface{}) + addFunc := func(obj interface{}) { + out <- obj + } + sii.AddEventHandler(kcache.ResourceEventHandlerFuncs{AddFunc: addFunc}) + + By("adding an object") + cl, err := client.New(cfg, client.Options{}) + Expect(err).NotTo(HaveOccurred()) + Expect(cl.Create(context.Background(), pod)).To(Succeed()) + defer deletePod(pod) - // TODO: Add a test for when GVK is not in Scheme. Does code support informer for unstructured object? - It("should be able to get an informer by group/version/kind", func(done Done) { - By("getting an shared index informer for gvk = core/v1/pod") - gvk := schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Pod"} - sii, err := informerCache.GetInformerForKind(gvk) - Expect(err).NotTo(HaveOccurred()) - Expect(sii).NotTo(BeNil()) - Expect(sii.HasSynced()).To(BeTrue()) - - By("adding an event handler listening for object creation which sends the object to a channel") - out := make(chan interface{}) - addFunc := func(obj interface{}) { - out <- obj - } - sii.AddEventHandler(kcache.ResourceEventHandlerFuncs{AddFunc: addFunc}) - - By("adding an object") - cl, err := client.New(cfg, client.Options{}) - Expect(err).NotTo(HaveOccurred()) - pod := &kcorev1.Pod{ - ObjectMeta: kmetav1.ObjectMeta{ - Name: "informer-gvk", - Namespace: "default", - }, - Spec: kcorev1.PodSpec{ - Containers: []kcorev1.Container{ - { - Name: "nginx", - Image: "nginx", + By("verifying the object is received on the channel") + Eventually(out).Should(Receive(Equal(pod))) + close(done) + }) + // TODO: Add a test for when GVK is not in Scheme. Does code support informer for unstructured object? + It("should be able to get an informer by group/version/kind", func(done Done) { + By("getting an shared index informer for gvk = core/v1/pod") + gvk := schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Pod"} + sii, err := informerCache.GetInformerForKind(gvk) + Expect(err).NotTo(HaveOccurred()) + Expect(sii).NotTo(BeNil()) + Expect(sii.HasSynced()).To(BeTrue()) + + By("adding an event handler listening for object creation which sends the object to a channel") + out := make(chan interface{}) + addFunc := func(obj interface{}) { + out <- obj + } + sii.AddEventHandler(kcache.ResourceEventHandlerFuncs{AddFunc: addFunc}) + + By("adding an object") + cl, err := client.New(cfg, client.Options{}) + Expect(err).NotTo(HaveOccurred()) + pod := &kcorev1.Pod{ + ObjectMeta: kmetav1.ObjectMeta{ + Name: "informer-gvk", + Namespace: "default", + }, + Spec: kcorev1.PodSpec{ + Containers: []kcorev1.Container{ + { + Name: "nginx", + Image: "nginx", + }, }, }, - }, - } - Expect(cl.Create(context.Background(), pod)).To(Succeed()) + } + Expect(cl.Create(context.Background(), pod)).To(Succeed()) + defer deletePod(pod) - By("verifying the object is received on the channel") - Eventually(out).Should(Receive(Equal(pod))) - close(done) - }) + By("verifying the object is received on the channel") + Eventually(out).Should(Receive(Equal(pod))) + close(done) + }) - It("should be able to index an object field then retrieve objects by that field", func() { - By("creating the cache") - informer, err := cache.New(cfg, cache.Options{}) - Expect(err).NotTo(HaveOccurred()) - - By("indexing the restartPolicy field of the Pod object before starting") - pod := &kcorev1.Pod{} - indexFunc := func(obj runtime.Object) []string { - return []string{string(obj.(*kcorev1.Pod).Spec.RestartPolicy)} - } - Expect(informer.IndexField(pod, "spec.restartPolicy", indexFunc)).To(Succeed()) - - By("running the cache and waiting for it to sync") - go func() { - defer GinkgoRecover() - Expect(informer.Start(stop)).To(Succeed()) - }() - Expect(informer.WaitForCacheSync(stop)).NotTo(BeFalse()) - - By("listing Pods with restartPolicyOnFailure") - listObj := &kcorev1.PodList{} - Expect(informer.List(context.Background(), listObj, - client.MatchingField("spec.restartPolicy", "OnFailure"))).To(Succeed()) - - By("verifying that the returned pods have correct restart policy") - Expect(listObj.Items).NotTo(BeEmpty()) - Expect(listObj.Items).Should(HaveLen(1)) - actual := listObj.Items[0] - Expect(actual.Name).To(Equal("test-pod-3")) + It("should be able to index an object field then retrieve objects by that field", func() { + By("creating the cache") + informer, err := cache.New(cfg, cache.Options{}) + Expect(err).NotTo(HaveOccurred()) + + By("indexing the restartPolicy field of the Pod object before starting") + pod := &kcorev1.Pod{} + indexFunc := func(obj runtime.Object) []string { + return []string{string(obj.(*kcorev1.Pod).Spec.RestartPolicy)} + } + Expect(informer.IndexField(pod, "spec.restartPolicy", indexFunc)).To(Succeed()) + + By("running the cache and waiting for it to sync") + go func() { + defer GinkgoRecover() + Expect(informer.Start(stop)).To(Succeed()) + }() + Expect(informer.WaitForCacheSync(stop)).NotTo(BeFalse()) + + By("listing Pods with restartPolicyOnFailure") + listObj := &kcorev1.PodList{} + Expect(informer.List(context.Background(), listObj, + client.MatchingField("spec.restartPolicy", "OnFailure"))).To(Succeed()) + + By("verifying that the returned pods have correct restart policy") + Expect(listObj.Items).NotTo(BeEmpty()) + Expect(listObj.Items).Should(HaveLen(1)) + actual := listObj.Items[0] + Expect(actual.Name).To(Equal("test-pod-3")) + }) }) - }) - Context("with unstructured objects", func() { - It("should be able to get informer for the object", func(done Done) { - By("getting a shared index informer for a pod") - - pod := &unstructured.Unstructured{ - Object: map[string]interface{}{ - "spec": map[string]interface{}{ - "containers": []map[string]interface{}{ - map[string]interface{}{ - "name": "nginx", - "image": "nginx", + Context("with unstructured objects", func() { + It("should be able to get informer for the object", func(done Done) { + By("getting a shared index informer for a pod") + + pod := &unstructured.Unstructured{ + Object: map[string]interface{}{ + "spec": map[string]interface{}{ + "containers": []map[string]interface{}{ + map[string]interface{}{ + "name": "nginx", + "image": "nginx", + }, }, }, }, - }, - } - pod.SetName("informer-obj2") - pod.SetNamespace("default") - pod.SetGroupVersionKind(schema.GroupVersionKind{ - Group: "", - Version: "v1", - Kind: "Pod", - }) - sii, err := informerCache.GetInformer(pod) - Expect(err).NotTo(HaveOccurred()) - Expect(sii).NotTo(BeNil()) - Expect(sii.HasSynced()).To(BeTrue()) - - By("adding an event handler listening for object creation which sends the object to a channel") - out := make(chan interface{}) - addFunc := func(obj interface{}) { - out <- obj - } - sii.AddEventHandler(kcache.ResourceEventHandlerFuncs{AddFunc: addFunc}) - - By("adding an object") - cl, err := client.New(cfg, client.Options{}) - Expect(err).NotTo(HaveOccurred()) - Expect(cl.Create(context.Background(), pod)).To(Succeed()) - - By("verifying the object is received on the channel") - Eventually(out).Should(Receive(Equal(pod))) - close(done) - }, 3) - - It("should be able to index an object field then retrieve objects by that field", func() { - By("creating the cache") - informer, err := cache.New(cfg, cache.Options{}) - Expect(err).NotTo(HaveOccurred()) - - By("indexing the restartPolicy field of the Pod object before starting") - pod := &unstructured.Unstructured{} - pod.SetGroupVersionKind(schema.GroupVersionKind{ - Group: "", - Version: "v1", - Kind: "Pod", - }) - indexFunc := func(obj runtime.Object) []string { - s, ok := obj.(*unstructured.Unstructured).Object["spec"] - if !ok { - return []string{} } - m, ok := s.(map[string]interface{}) - if !ok { - return []string{} + pod.SetName("informer-obj2") + pod.SetNamespace("default") + pod.SetGroupVersionKind(schema.GroupVersionKind{ + Group: "", + Version: "v1", + Kind: "Pod", + }) + sii, err := informerCache.GetInformer(pod) + Expect(err).NotTo(HaveOccurred()) + Expect(sii).NotTo(BeNil()) + Expect(sii.HasSynced()).To(BeTrue()) + + By("adding an event handler listening for object creation which sends the object to a channel") + out := make(chan interface{}) + addFunc := func(obj interface{}) { + out <- obj } - return []string{fmt.Sprintf("%v", m["restartPolicy"])} - } - Expect(informer.IndexField(pod, "spec.restartPolicy", indexFunc)).To(Succeed()) - - By("running the cache and waiting for it to sync") - go func() { - defer GinkgoRecover() - Expect(informer.Start(stop)).To(Succeed()) - }() - Expect(informer.WaitForCacheSync(stop)).NotTo(BeFalse()) - - By("listing Pods with restartPolicyOnFailure") - listObj := &unstructured.UnstructuredList{} - listObj.SetGroupVersionKind(schema.GroupVersionKind{ - Group: "", - Version: "v1", - Kind: "PodList", - }) - err = informer.List(context.Background(), listObj, - client.MatchingField("spec.restartPolicy", "OnFailure")) - Expect(err).To(Succeed()) - - By("verifying that the returned pods have correct restart policy") - Expect(listObj.Items).NotTo(BeEmpty()) - Expect(listObj.Items).Should(HaveLen(1)) - actual := listObj.Items[0] - Expect(actual.GetName()).To(Equal("test-pod-3")) - }, 3) + sii.AddEventHandler(kcache.ResourceEventHandlerFuncs{AddFunc: addFunc}) + + By("adding an object") + cl, err := client.New(cfg, client.Options{}) + Expect(err).NotTo(HaveOccurred()) + Expect(cl.Create(context.Background(), pod)).To(Succeed()) + defer deletePod(pod) + + By("verifying the object is received on the channel") + Eventually(out).Should(Receive(Equal(pod))) + close(done) + }, 3) + + It("should be able to index an object field then retrieve objects by that field", func() { + By("creating the cache") + informer, err := cache.New(cfg, cache.Options{}) + Expect(err).NotTo(HaveOccurred()) + + By("indexing the restartPolicy field of the Pod object before starting") + pod := &unstructured.Unstructured{} + pod.SetGroupVersionKind(schema.GroupVersionKind{ + Group: "", + Version: "v1", + Kind: "Pod", + }) + indexFunc := func(obj runtime.Object) []string { + s, ok := obj.(*unstructured.Unstructured).Object["spec"] + if !ok { + return []string{} + } + m, ok := s.(map[string]interface{}) + if !ok { + return []string{} + } + return []string{fmt.Sprintf("%v", m["restartPolicy"])} + } + Expect(informer.IndexField(pod, "spec.restartPolicy", indexFunc)).To(Succeed()) + + By("running the cache and waiting for it to sync") + go func() { + defer GinkgoRecover() + Expect(informer.Start(stop)).To(Succeed()) + }() + Expect(informer.WaitForCacheSync(stop)).NotTo(BeFalse()) + + By("listing Pods with restartPolicyOnFailure") + listObj := &unstructured.UnstructuredList{} + listObj.SetGroupVersionKind(schema.GroupVersionKind{ + Group: "", + Version: "v1", + Kind: "PodList", + }) + err = informer.List(context.Background(), listObj, + client.MatchingField("spec.restartPolicy", "OnFailure")) + Expect(err).To(Succeed()) + + By("verifying that the returned pods have correct restart policy") + Expect(listObj.Items).NotTo(BeEmpty()) + Expect(listObj.Items).Should(HaveLen(1)) + actual := listObj.Items[0] + Expect(actual.GetName()).To(Equal("test-pod-3")) + }, 3) + }) }) }) -}) +} diff --git a/pkg/cache/example_test.go b/pkg/cache/example_test.go new file mode 100644 index 0000000000..3f4231dd7d --- /dev/null +++ b/pkg/cache/example_test.go @@ -0,0 +1,30 @@ +package cache_test + +import ( + "os" + + "sigs.k8s.io/controller-runtime/pkg/cache" + logf "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/runtime/signals" +) + +var ( + log = logf.Log.WithName("manager-examples") +) + +// This example shows how to create and set a new multi namespaced cache. +func ExampleNewMultiNamespacedCacheBuilder() { + mgr, err := manager.New(cfg, manager.Options{ + NewCache: cache.NewMultiNamespacedCacheBuilder([]string{"namespace1", "namespace2"}), + }) + if err != nil { + log.Error(err, "unable to create manager") + os.Exit(1) + } + err = mgr.Start(signals.SetupSignalHandler()) + if err != nil { + log.Error(err, "unable start the manager") + os.Exit(1) + } +} diff --git a/pkg/cache/informertest/fake_cache.go b/pkg/cache/informertest/fake_cache.go index 2f311cf729..93a8fbcf4c 100644 --- a/pkg/cache/informertest/fake_cache.go +++ b/pkg/cache/informertest/fake_cache.go @@ -39,7 +39,7 @@ type FakeInformers struct { } // GetInformerForKind implements Informers -func (c *FakeInformers) GetInformerForKind(gvk schema.GroupVersionKind) (toolscache.SharedIndexInformer, error) { +func (c *FakeInformers) GetInformerForKind(gvk schema.GroupVersionKind) (cache.Informer, error) { if c.Scheme == nil { c.Scheme = scheme.Scheme } @@ -67,7 +67,7 @@ func (c *FakeInformers) FakeInformerForKind(gvk schema.GroupVersionKind) (*contr } // GetInformer implements Informers -func (c *FakeInformers) GetInformer(obj runtime.Object) (toolscache.SharedIndexInformer, error) { +func (c *FakeInformers) GetInformer(obj runtime.Object) (cache.Informer, error) { if c.Scheme == nil { c.Scheme = scheme.Scheme } diff --git a/pkg/cache/multi_namespace_cache.go b/pkg/cache/multi_namespace_cache.go index 42ec5543ff..32f552d79b 100644 --- a/pkg/cache/multi_namespace_cache.go +++ b/pkg/cache/multi_namespace_cache.go @@ -1,25 +1,53 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package cache import ( "context" + "encoding/json" "fmt" "time" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/rest" toolscache "k8s.io/client-go/tools/cache" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/apiutil" ) +// multiNamespaceCache knows how to handle multiple namespaced caches +// You may want to use this feature when scoping permissions for your +// operator to a list of namespaces instead of watching every namespace +// in the cluster. type multiNamespaceCache struct { namespaceToCache map[string]Cache + Scheme *runtime.Scheme } -// NewMultiNamespacedCache - retrieve the new function for a multinamespaced cache. -func NewMultiNamespacedCache(namespaces []string) func(config *rest.Config, opts Options) (Cache, error) { +// NewMultiNamespacedCacheBuilder - Builder function to create a new multi-namespaced cache. +func NewMultiNamespacedCacheBuilder(namespaces []string) func(config *rest.Config, opts Options) (Cache, error) { return func(config *rest.Config, opts Options) (Cache, error) { + opts, err := defaultOpts(config, opts) + if err != nil { + return nil, err + } caches := map[string]Cache{} for _, ns := range namespaces { opts.Namespace = ns @@ -29,7 +57,7 @@ func NewMultiNamespacedCache(namespaces []string) func(config *rest.Config, opts } caches[ns] = c } - return &multiNamespaceCache{namespaceToCache: caches}, nil + return &multiNamespaceCache{namespaceToCache: caches, Scheme: opts.Scheme}, nil } } @@ -61,8 +89,13 @@ func (c *multiNamespaceCache) GetInformerForKind(gvk schema.GroupVersionKind) (I } func (c *multiNamespaceCache) Start(stopCh <-chan struct{}) error { - for _, cache := range c.namespaceToCache { - go cache.Start(stopCh) + for ns, cache := range c.namespaceToCache { + go func(ns string, cache Cache) { + err := cache.Start(stopCh) + if err != nil { + log.Error(err, "multinamespace cache failed to start namespaced informer", "namespace", ns) + } + }(ns, cache) } <-stopCh return nil @@ -95,36 +128,63 @@ func (c *multiNamespaceCache) Get(ctx context.Context, key client.ObjectKey, obj return cache.Get(ctx, key, obj) } -func (c *multiNamespaceCache) List(ctx context.Context, opts *client.ListOptions, list runtime.Object) error { - if opts.Namespace != corev1.NamespaceAll { - cache, ok := c.namespaceToCache[opts.Namespace] +// List multi namespace cache will get all the objects in the namespaces that the cache is watching if asked for all namespaces. +func (c *multiNamespaceCache) List(ctx context.Context, list runtime.Object, opts ...client.ListOptionFunc) error { + listOpts := client.ListOptions{} + listOpts.ApplyOptions(opts) + if listOpts.Namespace != corev1.NamespaceAll { + cache, ok := c.namespaceToCache[listOpts.Namespace] if !ok { - return fmt.Errorf("unable to get: %v because of unknown namespace for the cache", opts.Namespace) + return fmt.Errorf("unable to get: %v because of unknown namespace for the cache", listOpts.Namespace) } - return cache.List(ctx, opts, list) + return cache.List(ctx, list, opts...) + } + gvk, err := apiutil.GVKForObject(list, c.Scheme) + if err != nil { + return err } - //TODO(shawn-hurley): handle this use case by listing for each namespace in the cache. - return fmt.Errorf("unable to list for an all namespaces") + allItems := &unstructured.UnstructuredList{} + for _, cache := range c.namespaceToCache { + items := &unstructured.UnstructuredList{} + items.SetGroupVersionKind(gvk) + err := cache.List(ctx, items, opts...) + if err != nil { + return err + } + allItems.Items = append(allItems.Items, items.Items...) + // The last list call should have the most correct resource version. + allItems.Object = items.Object + } + data, err := allItems.MarshalJSON() + if err != nil { + return err + } + fmt.Printf("%s", data) + return json.Unmarshal(data, list) } +// multiNamespaceInformer knows how to handle interacting with the underlying informer across multiple namespaces type multiNamespaceInformer struct { namespaceToInformer map[string]Informer } var _ Informer = &multiNamespaceInformer{} +// AddEventHandler adds the handler to each namespaced informer func (i *multiNamespaceInformer) AddEventHandler(handler toolscache.ResourceEventHandler) { for _, informer := range i.namespaceToInformer { informer.AddEventHandler(handler) } } +// AddEventHandlerWithResyncPeriod adds the handler with a resync period to each namespaced informer func (i *multiNamespaceInformer) AddEventHandlerWithResyncPeriod(handler toolscache.ResourceEventHandler, resyncPeriod time.Duration) { for _, informer := range i.namespaceToInformer { informer.AddEventHandlerWithResyncPeriod(handler, resyncPeriod) } } +// AddIndexers adds the indexer for each namespaced informer func (i *multiNamespaceInformer) AddIndexers(indexers toolscache.Indexers) error { for _, informer := range i.namespaceToInformer { err := informer.AddIndexers(indexers) @@ -134,3 +194,13 @@ func (i *multiNamespaceInformer) AddIndexers(indexers toolscache.Indexers) error } return nil } + +// HasSynced checks if each namespaced informer has synced +func (i *multiNamespaceInformer) HasSynced() bool { + for _, informer := range i.namespaceToInformer { + if ok := informer.HasSynced(); !ok { + return ok + } + } + return true +} diff --git a/pkg/cache/multi_namespace_cache_test.go b/pkg/cache/multi_namespace_cache_test.go new file mode 100644 index 0000000000..5d63a03888 --- /dev/null +++ b/pkg/cache/multi_namespace_cache_test.go @@ -0,0 +1,32 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache_test + +import ( + . "github.com/onsi/ginkgo" + kcorev1 "k8s.io/api/core/v1" + + "sigs.k8s.io/controller-runtime/pkg/cache" +) + +const testNamespaceThree = "test-namespace-3" + +var _ = Describe("Multinamesapce Informer Cache", func() { + knownPod1 := createPod("test-pod-1", testNamespaceThree, kcorev1.RestartPolicyNever) + defer deletePod(knownPod1) + CacheTest(cache.NewMultiNamespacedCacheBuilder([]string{testNamespaceOne, testNamespaceTwo, "default"})) +}) diff --git a/pkg/manager/example_test.go b/pkg/manager/example_test.go index f718b38f8e..26d9e611f9 100644 --- a/pkg/manager/example_test.go +++ b/pkg/manager/example_test.go @@ -20,8 +20,8 @@ import ( "os" "sigs.k8s.io/controller-runtime/pkg/client/config" - "sigs.k8s.io/controller-runtime/pkg/manager" logf "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/runtime/signals" )