Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Option to restrict informer cache to a namespace #136

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ type Options struct {

// Resync is the resync period. Defaults to defaultResyncTime.
Resync *time.Duration

// Namespace restricts the cache's ListWatch to the desired namespace
// Default watches all namespaces
Namespace string
}

var defaultResyncTime = 10 * time.Hour
Expand All @@ -89,7 +93,7 @@ func New(config *rest.Config, opts Options) (Cache, error) {
if err != nil {
return nil, err
}
im := internal.NewInformersMap(config, opts.Scheme, opts.Mapper, *opts.Resync)
im := internal.NewInformersMap(config, opts.Scheme, opts.Mapper, *opts.Resync, opts.Namespace)
return &informerCache{InformersMap: im}, nil
}

Expand Down
68 changes: 68 additions & 0 deletions pkg/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,35 @@ var _ = Describe("Informer Cache", func() {
Expect(actual.Namespace).To(Equal(testNamespaceOne))
})

It("should be able to restrict cache to a namespace", func() {
By("creating a namespaced cache")
namespacedCache, err := cache.New(cfg, cache.Options{Namespace: testNamespaceOne})
Expect(err).NotTo(HaveOccurred())

By("running the cache and waiting for it to sync")
go func() {
defer GinkgoRecover()
Expect(namespacedCache.Start(stop)).To(Succeed())
}()
Expect(namespacedCache.WaitForCacheSync(stop)).NotTo(BeFalse())

By("listing pods in all namespaces")
out := &kcorev1.PodList{}
Expect(namespacedCache.List(context.Background(), nil, out)).To(Succeed())

By("verifying the returned pod is from the watched namespace")
Expect(out.Items).NotTo(BeEmpty())
Expect(out.Items).Should(HaveLen(1))
Expect(out.Items[0].Namespace).To(Equal(testNamespaceOne))

By("listing all namespaces - should still be able to get a cluster-scoped resource")
namespaceList := &kcorev1.NamespaceList{}
Expect(namespacedCache.List(context.Background(), nil, namespaceList)).To(Succeed())

By("verifying the namespace list is not empty")
Expect(namespaceList.Items).NotTo(BeEmpty())
})

It("should deep copy the object unless told otherwise", func() {
By("retrieving a specific pod from the cache")
out := &kcorev1.Pod{}
Expand Down Expand Up @@ -333,6 +362,45 @@ var _ = Describe("Informer Cache", func() {
Expect(actual.GetNamespace()).To(Equal(testNamespaceOne))
})

It("should be able to restrict cache to a namespace", func() {
By("creating a namespaced cache")
namespacedCache, err := cache.New(cfg, cache.Options{Namespace: testNamespaceOne})
Expect(err).NotTo(HaveOccurred())

By("running the cache and waiting for it to sync")
go func() {
defer GinkgoRecover()
Expect(namespacedCache.Start(stop)).To(Succeed())
}()
Expect(namespacedCache.WaitForCacheSync(stop)).NotTo(BeFalse())

By("listing pods in all namespaces")
out := &unstructured.UnstructuredList{}
out.SetGroupVersionKind(schema.GroupVersionKind{
Group: "",
Version: "v1",
Kind: "PodList",
})
Expect(namespacedCache.List(context.Background(), nil, out)).To(Succeed())

By("verifying the returned pod is from the watched namespace")
Expect(out.Items).NotTo(BeEmpty())
Expect(out.Items).Should(HaveLen(1))
Expect(out.Items[0].GetNamespace()).To(Equal(testNamespaceOne))

By("listing all namespaces - should still be able to get a cluster-scoped resource")
namespaceList := &unstructured.UnstructuredList{}
namespaceList.SetGroupVersionKind(schema.GroupVersionKind{
Group: "",
Version: "v1",
Kind: "NamespaceList",
})
Expect(namespacedCache.List(context.Background(), nil, namespaceList)).To(Succeed())

By("verifying the namespace list is not empty")
Expect(namespaceList.Items).NotTo(BeEmpty())
})

It("should deep copy the object unless told otherwise", func() {
By("retrieving a specific pod from the cache")
out := &unstructured.Unstructured{}
Expand Down
15 changes: 8 additions & 7 deletions pkg/cache/internal/deleg_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,12 @@ type InformersMap struct {
func NewInformersMap(config *rest.Config,
scheme *runtime.Scheme,
mapper meta.RESTMapper,
resync time.Duration) *InformersMap {
resync time.Duration,
namespace string) *InformersMap {

return &InformersMap{
structured: newStructuredInformersMap(config, scheme, mapper, resync),
unstructured: newUnstructuredInformersMap(config, scheme, mapper, resync),
structured: newStructuredInformersMap(config, scheme, mapper, resync, namespace),
unstructured: newUnstructuredInformersMap(config, scheme, mapper, resync, namespace),

Scheme: scheme,
}
Expand Down Expand Up @@ -85,11 +86,11 @@ func (m *InformersMap) Get(gvk schema.GroupVersionKind, obj runtime.Object) (*Ma
}

// newStructuredInformersMap creates a new InformersMap for structured objects.
func newStructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration) *specificInformersMap {
return newSpecificInformersMap(config, scheme, mapper, resync, createStructuredListWatch)
func newStructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, namespace string) *specificInformersMap {
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, createStructuredListWatch)
}

// newUnstructuredInformersMap creates a new InformersMap for unstructured objects.
func newUnstructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration) *specificInformersMap {
return newSpecificInformersMap(config, scheme, mapper, resync, createUnstructuredListWatch)
func newUnstructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, namespace string) *specificInformersMap {
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, createUnstructuredListWatch)
}
21 changes: 18 additions & 3 deletions pkg/cache/internal/informers_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ type createListWatcherFunc func(gvk schema.GroupVersionKind, ip *specificInforme
func newSpecificInformersMap(config *rest.Config,
scheme *runtime.Scheme,
mapper meta.RESTMapper,
resync time.Duration, createListWatcher createListWatcherFunc) *specificInformersMap {
resync time.Duration,
namespace string,
createListWatcher createListWatcherFunc) *specificInformersMap {
ip := &specificInformersMap{
config: config,
Scheme: scheme,
Expand All @@ -52,6 +54,7 @@ func newSpecificInformersMap(config *rest.Config,
paramCodec: runtime.NewParameterCodec(scheme),
resync: resync,
createListWatcher: createListWatcher,
namespace: namespace,
}
return ip
}
Expand Down Expand Up @@ -102,6 +105,10 @@ type specificInformersMap struct {
// and allows for abstracting over the particulars of structured vs
// unstructured objects.
createListWatcher createListWatcherFunc

// namespace is the namespace that all ListWatches are restricted to
// default or empty string means all namespaces
namespace string
}

// Start calls Run on each of the informers and sets started to true. Blocks on the stop channel.
Expand Down Expand Up @@ -225,14 +232,16 @@ func createStructuredListWatch(gvk schema.GroupVersionKind, ip *specificInformer
return &cache.ListWatch{
ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
res := listObj.DeepCopyObject()
err := client.Get().Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Do().Into(res)
isNamespaceScoped := ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot
err := client.Get().NamespaceIfScoped(ip.namespace, isNamespaceScoped).Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Do().Into(res)
return res, err
},
// Setup the watch function
WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
// Watch needs to be set to true separately
opts.Watch = true
return client.Get().Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Watch()
isNamespaceScoped := ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot
return client.Get().NamespaceIfScoped(ip.namespace, isNamespaceScoped).Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Watch()
},
}, nil
}
Expand All @@ -252,12 +261,18 @@ func createUnstructuredListWatch(gvk schema.GroupVersionKind, ip *specificInform
// Create a new ListWatch for the obj
return &cache.ListWatch{
ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
if ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot {
return dynamicClient.Resource(mapping.Resource).Namespace(ip.namespace).List(opts)
}
return dynamicClient.Resource(mapping.Resource).List(opts)
},
// Setup the watch function
WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
// Watch needs to be set to true separately
opts.Watch = true
if ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot {
return dynamicClient.Resource(mapping.Resource).Namespace(ip.namespace).Watch(opts)
}
return dynamicClient.Resource(mapping.Resource).Watch(opts)
},
}, nil
Expand Down
8 changes: 7 additions & 1 deletion pkg/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,12 @@ type Options struct {
// will use for holding the leader lock.
LeaderElectionID string

// Namespace if specified restricts the manager's cache to watch objects in the desired namespace
// Defaults to all namespaces
// Note: If a namespace is specified then controllers can still Watch for a cluster-scoped resource e.g Node
// For namespaced resources the cache will only hold objects from the desired namespace.
Namespace string

// Dependency injection for testing
newCache func(config *rest.Config, opts cache.Options) (cache.Cache, error)
newClient func(config *rest.Config, options client.Options) (client.Client, error)
Expand Down Expand Up @@ -153,7 +159,7 @@ func New(config *rest.Config, options Options) (Manager, error) {
}

// Create the cache for the cached read client and registering informers
cache, err := options.newCache(config, cache.Options{Scheme: options.Scheme, Mapper: mapper, Resync: options.SyncPeriod})
cache, err := options.newCache(config, cache.Options{Scheme: options.Scheme, Mapper: mapper, Resync: options.SyncPeriod, Namespace: options.Namespace})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I need to watch all namespace events, set nil to options.Namespace or set other value?
Thanks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To watch all namespaces you set options.Namespaces = "" or metav1.NamespaceAll which is the same thing.
But since the default string value is empty you can just not set the Namespace option in cache.Options and it should watch all namespaces by default.

if err != nil {
return nil, err
}
Expand Down