Skip to content

Commit

Permalink
Option to restrict informer cache to a namespace
Browse files Browse the repository at this point in the history
A namespace option can be passed from the manager to
the cache which restricts the ListWatch for all informers
to the desired namespace. This way a manager can be run
with a Role instead of a ClusterRole.
  • Loading branch information
hasbro17 committed Sep 11, 2018
1 parent 419794c commit 9ddd976
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 5 deletions.
6 changes: 5 additions & 1 deletion pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ type Options struct {

// Resync is the resync period. Defaults to defaultResyncTime.
Resync *time.Duration

// Namespace restricts the cache's ListWatch to the desired namespace
// Default watches all namespaces
Namespace string
}

var defaultResyncTime = 10 * time.Hour
Expand All @@ -89,7 +93,7 @@ func New(config *rest.Config, opts Options) (Cache, error) {
if err != nil {
return nil, err
}
im := internal.NewInformersMap(config, opts.Scheme, opts.Mapper, *opts.Resync)
im := internal.NewInformersMap(config, opts.Scheme, opts.Mapper, *opts.Resync, opts.Namespace)
return &informerCache{InformersMap: im}, nil
}

Expand Down
22 changes: 22 additions & 0 deletions pkg/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,28 @@ var _ = Describe("Informer Cache", func() {
Expect(actual.Namespace).To(Equal(testNamespaceOne))
})

It("should be able to restrict cache to a namespace", func() {
By("creating a namespaced cache")
namespacedCache, err := cache.New(cfg, cache.Options{Namespace: testNamespaceOne})
Expect(err).NotTo(HaveOccurred())

By("running the cache and waiting for it to sync")
go func() {
defer GinkgoRecover()
Expect(namespacedCache.Start(stop)).To(Succeed())
}()
Expect(namespacedCache.WaitForCacheSync(stop)).NotTo(BeFalse())

By("listing pods in all namespaces")
out := kcorev1.PodList{}
Expect(namespacedCache.List(context.Background(), nil, &out)).To(Succeed())

By("verifying the returned pod is from the watched namespace")
Expect(out.Items).NotTo(BeEmpty())
Expect(out.Items).Should(HaveLen(1))
Expect(out.Items[0].Namespace).To(Equal(testNamespaceOne))
})

It("should deep copy the object unless told otherwise", func() {
By("retrieving a specific pod from the cache")
out := &kcorev1.Pod{}
Expand Down
12 changes: 9 additions & 3 deletions pkg/cache/internal/informers_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ import (
func NewInformersMap(config *rest.Config,
scheme *runtime.Scheme,
mapper meta.RESTMapper,
resync time.Duration) *InformersMap {
resync time.Duration,
namespace string) *InformersMap {
ip := &InformersMap{
config: config,
Scheme: scheme,
Expand All @@ -45,6 +46,7 @@ func NewInformersMap(config *rest.Config,
codecs: serializer.NewCodecFactory(scheme),
paramCodec: runtime.NewParameterCodec(scheme),
resync: resync,
namespace: namespace,
}
return ip
}
Expand Down Expand Up @@ -90,6 +92,10 @@ type InformersMap struct {

// start is true if the informers have been started
started bool

// namespace is the namespace that all ListWatches are restricted to
// default means all namespaces
namespace string
}

// Start calls Run on each of the informers and sets started to true. Blocks on the stop channel.
Expand Down Expand Up @@ -217,14 +223,14 @@ func (ip *InformersMap) newListWatch(gvk schema.GroupVersionKind) (*cache.ListWa
return &cache.ListWatch{
ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
res := listObj.DeepCopyObject()
err := client.Get().Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Do().Into(res)
err := client.Get().NamespaceIfScoped(ip.namespace, ip.namespace != "").Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Do().Into(res)
return res, err
},
// Setup the watch function
WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
// Watch needs to be set to true separately
opts.Watch = true
return client.Get().Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Watch()
return client.Get().NamespaceIfScoped(ip.namespace, ip.namespace != "").Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Watch()
},
}, nil
}
6 changes: 5 additions & 1 deletion pkg/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ type Options struct {
// will use for holding the leader lock.
LeaderElectionID string

// Namespace if specified restricts the manager's cache to watch the desired namespace
// Defaults to all namespaces
Namespace string

// Dependency injection for testing
newCache func(config *rest.Config, opts cache.Options) (cache.Cache, error)
newClient func(config *rest.Config, options client.Options) (client.Client, error)
Expand Down Expand Up @@ -143,7 +147,7 @@ func New(config *rest.Config, options Options) (Manager, error) {
}

// Create the cache for the cached read client and registering informers
cache, err := options.newCache(config, cache.Options{Scheme: options.Scheme, Mapper: mapper, Resync: options.SyncPeriod})
cache, err := options.newCache(config, cache.Options{Scheme: options.Scheme, Mapper: mapper, Resync: options.SyncPeriod, Namespace: options.Namespace})
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 9ddd976

Please sign in to comment.