From 91a3132e0073f578c8d2f567442804512e7269e2 Mon Sep 17 00:00:00 2001 From: Guillermo Gaston Date: Wed, 24 Jan 2024 14:31:02 +0000 Subject: [PATCH] Clean restmapper cache if a version is notFound This avoids: - Extra calls to https://host/apis// when a version seen before and cached in apiGroups is deleted or marked as not served. - Returnning a valid mapping for a cached version that is deleted or not served anymore. --- pkg/client/apiutil/restmapper.go | 62 ++++++-- pkg/client/apiutil/restmapper_test.go | 213 +++++++++++++++++++++++--- 2 files changed, 240 insertions(+), 35 deletions(-) diff --git a/pkg/client/apiutil/restmapper.go b/pkg/client/apiutil/restmapper.go index d5e03b2b19..2320cd26fc 100644 --- a/pkg/client/apiutil/restmapper.go +++ b/pkg/client/apiutil/restmapper.go @@ -21,6 +21,7 @@ import ( "net/http" "sync" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" @@ -179,23 +180,28 @@ func (m *mapper) addKnownGroupAndReload(groupName string, versions ...string) er Group: metav1.APIGroup{Name: groupName}, VersionedResources: make(map[string][]metav1.APIResource), } - if _, ok := m.knownGroups[groupName]; ok { - groupResources = m.knownGroups[groupName] - } // Update information for group resources about versioned resources. // The number of API calls is equal to the number of versions: /apis//. - groupVersionResources, err := m.fetchGroupVersionResources(groupName, versions...) + // If we encounter a missing API version (NotFound error), we will remove the group from + // the m.apiGroups and m.knownGroups caches. + // If this happens, in the next call the group will be added back to apiGroups + // and only the existing versions will be loaded in knownGroups. + groupVersionResources, err := m.fetchGroupVersionResourcesLocked(groupName, versions...) if err != nil { return fmt.Errorf("failed to get API group resources: %w", err) } - for version, resources := range groupVersionResources { - groupResources.VersionedResources[version.Version] = resources.APIResources + + if _, ok := m.knownGroups[groupName]; ok { + groupResources = m.knownGroups[groupName] } // Update information for group resources about the API group by adding new versions. // Ignore the versions that are already registered. - for _, version := range versions { + for groupVersion, resources := range groupVersionResources { + version := groupVersion.Version + + groupResources.VersionedResources[version] = resources.APIResources found := false for _, v := range groupResources.Group.Versions { if v.Version == version { @@ -216,12 +222,7 @@ func (m *mapper) addKnownGroupAndReload(groupName string, versions ...string) er m.knownGroups[groupName] = groupResources // Finally, update the group with received information and regenerate the mapper. - updatedGroupResources := make([]*restmapper.APIGroupResources, 0, len(m.knownGroups)) - for _, agr := range m.knownGroups { - updatedGroupResources = append(updatedGroupResources, agr) - } - - m.mapper = restmapper.NewDiscoveryRESTMapper(updatedGroupResources) + m.refreshMapper() return nil } @@ -267,8 +268,9 @@ func (m *mapper) findAPIGroupByName(groupName string) (*metav1.APIGroup, error) return nil, fmt.Errorf("failed to find API group %q", groupName) } -// fetchGroupVersionResources fetches the resources for the specified group and its versions. -func (m *mapper) fetchGroupVersionResources(groupName string, versions ...string) (map[schema.GroupVersion]*metav1.APIResourceList, error) { +// fetchGroupVersionResourcesLocked fetches the resources for the specified group and its versions. +// This method might modify the cache so it needs to be called under the lock. +func (m *mapper) fetchGroupVersionResourcesLocked(groupName string, versions ...string) (map[schema.GroupVersion]*metav1.APIResourceList, error) { groupVersionResources := make(map[schema.GroupVersion]*metav1.APIResourceList) failedGroups := make(map[schema.GroupVersion]error) @@ -276,9 +278,21 @@ func (m *mapper) fetchGroupVersionResources(groupName string, versions ...string groupVersion := schema.GroupVersion{Group: groupName, Version: version} apiResourceList, err := m.client.ServerResourcesForGroupVersion(groupVersion.String()) + if apierrors.IsNotFound(err) && m.isGroupVersionCached(groupVersion) { + // If the version is not found, we remove the group from the cache + // so it gets refreshed on the next call. + delete(m.apiGroups, groupName) + delete(m.knownGroups, groupName) + // It's important to refresh the mapper after invalidating the cache, since returning an error + // aborts the call and leaves the underlying mapper unchanged. If not refreshed, the next call + // will still return a match for the NotFound version. + m.refreshMapper() + } + if err != nil { failedGroups[groupVersion] = err } + if apiResourceList != nil { // even in case of error, some fallback might have been returned. groupVersionResources[groupVersion] = apiResourceList @@ -292,3 +306,21 @@ func (m *mapper) fetchGroupVersionResources(groupName string, versions ...string return groupVersionResources, nil } + +// isGroupVersionCached checks if a version for a group is cached in the known groups cache. +func (m *mapper) isGroupVersionCached(gv schema.GroupVersion) bool { + if cachedGroup, ok := m.knownGroups[gv.Group]; ok { + _, cached := cachedGroup.VersionedResources[gv.Version] + return cached + } + + return false +} + +func (m *mapper) refreshMapper() { + updatedGroupResources := make([]*restmapper.APIGroupResources, 0, len(m.knownGroups)) + for _, agr := range m.knownGroups { + updatedGroupResources = append(updatedGroupResources, agr) + } + m.mapper = restmapper.NewDiscoveryRESTMapper(updatedGroupResources) +} diff --git a/pkg/client/apiutil/restmapper_test.go b/pkg/client/apiutil/restmapper_test.go index bee63bf240..95cbf25a78 100644 --- a/pkg/client/apiutil/restmapper_test.go +++ b/pkg/client/apiutil/restmapper_test.go @@ -18,17 +18,20 @@ package apiutil_test import ( "context" + "fmt" "net/http" "testing" - "k8s.io/apimachinery/pkg/api/meta" - _ "github.com/onsi/ginkgo/v2" gmg "github.com/onsi/gomega" - + "github.com/onsi/gomega/format" + gomegatypes "github.com/onsi/gomega/types" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/discovery" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" "sigs.k8s.io/controller-runtime/pkg/client" @@ -493,23 +496,7 @@ func TestLazyRestMapperProvider(t *testing.T) { g.Expect(err).NotTo(gmg.HaveOccurred()) // Register another CRD in runtime - "riders.crew.example.com". - - crd := &apiextensionsv1.CustomResourceDefinition{} - err = c.Get(context.TODO(), types.NamespacedName{Name: "drivers.crew.example.com"}, crd) - g.Expect(err).NotTo(gmg.HaveOccurred()) - g.Expect(crd.Spec.Names.Kind).To(gmg.Equal("Driver")) - - newCRD := &apiextensionsv1.CustomResourceDefinition{} - crd.DeepCopyInto(newCRD) - newCRD.Name = "riders.crew.example.com" - newCRD.Spec.Names = apiextensionsv1.CustomResourceDefinitionNames{ - Kind: "Rider", - Plural: "riders", - } - newCRD.ResourceVersion = "" - - // Create the new CRD. - g.Expect(c.Create(context.TODO(), newCRD)).To(gmg.Succeed()) + createNewCRD(context.TODO(), g, c, "crew.example.com", "Rider", "riders") // Wait a bit until the CRD is registered. g.Eventually(func() error { @@ -528,4 +515,190 @@ func TestLazyRestMapperProvider(t *testing.T) { g.Expect(err).NotTo(gmg.HaveOccurred()) g.Expect(mapping.GroupVersionKind.Kind).To(gmg.Equal("rider")) }) + + t.Run("LazyRESTMapper should invalidate the group cache if a version is not found", func(t *testing.T) { + g := gmg.NewWithT(t) + ctx := context.Background() + + httpClient, err := rest.HTTPClientFor(restCfg) + g.Expect(err).NotTo(gmg.HaveOccurred()) + + crt := newCountingRoundTripper(httpClient.Transport) + httpClient.Transport = crt + + lazyRestMapper, err := apiutil.NewDynamicRESTMapper(restCfg, httpClient) + g.Expect(err).NotTo(gmg.HaveOccurred()) + + s := scheme.Scheme + err = apiextensionsv1.AddToScheme(s) + g.Expect(err).NotTo(gmg.HaveOccurred()) + + c, err := client.New(restCfg, client.Options{Scheme: s}) + g.Expect(err).NotTo(gmg.HaveOccurred()) + + // Register a new CRD in a new group to avoid collisions when deleting versions - "taxis.inventory.example.com". + group := "inventory.example.com" + kind := "Taxi" + plural := "taxis" + crdName := plural + "." + group + // Create a CRD with two versions: v1alpha1 and v1 where both are served and + // v1 is the storage version so we can easily remove v1alpha1 later. + crd := newCRD(ctx, g, c, group, kind, plural) + v1alpha1 := crd.Spec.Versions[0] + v1alpha1.Name = "v1alpha1" + v1alpha1.Storage = false + v1alpha1.Served = true + v1 := crd.Spec.Versions[0] + v1.Name = "v1" + v1.Storage = true + v1.Served = true + crd.Spec.Versions = []apiextensionsv1.CustomResourceDefinitionVersion{v1alpha1, v1} + g.Expect(c.Create(ctx, crd)).To(gmg.Succeed()) + t.Cleanup(func() { + g.Expect(c.Delete(ctx, crd)).To(gmg.Succeed()) + }) + + // Wait until the CRD is registered. + discHTTP, err := rest.HTTPClientFor(restCfg) + g.Expect(err).NotTo(gmg.HaveOccurred()) + discClient, err := discovery.NewDiscoveryClientForConfigAndClient(restCfg, discHTTP) + g.Expect(err).NotTo(gmg.HaveOccurred()) + g.Eventually(func(g gmg.Gomega) { + _, err = discClient.ServerResourcesForGroupVersion(group + "/v1") + g.Expect(err).NotTo(gmg.HaveOccurred()) + }).Should(gmg.Succeed(), "v1 should be available") + + // There are no requests before any call + g.Expect(crt.GetRequestCount()).To(gmg.Equal(0)) + + // Since we don't specify what version we expect, restmapper will fetch them all and search there. + // To fetch a list of available versions + // #1: GET https://host/api + // #2: GET https://host/apis + // Then, for all available versions: + // #3: GET https://host/apis/inventory.example.com/v1alpha1 + // #4: GET https://host/apis/inventory.example.com/v1 + // This should fill the cache for apiGroups and versions. + mapping, err := lazyRestMapper.RESTMapping(schema.GroupKind{Group: group, Kind: kind}) + g.Expect(err).NotTo(gmg.HaveOccurred()) + g.Expect(mapping.GroupVersionKind.Kind).To(gmg.Equal(kind)) + g.Expect(crt.GetRequestCount()).To(gmg.Equal(4)) + crt.Reset() // We reset the counter to check how many additional requests are made later. + + // At this point v1alpha1 should be cached + _, err = lazyRestMapper.RESTMapping(schema.GroupKind{Group: group, Kind: kind}, "v1alpha1") + g.Expect(err).NotTo(gmg.HaveOccurred()) + g.Expect(crt.GetRequestCount()).To(gmg.Equal(0)) + + // We update the CRD to only have v1 version. + g.Expect(c.Get(ctx, types.NamespacedName{Name: crdName}, crd)).To(gmg.Succeed()) + for _, version := range crd.Spec.Versions { + if version.Name == "v1" { + v1 = version + break + } + } + crd.Spec.Versions = []apiextensionsv1.CustomResourceDefinitionVersion{v1} + g.Expect(c.Update(ctx, crd)).To(gmg.Succeed()) + + // We wait until v1alpha1 is not available anymore. + g.Eventually(func(g gmg.Gomega) { + _, err = discClient.ServerResourcesForGroupVersion(group + "/v1alpha1") + g.Expect(apierrors.IsNotFound(err)).To(gmg.BeTrue(), "v1alpha1 should not be available anymore") + }).Should(gmg.Succeed()) + + // Although v1alpha1 is not available anymore, the cache is not invalidated yet so it should return a mapping. + _, err = lazyRestMapper.RESTMapping(schema.GroupKind{Group: group, Kind: kind}, "v1alpha1") + g.Expect(err).NotTo(gmg.HaveOccurred()) + g.Expect(crt.GetRequestCount()).To(gmg.Equal(0)) + + // We request Limo, which is not in the mapper because it doesn't exist. + // This will trigger a reload of the lazy mapper cache. + // Reloading the cache will read v2 again and since it's not available anymore, it should invalidate the cache. + // #1: GET https://host/apis/inventory.example.com/v1alpha1 + // #2: GET https://host/apis/inventory.example.com/v1 + _, err = lazyRestMapper.RESTMapping(schema.GroupKind{Group: group, Kind: "Limo"}) + g.Expect(err).To(beNoMatchError()) + g.Expect(crt.GetRequestCount()).To(gmg.Equal(2)) + crt.Reset() + + // Now we request v1alpha1 again and it should return an error since the cache was invalidated. + // #1: GET https://host/apis/inventory.example.com/v1alpha1 + _, err = lazyRestMapper.RESTMapping(schema.GroupKind{Group: group, Kind: kind}, "v1alpha1") + g.Expect(err).To(beNoMatchError()) + g.Expect(crt.GetRequestCount()).To(gmg.Equal(1)) + crt.Reset() + + // Since we don't specify what version we expect, restmapper will fetch them all and search there. + // To fetch a list of available versions + // #1: GET https://host/api + // #2: GET https://host/apis + // Then, for all available versions: + // #3: GET https://host/apis/inventory.example.com/v1 + mapping, err = lazyRestMapper.RESTMapping(schema.GroupKind{Group: group, Kind: kind}) + g.Expect(err).NotTo(gmg.HaveOccurred()) + g.Expect(mapping.GroupVersionKind.Kind).To(gmg.Equal(kind)) + g.Expect(crt.GetRequestCount()).To(gmg.Equal(3)) + }) +} + +// createNewCRD creates a new CRD with the given group, kind, and plural and returns it. +func createNewCRD(ctx context.Context, g gmg.Gomega, c client.Client, group, kind, plural string) *apiextensionsv1.CustomResourceDefinition { + newCRD := newCRD(ctx, g, c, group, kind, plural) + g.Expect(c.Create(ctx, newCRD)).To(gmg.Succeed()) + + return newCRD +} + +// newCRD returns a new CRD with the given group, kind, and plural. +func newCRD(ctx context.Context, g gmg.Gomega, c client.Client, group, kind, plural string) *apiextensionsv1.CustomResourceDefinition { + crd := &apiextensionsv1.CustomResourceDefinition{} + err := c.Get(ctx, types.NamespacedName{Name: "drivers.crew.example.com"}, crd) + g.Expect(err).NotTo(gmg.HaveOccurred()) + g.Expect(crd.Spec.Names.Kind).To(gmg.Equal("Driver")) + + newCRD := &apiextensionsv1.CustomResourceDefinition{} + crd.DeepCopyInto(newCRD) + newCRD.Spec.Group = group + newCRD.Name = plural + "." + group + newCRD.Spec.Names = apiextensionsv1.CustomResourceDefinitionNames{ + Kind: kind, + Plural: plural, + } + newCRD.ResourceVersion = "" + + return newCRD +} + +func beNoMatchError() gomegatypes.GomegaMatcher { + return &errorMatcher{ + checkFunc: meta.IsNoMatchError, + message: "NoMatch", + } +} + +type errorMatcher struct { + checkFunc func(error) bool + message string +} + +func (e *errorMatcher) Match(actual interface{}) (success bool, err error) { + if actual == nil { + return false, nil + } + + actualErr, actualOk := actual.(error) + if !actualOk { + return false, fmt.Errorf("expected an error-type. got:\n%s", format.Object(actual, 1)) + } + + return e.checkFunc(actualErr), nil +} + +func (e *errorMatcher) FailureMessage(actual interface{}) (message string) { + return format.Message(actual, fmt.Sprintf("to be %s error", e.message)) +} + +func (e *errorMatcher) NegatedFailureMessage(actual interface{}) (message string) { + return format.Message(actual, fmt.Sprintf("not to be %s error", e.message)) }