Skip to content

Commit

Permalink
✨ Cache-Backed Client: Support listOpts.Limit (#1479)
Browse files Browse the repository at this point in the history
* Enable list opts in informer cache List
- add test for informer cache limit option

Signed-off-by: Madhav Jivrajani <madhav.jiv@gmail.com>

* enable Limit option in multi namespace cache

Signed-off-by: Madhav Jivrajani <madhav.jiv@gmail.com>
  • Loading branch information
MadhavJivrajani committed May 3, 2021
1 parent a17ac06 commit 0c99fc7
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 23 deletions.
65 changes: 44 additions & 21 deletions pkg/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
knownPod2 client.Object
knownPod3 client.Object
knownPod4 client.Object
knownPod5 client.Object
knownPod6 client.Object
)

BeforeEach(func() {
Expand All @@ -122,14 +124,20 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
knownPod2 = createPod("test-pod-2", testNamespaceTwo, kcorev1.RestartPolicyAlways)
knownPod3 = createPodWithLabels("test-pod-3", testNamespaceTwo, kcorev1.RestartPolicyOnFailure, map[string]string{"common-label": "common"})
knownPod4 = createPodWithLabels("test-pod-4", testNamespaceThree, kcorev1.RestartPolicyNever, map[string]string{"common-label": "common"})
knownPod5 = createPod("test-pod-5", testNamespaceOne, kcorev1.RestartPolicyNever)
knownPod6 = createPod("test-pod-6", testNamespaceTwo, kcorev1.RestartPolicyAlways)

podGVK := schema.GroupVersionKind{
Kind: "Pod",
Version: "v1",
}

knownPod1.GetObjectKind().SetGroupVersionKind(podGVK)
knownPod2.GetObjectKind().SetGroupVersionKind(podGVK)
knownPod3.GetObjectKind().SetGroupVersionKind(podGVK)
knownPod4.GetObjectKind().SetGroupVersionKind(podGVK)
knownPod5.GetObjectKind().SetGroupVersionKind(podGVK)
knownPod6.GetObjectKind().SetGroupVersionKind(podGVK)

By("creating the informer cache")
informerCache, err = createCacheFunc(cfg, cache.Options{})
Expand All @@ -149,6 +157,8 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
deletePod(knownPod2)
deletePod(knownPod3)
deletePod(knownPod4)
deletePod(knownPod5)
deletePod(knownPod6)

informerCacheCancel()
})
Expand Down Expand Up @@ -226,7 +236,7 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca

By("verifying that the returned pods have GVK populated")
Expect(out.Items).NotTo(BeEmpty())
Expect(out.Items).Should(SatisfyAny(HaveLen(3), HaveLen(4)))
Expect(out.Items).Should(SatisfyAny(HaveLen(5), HaveLen(6)))
for _, p := range out.Items {
Expect(p.GroupVersionKind()).To(Equal(kcorev1.SchemeGroupVersion.WithKind("Pod")))
}
Expand All @@ -240,9 +250,10 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca

By("verifying that the returned pods are in test-namespace-1")
Expect(listObj.Items).NotTo(BeEmpty())
Expect(listObj.Items).Should(HaveLen(1))
actual := listObj.Items[0]
Expect(actual.Namespace).To(Equal(testNamespaceOne))
Expect(listObj.Items).Should(HaveLen(2))
for _, item := range listObj.Items {
Expect(item.Namespace).To(Equal(testNamespaceOne))
}
})

It("should deep copy the object unless told otherwise", func() {
Expand Down Expand Up @@ -295,7 +306,15 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
Expect(errors.IsTimeout(err)).To(BeTrue())
})

It("should set the Limit option and limit number of objects to Limit when List is called", func() {
opts := &client.ListOptions{Limit: int64(3)}
By("verifying that only Limit (3) number of objects are retrieved from the cache")
listObj := &kcorev1.PodList{}
Expect(informerCache.List(context.Background(), listObj, opts)).To(Succeed())
Expect(listObj.Items).Should(HaveLen(3))
})
})

Context("with unstructured objects", func() {
It("should be able to list objects that haven't been watched previously", func() {
By("listing all services in the cluster")
Expand Down Expand Up @@ -396,9 +415,10 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca

By("verifying that the returned pods are in test-namespace-1")
Expect(listObj.Items).NotTo(BeEmpty())
Expect(listObj.Items).Should(HaveLen(1))
actual := listObj.Items[0]
Expect(actual.GetNamespace()).To(Equal(testNamespaceOne))
Expect(listObj.Items).Should(HaveLen(2))
for _, item := range listObj.Items {
Expect(item.GetNamespace()).To(Equal(testNamespaceOne))
}
})

It("should be able to restrict cache to a namespace", func() {
Expand All @@ -424,9 +444,10 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca

By("verifying the returned pod is from the watched namespace")
Expect(out.Items).NotTo(BeEmpty())
Expect(out.Items).Should(HaveLen(1))
Expect(out.Items[0].GetNamespace()).To(Equal(testNamespaceOne))

Expect(out.Items).Should(HaveLen(2))
for _, item := range out.Items {
Expect(item.GetNamespace()).To(Equal(testNamespaceOne))
}
By("listing all nodes - should still be able to list a cluster-scoped resource")
nodeList := &unstructured.UnstructuredList{}
nodeList.SetGroupVersionKind(schema.GroupVersionKind{
Expand Down Expand Up @@ -639,9 +660,10 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca

By("verifying that the returned pods are in test-namespace-1")
Expect(listObj.Items).NotTo(BeEmpty())
Expect(listObj.Items).Should(HaveLen(1))
actual := listObj.Items[0]
Expect(actual.GetNamespace()).To(Equal(testNamespaceOne))
Expect(listObj.Items).Should(HaveLen(2))
for _, item := range listObj.Items {
Expect(item.Namespace).To(Equal(testNamespaceOne))
}
})

It("should be able to restrict cache to a namespace", func() {
Expand All @@ -667,9 +689,10 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca

By("verifying the returned pod is from the watched namespace")
Expect(out.Items).NotTo(BeEmpty())
Expect(out.Items).Should(HaveLen(1))
Expect(out.Items[0].GetNamespace()).To(Equal(testNamespaceOne))

Expect(out.Items).Should(HaveLen(2))
for _, item := range out.Items {
Expect(item.Namespace).To(Equal(testNamespaceOne))
}
By("listing all nodes - should still be able to list a cluster-scoped resource")
nodeList := &kmetav1.PartialObjectMetadataList{}
nodeList.SetGroupVersionKind(schema.GroupVersionKind{
Expand Down Expand Up @@ -828,25 +851,25 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
Entry("when selectors are empty it has to inform about all the pods", selectorsTestCase{
fieldSelectors: map[string]string{},
labelSelectors: map[string]string{},
expectedPods: []string{"test-pod-1", "test-pod-2", "test-pod-3", "test-pod-4"},
expectedPods: []string{"test-pod-1", "test-pod-2", "test-pod-3", "test-pod-4", "test-pod-5", "test-pod-6"},
}),
Entry("when field matches one pod it has to inform about it", selectorsTestCase{
fieldSelectors: map[string]string{"metadata.name": "test-pod-2"},
expectedPods: []string{"test-pod-2"},
}),
Entry("when field matches multiple pods it has to infor about all of them", selectorsTestCase{
Entry("when field matches multiple pods it has to inform about all of them", selectorsTestCase{
fieldSelectors: map[string]string{"metadata.namespace": testNamespaceTwo},
expectedPods: []string{"test-pod-2", "test-pod-3"},
expectedPods: []string{"test-pod-2", "test-pod-3", "test-pod-6"},
}),
Entry("when label matches one pod it has to inform about it", selectorsTestCase{
labelSelectors: map[string]string{"test-label": "test-pod-4"},
expectedPods: []string{"test-pod-4"},
}),
Entry("when label matches multiple pods it has to infor about all of them", selectorsTestCase{
Entry("when label matches multiple pods it has to inform about all of them", selectorsTestCase{
labelSelectors: map[string]string{"common-label": "common"},
expectedPods: []string{"test-pod-3", "test-pod-4"},
}),
Entry("when label and field matches one pod it has to infor about about it", selectorsTestCase{
Entry("when label and field matches one pod it has to inform about about it", selectorsTestCase{
labelSelectors: map[string]string{"common-label": "common"},
fieldSelectors: map[string]string{"metadata.namespace": testNamespaceTwo},
expectedPods: []string{"test-pod-3"},
Expand Down
9 changes: 8 additions & 1 deletion pkg/cache/internal/cache_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,15 @@ func (c *CacheReader) List(_ context.Context, out client.ObjectList, opts ...cli
labelSel = listOpts.LabelSelector
}

limitSet := listOpts.Limit > 0

runtimeObjs := make([]runtime.Object, 0, len(objs))
for _, item := range objs {
for i, item := range objs {
// if the Limit option is set and the number of items
// listed exceeds this limit, then stop reading.
if limitSet && int64(i) >= listOpts.Limit {
break
}
obj, isObj := item.(runtime.Object)
if !isObj {
return fmt.Errorf("cache contained %T, which is not an Object", obj)
Expand Down
16 changes: 15 additions & 1 deletion pkg/cache/multi_namespace_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,10 +186,13 @@ func (c *multiNamespaceCache) List(ctx context.Context, list client.ObjectList,
if err != nil {
return err
}

limitSet := listOpts.Limit > 0

var resourceVersion string
for _, cache := range c.namespaceToCache {
listObj := list.DeepCopyObject().(client.ObjectList)
err = cache.List(ctx, listObj, opts...)
err = cache.List(ctx, listObj, &listOpts)
if err != nil {
return err
}
Expand All @@ -204,6 +207,17 @@ func (c *multiNamespaceCache) List(ctx context.Context, list client.ObjectList,
allItems = append(allItems, items...)
// The last list call should have the most correct resource version.
resourceVersion = accessor.GetResourceVersion()
if limitSet {
// decrement Limit by the number of items
// fetched from the current namespace.
listOpts.Limit -= int64(len(items))
// if a Limit was set and the number of
// items read has reached this set limit,
// then stop reading.
if listOpts.Limit == 0 {
break
}
}
}
listAccessor.SetResourceVersion(resourceVersion)

Expand Down

0 comments on commit 0c99fc7

Please sign in to comment.