diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index 5435d27151..cb87d8f385 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -79,6 +79,10 @@ type Options struct { // Resync is the resync period. Defaults to defaultResyncTime. Resync *time.Duration + + // Namespace restricts the cache's ListWatch to the desired namespace + // Default watches all namespaces + Namespace string } var defaultResyncTime = 10 * time.Hour @@ -89,7 +93,7 @@ func New(config *rest.Config, opts Options) (Cache, error) { if err != nil { return nil, err } - im := internal.NewInformersMap(config, opts.Scheme, opts.Mapper, *opts.Resync) + im := internal.NewInformersMap(config, opts.Scheme, opts.Mapper, *opts.Resync, opts.Namespace) return &informerCache{InformersMap: im}, nil } diff --git a/pkg/cache/cache_test.go b/pkg/cache/cache_test.go index 832cecde8d..ff4f874369 100644 --- a/pkg/cache/cache_test.go +++ b/pkg/cache/cache_test.go @@ -195,6 +195,35 @@ var _ = Describe("Informer Cache", func() { Expect(actual.Namespace).To(Equal(testNamespaceOne)) }) + It("should be able to restrict cache to a namespace", func() { + By("creating a namespaced cache") + namespacedCache, err := cache.New(cfg, cache.Options{Namespace: testNamespaceOne}) + Expect(err).NotTo(HaveOccurred()) + + By("running the cache and waiting for it to sync") + go func() { + defer GinkgoRecover() + Expect(namespacedCache.Start(stop)).To(Succeed()) + }() + Expect(namespacedCache.WaitForCacheSync(stop)).NotTo(BeFalse()) + + By("listing pods in all namespaces") + out := &kcorev1.PodList{} + Expect(namespacedCache.List(context.Background(), nil, out)).To(Succeed()) + + By("verifying the returned pod is from the watched namespace") + Expect(out.Items).NotTo(BeEmpty()) + Expect(out.Items).Should(HaveLen(1)) + Expect(out.Items[0].Namespace).To(Equal(testNamespaceOne)) + + By("listing all namespaces - should still be able to get a cluster-scoped resource") + namespaceList := &kcorev1.NamespaceList{} + Expect(namespacedCache.List(context.Background(), nil, namespaceList)).To(Succeed()) + + By("verifying the namespace list is not empty") + Expect(namespaceList.Items).NotTo(BeEmpty()) + }) + It("should deep copy the object unless told otherwise", func() { By("retrieving a specific pod from the cache") out := &kcorev1.Pod{} @@ -333,6 +362,45 @@ var _ = Describe("Informer Cache", func() { Expect(actual.GetNamespace()).To(Equal(testNamespaceOne)) }) + It("should be able to restrict cache to a namespace", func() { + By("creating a namespaced cache") + namespacedCache, err := cache.New(cfg, cache.Options{Namespace: testNamespaceOne}) + Expect(err).NotTo(HaveOccurred()) + + By("running the cache and waiting for it to sync") + go func() { + defer GinkgoRecover() + Expect(namespacedCache.Start(stop)).To(Succeed()) + }() + Expect(namespacedCache.WaitForCacheSync(stop)).NotTo(BeFalse()) + + By("listing pods in all namespaces") + out := &unstructured.UnstructuredList{} + out.SetGroupVersionKind(schema.GroupVersionKind{ + Group: "", + Version: "v1", + Kind: "PodList", + }) + Expect(namespacedCache.List(context.Background(), nil, out)).To(Succeed()) + + By("verifying the returned pod is from the watched namespace") + Expect(out.Items).NotTo(BeEmpty()) + Expect(out.Items).Should(HaveLen(1)) + Expect(out.Items[0].GetNamespace()).To(Equal(testNamespaceOne)) + + By("listing all namespaces - should still be able to get a cluster-scoped resource") + namespaceList := &unstructured.UnstructuredList{} + namespaceList.SetGroupVersionKind(schema.GroupVersionKind{ + Group: "", + Version: "v1", + Kind: "NamespaceList", + }) + Expect(namespacedCache.List(context.Background(), nil, namespaceList)).To(Succeed()) + + By("verifying the namespace list is not empty") + Expect(namespaceList.Items).NotTo(BeEmpty()) + }) + It("should deep copy the object unless told otherwise", func() { By("retrieving a specific pod from the cache") out := &unstructured.Unstructured{} diff --git a/pkg/cache/internal/deleg_map.go b/pkg/cache/internal/deleg_map.go index feb1c6625f..978002d9cf 100644 --- a/pkg/cache/internal/deleg_map.go +++ b/pkg/cache/internal/deleg_map.go @@ -44,11 +44,12 @@ type InformersMap struct { func NewInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, - resync time.Duration) *InformersMap { + resync time.Duration, + namespace string) *InformersMap { return &InformersMap{ - structured: newStructuredInformersMap(config, scheme, mapper, resync), - unstructured: newUnstructuredInformersMap(config, scheme, mapper, resync), + structured: newStructuredInformersMap(config, scheme, mapper, resync, namespace), + unstructured: newUnstructuredInformersMap(config, scheme, mapper, resync, namespace), Scheme: scheme, } @@ -85,11 +86,11 @@ func (m *InformersMap) Get(gvk schema.GroupVersionKind, obj runtime.Object) (*Ma } // newStructuredInformersMap creates a new InformersMap for structured objects. -func newStructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration) *specificInformersMap { - return newSpecificInformersMap(config, scheme, mapper, resync, createStructuredListWatch) +func newStructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, namespace string) *specificInformersMap { + return newSpecificInformersMap(config, scheme, mapper, resync, namespace, createStructuredListWatch) } // newUnstructuredInformersMap creates a new InformersMap for unstructured objects. -func newUnstructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration) *specificInformersMap { - return newSpecificInformersMap(config, scheme, mapper, resync, createUnstructuredListWatch) +func newUnstructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, namespace string) *specificInformersMap { + return newSpecificInformersMap(config, scheme, mapper, resync, namespace, createUnstructuredListWatch) } diff --git a/pkg/cache/internal/informers_map.go b/pkg/cache/internal/informers_map.go index 75593c3458..9dea36966b 100644 --- a/pkg/cache/internal/informers_map.go +++ b/pkg/cache/internal/informers_map.go @@ -42,7 +42,9 @@ type createListWatcherFunc func(gvk schema.GroupVersionKind, ip *specificInforme func newSpecificInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, - resync time.Duration, createListWatcher createListWatcherFunc) *specificInformersMap { + resync time.Duration, + namespace string, + createListWatcher createListWatcherFunc) *specificInformersMap { ip := &specificInformersMap{ config: config, Scheme: scheme, @@ -52,6 +54,7 @@ func newSpecificInformersMap(config *rest.Config, paramCodec: runtime.NewParameterCodec(scheme), resync: resync, createListWatcher: createListWatcher, + namespace: namespace, } return ip } @@ -102,6 +105,10 @@ type specificInformersMap struct { // and allows for abstracting over the particulars of structured vs // unstructured objects. createListWatcher createListWatcherFunc + + // namespace is the namespace that all ListWatches are restricted to + // default or empty string means all namespaces + namespace string } // Start calls Run on each of the informers and sets started to true. Blocks on the stop channel. @@ -227,14 +234,16 @@ func createStructuredListWatch(gvk schema.GroupVersionKind, ip *specificInformer return &cache.ListWatch{ ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { res := listObj.DeepCopyObject() - err := client.Get().Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Do().Into(res) + isNamespaceScoped := ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot + err := client.Get().NamespaceIfScoped(ip.namespace, isNamespaceScoped).Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Do().Into(res) return res, err }, // Setup the watch function WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { // Watch needs to be set to true separately opts.Watch = true - return client.Get().Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Watch() + isNamespaceScoped := ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot + return client.Get().NamespaceIfScoped(ip.namespace, isNamespaceScoped).Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Watch() }, }, nil } @@ -254,12 +263,18 @@ func createUnstructuredListWatch(gvk schema.GroupVersionKind, ip *specificInform // Create a new ListWatch for the obj return &cache.ListWatch{ ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + if ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot { + return dynamicClient.Resource(mapping.Resource).Namespace(ip.namespace).List(opts) + } return dynamicClient.Resource(mapping.Resource).List(opts) }, // Setup the watch function WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { // Watch needs to be set to true separately opts.Watch = true + if ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot { + return dynamicClient.Resource(mapping.Resource).Namespace(ip.namespace).Watch(opts) + } return dynamicClient.Resource(mapping.Resource).Watch(opts) }, }, nil diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index e4cdbb22c4..be83eb973c 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -106,6 +106,12 @@ type Options struct { // will use for holding the leader lock. LeaderElectionID string + // Namespace if specified restricts the manager's cache to watch objects in the desired namespace + // Defaults to all namespaces + // Note: If a namespace is specified then controllers can still Watch for a cluster-scoped resource e.g Node + // For namespaced resources the cache will only hold objects from the desired namespace. + Namespace string + // Dependency injection for testing newCache func(config *rest.Config, opts cache.Options) (cache.Cache, error) newClient func(config *rest.Config, options client.Options) (client.Client, error) @@ -153,7 +159,7 @@ func New(config *rest.Config, options Options) (Manager, error) { } // Create the cache for the cached read client and registering informers - cache, err := options.newCache(config, cache.Options{Scheme: options.Scheme, Mapper: mapper, Resync: options.SyncPeriod}) + cache, err := options.newCache(config, cache.Options{Scheme: options.Scheme, Mapper: mapper, Resync: options.SyncPeriod, Namespace: options.Namespace}) if err != nil { return nil, err }