Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix memory leak due to misuse of K8s clients #198

Merged
merged 1 commit into from
Sep 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ require (
k8s.io/apimachinery v0.24.2
k8s.io/cli-runtime v0.24.2
k8s.io/client-go v0.24.2
k8s.io/kubectl v0.24.2
k8s.io/utils v0.0.0-20220210201930-3a6ce19ff2f9
sigs.k8s.io/controller-runtime v0.12.2
sigs.k8s.io/kubebuilder/v3 v3.6.0
Expand Down Expand Up @@ -178,6 +177,7 @@ require (
k8s.io/component-base v0.24.2 // indirect
k8s.io/klog/v2 v2.60.1 // indirect
k8s.io/kube-openapi v0.0.0-20220627174259-011e075b9cb8 // indirect
k8s.io/kubectl v0.24.2 // indirect
oras.land/oras-go v1.2.0 // indirect
sigs.k8s.io/controller-tools v0.9.2 // indirect
sigs.k8s.io/json v0.0.0-20211208200746-9f7c6b3444d2 // indirect
Expand Down
11 changes: 7 additions & 4 deletions pkg/client/actionclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ var _ = Describe("ActionClient", func() {
})
var _ = Describe("NewActionClientGetter", func() {
It("should return a valid ActionConfigGetter", func() {
actionConfigGetter := NewActionConfigGetter(cfg, rm, logr.Discard())
actionConfigGetter, err := NewActionConfigGetter(cfg, rm, logr.Discard())
Expect(err).ShouldNot(HaveOccurred())
Expect(NewActionClientGetter(actionConfigGetter)).NotTo(BeNil())
})
})
Expand All @@ -85,7 +86,9 @@ var _ = Describe("ActionClient", func() {
obj = testutil.BuildTestCR(gvk)
})
It("should return a valid ActionClient", func() {
acg := NewActionClientGetter(NewActionConfigGetter(cfg, rm, logr.Discard()))
actionConfGetter, err := NewActionConfigGetter(cfg, rm, logr.Discard())
Expect(err).ShouldNot(HaveOccurred())
acg := NewActionClientGetter(actionConfGetter)
ac, err := acg.ActionClientFor(obj)
Expect(err).To(BeNil())
Expect(ac).NotTo(BeNil())
Expand All @@ -102,8 +105,8 @@ var _ = Describe("ActionClient", func() {
BeforeEach(func() {
obj = testutil.BuildTestCR(gvk)

var err error
actionConfigGetter := NewActionConfigGetter(cfg, rm, logr.Discard())
actionConfigGetter, err := NewActionConfigGetter(cfg, rm, logr.Discard())
Expect(err).ShouldNot(HaveOccurred())
acg := NewActionClientGetter(actionConfigGetter)
ac, err = acg.ActionClientFor(obj)
Expect(err).To(BeNil())
Expand Down
70 changes: 34 additions & 36 deletions pkg/client/actionconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"context"
"fmt"

"k8s.io/client-go/kubernetes"

"github.com/go-logr/logr"
"helm.sh/helm/v3/pkg/action"
"helm.sh/helm/v3/pkg/kube"
Expand All @@ -30,72 +32,68 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/rest"
cmdutil "k8s.io/kubectl/pkg/cmd/util"
"sigs.k8s.io/controller-runtime/pkg/client"
)

type ActionConfigGetter interface {
ActionConfigFor(obj client.Object) (*action.Configuration, error)
}

func NewActionConfigGetter(cfg *rest.Config, rm meta.RESTMapper, log logr.Logger) ActionConfigGetter {
return &actionConfigGetter{
cfg: cfg,
restMapper: rm,
log: log,
}
}

var _ ActionConfigGetter = &actionConfigGetter{}

type actionConfigGetter struct {
cfg *rest.Config
restMapper meta.RESTMapper
log logr.Logger
}

func (acg *actionConfigGetter) ActionConfigFor(obj client.Object) (*action.Configuration, error) {
// Create a RESTClientGetter
rcg := newRESTClientGetter(acg.cfg, acg.restMapper, obj.GetNamespace())

func NewActionConfigGetter(cfg *rest.Config, rm meta.RESTMapper, log logr.Logger) (ActionConfigGetter, error) {
rcg := newRESTClientGetter(cfg, rm, "")
// Setup the debug log function that Helm will use
debugLog := func(format string, v ...interface{}) {
if acg.log.GetSink() != nil {
acg.log.V(1).Info(fmt.Sprintf(format, v...))
if log.GetSink() != nil {
log.V(1).Info(fmt.Sprintf(format, v...))
}
}

// Create a client that helm will use to manage release resources.
// The passed object is used as an owner reference on every
// object the client creates.
kc := kube.New(rcg)
kc.Log = debugLog

// Create the Kubernetes Secrets client. The passed object is
// also used as an owner reference in the release secrets
// created by this client.
kcs, err := cmdutil.NewFactory(rcg).KubernetesClientSet()
kcs, err := kc.Factory.KubernetesClientSet()
if err != nil {
return nil, err
return nil, fmt.Errorf("creating kubernetes client set: %w", err)
}

return &actionConfigGetter{
kubeClient: kc,
kubeClientSet: kcs,
debugLog: debugLog,
restClientGetter: rcg.restClientGetter,
}, nil
}

var _ ActionConfigGetter = &actionConfigGetter{}

type actionConfigGetter struct {
kubeClient *kube.Client
kubeClientSet kubernetes.Interface
debugLog func(string, ...interface{})
restClientGetter *restClientGetter
}

func (acg *actionConfigGetter) ActionConfigFor(obj client.Object) (*action.Configuration, error) {
ownerRef := metav1.NewControllerRef(obj, obj.GetObjectKind().GroupVersionKind())
d := driver.NewSecrets(&ownerRefSecretClient{
SecretInterface: kcs.CoreV1().Secrets(obj.GetNamespace()),
SecretInterface: acg.kubeClientSet.CoreV1().Secrets(obj.GetNamespace()),
refs: []metav1.OwnerReference{*ownerRef},
})

// Also, use the debug log for the storage driver
d.Log = debugLog
d.Log = acg.debugLog

// Initialize the storage backend
s := storage.Init(d)

kubeClient := *acg.kubeClient
kubeClient.Namespace = obj.GetNamespace()

return &action.Configuration{
RESTClientGetter: rcg,
RESTClientGetter: acg.restClientGetter.ForNamespace(obj.GetNamespace()),
Releases: s,
KubeClient: kc,
Log: debugLog,
KubeClient: &kubeClient,
Log: acg.debugLog,
}, nil
}

Expand Down
7 changes: 5 additions & 2 deletions pkg/client/actionconfig_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ import (
var _ = Describe("ActionConfig", func() {
var _ = Describe("NewActionConfigGetter", func() {
It("should return a valid ActionConfigGetter", func() {
Expect(NewActionConfigGetter(nil, nil, logr.Discard())).NotTo(BeNil())
acg, err := NewActionConfigGetter(cfg, nil, logr.Discard())
Expect(err).ShouldNot(HaveOccurred())
Expect(acg).NotTo(BeNil())
})
})

Expand All @@ -42,7 +44,8 @@ var _ = Describe("ActionConfig", func() {
rm, err := apiutil.NewDiscoveryRESTMapper(cfg)
Expect(err).To(BeNil())

acg := NewActionConfigGetter(cfg, rm, logr.Discard())
acg, err := NewActionConfigGetter(cfg, rm, logr.Discard())
Expect(err).ShouldNot(HaveOccurred())
ac, err := acg.ActionConfigFor(obj)
Expect(err).To(BeNil())
Expect(ac).NotTo(BeNil())
Expand Down
35 changes: 24 additions & 11 deletions pkg/client/restclientgetter.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,19 @@ import (
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
)

var _ genericclioptions.RESTClientGetter = &restClientGetter{}

func newRESTClientGetter(cfg *rest.Config, rm meta.RESTMapper, ns string) genericclioptions.RESTClientGetter {
return &restClientGetter{
restConfig: cfg,
restMapper: rm,
namespaceConfig: &namespaceClientConfig{ns},
func newRESTClientGetter(cfg *rest.Config, rm meta.RESTMapper, ns string) *namespacedRCG {
return &namespacedRCG{
restClientGetter: &restClientGetter{
restConfig: cfg,
restMapper: rm,
},
namespaceConfig: namespaceClientConfig{ns},
}
}

type restClientGetter struct {
restConfig *rest.Config
restMapper meta.RESTMapper
namespaceConfig clientcmd.ClientConfig
restConfig *rest.Config
restMapper meta.RESTMapper

setupDiscoveryClient sync.Once
cachedDiscoveryClient discovery.CachedDiscoveryInterface
Expand Down Expand Up @@ -73,7 +72,21 @@ func (c *restClientGetter) ToRESTMapper() (meta.RESTMapper, error) {
return c.restMapper, nil
}

func (c *restClientGetter) ToRawKubeConfigLoader() clientcmd.ClientConfig {
func (c *restClientGetter) ForNamespace(ns string) genericclioptions.RESTClientGetter {
return &namespacedRCG{
restClientGetter: c,
namespaceConfig: namespaceClientConfig{namespace: ns},
}
}

var _ genericclioptions.RESTClientGetter = &namespacedRCG{}

type namespacedRCG struct {
*restClientGetter
namespaceConfig namespaceClientConfig
}

func (c *namespacedRCG) ToRawKubeConfigLoader() clientcmd.ClientConfig {
return c.namespaceConfig
}

Expand Down
73 changes: 40 additions & 33 deletions pkg/reconciler/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,10 @@ func (r *Reconciler) setupAnnotationMaps() {
func (r *Reconciler) SetupWithManager(mgr ctrl.Manager) error {
controllerName := fmt.Sprintf("%v-controller", strings.ToLower(r.gvk.Kind))

r.addDefaults(mgr, controllerName)
if err := r.addDefaults(mgr, controllerName); err != nil {
return err
}

if !r.skipPrimaryGVKSchemeRegistration {
r.setupScheme(mgr)
}
Expand Down Expand Up @@ -270,33 +273,33 @@ func SkipDependentWatches(skip bool) Option {
//
// Example for using a custom type for the GVK scheme instead of unstructured.Unstructured:
//
// // Define custom type for GVK scheme.
// //+kubebuilder:object:root=true
// type Custom struct {
// // [...]
// }
// // Define custom type for GVK scheme.
// //+kubebuilder:object:root=true
// type Custom struct {
// // [...]
// }
//
// // Register custom type along with common meta types in scheme.
// scheme := runtime.NewScheme()
// scheme.AddKnownTypes(SchemeGroupVersion, &Custom{})
// metav1.AddToGroupVersion(scheme, SchemeGroupVersion)
// // Register custom type along with common meta types in scheme.
// scheme := runtime.NewScheme()
// scheme.AddKnownTypes(SchemeGroupVersion, &Custom{})
// metav1.AddToGroupVersion(scheme, SchemeGroupVersion)
//
// // Create new manager using the controller-runtime, injecting above scheme.
// options := ctrl.Options{
// Scheme = scheme,
// // [...]
// }
// mgr, err := ctrl.NewManager(config, options)
// // Create new manager using the controller-runtime, injecting above scheme.
// options := ctrl.Options{
// Scheme = scheme,
// // [...]
// }
// mgr, err := ctrl.NewManager(config, options)
//
// // Create reconciler with generic scheme registration being disabled.
// r, err := reconciler.New(
// reconciler.WithChart(chart),
// reconciler.SkipPrimaryGVKSchemeRegistration(true),
// // [...]
// )
// // Create reconciler with generic scheme registration being disabled.
// r, err := reconciler.New(
// reconciler.WithChart(chart),
// reconciler.SkipPrimaryGVKSchemeRegistration(true),
// // [...]
// )
//
// // Setup reconciler with above manager.
// err = r.SetupWithManager(mgr)
// // Setup reconciler with above manager.
// err = r.SetupWithManager(mgr)
//
// By default, skipping of the generic scheme setup is disabled, which means that
// unstructured.Unstructured is used for the GVK scheme.
Expand Down Expand Up @@ -435,16 +438,16 @@ func WithPostHook(h hook.PostHook) Option {
// If you wish to, you can convert the Unstructured that is passed to your Translator to your own
// Custom Resource struct like this:
//
// import "k8s.io/apimachinery/pkg/runtime"
// foo := your.Foo{}
// if err = runtime.DefaultUnstructuredConverter.FromUnstructured(u.Object, &foo); err != nil {
// return nil, err
// }
// // work with the type-safe foo
// import "k8s.io/apimachinery/pkg/runtime"
// foo := your.Foo{}
// if err = runtime.DefaultUnstructuredConverter.FromUnstructured(u.Object, &foo); err != nil {
// return nil, err
// }
// // work with the type-safe foo
//
// Alternatively, your translator can also work similarly to a Mapper, by accessing the spec with:
//
// u.Object["spec"].(map[string]interface{})
// u.Object["spec"].(map[string]interface{})
func WithValueTranslator(t values.Translator) Option {
return func(r *Reconciler) error {
r.valueTranslator = t
Expand Down Expand Up @@ -866,15 +869,18 @@ func (r *Reconciler) validate() error {
return nil
}

func (r *Reconciler) addDefaults(mgr ctrl.Manager, controllerName string) {
func (r *Reconciler) addDefaults(mgr ctrl.Manager, controllerName string) error {
if r.client == nil {
r.client = mgr.GetClient()
}
if r.log.GetSink() == nil {
r.log = ctrl.Log.WithName("controllers").WithName("Helm")
}
if r.actionClientGetter == nil {
actionConfigGetter := helmclient.NewActionConfigGetter(mgr.GetConfig(), mgr.GetRESTMapper(), r.log)
actionConfigGetter, err := helmclient.NewActionConfigGetter(mgr.GetConfig(), mgr.GetRESTMapper(), r.log)
if err != nil {
return fmt.Errorf("creating action config getter: %w", err)
}
r.actionClientGetter = helmclient.NewActionClientGetter(actionConfigGetter)
}
if r.eventRecorder == nil {
Expand All @@ -886,6 +892,7 @@ func (r *Reconciler) addDefaults(mgr ctrl.Manager, controllerName string) {
if r.valueMapper == nil {
r.valueMapper = internalvalues.DefaultMapper
}
return nil
}

func (r *Reconciler) setupScheme(mgr ctrl.Manager) {
Expand Down
11 changes: 5 additions & 6 deletions pkg/reconciler/reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,9 @@ var _ = Describe("Reconciler", func() {
})
var _ = Describe("WithActionClientGetter", func() {
It("should set the reconciler action client getter", func() {
cfgGetter := helmclient.NewActionConfigGetter(nil, nil, logr.Discard())
acg := helmclient.NewActionClientGetter(cfgGetter)
Expect(WithActionClientGetter(acg)(r)).To(Succeed())
Expect(r.actionClientGetter).To(Equal(acg))
fakeActionClientGetter := helmfake.NewActionClientGetter(nil, nil)
Expect(WithActionClientGetter(fakeActionClientGetter)(r)).To(Succeed())
Expect(r.actionClientGetter).To(Equal(fakeActionClientGetter))
})
})
var _ = Describe("WithEventRecorder", func() {
Expand Down Expand Up @@ -974,8 +973,8 @@ var _ = Describe("Reconciler", func() {
var actionConf *action.Configuration
BeforeEach(func() {
By("getting the current release and config", func() {
var err error
acg := helmclient.NewActionConfigGetter(mgr.GetConfig(), mgr.GetRESTMapper(), logr.Discard())
acg, err := helmclient.NewActionConfigGetter(mgr.GetConfig(), mgr.GetRESTMapper(), logr.Discard())
Expect(err).ShouldNot(HaveOccurred())
actionConf, err = acg.ActionConfigFor(obj)
Expect(err).To(BeNil())
})
Expand Down