Skip to content

Commit

Permalink
Merge pull request #136 from hasbro17/make-list-watchers-namespaced
Browse files Browse the repository at this point in the history
Option to restrict informer cache to a namespace
  • Loading branch information
k8s-ci-robot committed Sep 25, 2018
2 parents b2824f6 + ca13e86 commit 53fc44b
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 12 deletions.
6 changes: 5 additions & 1 deletion pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ type Options struct {

// Resync is the resync period. Defaults to defaultResyncTime.
Resync *time.Duration

// Namespace restricts the cache's ListWatch to the desired namespace
// Default watches all namespaces
Namespace string
}

var defaultResyncTime = 10 * time.Hour
Expand All @@ -89,7 +93,7 @@ func New(config *rest.Config, opts Options) (Cache, error) {
if err != nil {
return nil, err
}
im := internal.NewInformersMap(config, opts.Scheme, opts.Mapper, *opts.Resync)
im := internal.NewInformersMap(config, opts.Scheme, opts.Mapper, *opts.Resync, opts.Namespace)
return &informerCache{InformersMap: im}, nil
}

Expand Down
68 changes: 68 additions & 0 deletions pkg/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,35 @@ var _ = Describe("Informer Cache", func() {
Expect(actual.Namespace).To(Equal(testNamespaceOne))
})

It("should be able to restrict cache to a namespace", func() {
By("creating a namespaced cache")
namespacedCache, err := cache.New(cfg, cache.Options{Namespace: testNamespaceOne})
Expect(err).NotTo(HaveOccurred())

By("running the cache and waiting for it to sync")
go func() {
defer GinkgoRecover()
Expect(namespacedCache.Start(stop)).To(Succeed())
}()
Expect(namespacedCache.WaitForCacheSync(stop)).NotTo(BeFalse())

By("listing pods in all namespaces")
out := &kcorev1.PodList{}
Expect(namespacedCache.List(context.Background(), nil, out)).To(Succeed())

By("verifying the returned pod is from the watched namespace")
Expect(out.Items).NotTo(BeEmpty())
Expect(out.Items).Should(HaveLen(1))
Expect(out.Items[0].Namespace).To(Equal(testNamespaceOne))

By("listing all namespaces - should still be able to get a cluster-scoped resource")
namespaceList := &kcorev1.NamespaceList{}
Expect(namespacedCache.List(context.Background(), nil, namespaceList)).To(Succeed())

By("verifying the namespace list is not empty")
Expect(namespaceList.Items).NotTo(BeEmpty())
})

It("should deep copy the object unless told otherwise", func() {
By("retrieving a specific pod from the cache")
out := &kcorev1.Pod{}
Expand Down Expand Up @@ -333,6 +362,45 @@ var _ = Describe("Informer Cache", func() {
Expect(actual.GetNamespace()).To(Equal(testNamespaceOne))
})

It("should be able to restrict cache to a namespace", func() {
By("creating a namespaced cache")
namespacedCache, err := cache.New(cfg, cache.Options{Namespace: testNamespaceOne})
Expect(err).NotTo(HaveOccurred())

By("running the cache and waiting for it to sync")
go func() {
defer GinkgoRecover()
Expect(namespacedCache.Start(stop)).To(Succeed())
}()
Expect(namespacedCache.WaitForCacheSync(stop)).NotTo(BeFalse())

By("listing pods in all namespaces")
out := &unstructured.UnstructuredList{}
out.SetGroupVersionKind(schema.GroupVersionKind{
Group: "",
Version: "v1",
Kind: "PodList",
})
Expect(namespacedCache.List(context.Background(), nil, out)).To(Succeed())

By("verifying the returned pod is from the watched namespace")
Expect(out.Items).NotTo(BeEmpty())
Expect(out.Items).Should(HaveLen(1))
Expect(out.Items[0].GetNamespace()).To(Equal(testNamespaceOne))

By("listing all namespaces - should still be able to get a cluster-scoped resource")
namespaceList := &unstructured.UnstructuredList{}
namespaceList.SetGroupVersionKind(schema.GroupVersionKind{
Group: "",
Version: "v1",
Kind: "NamespaceList",
})
Expect(namespacedCache.List(context.Background(), nil, namespaceList)).To(Succeed())

By("verifying the namespace list is not empty")
Expect(namespaceList.Items).NotTo(BeEmpty())
})

It("should deep copy the object unless told otherwise", func() {
By("retrieving a specific pod from the cache")
out := &unstructured.Unstructured{}
Expand Down
15 changes: 8 additions & 7 deletions pkg/cache/internal/deleg_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,12 @@ type InformersMap struct {
func NewInformersMap(config *rest.Config,
scheme *runtime.Scheme,
mapper meta.RESTMapper,
resync time.Duration) *InformersMap {
resync time.Duration,
namespace string) *InformersMap {

return &InformersMap{
structured: newStructuredInformersMap(config, scheme, mapper, resync),
unstructured: newUnstructuredInformersMap(config, scheme, mapper, resync),
structured: newStructuredInformersMap(config, scheme, mapper, resync, namespace),
unstructured: newUnstructuredInformersMap(config, scheme, mapper, resync, namespace),

Scheme: scheme,
}
Expand Down Expand Up @@ -85,11 +86,11 @@ func (m *InformersMap) Get(gvk schema.GroupVersionKind, obj runtime.Object) (*Ma
}

// newStructuredInformersMap creates a new InformersMap for structured objects.
func newStructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration) *specificInformersMap {
return newSpecificInformersMap(config, scheme, mapper, resync, createStructuredListWatch)
func newStructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, namespace string) *specificInformersMap {
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, createStructuredListWatch)
}

// newUnstructuredInformersMap creates a new InformersMap for unstructured objects.
func newUnstructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration) *specificInformersMap {
return newSpecificInformersMap(config, scheme, mapper, resync, createUnstructuredListWatch)
func newUnstructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, namespace string) *specificInformersMap {
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, createUnstructuredListWatch)
}
21 changes: 18 additions & 3 deletions pkg/cache/internal/informers_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ type createListWatcherFunc func(gvk schema.GroupVersionKind, ip *specificInforme
func newSpecificInformersMap(config *rest.Config,
scheme *runtime.Scheme,
mapper meta.RESTMapper,
resync time.Duration, createListWatcher createListWatcherFunc) *specificInformersMap {
resync time.Duration,
namespace string,
createListWatcher createListWatcherFunc) *specificInformersMap {
ip := &specificInformersMap{
config: config,
Scheme: scheme,
Expand All @@ -52,6 +54,7 @@ func newSpecificInformersMap(config *rest.Config,
paramCodec: runtime.NewParameterCodec(scheme),
resync: resync,
createListWatcher: createListWatcher,
namespace: namespace,
}
return ip
}
Expand Down Expand Up @@ -102,6 +105,10 @@ type specificInformersMap struct {
// and allows for abstracting over the particulars of structured vs
// unstructured objects.
createListWatcher createListWatcherFunc

// namespace is the namespace that all ListWatches are restricted to
// default or empty string means all namespaces
namespace string
}

// Start calls Run on each of the informers and sets started to true. Blocks on the stop channel.
Expand Down Expand Up @@ -227,14 +234,16 @@ func createStructuredListWatch(gvk schema.GroupVersionKind, ip *specificInformer
return &cache.ListWatch{
ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
res := listObj.DeepCopyObject()
err := client.Get().Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Do().Into(res)
isNamespaceScoped := ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot
err := client.Get().NamespaceIfScoped(ip.namespace, isNamespaceScoped).Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Do().Into(res)
return res, err
},
// Setup the watch function
WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
// Watch needs to be set to true separately
opts.Watch = true
return client.Get().Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Watch()
isNamespaceScoped := ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot
return client.Get().NamespaceIfScoped(ip.namespace, isNamespaceScoped).Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Watch()
},
}, nil
}
Expand All @@ -254,12 +263,18 @@ func createUnstructuredListWatch(gvk schema.GroupVersionKind, ip *specificInform
// Create a new ListWatch for the obj
return &cache.ListWatch{
ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
if ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot {
return dynamicClient.Resource(mapping.Resource).Namespace(ip.namespace).List(opts)
}
return dynamicClient.Resource(mapping.Resource).List(opts)
},
// Setup the watch function
WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
// Watch needs to be set to true separately
opts.Watch = true
if ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot {
return dynamicClient.Resource(mapping.Resource).Namespace(ip.namespace).Watch(opts)
}
return dynamicClient.Resource(mapping.Resource).Watch(opts)
},
}, nil
Expand Down
8 changes: 7 additions & 1 deletion pkg/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,12 @@ type Options struct {
// will use for holding the leader lock.
LeaderElectionID string

// Namespace if specified restricts the manager's cache to watch objects in the desired namespace
// Defaults to all namespaces
// Note: If a namespace is specified then controllers can still Watch for a cluster-scoped resource e.g Node
// For namespaced resources the cache will only hold objects from the desired namespace.
Namespace string

// Dependency injection for testing
newCache func(config *rest.Config, opts cache.Options) (cache.Cache, error)
newClient func(config *rest.Config, options client.Options) (client.Client, error)
Expand Down Expand Up @@ -153,7 +159,7 @@ func New(config *rest.Config, options Options) (Manager, error) {
}

// Create the cache for the cached read client and registering informers
cache, err := options.newCache(config, cache.Options{Scheme: options.Scheme, Mapper: mapper, Resync: options.SyncPeriod})
cache, err := options.newCache(config, cache.Options{Scheme: options.Scheme, Mapper: mapper, Resync: options.SyncPeriod, Namespace: options.Namespace})
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 53fc44b

Please sign in to comment.