Skip to content

Commit

Permalink
Allow lazy restmapper to work with CRDs created in runtime
Browse files Browse the repository at this point in the history
Now lazy restmapper fetches all API resources once at start and then
caches them. It prevents it from discovery of new CRDs created after
the controller has started.
This commit allows lazy restmapper to work with such CRDs.
  • Loading branch information
Fedosin authored and sbueringer committed Mar 1, 2023
1 parent 0b49f2e commit 66fe1a0
Show file tree
Hide file tree
Showing 2 changed files with 143 additions and 37 deletions.
60 changes: 39 additions & 21 deletions pkg/client/apiutil/lazyrestmapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type lazyRESTMapper struct {
mapper meta.RESTMapper
client *discovery.DiscoveryClient
knownGroups map[string]*restmapper.APIGroupResources
apiGroups *metav1.APIGroupList
apiGroups []metav1.APIGroup

// mutex to provide thread-safe mapper reloading.
mu sync.Mutex
Expand All @@ -45,6 +45,7 @@ func newLazyRESTMapperWithClient(discoveryClient *discovery.DiscoveryClient) (me
mapper: restmapper.NewDiscoveryRESTMapper([]*restmapper.APIGroupResources{}),
client: discoveryClient,
knownGroups: map[string]*restmapper.APIGroupResources{},
apiGroups: []metav1.APIGroup{},
}, nil
}

Expand Down Expand Up @@ -147,7 +148,7 @@ func (m *lazyRESTMapper) addKnownGroupAndReload(groupName string, versions ...st
// This operation requires 2 requests: /api and /apis, but only once. For all subsequent calls
// this data will be taken from cache.
if len(versions) == 0 {
apiGroup, err := m.findAPIGroupByName(groupName)
apiGroup, err := m.findAPIGroupByNameLocked(groupName)
if err != nil {
return err
}
Expand Down Expand Up @@ -176,11 +177,22 @@ func (m *lazyRESTMapper) addKnownGroupAndReload(groupName string, versions ...st
}

// Update information for group resources about the API group by adding new versions.
// Ignore the versions that are already registered.
for _, version := range versions {
groupResources.Group.Versions = append(groupResources.Group.Versions, metav1.GroupVersionForDiscovery{
GroupVersion: metav1.GroupVersion{Group: groupName, Version: version}.String(),
Version: version,
})
found := false
for _, v := range groupResources.Group.Versions {
if v.Version == version {
found = true
break
}
}

if !found {
groupResources.Group.Versions = append(groupResources.Group.Versions, metav1.GroupVersionForDiscovery{
GroupVersion: metav1.GroupVersion{Group: groupName, Version: version}.String(),
Version: version,
})
}
}

// Update data in the cache.
Expand All @@ -197,28 +209,34 @@ func (m *lazyRESTMapper) addKnownGroupAndReload(groupName string, versions ...st
return nil
}

// findAPIGroupByName returns API group by its name.
func (m *lazyRESTMapper) findAPIGroupByName(groupName string) (metav1.APIGroup, error) {
// Ensure that required info about existing API groups is received and stored in the mapper.
// It will make 2 API calls to /api and /apis, but only once.
if m.apiGroups == nil {
apiGroups, err := m.client.ServerGroups()
if err != nil {
return metav1.APIGroup{}, fmt.Errorf("failed to get server groups: %w", err)
}
if len(apiGroups.Groups) == 0 {
return metav1.APIGroup{}, fmt.Errorf("received an empty API groups list")
// findAPIGroupByNameLocked returns API group by its name.
func (m *lazyRESTMapper) findAPIGroupByNameLocked(groupName string) (metav1.APIGroup, error) {
// Looking in the cache first.
for _, apiGroup := range m.apiGroups {
if groupName == apiGroup.Name {
return apiGroup, nil
}
}

m.apiGroups = apiGroups
// Update the cache if nothing was found.
apiGroups, err := m.client.ServerGroups()
if err != nil {
return metav1.APIGroup{}, fmt.Errorf("failed to get server groups: %w", err)
}
if len(apiGroups.Groups) == 0 {
return metav1.APIGroup{}, fmt.Errorf("received an empty API groups list")
}

m.apiGroups = apiGroups.Groups

for i := range m.apiGroups.Groups {
if groupName == (&m.apiGroups.Groups[i]).Name {
return m.apiGroups.Groups[i], nil
// Looking in the cache again.
for _, apiGroup := range m.apiGroups {
if groupName == apiGroup.Name {
return apiGroup, nil
}
}

// If there is still nothing, return an error.
return metav1.APIGroup{}, fmt.Errorf("failed to find API group %s", groupName)
}

Expand Down
120 changes: 104 additions & 16 deletions pkg/client/apiutil/lazyrestmapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,19 @@ limitations under the License.
package apiutil_test

import (
"context"
"net/http"
"testing"

_ "github.com/onsi/ginkgo/v2"
gmg "github.com/onsi/gomega"

apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
"sigs.k8s.io/controller-runtime/pkg/envtest"
)
Expand Down Expand Up @@ -102,38 +107,38 @@ func TestLazyRestMapperProvider(t *testing.T) {
// There are no requests before any call
g.Expect(crt.GetRequestCount()).To(gmg.Equal(0))

mapping, err := lazyRestMapper.RESTMapping(schema.GroupKind{Group: "apps", Kind: "deployment"})
mapping, err := lazyRestMapper.RESTMapping(schema.GroupKind{Group: "apps", Kind: "deployment"}, "v1")
g.Expect(err).NotTo(gmg.HaveOccurred())
g.Expect(mapping.GroupVersionKind.Kind).To(gmg.Equal("deployment"))
g.Expect(crt.GetRequestCount()).To(gmg.Equal(3))
g.Expect(crt.GetRequestCount()).To(gmg.Equal(1))

mappings, err := lazyRestMapper.RESTMappings(schema.GroupKind{Group: "", Kind: "pod"})
mappings, err := lazyRestMapper.RESTMappings(schema.GroupKind{Group: "", Kind: "pod"}, "v1")
g.Expect(err).NotTo(gmg.HaveOccurred())
g.Expect(len(mappings)).To(gmg.Equal(1))
g.Expect(mappings[0].GroupVersionKind.Kind).To(gmg.Equal("pod"))
g.Expect(crt.GetRequestCount()).To(gmg.Equal(4))
g.Expect(crt.GetRequestCount()).To(gmg.Equal(2))

kind, err := lazyRestMapper.KindFor(schema.GroupVersionResource{Group: "networking.k8s.io", Version: "v1", Resource: "ingresses"})
g.Expect(err).NotTo(gmg.HaveOccurred())
g.Expect(kind.Kind).To(gmg.Equal("Ingress"))
g.Expect(crt.GetRequestCount()).To(gmg.Equal(5))
g.Expect(crt.GetRequestCount()).To(gmg.Equal(3))

kinds, err := lazyRestMapper.KindsFor(schema.GroupVersionResource{Group: "authentication.k8s.io", Version: "v1", Resource: "tokenreviews"})
g.Expect(err).NotTo(gmg.HaveOccurred())
g.Expect(len(kinds)).To(gmg.Equal(1))
g.Expect(kinds[0].Kind).To(gmg.Equal("TokenReview"))
g.Expect(crt.GetRequestCount()).To(gmg.Equal(6))
g.Expect(crt.GetRequestCount()).To(gmg.Equal(4))

resource, err := lazyRestMapper.ResourceFor(schema.GroupVersionResource{Group: "scheduling.k8s.io", Version: "v1", Resource: "priorityclasses"})
g.Expect(err).NotTo(gmg.HaveOccurred())
g.Expect(resource.Resource).To(gmg.Equal("priorityclasses"))
g.Expect(crt.GetRequestCount()).To(gmg.Equal(7))
g.Expect(crt.GetRequestCount()).To(gmg.Equal(5))

resources, err := lazyRestMapper.ResourcesFor(schema.GroupVersionResource{Group: "policy", Version: "v1", Resource: "poddisruptionbudgets"})
g.Expect(err).NotTo(gmg.HaveOccurred())
g.Expect(len(resources)).To(gmg.Equal(1))
g.Expect(resources[0].Resource).To(gmg.Equal("poddisruptionbudgets"))
g.Expect(crt.GetRequestCount()).To(gmg.Equal(8))
g.Expect(crt.GetRequestCount()).To(gmg.Equal(6))
})

t.Run("LazyRESTMapper should cache fetched data and doesn't perform any additional requests", func(t *testing.T) {
Expand Down Expand Up @@ -344,29 +349,29 @@ func TestLazyRestMapperProvider(t *testing.T) {
lazyRestMapper, err := apiutil.NewDynamicRESTMapper(restCfg, apiutil.WithExperimentalLazyMapper)
g.Expect(err).NotTo(gmg.HaveOccurred())

_, err = lazyRestMapper.RESTMapping(schema.GroupKind{Group: "apps", Kind: "INVALID"})
_, err = lazyRestMapper.RESTMapping(schema.GroupKind{Group: "apps", Kind: "INVALID"}, "v1")
g.Expect(err).To(gmg.HaveOccurred())
g.Expect(crt.GetRequestCount()).To(gmg.Equal(3))
g.Expect(crt.GetRequestCount()).To(gmg.Equal(1))

_, err = lazyRestMapper.RESTMappings(schema.GroupKind{Group: "", Kind: "INVALID"})
_, err = lazyRestMapper.RESTMappings(schema.GroupKind{Group: "", Kind: "INVALID"}, "v1")
g.Expect(err).To(gmg.HaveOccurred())
g.Expect(crt.GetRequestCount()).To(gmg.Equal(4))
g.Expect(crt.GetRequestCount()).To(gmg.Equal(2))

_, err = lazyRestMapper.KindFor(schema.GroupVersionResource{Group: "networking.k8s.io", Version: "v1", Resource: "INVALID"})
g.Expect(err).To(gmg.HaveOccurred())
g.Expect(crt.GetRequestCount()).To(gmg.Equal(5))
g.Expect(crt.GetRequestCount()).To(gmg.Equal(3))

_, err = lazyRestMapper.KindsFor(schema.GroupVersionResource{Group: "authentication.k8s.io", Version: "v1", Resource: "INVALID"})
g.Expect(err).To(gmg.HaveOccurred())
g.Expect(crt.GetRequestCount()).To(gmg.Equal(6))
g.Expect(crt.GetRequestCount()).To(gmg.Equal(4))

_, err = lazyRestMapper.ResourceFor(schema.GroupVersionResource{Group: "scheduling.k8s.io", Version: "v1", Resource: "INVALID"})
g.Expect(err).To(gmg.HaveOccurred())
g.Expect(crt.GetRequestCount()).To(gmg.Equal(7))
g.Expect(crt.GetRequestCount()).To(gmg.Equal(5))

_, err = lazyRestMapper.ResourcesFor(schema.GroupVersionResource{Group: "policy", Version: "v1", Resource: "INVALID"})
g.Expect(err).To(gmg.HaveOccurred())
g.Expect(crt.GetRequestCount()).To(gmg.Equal(8))
g.Expect(crt.GetRequestCount()).To(gmg.Equal(6))
})

t.Run("LazyRESTMapper should return an error if the version doesn't exist", func(t *testing.T) {
Expand Down Expand Up @@ -407,4 +412,87 @@ func TestLazyRestMapperProvider(t *testing.T) {
g.Expect(err).To(gmg.HaveOccurred())
g.Expect(crt.GetRequestCount()).To(gmg.Equal(6))
})

t.Run("LazyRESTMapper can fetch CRDs if they were created at runtime", func(t *testing.T) {
g := gmg.NewWithT(t)

// To fetch all versions mapper does 2 requests:
// GET https://host/api
// GET https://host/apis
// Then, for each version it performs just one request to the API server as usual:
// GET https://host/apis/<group>/<version>

// Note: We have to use a separate restCfg for the Client, otherwise we
// get a race condition on the counting round tripper between the Client
// and the lazy restmapper.
clientRestCfg := rest.CopyConfig(restCfg)

var crt *countingRoundTripper
restCfg := rest.CopyConfig(restCfg)
restCfg.WrapTransport = func(rt http.RoundTripper) http.RoundTripper {
crt = newCountingRoundTripper(rt)
return crt
}

lazyRestMapper, err := apiutil.NewDynamicRESTMapper(restCfg, apiutil.WithExperimentalLazyMapper)
g.Expect(err).NotTo(gmg.HaveOccurred())

// There are no requests before any call
g.Expect(crt.GetRequestCount()).To(gmg.Equal(0))

// Since we don't specify what version we expect, restmapper will fetch them all and search there.
// To fetch a list of available versions
// #1: GET https://host/api
// #2: GET https://host/apis
// Then, for each currently registered version:
// #3: GET https://host/apis/crew.example.com/v1
// #4: GET https://host/apis/crew.example.com/v2
mapping, err := lazyRestMapper.RESTMapping(schema.GroupKind{Group: "crew.example.com", Kind: "driver"})
g.Expect(err).NotTo(gmg.HaveOccurred())
g.Expect(mapping.GroupVersionKind.Kind).To(gmg.Equal("driver"))
g.Expect(crt.GetRequestCount()).To(gmg.Equal(4))

s := scheme.Scheme
err = apiextensionsv1.AddToScheme(s)
g.Expect(err).NotTo(gmg.HaveOccurred())

c, err := client.New(clientRestCfg, client.Options{Scheme: s})
g.Expect(err).NotTo(gmg.HaveOccurred())

// Register another CRD in runtime - "riders.crew.example.com".

crd := &apiextensionsv1.CustomResourceDefinition{}
err = c.Get(context.TODO(), types.NamespacedName{Name: "drivers.crew.example.com"}, crd)
g.Expect(err).NotTo(gmg.HaveOccurred())
g.Expect(crd.Spec.Names.Kind).To(gmg.Equal("Driver"))

newCRD := &apiextensionsv1.CustomResourceDefinition{}
crd.DeepCopyInto(newCRD)
newCRD.Name = "riders.crew.example.com"
newCRD.Spec.Names = apiextensionsv1.CustomResourceDefinitionNames{
Kind: "Rider",
Plural: "riders",
}
newCRD.ResourceVersion = ""

// Create the new CRD.
g.Expect(c.Create(context.TODO(), newCRD)).To(gmg.Succeed())

// Wait a bit until the CRD is registered.
g.Eventually(func() error {
_, err := lazyRestMapper.RESTMapping(schema.GroupKind{Group: "crew.example.com", Kind: "rider"})
return err
}).Should(gmg.Succeed())

// Since we don't specify what version we expect, restmapper will fetch them all and search there.
// To fetch a list of available versions
// #1: GET https://host/api
// #2: GET https://host/apis
// Then, for each currently registered version:
// #3: GET https://host/apis/crew.example.com/v1
// #4: GET https://host/apis/crew.example.com/v2
mapping, err = lazyRestMapper.RESTMapping(schema.GroupKind{Group: "crew.example.com", Kind: "rider"})
g.Expect(err).NotTo(gmg.HaveOccurred())
g.Expect(mapping.GroupVersionKind.Kind).To(gmg.Equal("rider"))
})
}

0 comments on commit 66fe1a0

Please sign in to comment.