From 66fe1a07410b0a96d75cf7fe752154ed9fedff93 Mon Sep 17 00:00:00 2001 From: Mikhail Fedosin Date: Wed, 22 Feb 2023 15:38:05 +0100 Subject: [PATCH] Allow lazy restmapper to work with CRDs created in runtime Now lazy restmapper fetches all API resources once at start and then caches them. It prevents it from discovery of new CRDs created after the controller has started. This commit allows lazy restmapper to work with such CRDs. --- pkg/client/apiutil/lazyrestmapper.go | 60 +++++++---- pkg/client/apiutil/lazyrestmapper_test.go | 120 +++++++++++++++++++--- 2 files changed, 143 insertions(+), 37 deletions(-) diff --git a/pkg/client/apiutil/lazyrestmapper.go b/pkg/client/apiutil/lazyrestmapper.go index 70c6a11dbc..e9b1e710c2 100644 --- a/pkg/client/apiutil/lazyrestmapper.go +++ b/pkg/client/apiutil/lazyrestmapper.go @@ -33,7 +33,7 @@ type lazyRESTMapper struct { mapper meta.RESTMapper client *discovery.DiscoveryClient knownGroups map[string]*restmapper.APIGroupResources - apiGroups *metav1.APIGroupList + apiGroups []metav1.APIGroup // mutex to provide thread-safe mapper reloading. mu sync.Mutex @@ -45,6 +45,7 @@ func newLazyRESTMapperWithClient(discoveryClient *discovery.DiscoveryClient) (me mapper: restmapper.NewDiscoveryRESTMapper([]*restmapper.APIGroupResources{}), client: discoveryClient, knownGroups: map[string]*restmapper.APIGroupResources{}, + apiGroups: []metav1.APIGroup{}, }, nil } @@ -147,7 +148,7 @@ func (m *lazyRESTMapper) addKnownGroupAndReload(groupName string, versions ...st // This operation requires 2 requests: /api and /apis, but only once. For all subsequent calls // this data will be taken from cache. if len(versions) == 0 { - apiGroup, err := m.findAPIGroupByName(groupName) + apiGroup, err := m.findAPIGroupByNameLocked(groupName) if err != nil { return err } @@ -176,11 +177,22 @@ func (m *lazyRESTMapper) addKnownGroupAndReload(groupName string, versions ...st } // Update information for group resources about the API group by adding new versions. + // Ignore the versions that are already registered. for _, version := range versions { - groupResources.Group.Versions = append(groupResources.Group.Versions, metav1.GroupVersionForDiscovery{ - GroupVersion: metav1.GroupVersion{Group: groupName, Version: version}.String(), - Version: version, - }) + found := false + for _, v := range groupResources.Group.Versions { + if v.Version == version { + found = true + break + } + } + + if !found { + groupResources.Group.Versions = append(groupResources.Group.Versions, metav1.GroupVersionForDiscovery{ + GroupVersion: metav1.GroupVersion{Group: groupName, Version: version}.String(), + Version: version, + }) + } } // Update data in the cache. @@ -197,28 +209,34 @@ func (m *lazyRESTMapper) addKnownGroupAndReload(groupName string, versions ...st return nil } -// findAPIGroupByName returns API group by its name. -func (m *lazyRESTMapper) findAPIGroupByName(groupName string) (metav1.APIGroup, error) { - // Ensure that required info about existing API groups is received and stored in the mapper. - // It will make 2 API calls to /api and /apis, but only once. - if m.apiGroups == nil { - apiGroups, err := m.client.ServerGroups() - if err != nil { - return metav1.APIGroup{}, fmt.Errorf("failed to get server groups: %w", err) - } - if len(apiGroups.Groups) == 0 { - return metav1.APIGroup{}, fmt.Errorf("received an empty API groups list") +// findAPIGroupByNameLocked returns API group by its name. +func (m *lazyRESTMapper) findAPIGroupByNameLocked(groupName string) (metav1.APIGroup, error) { + // Looking in the cache first. + for _, apiGroup := range m.apiGroups { + if groupName == apiGroup.Name { + return apiGroup, nil } + } - m.apiGroups = apiGroups + // Update the cache if nothing was found. + apiGroups, err := m.client.ServerGroups() + if err != nil { + return metav1.APIGroup{}, fmt.Errorf("failed to get server groups: %w", err) } + if len(apiGroups.Groups) == 0 { + return metav1.APIGroup{}, fmt.Errorf("received an empty API groups list") + } + + m.apiGroups = apiGroups.Groups - for i := range m.apiGroups.Groups { - if groupName == (&m.apiGroups.Groups[i]).Name { - return m.apiGroups.Groups[i], nil + // Looking in the cache again. + for _, apiGroup := range m.apiGroups { + if groupName == apiGroup.Name { + return apiGroup, nil } } + // If there is still nothing, return an error. return metav1.APIGroup{}, fmt.Errorf("failed to find API group %s", groupName) } diff --git a/pkg/client/apiutil/lazyrestmapper_test.go b/pkg/client/apiutil/lazyrestmapper_test.go index 6282370164..f54dbd1600 100644 --- a/pkg/client/apiutil/lazyrestmapper_test.go +++ b/pkg/client/apiutil/lazyrestmapper_test.go @@ -17,14 +17,19 @@ limitations under the License. package apiutil_test import ( + "context" "net/http" "testing" _ "github.com/onsi/ginkgo/v2" gmg "github.com/onsi/gomega" + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/apiutil" "sigs.k8s.io/controller-runtime/pkg/envtest" ) @@ -102,38 +107,38 @@ func TestLazyRestMapperProvider(t *testing.T) { // There are no requests before any call g.Expect(crt.GetRequestCount()).To(gmg.Equal(0)) - mapping, err := lazyRestMapper.RESTMapping(schema.GroupKind{Group: "apps", Kind: "deployment"}) + mapping, err := lazyRestMapper.RESTMapping(schema.GroupKind{Group: "apps", Kind: "deployment"}, "v1") g.Expect(err).NotTo(gmg.HaveOccurred()) g.Expect(mapping.GroupVersionKind.Kind).To(gmg.Equal("deployment")) - g.Expect(crt.GetRequestCount()).To(gmg.Equal(3)) + g.Expect(crt.GetRequestCount()).To(gmg.Equal(1)) - mappings, err := lazyRestMapper.RESTMappings(schema.GroupKind{Group: "", Kind: "pod"}) + mappings, err := lazyRestMapper.RESTMappings(schema.GroupKind{Group: "", Kind: "pod"}, "v1") g.Expect(err).NotTo(gmg.HaveOccurred()) g.Expect(len(mappings)).To(gmg.Equal(1)) g.Expect(mappings[0].GroupVersionKind.Kind).To(gmg.Equal("pod")) - g.Expect(crt.GetRequestCount()).To(gmg.Equal(4)) + g.Expect(crt.GetRequestCount()).To(gmg.Equal(2)) kind, err := lazyRestMapper.KindFor(schema.GroupVersionResource{Group: "networking.k8s.io", Version: "v1", Resource: "ingresses"}) g.Expect(err).NotTo(gmg.HaveOccurred()) g.Expect(kind.Kind).To(gmg.Equal("Ingress")) - g.Expect(crt.GetRequestCount()).To(gmg.Equal(5)) + g.Expect(crt.GetRequestCount()).To(gmg.Equal(3)) kinds, err := lazyRestMapper.KindsFor(schema.GroupVersionResource{Group: "authentication.k8s.io", Version: "v1", Resource: "tokenreviews"}) g.Expect(err).NotTo(gmg.HaveOccurred()) g.Expect(len(kinds)).To(gmg.Equal(1)) g.Expect(kinds[0].Kind).To(gmg.Equal("TokenReview")) - g.Expect(crt.GetRequestCount()).To(gmg.Equal(6)) + g.Expect(crt.GetRequestCount()).To(gmg.Equal(4)) resource, err := lazyRestMapper.ResourceFor(schema.GroupVersionResource{Group: "scheduling.k8s.io", Version: "v1", Resource: "priorityclasses"}) g.Expect(err).NotTo(gmg.HaveOccurred()) g.Expect(resource.Resource).To(gmg.Equal("priorityclasses")) - g.Expect(crt.GetRequestCount()).To(gmg.Equal(7)) + g.Expect(crt.GetRequestCount()).To(gmg.Equal(5)) resources, err := lazyRestMapper.ResourcesFor(schema.GroupVersionResource{Group: "policy", Version: "v1", Resource: "poddisruptionbudgets"}) g.Expect(err).NotTo(gmg.HaveOccurred()) g.Expect(len(resources)).To(gmg.Equal(1)) g.Expect(resources[0].Resource).To(gmg.Equal("poddisruptionbudgets")) - g.Expect(crt.GetRequestCount()).To(gmg.Equal(8)) + g.Expect(crt.GetRequestCount()).To(gmg.Equal(6)) }) t.Run("LazyRESTMapper should cache fetched data and doesn't perform any additional requests", func(t *testing.T) { @@ -344,29 +349,29 @@ func TestLazyRestMapperProvider(t *testing.T) { lazyRestMapper, err := apiutil.NewDynamicRESTMapper(restCfg, apiutil.WithExperimentalLazyMapper) g.Expect(err).NotTo(gmg.HaveOccurred()) - _, err = lazyRestMapper.RESTMapping(schema.GroupKind{Group: "apps", Kind: "INVALID"}) + _, err = lazyRestMapper.RESTMapping(schema.GroupKind{Group: "apps", Kind: "INVALID"}, "v1") g.Expect(err).To(gmg.HaveOccurred()) - g.Expect(crt.GetRequestCount()).To(gmg.Equal(3)) + g.Expect(crt.GetRequestCount()).To(gmg.Equal(1)) - _, err = lazyRestMapper.RESTMappings(schema.GroupKind{Group: "", Kind: "INVALID"}) + _, err = lazyRestMapper.RESTMappings(schema.GroupKind{Group: "", Kind: "INVALID"}, "v1") g.Expect(err).To(gmg.HaveOccurred()) - g.Expect(crt.GetRequestCount()).To(gmg.Equal(4)) + g.Expect(crt.GetRequestCount()).To(gmg.Equal(2)) _, err = lazyRestMapper.KindFor(schema.GroupVersionResource{Group: "networking.k8s.io", Version: "v1", Resource: "INVALID"}) g.Expect(err).To(gmg.HaveOccurred()) - g.Expect(crt.GetRequestCount()).To(gmg.Equal(5)) + g.Expect(crt.GetRequestCount()).To(gmg.Equal(3)) _, err = lazyRestMapper.KindsFor(schema.GroupVersionResource{Group: "authentication.k8s.io", Version: "v1", Resource: "INVALID"}) g.Expect(err).To(gmg.HaveOccurred()) - g.Expect(crt.GetRequestCount()).To(gmg.Equal(6)) + g.Expect(crt.GetRequestCount()).To(gmg.Equal(4)) _, err = lazyRestMapper.ResourceFor(schema.GroupVersionResource{Group: "scheduling.k8s.io", Version: "v1", Resource: "INVALID"}) g.Expect(err).To(gmg.HaveOccurred()) - g.Expect(crt.GetRequestCount()).To(gmg.Equal(7)) + g.Expect(crt.GetRequestCount()).To(gmg.Equal(5)) _, err = lazyRestMapper.ResourcesFor(schema.GroupVersionResource{Group: "policy", Version: "v1", Resource: "INVALID"}) g.Expect(err).To(gmg.HaveOccurred()) - g.Expect(crt.GetRequestCount()).To(gmg.Equal(8)) + g.Expect(crt.GetRequestCount()).To(gmg.Equal(6)) }) t.Run("LazyRESTMapper should return an error if the version doesn't exist", func(t *testing.T) { @@ -407,4 +412,87 @@ func TestLazyRestMapperProvider(t *testing.T) { g.Expect(err).To(gmg.HaveOccurred()) g.Expect(crt.GetRequestCount()).To(gmg.Equal(6)) }) + + t.Run("LazyRESTMapper can fetch CRDs if they were created at runtime", func(t *testing.T) { + g := gmg.NewWithT(t) + + // To fetch all versions mapper does 2 requests: + // GET https://host/api + // GET https://host/apis + // Then, for each version it performs just one request to the API server as usual: + // GET https://host/apis// + + // Note: We have to use a separate restCfg for the Client, otherwise we + // get a race condition on the counting round tripper between the Client + // and the lazy restmapper. + clientRestCfg := rest.CopyConfig(restCfg) + + var crt *countingRoundTripper + restCfg := rest.CopyConfig(restCfg) + restCfg.WrapTransport = func(rt http.RoundTripper) http.RoundTripper { + crt = newCountingRoundTripper(rt) + return crt + } + + lazyRestMapper, err := apiutil.NewDynamicRESTMapper(restCfg, apiutil.WithExperimentalLazyMapper) + g.Expect(err).NotTo(gmg.HaveOccurred()) + + // There are no requests before any call + g.Expect(crt.GetRequestCount()).To(gmg.Equal(0)) + + // Since we don't specify what version we expect, restmapper will fetch them all and search there. + // To fetch a list of available versions + // #1: GET https://host/api + // #2: GET https://host/apis + // Then, for each currently registered version: + // #3: GET https://host/apis/crew.example.com/v1 + // #4: GET https://host/apis/crew.example.com/v2 + mapping, err := lazyRestMapper.RESTMapping(schema.GroupKind{Group: "crew.example.com", Kind: "driver"}) + g.Expect(err).NotTo(gmg.HaveOccurred()) + g.Expect(mapping.GroupVersionKind.Kind).To(gmg.Equal("driver")) + g.Expect(crt.GetRequestCount()).To(gmg.Equal(4)) + + s := scheme.Scheme + err = apiextensionsv1.AddToScheme(s) + g.Expect(err).NotTo(gmg.HaveOccurred()) + + c, err := client.New(clientRestCfg, client.Options{Scheme: s}) + g.Expect(err).NotTo(gmg.HaveOccurred()) + + // Register another CRD in runtime - "riders.crew.example.com". + + crd := &apiextensionsv1.CustomResourceDefinition{} + err = c.Get(context.TODO(), types.NamespacedName{Name: "drivers.crew.example.com"}, crd) + g.Expect(err).NotTo(gmg.HaveOccurred()) + g.Expect(crd.Spec.Names.Kind).To(gmg.Equal("Driver")) + + newCRD := &apiextensionsv1.CustomResourceDefinition{} + crd.DeepCopyInto(newCRD) + newCRD.Name = "riders.crew.example.com" + newCRD.Spec.Names = apiextensionsv1.CustomResourceDefinitionNames{ + Kind: "Rider", + Plural: "riders", + } + newCRD.ResourceVersion = "" + + // Create the new CRD. + g.Expect(c.Create(context.TODO(), newCRD)).To(gmg.Succeed()) + + // Wait a bit until the CRD is registered. + g.Eventually(func() error { + _, err := lazyRestMapper.RESTMapping(schema.GroupKind{Group: "crew.example.com", Kind: "rider"}) + return err + }).Should(gmg.Succeed()) + + // Since we don't specify what version we expect, restmapper will fetch them all and search there. + // To fetch a list of available versions + // #1: GET https://host/api + // #2: GET https://host/apis + // Then, for each currently registered version: + // #3: GET https://host/apis/crew.example.com/v1 + // #4: GET https://host/apis/crew.example.com/v2 + mapping, err = lazyRestMapper.RESTMapping(schema.GroupKind{Group: "crew.example.com", Kind: "rider"}) + g.Expect(err).NotTo(gmg.HaveOccurred()) + g.Expect(mapping.GroupVersionKind.Kind).To(gmg.Equal("rider")) + }) }