From a9eca3f044c9d7ee45a70ce7dbcf18161c954475 Mon Sep 17 00:00:00 2001 From: Malte Isberner Date: Thu, 8 Sep 2022 22:59:18 +0200 Subject: [PATCH] helm: fix a memory leak resulting from too many k8s client instantiations. See https://github.com/operator-framework/helm-operator-plugins/pull/198 for a detailed description of the issue. This commit ports over the relevant changes. Signed-off-by: Malte Isberner --- internal/cmd/helm-operator/run/cmd.go | 8 +- internal/helm/client/actionconfig.go | 120 +++++++++++++++++++++++ internal/helm/client/client.go | 98 +++--------------- internal/helm/client/restclientgetter.go | 113 +++++++++++++++++++++ internal/helm/release/manager_factory.go | 51 +++------- 5 files changed, 268 insertions(+), 122 deletions(-) create mode 100644 internal/helm/client/actionconfig.go create mode 100644 internal/helm/client/restclientgetter.go diff --git a/internal/cmd/helm-operator/run/cmd.go b/internal/cmd/helm-operator/run/cmd.go index d6deb6d662a..0c22658fa4d 100644 --- a/internal/cmd/helm-operator/run/cmd.go +++ b/internal/cmd/helm-operator/run/cmd.go @@ -37,6 +37,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/manager/signals" crmetrics "sigs.k8s.io/controller-runtime/pkg/metrics" + helmClient "github.com/operator-framework/operator-sdk/internal/helm/client" "github.com/operator-framework/operator-sdk/internal/helm/controller" "github.com/operator-framework/operator-sdk/internal/helm/flags" "github.com/operator-framework/operator-sdk/internal/helm/metrics" @@ -193,6 +194,11 @@ func run(cmd *cobra.Command, f *flags.Flags) { log.Error(err, "Failed to create new manager factories.") os.Exit(1) } + acg, err := helmClient.NewActionConfigGetter(mgr.GetConfig(), mgr.GetRESTMapper(), mgr.GetLogger()) + if err != nil { + log.Error(err, "Failed to create Helm action config getter") + os.Exit(1) + } for _, w := range ws { // Register the controller with the factory. reconcilePeriod := f.ReconcilePeriod @@ -203,7 +209,7 @@ func run(cmd *cobra.Command, f *flags.Flags) { err := controller.Add(mgr, controller.WatchOptions{ Namespace: namespace, GVK: w.GroupVersionKind, - ManagerFactory: release.NewManagerFactory(mgr, w.ChartDir), + ManagerFactory: release.NewManagerFactory(mgr, acg, w.ChartDir), ReconcilePeriod: reconcilePeriod, WatchDependentResources: *w.WatchDependentResources, OverrideValues: w.OverrideValues, diff --git a/internal/helm/client/actionconfig.go b/internal/helm/client/actionconfig.go new file mode 100644 index 00000000000..90f51728068 --- /dev/null +++ b/internal/helm/client/actionconfig.go @@ -0,0 +1,120 @@ +/* +Copyright 2020 The Operator-SDK Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package client + +import ( + "context" + "fmt" + + "k8s.io/client-go/kubernetes" + + "github.com/go-logr/logr" + "helm.sh/helm/v3/pkg/action" + "helm.sh/helm/v3/pkg/kube" + "helm.sh/helm/v3/pkg/storage" + "helm.sh/helm/v3/pkg/storage/driver" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + v1 "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/rest" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +type ActionConfigGetter interface { + ActionConfigFor(obj client.Object) (*action.Configuration, error) +} + +func NewActionConfigGetter(cfg *rest.Config, rm meta.RESTMapper, log logr.Logger) (ActionConfigGetter, error) { + rcg := newRESTClientGetter(cfg, rm, "") + // Setup the debug log function that Helm will use + debugLog := func(format string, v ...interface{}) { + if log.Enabled() { + log.V(1).Info(fmt.Sprintf(format, v...)) + } + } + + kc := kube.New(rcg) + kc.Log = debugLog + + kcs, err := kc.Factory.KubernetesClientSet() + if err != nil { + return nil, fmt.Errorf("creating kubernetes client set: %w", err) + } + + return &actionConfigGetter{ + kubeClient: kc, + kubeClientSet: kcs, + debugLog: debugLog, + restClientGetter: rcg.restClientGetter, + }, nil +} + +var _ ActionConfigGetter = &actionConfigGetter{} + +type actionConfigGetter struct { + kubeClient *kube.Client + kubeClientSet kubernetes.Interface + debugLog func(string, ...interface{}) + restClientGetter *restClientGetter +} + +func (acg *actionConfigGetter) ActionConfigFor(obj client.Object) (*action.Configuration, error) { + ownerRef := metav1.NewControllerRef(obj, obj.GetObjectKind().GroupVersionKind()) + d := driver.NewSecrets(&ownerRefSecretClient{ + SecretInterface: acg.kubeClientSet.CoreV1().Secrets(obj.GetNamespace()), + refs: []metav1.OwnerReference{*ownerRef}, + }) + + // Also, use the debug log for the storage driver + d.Log = acg.debugLog + + // Initialize the storage backend + s := storage.Init(d) + + kubeClient := *acg.kubeClient + kubeClient.Namespace = obj.GetNamespace() + + ownerRefClient, err := NewOwnerRefInjectingClient(&kubeClient, acg.restClientGetter.restMapper, obj) + if err != nil { + return nil, fmt.Errorf("could not create owner reference injecting client: %w", err) + } + + return &action.Configuration{ + RESTClientGetter: acg.restClientGetter.ForNamespace(obj.GetNamespace()), + Releases: s, + KubeClient: ownerRefClient, + Log: acg.debugLog, + }, nil +} + +var _ v1.SecretInterface = &ownerRefSecretClient{} + +type ownerRefSecretClient struct { + v1.SecretInterface + refs []metav1.OwnerReference +} + +func (c *ownerRefSecretClient) Create(ctx context.Context, in *corev1.Secret, opts metav1.CreateOptions) (*corev1.Secret, error) { + in.OwnerReferences = append(in.OwnerReferences, c.refs...) + return c.SecretInterface.Create(ctx, in, opts) +} + +func (c *ownerRefSecretClient) Update(ctx context.Context, in *corev1.Secret, opts metav1.UpdateOptions) (*corev1.Secret, error) { + in.OwnerReferences = append(in.OwnerReferences, c.refs...) + return c.SecretInterface.Update(ctx, in, opts) +} diff --git a/internal/helm/client/client.go b/internal/helm/client/client.go index 5f9af39ed12..62e18f541a9 100644 --- a/internal/helm/client/client.go +++ b/internal/helm/client/client.go @@ -26,106 +26,38 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/cli-runtime/pkg/genericclioptions" "k8s.io/cli-runtime/pkg/resource" - "k8s.io/client-go/discovery" - cached "k8s.io/client-go/discovery/cached" - "k8s.io/client-go/rest" - "k8s.io/client-go/tools/clientcmd" - clientcmdapi "k8s.io/client-go/tools/clientcmd/api" - "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/client" ) -var _ genericclioptions.RESTClientGetter = &restClientGetter{} - -type restClientGetter struct { - restConfig *rest.Config - discoveryClient discovery.CachedDiscoveryInterface - restMapper meta.RESTMapper - namespaceConfig clientcmd.ClientConfig -} - -func (c *restClientGetter) ToRESTConfig() (*rest.Config, error) { - return c.restConfig, nil -} - -func (c *restClientGetter) ToDiscoveryClient() (discovery.CachedDiscoveryInterface, error) { - return c.discoveryClient, nil -} - -func (c *restClientGetter) ToRESTMapper() (meta.RESTMapper, error) { - return c.restMapper, nil -} - -func (c *restClientGetter) ToRawKubeConfigLoader() clientcmd.ClientConfig { - return c.namespaceConfig -} - -var _ clientcmd.ClientConfig = &namespaceClientConfig{} - -type namespaceClientConfig struct { - namespace string -} - -func (c namespaceClientConfig) RawConfig() (clientcmdapi.Config, error) { - return clientcmdapi.Config{}, nil -} - -func (c namespaceClientConfig) ClientConfig() (*rest.Config, error) { - return nil, nil -} - -func (c namespaceClientConfig) Namespace() (string, bool, error) { - return c.namespace, false, nil -} - -func (c namespaceClientConfig) ConfigAccess() clientcmd.ConfigAccess { - return nil -} - -func NewRESTClientGetter(mgr manager.Manager, ns string) (genericclioptions.RESTClientGetter, error) { - cfg := mgr.GetConfig() - dc, err := discovery.NewDiscoveryClientForConfig(cfg) - if err != nil { - return nil, err - } - cdc := cached.NewMemCacheClient(dc) - rm := mgr.GetRESTMapper() - - return &restClientGetter{ - restConfig: cfg, - discoveryClient: cdc, - restMapper: rm, - namespaceConfig: &namespaceClientConfig{ns}, - }, nil -} - var _ kube.Interface = &ownerRefInjectingClient{} -func NewOwnerRefInjectingClient(base kube.Client, restMapper meta.RESTMapper, - cr *unstructured.Unstructured) (kube.Interface, error) { +func NewOwnerRefInjectingClient(base kube.Interface, restMapper meta.RESTMapper, + obj client.Object) (kube.Interface, error) { - if cr != nil { - if cr.GetObjectKind().GroupVersionKind().Empty() || cr.GetName() == "" || cr.GetUID() == "" { - var err = errors.New("owner resource is invalid") - return nil, err + if obj != nil { + if obj.GetObjectKind() != nil { + if obj.GetObjectKind().GroupVersionKind().Empty() || obj.GetName() == "" || obj.GetUID() == "" { + var err = errors.New("owner resource is invalid") + return nil, err + } } } return &ownerRefInjectingClient{ - Client: base, + Interface: base, restMapper: restMapper, - owner: cr, + owner: obj, }, nil } type ownerRefInjectingClient struct { - kube.Client + kube.Interface restMapper meta.RESTMapper - owner *unstructured.Unstructured + owner client.Object } func (c *ownerRefInjectingClient) Build(reader io.Reader, validate bool) (kube.ResourceList, error) { - resourceList, err := c.Client.Build(reader, validate) + resourceList, err := c.Interface.Build(reader, validate) if err != nil { return resourceList, err } @@ -146,7 +78,7 @@ func (c *ownerRefInjectingClient) Build(reader io.Reader, validate bool) (kube.R // If the resource contains the Helm resource-policy keep annotation, then do not add // the owner reference. So when the CR is deleted, Kubernetes won't GCs the resource. if useOwnerRef && !containsResourcePolicyKeep(u.GetAnnotations()) { - ownerRef := metav1.NewControllerRef(c.owner, c.owner.GroupVersionKind()) + ownerRef := metav1.NewControllerRef(c.owner, c.owner.GetObjectKind().GroupVersionKind()) u.SetOwnerReferences([]metav1.OwnerReference{*ownerRef}) } else { err := handler.SetOwnerAnnotations(u, c.owner) diff --git a/internal/helm/client/restclientgetter.go b/internal/helm/client/restclientgetter.go new file mode 100644 index 00000000000..818c2f8c448 --- /dev/null +++ b/internal/helm/client/restclientgetter.go @@ -0,0 +1,113 @@ +/* +Copyright 2020 The Operator-SDK Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package client + +import ( + "sync" + + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/cli-runtime/pkg/genericclioptions" + "k8s.io/client-go/discovery" + cached "k8s.io/client-go/discovery/cached" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" + clientcmdapi "k8s.io/client-go/tools/clientcmd/api" +) + +func newRESTClientGetter(cfg *rest.Config, rm meta.RESTMapper, ns string) *namespacedRCG { + return &namespacedRCG{ + restClientGetter: &restClientGetter{ + restConfig: cfg, + restMapper: rm, + }, + namespaceConfig: namespaceClientConfig{ns}, + } +} + +type restClientGetter struct { + restConfig *rest.Config + restMapper meta.RESTMapper + + setupDiscoveryClient sync.Once + cachedDiscoveryClient discovery.CachedDiscoveryInterface +} + +func (c *restClientGetter) ToRESTConfig() (*rest.Config, error) { + return rest.CopyConfig(c.restConfig), nil +} + +func (c *restClientGetter) ToDiscoveryClient() (discovery.CachedDiscoveryInterface, error) { + var ( + dc discovery.DiscoveryInterface + err error + ) + c.setupDiscoveryClient.Do(func() { + dc, err = discovery.NewDiscoveryClientForConfig(c.restConfig) + if err != nil { + return + } + c.cachedDiscoveryClient = cached.NewMemCacheClient(dc) + }) + if err != nil { + return nil, err + } + return c.cachedDiscoveryClient, nil +} + +func (c *restClientGetter) ToRESTMapper() (meta.RESTMapper, error) { + return c.restMapper, nil +} + +func (c *restClientGetter) ForNamespace(ns string) genericclioptions.RESTClientGetter { + return &namespacedRCG{ + restClientGetter: c, + namespaceConfig: namespaceClientConfig{namespace: ns}, + } +} + +var _ genericclioptions.RESTClientGetter = &namespacedRCG{} + +type namespacedRCG struct { + *restClientGetter + namespaceConfig namespaceClientConfig +} + +func (c *namespacedRCG) ToRawKubeConfigLoader() clientcmd.ClientConfig { + return c.namespaceConfig +} + +var _ clientcmd.ClientConfig = &namespaceClientConfig{} + +type namespaceClientConfig struct { + namespace string +} + +func (c namespaceClientConfig) RawConfig() (clientcmdapi.Config, error) { + return clientcmdapi.Config{}, nil +} + +func (c namespaceClientConfig) ClientConfig() (*rest.Config, error) { + return nil, nil +} + +func (c namespaceClientConfig) Namespace() (string, bool, error) { + return c.namespace, false, nil +} + +func (c namespaceClientConfig) ConfigAccess() clientcmd.ConfigAccess { + return nil +} diff --git a/internal/helm/release/manager_factory.go b/internal/helm/release/manager_factory.go index 406c5dbf47d..cb1857fd0b7 100644 --- a/internal/helm/release/manager_factory.go +++ b/internal/helm/release/manager_factory.go @@ -17,15 +17,11 @@ package release import ( "fmt" - "helm.sh/helm/v3/pkg/action" "helm.sh/helm/v3/pkg/chart/loader" - "helm.sh/helm/v3/pkg/kube" helmrelease "helm.sh/helm/v3/pkg/release" "helm.sh/helm/v3/pkg/storage" - "helm.sh/helm/v3/pkg/storage/driver" "helm.sh/helm/v3/pkg/strvals" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - v1 "k8s.io/client-go/kubernetes/typed/core/v1" crmanager "sigs.k8s.io/controller-runtime/pkg/manager" "github.com/operator-framework/operator-sdk/internal/helm/client" @@ -42,34 +38,19 @@ type ManagerFactory interface { type managerFactory struct { mgr crmanager.Manager + acg client.ActionConfigGetter chartDir string } // NewManagerFactory returns a new Helm manager factory capable of installing and uninstalling releases. -func NewManagerFactory(mgr crmanager.Manager, chartDir string) ManagerFactory { - return &managerFactory{mgr, chartDir} +func NewManagerFactory(mgr crmanager.Manager, acg client.ActionConfigGetter, chartDir string) ManagerFactory { + return &managerFactory{mgr, acg, chartDir} } func (f managerFactory) NewManager(cr *unstructured.Unstructured, overrideValues map[string]string) (Manager, error) { - // Get both v2 and v3 storage backends - clientv1, err := v1.NewForConfig(f.mgr.GetConfig()) + actionConfig, err := f.acg.ActionConfigFor(cr) if err != nil { - return nil, fmt.Errorf("failed to get core/v1 client: %w", err) - } - storageBackend := storage.Init(driver.NewSecrets(clientv1.Secrets(cr.GetNamespace()))) - - // Get the necessary clients and client getters. Use a client that injects the CR - // as an owner reference into all resources templated by the chart. - rcg, err := client.NewRESTClientGetter(f.mgr, cr.GetNamespace()) - if err != nil { - return nil, fmt.Errorf("failed to get REST client getter from manager: %w", err) - } - - kubeClient := kube.New(rcg) - restMapper := f.mgr.GetRESTMapper() - ownerRefClient, err := client.NewOwnerRefInjectingClient(*kubeClient, restMapper, cr) - if err != nil { - return nil, fmt.Errorf("failed to inject owner references: %w", err) + return nil, fmt.Errorf("failed to get helm action config: %w", err) } crChart, err := loader.LoadDir(f.chartDir) @@ -77,7 +58,7 @@ func (f managerFactory) NewManager(cr *unstructured.Unstructured, overrideValues return nil, fmt.Errorf("failed to load chart dir: %w", err) } - releaseName, err := getReleaseName(storageBackend, crChart.Name(), cr) + releaseName, err := getReleaseName(actionConfig.Releases, crChart.Name(), cr) if err != nil { return nil, fmt.Errorf("failed to get helm release name: %w", err) } @@ -93,17 +74,10 @@ func (f managerFactory) NewManager(cr *unstructured.Unstructured, overrideValues } values := mergeMaps(crValues, expOverrides) - actionConfig := &action.Configuration{ - RESTClientGetter: rcg, - Releases: storageBackend, - KubeClient: ownerRefClient, - Log: func(_ string, _ ...interface{}) {}, - } - return &manager{ actionConfig: actionConfig, - storageBackend: storageBackend, - kubeClient: ownerRefClient, + storageBackend: actionConfig.Releases, + kubeClient: actionConfig.KubeClient, releaseName: releaseName, namespace: cr.GetNamespace(), @@ -126,10 +100,11 @@ func (f managerFactory) NewManager(cr *unstructured.Unstructured, overrideValues // in the same namespace. // // TODO(jlanford): As noted above, using the CR name as the release name raises -// the possibility of collision. We should move this logic to a validating -// admission webhook so that the CR owner receives immediate feedback of the -// collision. As is, the only indication of collision will be in the CR status -// and operator logs. +// +// the possibility of collision. We should move this logic to a validating +// admission webhook so that the CR owner receives immediate feedback of the +// collision. As is, the only indication of collision will be in the CR status +// and operator logs. func getReleaseName(storageBackend *storage.Storage, crChartName string, cr *unstructured.Unstructured) (string, error) { // If a release with the CR name does not exist, return the CR name.