From 7ec6b619ed2025b7fd7e9f9af9b703bcf95a774d Mon Sep 17 00:00:00 2001 From: "Dr. Stefan Schimanski" Date: Fri, 26 Apr 2024 20:05:07 +0200 Subject: [PATCH] UPSTREAM: SQUASH: IndexField: only call cluster-aware keyFunc for cluster-aware cache Signed-off-by: Dr. Stefan Schimanski --- pkg/cache/cache.go | 2 + pkg/cache/informer_cache.go | 7 +- pkg/cache/kcp_test.go | 150 ++++++++++++++++++++++++------------ 3 files changed, 105 insertions(+), 54 deletions(-) diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index 0add6afd53..aac73ff787 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "net/http" + "strings" "time" "golang.org/x/exp/maps" @@ -390,6 +391,7 @@ func newCache(restConfig *rest.Config, opts Options) newCacheFunc { NewInformer: opts.NewInformerFunc, }), readerFailOnMissingInformer: opts.ReaderFailOnMissingInformer, + clusterIndexes: strings.HasSuffix(restConfig.Host, "/clusters/*"), } } } diff --git a/pkg/cache/informer_cache.go b/pkg/cache/informer_cache.go index ad7d610689..71b94da174 100644 --- a/pkg/cache/informer_cache.go +++ b/pkg/cache/informer_cache.go @@ -70,6 +70,7 @@ type informerCache struct { scheme *runtime.Scheme *internal.Informers readerFailOnMissingInformer bool + clusterIndexes bool } // Get implements Reader. @@ -219,10 +220,10 @@ func (ic *informerCache) IndexField(ctx context.Context, obj client.Object, fiel if err != nil { return err } - return indexByField(informer, field, extractValue) + return indexByField(informer, field, extractValue, ic.clusterIndexes) } -func indexByField(informer Informer, field string, extractValue client.IndexerFunc) error { +func indexByField(informer Informer, field string, extractValue client.IndexerFunc, clusterIndexes bool) error { indexFunc := func(objRaw interface{}) ([]string, error) { // TODO(directxman12): check if this is the correct type? obj, isObj := objRaw.(client.Object) @@ -236,7 +237,7 @@ func indexByField(informer Informer, field string, extractValue client.IndexerFu ns := meta.GetNamespace() keyFunc := internal.KeyToNamespacedKey - if clusterName := logicalcluster.From(obj); !clusterName.Empty() { + if clusterName := logicalcluster.From(obj); clusterIndexes && !clusterName.Empty() { keyFunc = func(ns, val string) string { return internal.KeyToClusteredKey(clusterName.String(), ns, val) } diff --git a/pkg/cache/kcp_test.go b/pkg/cache/kcp_test.go index cfc1ab3de9..11c48457e9 100644 --- a/pkg/cache/kcp_test.go +++ b/pkg/cache/kcp_test.go @@ -22,69 +22,117 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/fields" "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" ) -var _ = Describe("KCP cluster-unaware informer cache", func() { - // Test whether we can have a cluster-unaware informer cache against a single workspace. - // I.e. every object has a kcp.io/cluster annotation, but it should not be taken - // into consideration by the cache to compute the key. - It("should be able to get the default namespace despite kcp.io/cluster annotation", func() { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - c, err := cache.New(cfg, cache.Options{}) - Expect(err).NotTo(HaveOccurred()) - +var _ = Describe("informer cache against a kube cluster", func() { + BeforeEach(func() { By("Annotating the default namespace with kcp.io/cluster") cl, err := client.New(cfg, client.Options{}) Expect(err).NotTo(HaveOccurred()) ns := &corev1.Namespace{} - err = cl.Get(ctx, client.ObjectKey{Name: "default"}, ns) + err = cl.Get(context.Background(), client.ObjectKey{Name: "default"}, ns) Expect(err).NotTo(HaveOccurred()) ns.Annotations = map[string]string{"kcp.io/cluster": "cluster1"} - err = cl.Update(ctx, ns) - Expect(err).NotTo(HaveOccurred()) - - go c.Start(ctx) //nolint:errcheck // Start is blocking, and error not relevant here. - c.WaitForCacheSync(ctx) - - By("By getting the default namespace with the informer") - err = c.Get(ctx, client.ObjectKey{Name: "default"}, ns) + err = cl.Update(context.Background(), ns) Expect(err).NotTo(HaveOccurred()) }) -}) - -// TODO: get envtest in place with kcp -/* -var _ = Describe("KCP cluster-aware informer cache", func() { - It("should be able to get the default namespace with kcp.io/cluster annotation", func() { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - c, err := kcp.NewClusterAwareCache(cfg, cache.Options{}) - Expect(err).NotTo(HaveOccurred()) - - By("Annotating the default namespace with kcp.io/cluster") - cl, err := client.New(cfg, client.Options{}) - Expect(err).NotTo(HaveOccurred()) - ns := &corev1.Namespace{} - err = cl.Get(ctx, client.ObjectKey{Name: "default"}, ns) - Expect(err).NotTo(HaveOccurred()) - ns.Annotations = map[string]string{"kcp.io/cluster": "cluster1"} - err = cl.Update(ctx, ns) - Expect(err).NotTo(HaveOccurred()) - - go c.Start(ctx) //nolint:errcheck // Start is blocking, and error not relevant here. - c.WaitForCacheSync(ctx) - - By("By getting the default namespace with the informer, but cluster-less key should fail") - err = c.Get(ctx, client.ObjectKey{Name: "default"}, ns) - Expect(err).To(HaveOccurred()) - - By("By getting the default namespace with the informer, but cluster-aware key should succeed") - err = c.Get(kontext.WithCluster(ctx, "cluster1"), client.ObjectKey{Name: "default", Namespace: "cluster1"}, ns) + Describe("KCP cluster-unaware informer cache", func() { + // Test whether we can have a cluster-unaware informer cache against a single workspace. + // I.e. every object has a kcp.io/cluster annotation, but it should not be taken + // into consideration by the cache to compute the key. + It("should be able to get the default namespace despite kcp.io/cluster annotation", func() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + c, err := cache.New(cfg, cache.Options{}) + Expect(err).NotTo(HaveOccurred()) + + go c.Start(ctx) //nolint:errcheck // Start is blocking, and error not relevant here. + c.WaitForCacheSync(ctx) + + By("By getting the default namespace with the informer") + ns := &corev1.Namespace{} + err = c.Get(ctx, client.ObjectKey{Name: "default"}, ns) + Expect(err).NotTo(HaveOccurred()) + }) + + It("should support indexes with cluster-less keys", func() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + c, err := cache.New(cfg, cache.Options{}) + Expect(err).NotTo(HaveOccurred()) + + By("Indexing the default namespace by name") + err = c.IndexField(ctx, &corev1.Namespace{}, "name-clusterless", func(obj client.Object) []string { + return []string{"key-" + obj.GetName()} + }) + Expect(err).NotTo(HaveOccurred()) + + go c.Start(ctx) //nolint:errcheck // Start is blocking, and error not relevant here. + c.WaitForCacheSync(ctx) + + By("By getting the default namespace via the custom index") + nss := &corev1.NamespaceList{} + err = c.List(ctx, nss, client.MatchingFieldsSelector{ + Selector: fields.OneTermEqualSelector("name-clusterless", "key-default"), + }) + Expect(err).NotTo(HaveOccurred()) + Expect(nss.Items).To(HaveLen(1)) + }) }) + + // TODO: get envtest in place with kcp + /* + Describe("KCP cluster-aware informer cache", func() { + It("should be able to get the default namespace with kcp.io/cluster annotation", func() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + c, err := kcp.NewClusterAwareCache(cfg, cache.Options{}) + Expect(err).NotTo(HaveOccurred()) + + go c.Start(ctx) //nolint:errcheck // Start is blocking, and error not relevant here. + c.WaitForCacheSync(ctx) + + By("By getting the default namespace with the informer, but cluster-less key should fail") + ns := &corev1.Namespace{} + err = c.Get(ctx, client.ObjectKey{Name: "default"}, ns) + Expect(err).To(HaveOccurred()) + + By("By getting the default namespace with the informer, but cluster-aware key should succeed") + err = c.Get(kontext.WithCluster(ctx, "cluster1"), client.ObjectKey{Name: "default", Namespace: "cluster1"}, ns) + Expect(err).NotTo(HaveOccurred()) + }) + + It("should support indexes with cluster-aware keys", func() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + c, err := kcp.NewClusterAwareCache(cfg, cache.Options{}) + Expect(err).NotTo(HaveOccurred()) + + By("Indexing the default namespace by name") + err = c.IndexField(ctx, &corev1.Namespace{}, "name-clusteraware", func(obj client.Object) []string { + return []string{"key-" + obj.GetName()} + }) + Expect(err).NotTo(HaveOccurred()) + + go c.Start(ctx) //nolint:errcheck // Start is blocking, and error not relevant here. + c.WaitForCacheSync(ctx) + + By("By getting the default namespace via the custom index") + nss := &corev1.NamespaceList{} + err = c.List(ctx, nss, client.MatchingFieldsSelector{ + Selector: fields.OneTermEqualSelector("name-clusteraware", "key-default"), + }) + Expect(err).NotTo(HaveOccurred()) + Expect(nss.Items).To(HaveLen(1)) + }) + }) + */ }) -*/