Skip to content

Commit

Permalink
Option to restrict informer cache to a namespace
Browse files Browse the repository at this point in the history
A namespace option can be passed from the manager to
the cache which restricts the ListWatch for all informers
to the desired namespace. This way a manager can be run
with a Role instead of a ClusterRole.
  • Loading branch information
hasbro17 committed Sep 21, 2018
1 parent df7c11e commit b976451
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 12 deletions.
6 changes: 5 additions & 1 deletion pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ type Options struct {

// Resync is the resync period. Defaults to defaultResyncTime.
Resync *time.Duration

// Namespace restricts the cache's ListWatch to the desired namespace
// Default watches all namespaces
Namespace string
}

var defaultResyncTime = 10 * time.Hour
Expand All @@ -89,7 +93,7 @@ func New(config *rest.Config, opts Options) (Cache, error) {
if err != nil {
return nil, err
}
im := internal.NewInformersMap(config, opts.Scheme, opts.Mapper, *opts.Resync)
im := internal.NewInformersMap(config, opts.Scheme, opts.Mapper, *opts.Resync, opts.Namespace)
return &informerCache{InformersMap: im}, nil
}

Expand Down
49 changes: 49 additions & 0 deletions pkg/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,28 @@ var _ = Describe("Informer Cache", func() {
Expect(actual.Namespace).To(Equal(testNamespaceOne))
})

It("should be able to restrict cache to a namespace", func() {
By("creating a namespaced cache")
namespacedCache, err := cache.New(cfg, cache.Options{Namespace: testNamespaceOne})
Expect(err).NotTo(HaveOccurred())

By("running the cache and waiting for it to sync")
go func() {
defer GinkgoRecover()
Expect(namespacedCache.Start(stop)).To(Succeed())
}()
Expect(namespacedCache.WaitForCacheSync(stop)).NotTo(BeFalse())

By("listing pods in all namespaces")
out := &kcorev1.PodList{}
Expect(namespacedCache.List(context.Background(), nil, out)).To(Succeed())

By("verifying the returned pod is from the watched namespace")
Expect(out.Items).NotTo(BeEmpty())
Expect(out.Items).Should(HaveLen(1))
Expect(out.Items[0].Namespace).To(Equal(testNamespaceOne))
})

It("should deep copy the object unless told otherwise", func() {
By("retrieving a specific pod from the cache")
out := &kcorev1.Pod{}
Expand Down Expand Up @@ -333,6 +355,33 @@ var _ = Describe("Informer Cache", func() {
Expect(actual.GetNamespace()).To(Equal(testNamespaceOne))
})

It("should be able to restrict cache to a namespace", func() {
By("creating a namespaced cache")
namespacedCache, err := cache.New(cfg, cache.Options{Namespace: testNamespaceOne})
Expect(err).NotTo(HaveOccurred())

By("running the cache and waiting for it to sync")
go func() {
defer GinkgoRecover()
Expect(namespacedCache.Start(stop)).To(Succeed())
}()
Expect(namespacedCache.WaitForCacheSync(stop)).NotTo(BeFalse())

By("listing pods in all namespaces")
out := &unstructured.UnstructuredList{}
out.SetGroupVersionKind(schema.GroupVersionKind{
Group: "",
Version: "v1",
Kind: "PodList",
})
Expect(namespacedCache.List(context.Background(), nil, out)).To(Succeed())

By("verifying the returned pod is from the watched namespace")
Expect(out.Items).NotTo(BeEmpty())
Expect(out.Items).Should(HaveLen(1))
Expect(out.Items[0].GetNamespace()).To(Equal(testNamespaceOne))
})

It("should deep copy the object unless told otherwise", func() {
By("retrieving a specific pod from the cache")
out := &unstructured.Unstructured{}
Expand Down
15 changes: 8 additions & 7 deletions pkg/cache/internal/deleg_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,12 @@ type InformersMap struct {
func NewInformersMap(config *rest.Config,
scheme *runtime.Scheme,
mapper meta.RESTMapper,
resync time.Duration) *InformersMap {
resync time.Duration,
namespace string) *InformersMap {

return &InformersMap{
structured: newStructuredInformersMap(config, scheme, mapper, resync),
unstructured: newUnstructuredInformersMap(config, scheme, mapper, resync),
structured: newStructuredInformersMap(config, scheme, mapper, resync, namespace),
unstructured: newUnstructuredInformersMap(config, scheme, mapper, resync, namespace),

Scheme: scheme,
}
Expand Down Expand Up @@ -85,11 +86,11 @@ func (m *InformersMap) Get(gvk schema.GroupVersionKind, obj runtime.Object) (*Ma
}

// newStructuredInformersMap creates a new InformersMap for structured objects.
func newStructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration) *specificInformersMap {
return newSpecificInformersMap(config, scheme, mapper, resync, createStructuredListWatch)
func newStructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, namespace string) *specificInformersMap {
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, createStructuredListWatch)
}

// newUnstructuredInformersMap creates a new InformersMap for unstructured objects.
func newUnstructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration) *specificInformersMap {
return newSpecificInformersMap(config, scheme, mapper, resync, createUnstructuredListWatch)
func newUnstructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, namespace string) *specificInformersMap {
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, createUnstructuredListWatch)
}
19 changes: 16 additions & 3 deletions pkg/cache/internal/informers_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ type createListWatcherFunc func(gvk schema.GroupVersionKind, ip *specificInforme
func newSpecificInformersMap(config *rest.Config,
scheme *runtime.Scheme,
mapper meta.RESTMapper,
resync time.Duration, createListWatcher createListWatcherFunc) *specificInformersMap {
resync time.Duration,
namespace string,
createListWatcher createListWatcherFunc) *specificInformersMap {
ip := &specificInformersMap{
config: config,
Scheme: scheme,
Expand All @@ -52,6 +54,7 @@ func newSpecificInformersMap(config *rest.Config,
paramCodec: runtime.NewParameterCodec(scheme),
resync: resync,
createListWatcher: createListWatcher,
namespace: namespace,
}
return ip
}
Expand Down Expand Up @@ -102,6 +105,10 @@ type specificInformersMap struct {
// and allows for abstracting over the particulars of structured vs
// unstructured objects.
createListWatcher createListWatcherFunc

// namespace is the namespace that all ListWatches are restricted to
// default or empty string means all namespaces
namespace string
}

// Start calls Run on each of the informers and sets started to true. Blocks on the stop channel.
Expand Down Expand Up @@ -225,14 +232,14 @@ func createStructuredListWatch(gvk schema.GroupVersionKind, ip *specificInformer
return &cache.ListWatch{
ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
res := listObj.DeepCopyObject()
err := client.Get().Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Do().Into(res)
err := client.Get().NamespaceIfScoped(ip.namespace, ip.namespace != "").Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Do().Into(res)
return res, err
},
// Setup the watch function
WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
// Watch needs to be set to true separately
opts.Watch = true
return client.Get().Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Watch()
return client.Get().NamespaceIfScoped(ip.namespace, ip.namespace != "").Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Watch()
},
}, nil
}
Expand All @@ -252,12 +259,18 @@ func createUnstructuredListWatch(gvk schema.GroupVersionKind, ip *specificInform
// Create a new ListWatch for the obj
return &cache.ListWatch{
ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
if ip.namespace != "" {
return dynamicClient.Resource(mapping.Resource).Namespace(ip.namespace).List(opts)
}
return dynamicClient.Resource(mapping.Resource).List(opts)
},
// Setup the watch function
WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
// Watch needs to be set to true separately
opts.Watch = true
if ip.namespace != "" {
return dynamicClient.Resource(mapping.Resource).Namespace(ip.namespace).Watch(opts)
}
return dynamicClient.Resource(mapping.Resource).Watch(opts)
},
}, nil
Expand Down
6 changes: 5 additions & 1 deletion pkg/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ type Options struct {
// will use for holding the leader lock.
LeaderElectionID string

// Namespace if specified restricts the manager's cache to watch the desired namespace
// Defaults to all namespaces
Namespace string

// Dependency injection for testing
newCache func(config *rest.Config, opts cache.Options) (cache.Cache, error)
newClient func(config *rest.Config, options client.Options) (client.Client, error)
Expand Down Expand Up @@ -153,7 +157,7 @@ func New(config *rest.Config, options Options) (Manager, error) {
}

// Create the cache for the cached read client and registering informers
cache, err := options.newCache(config, cache.Options{Scheme: options.Scheme, Mapper: mapper, Resync: options.SyncPeriod})
cache, err := options.newCache(config, cache.Options{Scheme: options.Scheme, Mapper: mapper, Resync: options.SyncPeriod, Namespace: options.Namespace})
if err != nil {
return nil, err
}
Expand Down

0 comments on commit b976451

Please sign in to comment.