diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index 5435d27151..cb87d8f385 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -79,6 +79,10 @@ type Options struct { // Resync is the resync period. Defaults to defaultResyncTime. Resync *time.Duration + + // Namespace restricts the cache's ListWatch to the desired namespace + // Default watches all namespaces + Namespace string } var defaultResyncTime = 10 * time.Hour @@ -89,7 +93,7 @@ func New(config *rest.Config, opts Options) (Cache, error) { if err != nil { return nil, err } - im := internal.NewInformersMap(config, opts.Scheme, opts.Mapper, *opts.Resync) + im := internal.NewInformersMap(config, opts.Scheme, opts.Mapper, *opts.Resync, opts.Namespace) return &informerCache{InformersMap: im}, nil } diff --git a/pkg/cache/cache_test.go b/pkg/cache/cache_test.go index 82441777d8..3369912afd 100644 --- a/pkg/cache/cache_test.go +++ b/pkg/cache/cache_test.go @@ -188,6 +188,28 @@ var _ = Describe("Informer Cache", func() { Expect(actual.Namespace).To(Equal(testNamespaceOne)) }) + It("should be able to restrict cache to a namespace", func() { + By("creating a namespaced cache") + namespacedCache, err := cache.New(cfg, cache.Options{Namespace: testNamespaceOne}) + Expect(err).NotTo(HaveOccurred()) + + By("running the cache and waiting for it to sync") + go func() { + defer GinkgoRecover() + Expect(namespacedCache.Start(stop)).To(Succeed()) + }() + Expect(namespacedCache.WaitForCacheSync(stop)).NotTo(BeFalse()) + + By("listing pods in all namespaces") + out := kcorev1.PodList{} + Expect(namespacedCache.List(context.Background(), nil, &out)).To(Succeed()) + + By("verifying the returned pod is from the watched namespace") + Expect(out.Items).NotTo(BeEmpty()) + Expect(out.Items).Should(HaveLen(1)) + Expect(out.Items[0].Namespace).To(Equal(testNamespaceOne)) + }) + It("should deep copy the object unless told otherwise", func() { By("retrieving a specific pod from the cache") out := &kcorev1.Pod{} diff --git a/pkg/cache/internal/informers_map.go b/pkg/cache/internal/informers_map.go index 4564fc95e1..8b3a6cf441 100644 --- a/pkg/cache/internal/informers_map.go +++ b/pkg/cache/internal/informers_map.go @@ -36,7 +36,8 @@ import ( func NewInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, - resync time.Duration) *InformersMap { + resync time.Duration, + namespace string) *InformersMap { ip := &InformersMap{ config: config, Scheme: scheme, @@ -45,6 +46,7 @@ func NewInformersMap(config *rest.Config, codecs: serializer.NewCodecFactory(scheme), paramCodec: runtime.NewParameterCodec(scheme), resync: resync, + namespace: namespace, } return ip } @@ -90,6 +92,10 @@ type InformersMap struct { // start is true if the informers have been started started bool + + // namespace is the namespace that all ListWatches are restricted to + // default means all namespaces + namespace string } // Start calls Run on each of the informers and sets started to true. Blocks on the stop channel. @@ -217,14 +223,14 @@ func (ip *InformersMap) newListWatch(gvk schema.GroupVersionKind) (*cache.ListWa return &cache.ListWatch{ ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { res := listObj.DeepCopyObject() - err := client.Get().Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Do().Into(res) + err := client.Get().Namespace(ip.namespace).Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Do().Into(res) return res, err }, // Setup the watch function WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { // Watch needs to be set to true separately opts.Watch = true - return client.Get().Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Watch() + return client.Get().Namespace(ip.namespace).Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Watch() }, }, nil } diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index cc34ce419a..d9e002acc8 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -83,6 +83,10 @@ type Options struct { // value only if you know what you are doing. Defaults to 10 hours if unset. SyncPeriod *time.Duration + // Namespace if specified restricts the manager's cache to watch the desired namespace + // Defaults to all namespaces + Namespace string + // Dependency injection for testing newCache func(config *rest.Config, opts cache.Options) (cache.Cache, error) newClient func(config *rest.Config, options client.Options) (client.Client, error) @@ -128,7 +132,7 @@ func New(config *rest.Config, options Options) (Manager, error) { } // Create the cache for the cached read client and registering informers - cache, err := options.newCache(config, cache.Options{Scheme: options.Scheme, Mapper: mapper, Resync: options.SyncPeriod}) + cache, err := options.newCache(config, cache.Options{Scheme: options.Scheme, Mapper: mapper, Resync: options.SyncPeriod, Namespace: options.Namespace}) if err != nil { return nil, err }