diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index c6f3e6971f..2dd002ef56 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -48,11 +48,11 @@ type Cache interface { type Informers interface { // GetInformer fetches or constructs an informer for the given object that corresponds to a single // API kind and resource. - GetInformer(obj runtime.Object) (toolscache.SharedIndexInformer, error) + GetInformer(obj runtime.Object) (Informer, error) // GetInformerForKind is similar to GetInformer, except that it takes a group-version-kind, instead // of the underlying object. - GetInformerForKind(gvk schema.GroupVersionKind) (toolscache.SharedIndexInformer, error) + GetInformerForKind(gvk schema.GroupVersionKind) (Informer, error) // Start runs all the informers known to this cache until the given channel is closed. // It blocks. @@ -69,6 +69,23 @@ type Informers interface { IndexField(obj runtime.Object, field string, extractValue client.IndexerFunc) error } +// Informer - informer allows you interact with the underlying informer +type Informer interface { + // AddEventHandler adds an event handler to the shared informer using the shared informer's resync + // period. Events to a single handler are delivered sequentially, but there is no coordination + // between different handlers. + AddEventHandler(handler toolscache.ResourceEventHandler) + // AddEventHandlerWithResyncPeriod adds an event handler to the shared informer using the + // specified resync period. Events to a single handler are delivered sequentially, but there is + // no coordination between different handlers. + AddEventHandlerWithResyncPeriod(handler toolscache.ResourceEventHandler, resyncPeriod time.Duration) + // AddIndexers adds more indexers to this store. If you call this after you already have data + // in the store, the results are undefined. + AddIndexers(indexers toolscache.Indexers) error + //HasSynced return true if the informers underlying store has synced + HasSynced() bool +} + // Options are the optional arguments for creating a new InformersMap object type Options struct { // Scheme is the scheme to use for mapping objects to GroupVersionKinds diff --git a/pkg/cache/cache_test.go b/pkg/cache/cache_test.go index 08d98860cb..287651abc7 100644 --- a/pkg/cache/cache_test.go +++ b/pkg/cache/cache_test.go @@ -29,6 +29,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" kscheme "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest" kcache "k8s.io/client-go/tools/cache" "sigs.k8s.io/controller-runtime/pkg/cache" @@ -37,6 +38,7 @@ import ( const testNamespaceOne = "test-namespace-1" const testNamespaceTwo = "test-namespace-2" +const testNamespaceThree = "test-namespace-3" // TODO(community): Pull these helper functions into testenv. // Restart policy is included to allow indexing on that field. @@ -71,582 +73,585 @@ func deletePod(pod runtime.Object) { } var _ = Describe("Informer Cache", func() { + CacheTest(cache.New) +}) +var _ = Describe("Multi-Namesapce Informer Cache", func() { + CacheTest(cache.MultiNamespacedCacheBuilder([]string{testNamespaceOne, testNamespaceTwo, "default"})) +}) - var ( - informerCache cache.Cache - stop chan struct{} - knownPod1 runtime.Object - knownPod2 runtime.Object - knownPod3 runtime.Object - ) - - BeforeEach(func() { - stop = make(chan struct{}) - Expect(cfg).NotTo(BeNil()) - - By("creating three pods") - // Includes restart policy since these objects are indexed on this field. - knownPod1 = createPod("test-pod-1", testNamespaceOne, kcorev1.RestartPolicyNever) - knownPod2 = createPod("test-pod-2", testNamespaceTwo, kcorev1.RestartPolicyAlways) - knownPod3 = createPod("test-pod-3", testNamespaceTwo, kcorev1.RestartPolicyOnFailure) - podGVK := schema.GroupVersionKind{ - Kind: "Pod", - Version: "v1", - } - knownPod1.GetObjectKind().SetGroupVersionKind(podGVK) - knownPod2.GetObjectKind().SetGroupVersionKind(podGVK) - knownPod3.GetObjectKind().SetGroupVersionKind(podGVK) - - By("creating the informer cache") - var err error - informerCache, err = cache.New(cfg, cache.Options{}) - Expect(err).NotTo(HaveOccurred()) - By("running the cache and waiting for it to sync") - go func() { - defer GinkgoRecover() - Expect(informerCache.Start(stop)).To(Succeed()) - }() - Expect(informerCache.WaitForCacheSync(stop)).To(BeTrue()) - }) +// nolint: gocyclo +func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (cache.Cache, error)) { + Describe("Cache test", func() { + var ( + informerCache cache.Cache + stop chan struct{} + knownPod1 runtime.Object + knownPod2 runtime.Object + knownPod3 runtime.Object + knownPod4 runtime.Object + ) + + BeforeEach(func() { + stop = make(chan struct{}) + Expect(cfg).NotTo(BeNil()) + + By("creating three pods") + // Includes restart policy since these objects are indexed on this field. + knownPod1 = createPod("test-pod-1", testNamespaceOne, kcorev1.RestartPolicyNever) + knownPod2 = createPod("test-pod-2", testNamespaceTwo, kcorev1.RestartPolicyAlways) + knownPod3 = createPod("test-pod-3", testNamespaceTwo, kcorev1.RestartPolicyOnFailure) + knownPod4 = createPod("test-pod-4", testNamespaceThree, kcorev1.RestartPolicyNever) + podGVK := schema.GroupVersionKind{ + Kind: "Pod", + Version: "v1", + } + knownPod1.GetObjectKind().SetGroupVersionKind(podGVK) + knownPod2.GetObjectKind().SetGroupVersionKind(podGVK) + knownPod3.GetObjectKind().SetGroupVersionKind(podGVK) + knownPod4.GetObjectKind().SetGroupVersionKind(podGVK) + + By("creating the informer cache") + var err error + informerCache, err = createCacheFunc(cfg, cache.Options{}) + Expect(err).NotTo(HaveOccurred()) + By("running the cache and waiting for it to sync") + go func() { + defer GinkgoRecover() + Expect(informerCache.Start(stop)).To(Succeed()) + }() + Expect(informerCache.WaitForCacheSync(stop)).To(BeTrue()) + }) - AfterEach(func() { - By("cleaning up created pods") - deletePod(knownPod1) - deletePod(knownPod2) - deletePod(knownPod3) + AfterEach(func() { + By("cleaning up created pods") + deletePod(knownPod1) + deletePod(knownPod2) + deletePod(knownPod3) + deletePod(knownPod4) - close(stop) - }) + close(stop) + }) - Describe("as a Reader", func() { - Context("with structured objects", func() { - - It("should be able to list objects that haven't been watched previously", func() { - By("listing all services in the cluster") - listObj := &kcorev1.ServiceList{} - Expect(informerCache.List(context.Background(), listObj)).To(Succeed()) - - By("verifying that the returned list contains the Kubernetes service") - // NB: kubernetes default service is automatically created in testenv. - Expect(listObj.Items).NotTo(BeEmpty()) - hasKubeService := false - for _, svc := range listObj.Items { - if svc.Namespace == "default" && svc.Name == "kubernetes" { - hasKubeService = true - break + Describe("as a Reader", func() { + Context("with structured objects", func() { + + It("should be able to list objects that haven't been watched previously", func() { + By("listing all services in the cluster") + listObj := &kcorev1.ServiceList{} + Expect(informerCache.List(context.Background(), listObj)).To(Succeed()) + + By("verifying that the returned list contains the Kubernetes service") + // NB: kubernetes default service is automatically created in testenv. + Expect(listObj.Items).NotTo(BeEmpty()) + hasKubeService := false + for _, svc := range listObj.Items { + if svc.Namespace == "default" && svc.Name == "kubernetes" { + hasKubeService = true + break + } } - } - Expect(hasKubeService).To(BeTrue()) - }) - - It("should be able to get objects that haven't been watched previously", func() { - By("getting the Kubernetes service") - svc := &kcorev1.Service{} - svcKey := client.ObjectKey{Namespace: "default", Name: "kubernetes"} - Expect(informerCache.Get(context.Background(), svcKey, svc)).To(Succeed()) - - By("verifying that the returned service looks reasonable") - Expect(svc.Name).To(Equal("kubernetes")) - Expect(svc.Namespace).To(Equal("default")) - }) - - It("should support filtering by labels in a single namespace", func() { - By("listing pods with a particular label") - // NB: each pod has a "test-label": - out := kcorev1.PodList{} - Expect(informerCache.List(context.Background(), &out, - client.InNamespace(testNamespaceTwo), - client.MatchingLabels(map[string]string{"test-label": "test-pod-2"}))).To(Succeed()) - - By("verifying the returned pods have the correct label") - Expect(out.Items).NotTo(BeEmpty()) - Expect(out.Items).Should(HaveLen(1)) - actual := out.Items[0] - Expect(actual.Labels["test-label"]).To(Equal("test-pod-2")) - }) + Expect(hasKubeService).To(BeTrue()) + }) - It("should support filtering by labels from multiple namespaces", func() { - By("creating another pod with the same label but different namespace") - anotherPod := createPod("test-pod-2", testNamespaceOne, kcorev1.RestartPolicyAlways) + It("should be able to get objects that haven't been watched previously", func() { + By("getting the Kubernetes service") + svc := &kcorev1.Service{} + svcKey := client.ObjectKey{Namespace: "default", Name: "kubernetes"} + Expect(informerCache.Get(context.Background(), svcKey, svc)).To(Succeed()) - By("listing pods with a particular label") - // NB: each pod has a "test-label": - out := kcorev1.PodList{} - labels := map[string]string{"test-label": "test-pod-2"} - Expect(informerCache.List(context.Background(), &out, client.MatchingLabels(labels))).To(Succeed()) + By("verifying that the returned service looks reasonable") + Expect(svc.Name).To(Equal("kubernetes")) + Expect(svc.Namespace).To(Equal("default")) + }) - By("verifying multiple pods with the same label in different namespaces are returned") - Expect(out.Items).NotTo(BeEmpty()) - Expect(out.Items).Should(HaveLen(2)) - for _, actual := range out.Items { + It("should support filtering by labels in a single namespace", func() { + By("listing pods with a particular label") + // NB: each pod has a "test-label": + out := kcorev1.PodList{} + Expect(informerCache.List(context.Background(), &out, + client.InNamespace(testNamespaceTwo), + client.MatchingLabels(map[string]string{"test-label": "test-pod-2"}))).To(Succeed()) + + By("verifying the returned pods have the correct label") + Expect(out.Items).NotTo(BeEmpty()) + Expect(out.Items).Should(HaveLen(1)) + actual := out.Items[0] Expect(actual.Labels["test-label"]).To(Equal("test-pod-2")) - } - - deletePod(anotherPod) - }) - - It("should be able to list objects by namespace", func() { - By("listing pods in test-namespace-1") - listObj := &kcorev1.PodList{} - Expect(informerCache.List(context.Background(), listObj, - client.InNamespace(testNamespaceOne))).To(Succeed()) - - By("verifying that the returned pods are in test-namespace-1") - Expect(listObj.Items).NotTo(BeEmpty()) - Expect(listObj.Items).Should(HaveLen(1)) - actual := listObj.Items[0] - Expect(actual.Namespace).To(Equal(testNamespaceOne)) - }) - - It("should be able to restrict cache to a namespace", func() { - By("creating a namespaced cache") - namespacedCache, err := cache.New(cfg, cache.Options{Namespace: testNamespaceOne}) - Expect(err).NotTo(HaveOccurred()) - - By("running the cache and waiting for it to sync") - go func() { - defer GinkgoRecover() - Expect(namespacedCache.Start(stop)).To(Succeed()) - }() - Expect(namespacedCache.WaitForCacheSync(stop)).NotTo(BeFalse()) - - By("listing pods in all namespaces") - out := &kcorev1.PodList{} - Expect(namespacedCache.List(context.Background(), out)).To(Succeed()) - - By("verifying the returned pod is from the watched namespace") - Expect(out.Items).NotTo(BeEmpty()) - Expect(out.Items).Should(HaveLen(1)) - Expect(out.Items[0].Namespace).To(Equal(testNamespaceOne)) - - By("listing all namespaces - should still be able to get a cluster-scoped resource") - namespaceList := &kcorev1.NamespaceList{} - Expect(namespacedCache.List(context.Background(), namespaceList)).To(Succeed()) - - By("verifying the namespace list is not empty") - Expect(namespaceList.Items).NotTo(BeEmpty()) - }) + }) - It("should deep copy the object unless told otherwise", func() { - By("retrieving a specific pod from the cache") - out := &kcorev1.Pod{} - podKey := client.ObjectKey{Name: "test-pod-2", Namespace: testNamespaceTwo} - Expect(informerCache.Get(context.Background(), podKey, out)).To(Succeed()) + It("should support filtering by labels from multiple namespaces", func() { + By("creating another pod with the same label but different namespace") + anotherPod := createPod("test-pod-2", testNamespaceOne, kcorev1.RestartPolicyAlways) + + By("listing pods with a particular label") + // NB: each pod has a "test-label": + out := kcorev1.PodList{} + labels := map[string]string{"test-label": "test-pod-2"} + Expect(informerCache.List(context.Background(), &out, client.MatchingLabels(labels))).To(Succeed()) + + By("verifying multiple pods with the same label in different namespaces are returned") + Expect(out.Items).NotTo(BeEmpty()) + Expect(out.Items).Should(HaveLen(2)) + for _, actual := range out.Items { + Expect(actual.Labels["test-label"]).To(Equal("test-pod-2")) + } - By("verifying the retrieved pod is equal to a known pod") - Expect(out).To(Equal(knownPod2)) + deletePod(anotherPod) + }) - By("altering a field in the retrieved pod") - *out.Spec.ActiveDeadlineSeconds = 4 + It("should be able to list objects by namespace", func() { + By("listing pods in test-namespace-1") + listObj := &kcorev1.PodList{} + Expect(informerCache.List(context.Background(), listObj, + client.InNamespace(testNamespaceOne))).To(Succeed()) + + By("verifying that the returned pods are in test-namespace-1") + Expect(listObj.Items).NotTo(BeEmpty()) + Expect(listObj.Items).Should(HaveLen(1)) + actual := listObj.Items[0] + Expect(actual.Namespace).To(Equal(testNamespaceOne)) + }) - By("verifying the pods are no longer equal") - Expect(out).NotTo(Equal(knownPod2)) - }) + It("should deep copy the object unless told otherwise", func() { + By("retrieving a specific pod from the cache") + out := &kcorev1.Pod{} + podKey := client.ObjectKey{Name: "test-pod-2", Namespace: testNamespaceTwo} + Expect(informerCache.Get(context.Background(), podKey, out)).To(Succeed()) - It("should return an error if the object is not found", func() { - By("getting a service that does not exists") - svc := &kcorev1.Service{} - svcKey := client.ObjectKey{Namespace: "unknown", Name: "unknown"} + By("verifying the retrieved pod is equal to a known pod") + Expect(out).To(Equal(knownPod2)) - By("verifying that an error is returned") - err := informerCache.Get(context.Background(), svcKey, svc) - Expect(err).To(HaveOccurred()) - Expect(errors.IsNotFound(err)).To(BeTrue()) - }) - }) - Context("with unstructured objects", func() { - It("should be able to list objects that haven't been watched previously", func() { - By("listing all services in the cluster") - listObj := &unstructured.UnstructuredList{} - listObj.SetGroupVersionKind(schema.GroupVersionKind{ - Group: "", - Version: "v1", - Kind: "ServiceList", - }) - err := informerCache.List(context.Background(), listObj) - Expect(err).To(Succeed()) - - By("verifying that the returned list contains the Kubernetes service") - // NB: kubernetes default service is automatically created in testenv. - Expect(listObj.Items).NotTo(BeEmpty()) - hasKubeService := false - for _, svc := range listObj.Items { - if svc.GetNamespace() == "default" && svc.GetName() == "kubernetes" { - hasKubeService = true - break - } - } - Expect(hasKubeService).To(BeTrue()) - }) + By("altering a field in the retrieved pod") + *out.Spec.ActiveDeadlineSeconds = 4 - It("should be able to get objects that haven't been watched previously", func() { - By("getting the Kubernetes service") - svc := &unstructured.Unstructured{} - svc.SetGroupVersionKind(schema.GroupVersionKind{ - Group: "", - Version: "v1", - Kind: "Service", + By("verifying the pods are no longer equal") + Expect(out).NotTo(Equal(knownPod2)) }) - svcKey := client.ObjectKey{Namespace: "default", Name: "kubernetes"} - Expect(informerCache.Get(context.Background(), svcKey, svc)).To(Succeed()) - By("verifying that the returned service looks reasonable") - Expect(svc.GetName()).To(Equal("kubernetes")) - Expect(svc.GetNamespace()).To(Equal("default")) - }) + It("should return an error if the object is not found", func() { + By("getting a service that does not exists") + svc := &kcorev1.Service{} + svcKey := client.ObjectKey{Namespace: testNamespaceOne, Name: "unknown"} - It("should support filtering by labels in a single namespace", func() { - By("listing pods with a particular label") - // NB: each pod has a "test-label": - out := unstructured.UnstructuredList{} - out.SetGroupVersionKind(schema.GroupVersionKind{ - Group: "", - Version: "v1", - Kind: "PodList", + By("verifying that an error is returned") + err := informerCache.Get(context.Background(), svcKey, svc) + Expect(err).To(HaveOccurred()) + Expect(errors.IsNotFound(err)).To(BeTrue()) }) - err := informerCache.List(context.Background(), &out, - client.InNamespace(testNamespaceTwo), - client.MatchingLabels(map[string]string{"test-label": "test-pod-2"})) - Expect(err).To(Succeed()) - - By("verifying the returned pods have the correct label") - Expect(out.Items).NotTo(BeEmpty()) - Expect(out.Items).Should(HaveLen(1)) - actual := out.Items[0] - Expect(actual.GetLabels()["test-label"]).To(Equal("test-pod-2")) - }) - It("should support filtering by labels from multiple namespaces", func() { - By("creating another pod with the same label but different namespace") - anotherPod := createPod("test-pod-2", testNamespaceOne, kcorev1.RestartPolicyAlways) - - By("listing pods with a particular label") - // NB: each pod has a "test-label": - out := unstructured.UnstructuredList{} - out.SetGroupVersionKind(schema.GroupVersionKind{ - Group: "", - Version: "v1", - Kind: "PodList", - }) - labels := map[string]string{"test-label": "test-pod-2"} - err := informerCache.List(context.Background(), &out, client.MatchingLabels(labels)) - Expect(err).To(Succeed()) - - By("verifying multiple pods with the same label in different namespaces are returned") - Expect(out.Items).NotTo(BeEmpty()) - Expect(out.Items).Should(HaveLen(2)) - for _, actual := range out.Items { - Expect(actual.GetLabels()["test-label"]).To(Equal("test-pod-2")) - } - - deletePod(anotherPod) - }) + It("should return an error if getting object in unwatched namespace", func() { + By("getting a service that does not exists") + svc := &kcorev1.Service{} + svcKey := client.ObjectKey{Namespace: "unknown", Name: "unknown"} - It("should be able to list objects by namespace", func() { - By("listing pods in test-namespace-1") - listObj := &unstructured.UnstructuredList{} - listObj.SetGroupVersionKind(schema.GroupVersionKind{ - Group: "", - Version: "v1", - Kind: "PodList", + By("verifying that an error is returned") + err := informerCache.Get(context.Background(), svcKey, svc) + Expect(err).To(HaveOccurred()) }) - err := informerCache.List(context.Background(), listObj, client.InNamespace(testNamespaceOne)) - Expect(err).To(Succeed()) - - By("verifying that the returned pods are in test-namespace-1") - Expect(listObj.Items).NotTo(BeEmpty()) - Expect(listObj.Items).Should(HaveLen(1)) - actual := listObj.Items[0] - Expect(actual.GetNamespace()).To(Equal(testNamespaceOne)) }) - - It("should be able to restrict cache to a namespace", func() { - By("creating a namespaced cache") - namespacedCache, err := cache.New(cfg, cache.Options{Namespace: testNamespaceOne}) - Expect(err).NotTo(HaveOccurred()) - - By("running the cache and waiting for it to sync") - go func() { - defer GinkgoRecover() - Expect(namespacedCache.Start(stop)).To(Succeed()) - }() - Expect(namespacedCache.WaitForCacheSync(stop)).NotTo(BeFalse()) - - By("listing pods in all namespaces") - out := &unstructured.UnstructuredList{} - out.SetGroupVersionKind(schema.GroupVersionKind{ - Group: "", - Version: "v1", - Kind: "PodList", + Context("with unstructured objects", func() { + It("should be able to list objects that haven't been watched previously", func() { + By("listing all services in the cluster") + listObj := &unstructured.UnstructuredList{} + listObj.SetGroupVersionKind(schema.GroupVersionKind{ + Group: "", + Version: "v1", + Kind: "ServiceList", + }) + err := informerCache.List(context.Background(), listObj) + Expect(err).To(Succeed()) + + By("verifying that the returned list contains the Kubernetes service") + // NB: kubernetes default service is automatically created in testenv. + Expect(listObj.Items).NotTo(BeEmpty()) + hasKubeService := false + for _, svc := range listObj.Items { + if svc.GetNamespace() == "default" && svc.GetName() == "kubernetes" { + hasKubeService = true + break + } + } + Expect(hasKubeService).To(BeTrue()) }) - Expect(namespacedCache.List(context.Background(), out)).To(Succeed()) - - By("verifying the returned pod is from the watched namespace") - Expect(out.Items).NotTo(BeEmpty()) - Expect(out.Items).Should(HaveLen(1)) - Expect(out.Items[0].GetNamespace()).To(Equal(testNamespaceOne)) - - By("listing all namespaces - should still be able to get a cluster-scoped resource") - namespaceList := &unstructured.UnstructuredList{} - namespaceList.SetGroupVersionKind(schema.GroupVersionKind{ - Group: "", - Version: "v1", - Kind: "NamespaceList", + It("should be able to get objects that haven't been watched previously", func() { + By("getting the Kubernetes service") + svc := &unstructured.Unstructured{} + svc.SetGroupVersionKind(schema.GroupVersionKind{ + Group: "", + Version: "v1", + Kind: "Service", + }) + svcKey := client.ObjectKey{Namespace: "default", Name: "kubernetes"} + Expect(informerCache.Get(context.Background(), svcKey, svc)).To(Succeed()) + + By("verifying that the returned service looks reasonable") + Expect(svc.GetName()).To(Equal("kubernetes")) + Expect(svc.GetNamespace()).To(Equal("default")) }) - Expect(namespacedCache.List(context.Background(), namespaceList)).To(Succeed()) - - By("verifying the namespace list is not empty") - Expect(namespaceList.Items).NotTo(BeEmpty()) - }) - It("should deep copy the object unless told otherwise", func() { - By("retrieving a specific pod from the cache") - out := &unstructured.Unstructured{} - out.SetGroupVersionKind(schema.GroupVersionKind{ - Group: "", - Version: "v1", - Kind: "Pod", + It("should support filtering by labels in a single namespace", func() { + By("listing pods with a particular label") + // NB: each pod has a "test-label": + out := unstructured.UnstructuredList{} + out.SetGroupVersionKind(schema.GroupVersionKind{ + Group: "", + Version: "v1", + Kind: "PodList", + }) + err := informerCache.List(context.Background(), &out, + client.InNamespace(testNamespaceTwo), + client.MatchingLabels(map[string]string{"test-label": "test-pod-2"})) + Expect(err).To(Succeed()) + + By("verifying the returned pods have the correct label") + Expect(out.Items).NotTo(BeEmpty()) + Expect(out.Items).Should(HaveLen(1)) + actual := out.Items[0] + Expect(actual.GetLabels()["test-label"]).To(Equal("test-pod-2")) }) - uKnownPod2 := &unstructured.Unstructured{} - kscheme.Scheme.Convert(knownPod2, uKnownPod2, nil) - podKey := client.ObjectKey{Name: "test-pod-2", Namespace: testNamespaceTwo} - Expect(informerCache.Get(context.Background(), podKey, out)).To(Succeed()) + It("should support filtering by labels from multiple namespaces", func() { + By("creating another pod with the same label but different namespace") + anotherPod := createPod("test-pod-2", testNamespaceOne, kcorev1.RestartPolicyAlways) + + By("listing pods with a particular label") + // NB: each pod has a "test-label": + out := unstructured.UnstructuredList{} + out.SetGroupVersionKind(schema.GroupVersionKind{ + Group: "", + Version: "v1", + Kind: "PodList", + }) + labels := map[string]string{"test-label": "test-pod-2"} + err := informerCache.List(context.Background(), &out, client.MatchingLabels(labels)) + Expect(err).To(Succeed()) + + By("verifying multiple pods with the same label in different namespaces are returned") + Expect(out.Items).NotTo(BeEmpty()) + Expect(out.Items).Should(HaveLen(2)) + for _, actual := range out.Items { + Expect(actual.GetLabels()["test-label"]).To(Equal("test-pod-2")) + } - By("verifying the retrieved pod is equal to a known pod") - Expect(out).To(Equal(uKnownPod2)) + deletePod(anotherPod) + }) - By("altering a field in the retrieved pod") - m, _ := out.Object["spec"].(map[string]interface{}) - m["activeDeadlineSeconds"] = 4 + It("should be able to list objects by namespace", func() { + By("listing pods in test-namespace-1") + listObj := &unstructured.UnstructuredList{} + listObj.SetGroupVersionKind(schema.GroupVersionKind{ + Group: "", + Version: "v1", + Kind: "PodList", + }) + err := informerCache.List(context.Background(), listObj, client.InNamespace(testNamespaceOne)) + Expect(err).To(Succeed()) + + By("verifying that the returned pods are in test-namespace-1") + Expect(listObj.Items).NotTo(BeEmpty()) + Expect(listObj.Items).Should(HaveLen(1)) + actual := listObj.Items[0] + Expect(actual.GetNamespace()).To(Equal(testNamespaceOne)) + }) - By("verifying the pods are no longer equal") - Expect(out).NotTo(Equal(knownPod2)) - }) + It("should be able to restrict cache to a namespace", func() { + By("creating a namespaced cache") + namespacedCache, err := cache.New(cfg, cache.Options{Namespace: testNamespaceOne}) + Expect(err).NotTo(HaveOccurred()) + + By("running the cache and waiting for it to sync") + go func() { + defer GinkgoRecover() + Expect(namespacedCache.Start(stop)).To(Succeed()) + }() + Expect(namespacedCache.WaitForCacheSync(stop)).NotTo(BeFalse()) + + By("listing pods in all namespaces") + out := &unstructured.UnstructuredList{} + out.SetGroupVersionKind(schema.GroupVersionKind{ + Group: "", + Version: "v1", + Kind: "PodList", + }) + Expect(namespacedCache.List(context.Background(), out)).To(Succeed()) + + By("verifying the returned pod is from the watched namespace") + Expect(out.Items).NotTo(BeEmpty()) + Expect(out.Items).Should(HaveLen(1)) + Expect(out.Items[0].GetNamespace()).To(Equal(testNamespaceOne)) + + By("listing all namespaces - should still be able to get a cluster-scoped resource") + namespaceList := &unstructured.UnstructuredList{} + namespaceList.SetGroupVersionKind(schema.GroupVersionKind{ + Group: "", + Version: "v1", + Kind: "NamespaceList", + }) + Expect(namespacedCache.List(context.Background(), namespaceList)).To(Succeed()) + + By("verifying the namespace list is not empty") + Expect(namespaceList.Items).NotTo(BeEmpty()) + }) - It("should return an error if the object is not found", func() { - By("getting a service that does not exists") - svc := &unstructured.Unstructured{} - svc.SetGroupVersionKind(schema.GroupVersionKind{ - Group: "", - Version: "v1", - Kind: "Service", + It("should deep copy the object unless told otherwise", func() { + By("retrieving a specific pod from the cache") + out := &unstructured.Unstructured{} + out.SetGroupVersionKind(schema.GroupVersionKind{ + Group: "", + Version: "v1", + Kind: "Pod", + }) + uKnownPod2 := &unstructured.Unstructured{} + kscheme.Scheme.Convert(knownPod2, uKnownPod2, nil) + + podKey := client.ObjectKey{Name: "test-pod-2", Namespace: testNamespaceTwo} + Expect(informerCache.Get(context.Background(), podKey, out)).To(Succeed()) + + By("verifying the retrieved pod is equal to a known pod") + Expect(out).To(Equal(uKnownPod2)) + + By("altering a field in the retrieved pod") + m, _ := out.Object["spec"].(map[string]interface{}) + m["activeDeadlineSeconds"] = 4 + + By("verifying the pods are no longer equal") + Expect(out).NotTo(Equal(knownPod2)) }) - svcKey := client.ObjectKey{Namespace: "unknown", Name: "unknown"} - By("verifying that an error is returned") - err := informerCache.Get(context.Background(), svcKey, svc) - Expect(err).To(HaveOccurred()) - Expect(errors.IsNotFound(err)).To(BeTrue()) + It("should return an error if the object is not found", func() { + By("getting a service that does not exists") + svc := &unstructured.Unstructured{} + svc.SetGroupVersionKind(schema.GroupVersionKind{ + Group: "", + Version: "v1", + Kind: "Service", + }) + svcKey := client.ObjectKey{Namespace: testNamespaceOne, Name: "unknown"} + + By("verifying that an error is returned") + err := informerCache.Get(context.Background(), svcKey, svc) + Expect(err).To(HaveOccurred()) + Expect(errors.IsNotFound(err)).To(BeTrue()) + }) + It("should return an error if getting object in unwatched namespace", func() { + By("getting a service that does not exists") + svc := &kcorev1.Service{} + svcKey := client.ObjectKey{Namespace: "unknown", Name: "unknown"} + + By("verifying that an error is returned") + err := informerCache.Get(context.Background(), svcKey, svc) + Expect(err).To(HaveOccurred()) + }) }) }) - }) - - Describe("as an Informer", func() { - Context("with structured objects", func() { - It("should be able to get informer for the object", func(done Done) { - By("getting a shared index informer for a pod") - pod := &kcorev1.Pod{ - ObjectMeta: kmetav1.ObjectMeta{ - Name: "informer-obj", - Namespace: "default", - }, - Spec: kcorev1.PodSpec{ - Containers: []kcorev1.Container{ - { - Name: "nginx", - Image: "nginx", + Describe("as an Informer", func() { + Context("with structured objects", func() { + It("should be able to get informer for the object", func(done Done) { + By("getting a shared index informer for a pod") + pod := &kcorev1.Pod{ + ObjectMeta: kmetav1.ObjectMeta{ + Name: "informer-obj", + Namespace: "default", + }, + Spec: kcorev1.PodSpec{ + Containers: []kcorev1.Container{ + { + Name: "nginx", + Image: "nginx", + }, }, }, - }, - } - sii, err := informerCache.GetInformer(pod) - Expect(err).NotTo(HaveOccurred()) - Expect(sii).NotTo(BeNil()) - Expect(sii.HasSynced()).To(BeTrue()) - - By("adding an event handler listening for object creation which sends the object to a channel") - out := make(chan interface{}) - addFunc := func(obj interface{}) { - out <- obj - } - sii.AddEventHandler(kcache.ResourceEventHandlerFuncs{AddFunc: addFunc}) - - By("adding an object") - cl, err := client.New(cfg, client.Options{}) - Expect(err).NotTo(HaveOccurred()) - Expect(cl.Create(context.Background(), pod)).To(Succeed()) - - By("verifying the object is received on the channel") - Eventually(out).Should(Receive(Equal(pod))) - close(done) - }) + } + sii, err := informerCache.GetInformer(pod) + Expect(err).NotTo(HaveOccurred()) + Expect(sii).NotTo(BeNil()) + Expect(sii.HasSynced()).To(BeTrue()) + + By("adding an event handler listening for object creation which sends the object to a channel") + out := make(chan interface{}) + addFunc := func(obj interface{}) { + out <- obj + } + sii.AddEventHandler(kcache.ResourceEventHandlerFuncs{AddFunc: addFunc}) + + By("adding an object") + cl, err := client.New(cfg, client.Options{}) + Expect(err).NotTo(HaveOccurred()) + Expect(cl.Create(context.Background(), pod)).To(Succeed()) + defer deletePod(pod) - // TODO: Add a test for when GVK is not in Scheme. Does code support informer for unstructured object? - It("should be able to get an informer by group/version/kind", func(done Done) { - By("getting an shared index informer for gvk = core/v1/pod") - gvk := schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Pod"} - sii, err := informerCache.GetInformerForKind(gvk) - Expect(err).NotTo(HaveOccurred()) - Expect(sii).NotTo(BeNil()) - Expect(sii.HasSynced()).To(BeTrue()) - - By("adding an event handler listening for object creation which sends the object to a channel") - out := make(chan interface{}) - addFunc := func(obj interface{}) { - out <- obj - } - sii.AddEventHandler(kcache.ResourceEventHandlerFuncs{AddFunc: addFunc}) - - By("adding an object") - cl, err := client.New(cfg, client.Options{}) - Expect(err).NotTo(HaveOccurred()) - pod := &kcorev1.Pod{ - ObjectMeta: kmetav1.ObjectMeta{ - Name: "informer-gvk", - Namespace: "default", - }, - Spec: kcorev1.PodSpec{ - Containers: []kcorev1.Container{ - { - Name: "nginx", - Image: "nginx", + By("verifying the object is received on the channel") + Eventually(out).Should(Receive(Equal(pod))) + close(done) + }) + // TODO: Add a test for when GVK is not in Scheme. Does code support informer for unstructured object? + It("should be able to get an informer by group/version/kind", func(done Done) { + By("getting an shared index informer for gvk = core/v1/pod") + gvk := schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Pod"} + sii, err := informerCache.GetInformerForKind(gvk) + Expect(err).NotTo(HaveOccurred()) + Expect(sii).NotTo(BeNil()) + Expect(sii.HasSynced()).To(BeTrue()) + + By("adding an event handler listening for object creation which sends the object to a channel") + out := make(chan interface{}) + addFunc := func(obj interface{}) { + out <- obj + } + sii.AddEventHandler(kcache.ResourceEventHandlerFuncs{AddFunc: addFunc}) + + By("adding an object") + cl, err := client.New(cfg, client.Options{}) + Expect(err).NotTo(HaveOccurred()) + pod := &kcorev1.Pod{ + ObjectMeta: kmetav1.ObjectMeta{ + Name: "informer-gvk", + Namespace: "default", + }, + Spec: kcorev1.PodSpec{ + Containers: []kcorev1.Container{ + { + Name: "nginx", + Image: "nginx", + }, }, }, - }, - } - Expect(cl.Create(context.Background(), pod)).To(Succeed()) + } + Expect(cl.Create(context.Background(), pod)).To(Succeed()) + defer deletePod(pod) - By("verifying the object is received on the channel") - Eventually(out).Should(Receive(Equal(pod))) - close(done) - }) + By("verifying the object is received on the channel") + Eventually(out).Should(Receive(Equal(pod))) + close(done) + }) - It("should be able to index an object field then retrieve objects by that field", func() { - By("creating the cache") - informer, err := cache.New(cfg, cache.Options{}) - Expect(err).NotTo(HaveOccurred()) - - By("indexing the restartPolicy field of the Pod object before starting") - pod := &kcorev1.Pod{} - indexFunc := func(obj runtime.Object) []string { - return []string{string(obj.(*kcorev1.Pod).Spec.RestartPolicy)} - } - Expect(informer.IndexField(pod, "spec.restartPolicy", indexFunc)).To(Succeed()) - - By("running the cache and waiting for it to sync") - go func() { - defer GinkgoRecover() - Expect(informer.Start(stop)).To(Succeed()) - }() - Expect(informer.WaitForCacheSync(stop)).NotTo(BeFalse()) - - By("listing Pods with restartPolicyOnFailure") - listObj := &kcorev1.PodList{} - Expect(informer.List(context.Background(), listObj, - client.MatchingField("spec.restartPolicy", "OnFailure"))).To(Succeed()) - - By("verifying that the returned pods have correct restart policy") - Expect(listObj.Items).NotTo(BeEmpty()) - Expect(listObj.Items).Should(HaveLen(1)) - actual := listObj.Items[0] - Expect(actual.Name).To(Equal("test-pod-3")) + It("should be able to index an object field then retrieve objects by that field", func() { + By("creating the cache") + informer, err := cache.New(cfg, cache.Options{}) + Expect(err).NotTo(HaveOccurred()) + + By("indexing the restartPolicy field of the Pod object before starting") + pod := &kcorev1.Pod{} + indexFunc := func(obj runtime.Object) []string { + return []string{string(obj.(*kcorev1.Pod).Spec.RestartPolicy)} + } + Expect(informer.IndexField(pod, "spec.restartPolicy", indexFunc)).To(Succeed()) + + By("running the cache and waiting for it to sync") + go func() { + defer GinkgoRecover() + Expect(informer.Start(stop)).To(Succeed()) + }() + Expect(informer.WaitForCacheSync(stop)).NotTo(BeFalse()) + + By("listing Pods with restartPolicyOnFailure") + listObj := &kcorev1.PodList{} + Expect(informer.List(context.Background(), listObj, + client.MatchingField("spec.restartPolicy", "OnFailure"))).To(Succeed()) + + By("verifying that the returned pods have correct restart policy") + Expect(listObj.Items).NotTo(BeEmpty()) + Expect(listObj.Items).Should(HaveLen(1)) + actual := listObj.Items[0] + Expect(actual.Name).To(Equal("test-pod-3")) + }) }) - }) - Context("with unstructured objects", func() { - It("should be able to get informer for the object", func(done Done) { - By("getting a shared index informer for a pod") - - pod := &unstructured.Unstructured{ - Object: map[string]interface{}{ - "spec": map[string]interface{}{ - "containers": []map[string]interface{}{ - map[string]interface{}{ - "name": "nginx", - "image": "nginx", + Context("with unstructured objects", func() { + It("should be able to get informer for the object", func(done Done) { + By("getting a shared index informer for a pod") + + pod := &unstructured.Unstructured{ + Object: map[string]interface{}{ + "spec": map[string]interface{}{ + "containers": []map[string]interface{}{ + map[string]interface{}{ + "name": "nginx", + "image": "nginx", + }, }, }, }, - }, - } - pod.SetName("informer-obj2") - pod.SetNamespace("default") - pod.SetGroupVersionKind(schema.GroupVersionKind{ - Group: "", - Version: "v1", - Kind: "Pod", - }) - sii, err := informerCache.GetInformer(pod) - Expect(err).NotTo(HaveOccurred()) - Expect(sii).NotTo(BeNil()) - Expect(sii.HasSynced()).To(BeTrue()) - - By("adding an event handler listening for object creation which sends the object to a channel") - out := make(chan interface{}) - addFunc := func(obj interface{}) { - out <- obj - } - sii.AddEventHandler(kcache.ResourceEventHandlerFuncs{AddFunc: addFunc}) - - By("adding an object") - cl, err := client.New(cfg, client.Options{}) - Expect(err).NotTo(HaveOccurred()) - Expect(cl.Create(context.Background(), pod)).To(Succeed()) - - By("verifying the object is received on the channel") - Eventually(out).Should(Receive(Equal(pod))) - close(done) - }, 3) - - It("should be able to index an object field then retrieve objects by that field", func() { - By("creating the cache") - informer, err := cache.New(cfg, cache.Options{}) - Expect(err).NotTo(HaveOccurred()) - - By("indexing the restartPolicy field of the Pod object before starting") - pod := &unstructured.Unstructured{} - pod.SetGroupVersionKind(schema.GroupVersionKind{ - Group: "", - Version: "v1", - Kind: "Pod", - }) - indexFunc := func(obj runtime.Object) []string { - s, ok := obj.(*unstructured.Unstructured).Object["spec"] - if !ok { - return []string{} } - m, ok := s.(map[string]interface{}) - if !ok { - return []string{} + pod.SetName("informer-obj2") + pod.SetNamespace("default") + pod.SetGroupVersionKind(schema.GroupVersionKind{ + Group: "", + Version: "v1", + Kind: "Pod", + }) + sii, err := informerCache.GetInformer(pod) + Expect(err).NotTo(HaveOccurred()) + Expect(sii).NotTo(BeNil()) + Expect(sii.HasSynced()).To(BeTrue()) + + By("adding an event handler listening for object creation which sends the object to a channel") + out := make(chan interface{}) + addFunc := func(obj interface{}) { + out <- obj } - return []string{fmt.Sprintf("%v", m["restartPolicy"])} - } - Expect(informer.IndexField(pod, "spec.restartPolicy", indexFunc)).To(Succeed()) - - By("running the cache and waiting for it to sync") - go func() { - defer GinkgoRecover() - Expect(informer.Start(stop)).To(Succeed()) - }() - Expect(informer.WaitForCacheSync(stop)).NotTo(BeFalse()) - - By("listing Pods with restartPolicyOnFailure") - listObj := &unstructured.UnstructuredList{} - listObj.SetGroupVersionKind(schema.GroupVersionKind{ - Group: "", - Version: "v1", - Kind: "PodList", - }) - err = informer.List(context.Background(), listObj, - client.MatchingField("spec.restartPolicy", "OnFailure")) - Expect(err).To(Succeed()) - - By("verifying that the returned pods have correct restart policy") - Expect(listObj.Items).NotTo(BeEmpty()) - Expect(listObj.Items).Should(HaveLen(1)) - actual := listObj.Items[0] - Expect(actual.GetName()).To(Equal("test-pod-3")) - }, 3) + sii.AddEventHandler(kcache.ResourceEventHandlerFuncs{AddFunc: addFunc}) + + By("adding an object") + cl, err := client.New(cfg, client.Options{}) + Expect(err).NotTo(HaveOccurred()) + Expect(cl.Create(context.Background(), pod)).To(Succeed()) + defer deletePod(pod) + + By("verifying the object is received on the channel") + Eventually(out).Should(Receive(Equal(pod))) + close(done) + }, 3) + + It("should be able to index an object field then retrieve objects by that field", func() { + By("creating the cache") + informer, err := cache.New(cfg, cache.Options{}) + Expect(err).NotTo(HaveOccurred()) + + By("indexing the restartPolicy field of the Pod object before starting") + pod := &unstructured.Unstructured{} + pod.SetGroupVersionKind(schema.GroupVersionKind{ + Group: "", + Version: "v1", + Kind: "Pod", + }) + indexFunc := func(obj runtime.Object) []string { + s, ok := obj.(*unstructured.Unstructured).Object["spec"] + if !ok { + return []string{} + } + m, ok := s.(map[string]interface{}) + if !ok { + return []string{} + } + return []string{fmt.Sprintf("%v", m["restartPolicy"])} + } + Expect(informer.IndexField(pod, "spec.restartPolicy", indexFunc)).To(Succeed()) + + By("running the cache and waiting for it to sync") + go func() { + defer GinkgoRecover() + Expect(informer.Start(stop)).To(Succeed()) + }() + Expect(informer.WaitForCacheSync(stop)).NotTo(BeFalse()) + + By("listing Pods with restartPolicyOnFailure") + listObj := &unstructured.UnstructuredList{} + listObj.SetGroupVersionKind(schema.GroupVersionKind{ + Group: "", + Version: "v1", + Kind: "PodList", + }) + err = informer.List(context.Background(), listObj, + client.MatchingField("spec.restartPolicy", "OnFailure")) + Expect(err).To(Succeed()) + + By("verifying that the returned pods have correct restart policy") + Expect(listObj.Items).NotTo(BeEmpty()) + Expect(listObj.Items).Should(HaveLen(1)) + actual := listObj.Items[0] + Expect(actual.GetName()).To(Equal("test-pod-3")) + }, 3) + }) }) }) -}) +} diff --git a/pkg/cache/informer_cache.go b/pkg/cache/informer_cache.go index c33914ee19..6348937466 100644 --- a/pkg/cache/informer_cache.go +++ b/pkg/cache/informer_cache.go @@ -99,7 +99,7 @@ func (ip *informerCache) List(ctx context.Context, out runtime.Object, opts ...c } // GetInformerForKind returns the informer for the GroupVersionKind -func (ip *informerCache) GetInformerForKind(gvk schema.GroupVersionKind) (cache.SharedIndexInformer, error) { +func (ip *informerCache) GetInformerForKind(gvk schema.GroupVersionKind) (Informer, error) { // Map the gvk to an object obj, err := ip.Scheme.New(gvk) if err != nil { @@ -113,7 +113,7 @@ func (ip *informerCache) GetInformerForKind(gvk schema.GroupVersionKind) (cache. } // GetInformer returns the informer for the obj -func (ip *informerCache) GetInformer(obj runtime.Object) (cache.SharedIndexInformer, error) { +func (ip *informerCache) GetInformer(obj runtime.Object) (Informer, error) { gvk, err := apiutil.GVKForObject(obj, ip.Scheme) if err != nil { return nil, err @@ -135,10 +135,10 @@ func (ip *informerCache) IndexField(obj runtime.Object, field string, extractVal if err != nil { return err } - return indexByField(informer.GetIndexer(), field, extractValue) + return indexByField(informer, field, extractValue) } -func indexByField(indexer cache.Indexer, field string, extractor client.IndexerFunc) error { +func indexByField(indexer Informer, field string, extractor client.IndexerFunc) error { indexFunc := func(objRaw interface{}) ([]string, error) { // TODO(directxman12): check if this is the correct type? obj, isObj := objRaw.(runtime.Object) diff --git a/pkg/cache/informertest/fake_cache.go b/pkg/cache/informertest/fake_cache.go index 2f311cf729..93a8fbcf4c 100644 --- a/pkg/cache/informertest/fake_cache.go +++ b/pkg/cache/informertest/fake_cache.go @@ -39,7 +39,7 @@ type FakeInformers struct { } // GetInformerForKind implements Informers -func (c *FakeInformers) GetInformerForKind(gvk schema.GroupVersionKind) (toolscache.SharedIndexInformer, error) { +func (c *FakeInformers) GetInformerForKind(gvk schema.GroupVersionKind) (cache.Informer, error) { if c.Scheme == nil { c.Scheme = scheme.Scheme } @@ -67,7 +67,7 @@ func (c *FakeInformers) FakeInformerForKind(gvk schema.GroupVersionKind) (*contr } // GetInformer implements Informers -func (c *FakeInformers) GetInformer(obj runtime.Object) (toolscache.SharedIndexInformer, error) { +func (c *FakeInformers) GetInformer(obj runtime.Object) (cache.Informer, error) { if c.Scheme == nil { c.Scheme = scheme.Scheme } diff --git a/pkg/cache/multi_namespace_cache.go b/pkg/cache/multi_namespace_cache.go new file mode 100644 index 0000000000..4f59299487 --- /dev/null +++ b/pkg/cache/multi_namespace_cache.go @@ -0,0 +1,212 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "context" + "encoding/json" + "fmt" + "time" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/rest" + toolscache "k8s.io/client-go/tools/cache" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/apiutil" +) + +// NewCacheFunc - Function for creating a new cache from the options and a rest config +type NewCacheFunc func(config *rest.Config, opts Options) (Cache, error) + +// MultiNamespacedCacheBuilder - Builder function to create a new multi-namespaced cache. +// This will scope the cache to a list of namespaces. Listing for all Namespaces +// will list for all the namespaces that this knows about. +func MultiNamespacedCacheBuilder(namespaces []string) NewCacheFunc { + return func(config *rest.Config, opts Options) (Cache, error) { + opts, err := defaultOpts(config, opts) + if err != nil { + return nil, err + } + caches := map[string]Cache{} + for _, ns := range namespaces { + opts.Namespace = ns + c, err := New(config, opts) + if err != nil { + return nil, err + } + caches[ns] = c + } + return &multiNamespaceCache{namespaceToCache: caches, Scheme: opts.Scheme}, nil + } +} + +// multiNamespaceCache knows how to handle multiple namespaced caches +// Use this feature when scoping permissions for your +// operator to a list of namespaces instead of watching every namespace +// in the cluster. +type multiNamespaceCache struct { + namespaceToCache map[string]Cache + Scheme *runtime.Scheme +} + +var _ Cache = &multiNamespaceCache{} + +// Methods for multiNamespaceCache to conform to the Informers interface +func (c *multiNamespaceCache) GetInformer(obj runtime.Object) (Informer, error) { + informers := map[string]Informer{} + for ns, cache := range c.namespaceToCache { + informer, err := cache.GetInformer(obj) + if err != nil { + return nil, err + } + informers[ns] = informer + } + return &multiNamespaceInformer{namespaceToInformer: informers}, nil +} + +func (c *multiNamespaceCache) GetInformerForKind(gvk schema.GroupVersionKind) (Informer, error) { + informers := map[string]Informer{} + for ns, cache := range c.namespaceToCache { + informer, err := cache.GetInformerForKind(gvk) + if err != nil { + return nil, err + } + informers[ns] = informer + } + return &multiNamespaceInformer{namespaceToInformer: informers}, nil +} + +func (c *multiNamespaceCache) Start(stopCh <-chan struct{}) error { + for ns, cache := range c.namespaceToCache { + go func(ns string, cache Cache) { + err := cache.Start(stopCh) + if err != nil { + log.Error(err, "multinamespace cache failed to start namespaced informer", "namespace", ns) + } + }(ns, cache) + } + <-stopCh + return nil +} + +func (c *multiNamespaceCache) WaitForCacheSync(stop <-chan struct{}) bool { + synced := true + for _, cache := range c.namespaceToCache { + if s := cache.WaitForCacheSync(stop); !s { + synced = s + } + } + return synced +} + +func (c *multiNamespaceCache) IndexField(obj runtime.Object, field string, extractValue client.IndexerFunc) error { + for _, cache := range c.namespaceToCache { + if err := cache.IndexField(obj, field, extractValue); err != nil { + return err + } + } + return nil +} + +func (c *multiNamespaceCache) Get(ctx context.Context, key client.ObjectKey, obj runtime.Object) error { + cache, ok := c.namespaceToCache[key.Namespace] + if !ok { + return fmt.Errorf("unable to get: %v because of unknown namespace for the cache", key) + } + return cache.Get(ctx, key, obj) +} + +// List multi namespace cache will get all the objects in the namespaces that the cache is watching if asked for all namespaces. +func (c *multiNamespaceCache) List(ctx context.Context, list runtime.Object, opts ...client.ListOptionFunc) error { + listOpts := client.ListOptions{} + listOpts.ApplyOptions(opts) + if listOpts.Namespace != corev1.NamespaceAll { + cache, ok := c.namespaceToCache[listOpts.Namespace] + if !ok { + return fmt.Errorf("unable to get: %v because of unknown namespace for the cache", listOpts.Namespace) + } + return cache.List(ctx, list, opts...) + } + + // Get all the objects in the namespaces we are watching. + gvk, err := apiutil.GVKForObject(list, c.Scheme) + if err != nil { + return err + } + allItems := &unstructured.UnstructuredList{} + for _, cache := range c.namespaceToCache { + items := &unstructured.UnstructuredList{} + items.SetGroupVersionKind(gvk) + err := cache.List(ctx, items, opts...) + if err != nil { + return err + } + allItems.Items = append(allItems.Items, items.Items...) + // The last list call should have the most correct resource version. + allItems.Object = items.Object + } + data, err := allItems.MarshalJSON() + if err != nil { + return err + } + return json.Unmarshal(data, list) +} + +// multiNamespaceInformer knows how to handle interacting with the underlying informer across multiple namespaces +type multiNamespaceInformer struct { + namespaceToInformer map[string]Informer +} + +var _ Informer = &multiNamespaceInformer{} + +// AddEventHandler adds the handler to each namespaced informer +func (i *multiNamespaceInformer) AddEventHandler(handler toolscache.ResourceEventHandler) { + for _, informer := range i.namespaceToInformer { + informer.AddEventHandler(handler) + } +} + +// AddEventHandlerWithResyncPeriod adds the handler with a resync period to each namespaced informer +func (i *multiNamespaceInformer) AddEventHandlerWithResyncPeriod(handler toolscache.ResourceEventHandler, resyncPeriod time.Duration) { + for _, informer := range i.namespaceToInformer { + informer.AddEventHandlerWithResyncPeriod(handler, resyncPeriod) + } +} + +// AddIndexers adds the indexer for each namespaced informer +func (i *multiNamespaceInformer) AddIndexers(indexers toolscache.Indexers) error { + for _, informer := range i.namespaceToInformer { + err := informer.AddIndexers(indexers) + if err != nil { + return err + } + } + return nil +} + +// HasSynced checks if each namespaced informer has synced +func (i *multiNamespaceInformer) HasSynced() bool { + for _, informer := range i.namespaceToInformer { + if ok := informer.HasSynced(); !ok { + return ok + } + } + return true +} diff --git a/pkg/manager/example_test.go b/pkg/manager/example_test.go index f718b38f8e..21ee4e7852 100644 --- a/pkg/manager/example_test.go +++ b/pkg/manager/example_test.go @@ -19,9 +19,10 @@ package manager_test import ( "os" + "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client/config" - "sigs.k8s.io/controller-runtime/pkg/manager" logf "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/runtime/signals" ) @@ -47,8 +48,26 @@ func ExampleNew() { log.Info("created manager", "manager", mgr) } +// This example creates a new Manager that has a cache scoped to a list of namespaces. +func ExampleNew_multinamespaceCache() { + cfg, err := config.GetConfig() + if err != nil { + log.Error(err, "unable to get kubeconfig") + os.Exit(1) + } + + mgr, err := manager.New(cfg, manager.Options{ + NewCache: cache.MultiNamespacedCacheBuilder([]string{"namespace1", "namespace2"}), + }) + if err != nil { + log.Error(err, "unable to set up manager") + os.Exit(1) + } + log.Info("created manager", "manager", mgr) +} + // This example adds a Runnable for the Manager to Start. -func ExampleManager_Add() { +func ExampleManager_add() { err := mgr.Add(manager.RunnableFunc(func(<-chan struct{}) error { // Do something return nil @@ -60,7 +79,7 @@ func ExampleManager_Add() { } // This example starts a Manager that has had Runnables added. -func ExampleManager_Start() { +func ExampleManager_start() { err := mgr.Start(signals.SetupSignalHandler()) if err != nil { log.Error(err, "unable start the manager") diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index 8d346df6c1..8a1ccfbd68 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -122,7 +122,7 @@ type Options struct { // NewCache is the function that will create the cache to be used // by the manager. If not set this will use the default new cache function. - NewCache NewCacheFunc + NewCache cache.NewCacheFunc // NewClient will create the client to be used by the manager. // If not set this will create the default DelegatingClient that will @@ -136,9 +136,6 @@ type Options struct { newMetricsListener func(addr string) (net.Listener, error) } -// NewCacheFunc allows a user to define how to create a cache -type NewCacheFunc func(config *rest.Config, opts cache.Options) (cache.Cache, error) - // NewClientFunc allows a user to define how to create a client type NewClientFunc func(cache cache.Cache, config *rest.Config, options client.Options) (client.Client, error)