From f2fc48e63c4cdadcce3881be148ee10bf85c6fb7 Mon Sep 17 00:00:00 2001 From: Min Kim <291271447@qq.com> Date: Tue, 15 Mar 2022 15:29:53 +0800 Subject: [PATCH] Feat: Conditional CSR controller spawning (#84) * [generated] go mod vendor Signed-off-by: yue9944882 <291271447@qq.com> * conditional CSR controller enabling Signed-off-by: yue9944882 <291271447@qq.com> --- pkg/addonmanager/manager.go | 58 +++-- pkg/utils/csr_helper_test.go | 60 +++++ pkg/utils/csr_helpers.go | 29 +++ .../discovery/cached/memory/memcache.go | 233 ++++++++++++++++++ vendor/modules.txt | 1 + 5 files changed, 361 insertions(+), 20 deletions(-) create mode 100644 vendor/k8s.io/client-go/discovery/cached/memory/memcache.go diff --git a/pkg/addonmanager/manager.go b/pkg/addonmanager/manager.go index 223098802..e426c3f7c 100644 --- a/pkg/addonmanager/manager.go +++ b/pkg/addonmanager/manager.go @@ -6,6 +6,7 @@ import ( "io/ioutil" "time" + "github.com/openshift/library-go/pkg/controller/factory" "github.com/openshift/library-go/pkg/operator/events" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" kubeinformers "k8s.io/client-go/informers" @@ -20,6 +21,7 @@ import ( "open-cluster-management.io/addon-framework/pkg/addonmanager/controllers/clustermanagement" "open-cluster-management.io/addon-framework/pkg/addonmanager/controllers/registration" "open-cluster-management.io/addon-framework/pkg/agent" + "open-cluster-management.io/addon-framework/pkg/utils" addonv1alpha1client "open-cluster-management.io/api/client/addon/clientset/versioned" addoninformers "open-cluster-management.io/api/client/addon/informers/externalversions" clusterv1client "open-cluster-management.io/api/client/cluster/clientset/versioned" @@ -76,6 +78,11 @@ func (a *addonManager) Start(ctx context.Context) error { return err } + v1CSRSupported, _, err := utils.IsCSRSupported(kubeClient) + if err != nil { + return err + } + namespace, err := a.getComponentNamespace() if err != nil { klog.Warningf("unable to identify the current namespace for events: %v", err) @@ -150,24 +157,6 @@ func (a *addonManager) Start(ctx context.Context) error { eventRecorder, ) - csrApproveController := certificate.NewCSRApprovingController( - kubeClient, - clusterInformers.Cluster().V1().ManagedClusters(), - kubeInfomers.Certificates().V1().CertificateSigningRequests(), - addonInformers.Addon().V1alpha1().ManagedClusterAddOns(), - a.addonAgents, - eventRecorder, - ) - - csrSignController := certificate.NewCSRSignController( - kubeClient, - clusterInformers.Cluster().V1().ManagedClusters(), - kubeInfomers.Certificates().V1().CertificateSigningRequests(), - addonInformers.Addon().V1alpha1().ManagedClusterAddOns(), - a.addonAgents, - eventRecorder, - ) - clusterManagementController := clustermanagement.NewClusterManagementController( addonClient, clusterInformers.Cluster().V1().ManagedClusters(), @@ -192,6 +181,31 @@ func (a *addonManager) Start(ctx context.Context) error { a.addonAgents, eventRecorder) + var csrApproveController factory.Controller + var csrSignController factory.Controller + // Spawn the following controllers only if v1 CSR api is supported in the + // hub cluster. Under v1beta1 CSR api, all the CSR objects will be signed + // by the kube-controller-manager so custom CSR controller should be + // disabled to avoid conflict. + if v1CSRSupported { + csrApproveController = certificate.NewCSRApprovingController( + kubeClient, + clusterInformers.Cluster().V1().ManagedClusters(), + kubeInfomers.Certificates().V1().CertificateSigningRequests(), + addonInformers.Addon().V1alpha1().ManagedClusterAddOns(), + a.addonAgents, + eventRecorder, + ) + csrSignController = certificate.NewCSRSignController( + kubeClient, + clusterInformers.Cluster().V1().ManagedClusters(), + kubeInfomers.Certificates().V1().CertificateSigningRequests(), + addonInformers.Addon().V1alpha1().ManagedClusterAddOns(), + a.addonAgents, + eventRecorder, + ) + } + go addonInformers.Start(ctx.Done()) go workInformers.Start(ctx.Done()) go clusterInformers.Start(ctx.Done()) @@ -200,11 +214,15 @@ func (a *addonManager) Start(ctx context.Context) error { go deployController.Run(ctx, 1) go hookDeployController.Run(ctx, 1) go registrationController.Run(ctx, 1) - go csrApproveController.Run(ctx, 1) - go csrSignController.Run(ctx, 1) go clusterManagementController.Run(ctx, 1) go addonInstallController.Run(ctx, 1) go addonHealthCheckController.Run(ctx, 1) + if csrApproveController != nil { + go csrApproveController.Run(ctx, 1) + } + if csrSignController != nil { + go csrSignController.Run(ctx, 1) + } return nil } diff --git a/pkg/utils/csr_helper_test.go b/pkg/utils/csr_helper_test.go index c4a0e2dad..e440e5aaf 100644 --- a/pkg/utils/csr_helper_test.go +++ b/pkg/utils/csr_helper_test.go @@ -6,8 +6,11 @@ import ( "time" "github.com/openshift/library-go/pkg/crypto" + "github.com/stretchr/testify/assert" certificatesv1 "k8s.io/api/certificates/v1" + certificatesv1beta1 "k8s.io/api/certificates/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes/fake" certutil "k8s.io/client-go/util/cert" "k8s.io/client-go/util/keyutil" "open-cluster-management.io/addon-framework/pkg/agent" @@ -173,3 +176,60 @@ func TestUnionApprover(t *testing.T) { }) } } + +func TestIsCSRSupported(t *testing.T) { + cases := []struct { + apiResources []*metav1.APIResourceList + expectedV1 bool + expectedV1beta1 bool + expectedError error + }{ + { + apiResources: []*metav1.APIResourceList{ + { + GroupVersion: certificatesv1.SchemeGroupVersion.String(), + APIResources: []metav1.APIResource{ + { + Name: "certificatesigningrequests", + Kind: "CertificateSigningRequest", + }, + }, + }, + { + GroupVersion: certificatesv1beta1.SchemeGroupVersion.String(), + APIResources: []metav1.APIResource{ + { + Name: "certificatesigningrequests", + Kind: "CertificateSigningRequest", + }, + }, + }, + }, + expectedV1: true, + expectedV1beta1: true, + }, + { + apiResources: []*metav1.APIResourceList{ + { + GroupVersion: certificatesv1beta1.SchemeGroupVersion.String(), + APIResources: []metav1.APIResource{ + { + Name: "certificatesigningrequests", + Kind: "CertificateSigningRequest", + }, + }, + }, + }, + expectedV1: false, + expectedV1beta1: true, + }, + } + for _, c := range cases { + fakeClient := fake.NewSimpleClientset() + fakeClient.Resources = c.apiResources + v1Supported, v1beta1Supported, err := IsCSRSupported(fakeClient) + assert.Equal(t, c.expectedV1, v1Supported) + assert.Equal(t, c.expectedV1beta1, v1beta1Supported) + assert.Equal(t, c.expectedError, err) + } +} diff --git a/pkg/utils/csr_helpers.go b/pkg/utils/csr_helpers.go index cf09ab4ce..493fbc101 100644 --- a/pkg/utils/csr_helpers.go +++ b/pkg/utils/csr_helpers.go @@ -12,7 +12,11 @@ import ( "time" certificatesv1 "k8s.io/api/certificates/v1" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/discovery/cached/memory" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/restmapper" "k8s.io/klog/v2" "open-cluster-management.io/addon-framework/pkg/agent" addonapiv1alpha1 "open-cluster-management.io/api/addon/v1alpha1" @@ -178,3 +182,28 @@ func UnionCSRApprover(approvers ...agent.CSRApproveFunc) agent.CSRApproveFunc { return true } } + +// IsCSRSupported checks whether the cluster supports v1 or v1beta1 csr api. +func IsCSRSupported(nativeClient kubernetes.Interface) (bool, bool, error) { + mapper := restmapper.NewDeferredDiscoveryRESTMapper(memory.NewMemCacheClient(nativeClient.Discovery())) + mappings, err := mapper.RESTMappings(schema.GroupKind{ + Group: certificatesv1.GroupName, + Kind: "CertificateSigningRequest", + }) + if err != nil { + return false, false, err + } + v1CSRSupported := false + for _, mapping := range mappings { + if mapping.GroupVersionKind.Version == "v1" { + v1CSRSupported = true + } + } + v1beta1CSRSupported := false + for _, mapping := range mappings { + if mapping.GroupVersionKind.Version == "v1beta1" { + v1beta1CSRSupported = true + } + } + return v1CSRSupported, v1beta1CSRSupported, nil +} diff --git a/vendor/k8s.io/client-go/discovery/cached/memory/memcache.go b/vendor/k8s.io/client-go/discovery/cached/memory/memcache.go new file mode 100644 index 000000000..9de389fa7 --- /dev/null +++ b/vendor/k8s.io/client-go/discovery/cached/memory/memcache.go @@ -0,0 +1,233 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package memory + +import ( + "errors" + "fmt" + "sync" + "syscall" + + openapi_v2 "github.com/googleapis/gnostic/openapiv2" + + errorsutil "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/version" + "k8s.io/client-go/discovery" + restclient "k8s.io/client-go/rest" +) + +type cacheEntry struct { + resourceList *metav1.APIResourceList + err error +} + +// memCacheClient can Invalidate() to stay up-to-date with discovery +// information. +// +// TODO: Switch to a watch interface. Right now it will poll after each +// Invalidate() call. +type memCacheClient struct { + delegate discovery.DiscoveryInterface + + lock sync.RWMutex + groupToServerResources map[string]*cacheEntry + groupList *metav1.APIGroupList + cacheValid bool +} + +// Error Constants +var ( + ErrCacheNotFound = errors.New("not found") +) + +var _ discovery.CachedDiscoveryInterface = &memCacheClient{} + +// isTransientConnectionError checks whether given error is "Connection refused" or +// "Connection reset" error which usually means that apiserver is temporarily +// unavailable. +func isTransientConnectionError(err error) bool { + var errno syscall.Errno + if errors.As(err, &errno) { + return errno == syscall.ECONNREFUSED || errno == syscall.ECONNRESET + } + return false +} + +func isTransientError(err error) bool { + if isTransientConnectionError(err) { + return true + } + + if t, ok := err.(errorsutil.APIStatus); ok && t.Status().Code >= 500 { + return true + } + + return errorsutil.IsTooManyRequests(err) +} + +// ServerResourcesForGroupVersion returns the supported resources for a group and version. +func (d *memCacheClient) ServerResourcesForGroupVersion(groupVersion string) (*metav1.APIResourceList, error) { + d.lock.Lock() + defer d.lock.Unlock() + if !d.cacheValid { + if err := d.refreshLocked(); err != nil { + return nil, err + } + } + cachedVal, ok := d.groupToServerResources[groupVersion] + if !ok { + return nil, ErrCacheNotFound + } + + if cachedVal.err != nil && isTransientError(cachedVal.err) { + r, err := d.serverResourcesForGroupVersion(groupVersion) + if err != nil { + utilruntime.HandleError(fmt.Errorf("couldn't get resource list for %v: %v", groupVersion, err)) + } + cachedVal = &cacheEntry{r, err} + d.groupToServerResources[groupVersion] = cachedVal + } + + return cachedVal.resourceList, cachedVal.err +} + +// ServerResources returns the supported resources for all groups and versions. +// Deprecated: use ServerGroupsAndResources instead. +func (d *memCacheClient) ServerResources() ([]*metav1.APIResourceList, error) { + return discovery.ServerResources(d) +} + +// ServerGroupsAndResources returns the groups and supported resources for all groups and versions. +func (d *memCacheClient) ServerGroupsAndResources() ([]*metav1.APIGroup, []*metav1.APIResourceList, error) { + return discovery.ServerGroupsAndResources(d) +} + +func (d *memCacheClient) ServerGroups() (*metav1.APIGroupList, error) { + d.lock.Lock() + defer d.lock.Unlock() + if !d.cacheValid { + if err := d.refreshLocked(); err != nil { + return nil, err + } + } + return d.groupList, nil +} + +func (d *memCacheClient) RESTClient() restclient.Interface { + return d.delegate.RESTClient() +} + +func (d *memCacheClient) ServerPreferredResources() ([]*metav1.APIResourceList, error) { + return discovery.ServerPreferredResources(d) +} + +func (d *memCacheClient) ServerPreferredNamespacedResources() ([]*metav1.APIResourceList, error) { + return discovery.ServerPreferredNamespacedResources(d) +} + +func (d *memCacheClient) ServerVersion() (*version.Info, error) { + return d.delegate.ServerVersion() +} + +func (d *memCacheClient) OpenAPISchema() (*openapi_v2.Document, error) { + return d.delegate.OpenAPISchema() +} + +func (d *memCacheClient) Fresh() bool { + d.lock.RLock() + defer d.lock.RUnlock() + // Return whether the cache is populated at all. It is still possible that + // a single entry is missing due to transient errors and the attempt to read + // that entry will trigger retry. + return d.cacheValid +} + +// Invalidate enforces that no cached data that is older than the current time +// is used. +func (d *memCacheClient) Invalidate() { + d.lock.Lock() + defer d.lock.Unlock() + d.cacheValid = false + d.groupToServerResources = nil + d.groupList = nil +} + +// refreshLocked refreshes the state of cache. The caller must hold d.lock for +// writing. +func (d *memCacheClient) refreshLocked() error { + // TODO: Could this multiplicative set of calls be replaced by a single call + // to ServerResources? If it's possible for more than one resulting + // APIResourceList to have the same GroupVersion, the lists would need merged. + gl, err := d.delegate.ServerGroups() + if err != nil || len(gl.Groups) == 0 { + utilruntime.HandleError(fmt.Errorf("couldn't get current server API group list: %v", err)) + return err + } + + wg := &sync.WaitGroup{} + resultLock := &sync.Mutex{} + rl := map[string]*cacheEntry{} + for _, g := range gl.Groups { + for _, v := range g.Versions { + gv := v.GroupVersion + wg.Add(1) + go func() { + defer wg.Done() + defer utilruntime.HandleCrash() + + r, err := d.serverResourcesForGroupVersion(gv) + if err != nil { + utilruntime.HandleError(fmt.Errorf("couldn't get resource list for %v: %v", gv, err)) + } + + resultLock.Lock() + defer resultLock.Unlock() + rl[gv] = &cacheEntry{r, err} + }() + } + } + wg.Wait() + + d.groupToServerResources, d.groupList = rl, gl + d.cacheValid = true + return nil +} + +func (d *memCacheClient) serverResourcesForGroupVersion(groupVersion string) (*metav1.APIResourceList, error) { + r, err := d.delegate.ServerResourcesForGroupVersion(groupVersion) + if err != nil { + return r, err + } + if len(r.APIResources) == 0 { + return r, fmt.Errorf("Got empty response for: %v", groupVersion) + } + return r, nil +} + +// NewMemCacheClient creates a new CachedDiscoveryInterface which caches +// discovery information in memory and will stay up-to-date if Invalidate is +// called with regularity. +// +// NOTE: The client will NOT resort to live lookups on cache misses. +func NewMemCacheClient(delegate discovery.DiscoveryInterface) discovery.CachedDiscoveryInterface { + return &memCacheClient{ + delegate: delegate, + groupToServerResources: map[string]*cacheEntry{}, + } +} diff --git a/vendor/modules.txt b/vendor/modules.txt index f4badfe89..2263f8bf7 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -953,6 +953,7 @@ k8s.io/client-go/applyconfigurations/storage/v1 k8s.io/client-go/applyconfigurations/storage/v1alpha1 k8s.io/client-go/applyconfigurations/storage/v1beta1 k8s.io/client-go/discovery +k8s.io/client-go/discovery/cached/memory k8s.io/client-go/discovery/fake k8s.io/client-go/dynamic k8s.io/client-go/informers