Skip to content

Commit

Permalink
helm-operator: reduce cache memory footprint (#6377)
Browse files Browse the repository at this point in the history
  • Loading branch information
joelanford committed Oct 4, 2023
1 parent d21ed64 commit 16da916
Show file tree
Hide file tree
Showing 8 changed files with 149 additions and 106 deletions.
9 changes: 9 additions & 0 deletions changelog/fragments/helm-operator-cache-selectors.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
entries:
- description: >
(helm): Use informer cache label selectors to reduce memory consumption.
kind: bugfix
breaking: false
- description: >
(helm): Fix bug with detection of owner reference support when setting up dynamic watches
kind: bugfix
breaking: false
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ require (
k8s.io/client-go v0.26.2
k8s.io/kubectl v0.26.2
k8s.io/utils v0.0.0-20230711102312-30195339c3c7
sigs.k8s.io/controller-runtime v0.14.5
sigs.k8s.io/controller-runtime v0.14.6
sigs.k8s.io/controller-tools v0.11.3
sigs.k8s.io/kubebuilder/v3 v3.9.1
sigs.k8s.io/yaml v1.3.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1658,8 +1658,8 @@ rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.35 h1:+xBL5uTc+BkPBwmMi3vYfUJjq+N3K+H6PXeETwf5cPI=
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.35/go.mod h1:WxjusMwXlKzfAs4p9km6XJRndVt2FROgMVCE4cdohFo=
sigs.k8s.io/controller-runtime v0.14.5 h1:6xaWFqzT5KuAQ9ufgUaj1G/+C4Y1GRkhrxl+BJ9i+5s=
sigs.k8s.io/controller-runtime v0.14.5/go.mod h1:WqIdsAY6JBsjfc/CqO0CORmNtoCtE4S6qbPc9s68h+0=
sigs.k8s.io/controller-runtime v0.14.6 h1:oxstGVvXGNnMvY7TAESYk+lzr6S3V5VFxQ6d92KcwQA=
sigs.k8s.io/controller-runtime v0.14.6/go.mod h1:WqIdsAY6JBsjfc/CqO0CORmNtoCtE4S6qbPc9s68h+0=
sigs.k8s.io/controller-tools v0.11.3 h1:T1xzLkog9saiyQSLz1XOImu4OcbdXWytc5cmYsBeBiE=
sigs.k8s.io/controller-tools v0.11.3/go.mod h1:qcfX7jfcfYD/b7lAhvqAyTbt/px4GpvN88WKLFFv7p8=
sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2 h1:iXTIw73aPyC+oRdyqqvVJuloN1p0AC/kzH07hu3NE+k=
Expand Down
113 changes: 86 additions & 27 deletions internal/cmd/helm-operator/run/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/spf13/cobra"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
apimachruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/rest"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
Expand All @@ -45,6 +46,10 @@ import (
"github.com/operator-framework/operator-sdk/internal/helm/watches"
"github.com/operator-framework/operator-sdk/internal/util/k8sutil"
sdkVersion "github.com/operator-framework/operator-sdk/internal/version"
"helm.sh/helm/v3/pkg/chart/loader"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/selection"
)

var log = logf.Log.WithName("cmd")
Expand Down Expand Up @@ -136,6 +141,23 @@ func run(cmd *cobra.Command, f *flags.Flags) {
// Set default manager options
options = f.ToManagerOptions(options)

if options.Scheme == nil {
options.Scheme = apimachruntime.NewScheme()
}

ws, err := watches.Load(f.WatchesFile)
if err != nil {
log.Error(err, "Failed to load watches file.")
os.Exit(1)
}

watchNamespaces := getWatchNamespaces(options.Namespace)
options.NewCache, err = buildNewCacheFunc(watchNamespaces, ws, options.Scheme)
if err != nil {
log.Error(err, "Failed to create NewCache function for manager.")
os.Exit(1)
}

if options.NewClient == nil {
options.NewClient = func(cache cache.Cache, config *rest.Config, options client.Options, uncachedObjects ...client.Object) (client.Client, error) {
// Create the Client for Write operations.
Expand All @@ -152,27 +174,6 @@ func run(cmd *cobra.Command, f *flags.Flags) {
})
}
}
namespace, found := os.LookupEnv(k8sutil.WatchNamespaceEnvVar)
log = log.WithValues("Namespace", namespace)
if found {
log.V(1).Info(fmt.Sprintf("Setting namespace with value in %s", k8sutil.WatchNamespaceEnvVar))
if namespace == metav1.NamespaceAll {
log.Info("Watching all namespaces.")
options.Namespace = metav1.NamespaceAll
} else {
if strings.Contains(namespace, ",") {
log.Info("Watching multiple namespaces.")
options.NewCache = cache.MultiNamespacedCacheBuilder(strings.Split(namespace, ","))
} else {
log.Info("Watching single namespace.")
options.Namespace = namespace
}
}
} else if options.Namespace == "" {
log.Info(fmt.Sprintf("Watch namespaces not configured by environment variable %s or file. "+
"Watching all namespaces.", k8sutil.WatchNamespaceEnvVar))
options.Namespace = metav1.NamespaceAll
}

mgr, err := manager.New(cfg, options)
if err != nil {
Expand All @@ -189,11 +190,6 @@ func run(cmd *cobra.Command, f *flags.Flags) {
os.Exit(1)
}

ws, err := watches.Load(f.WatchesFile)
if err != nil {
log.Error(err, "Failed to create new manager factories.")
os.Exit(1)
}
acg, err := helmClient.NewActionConfigGetter(mgr.GetConfig(), mgr.GetRESTMapper(), mgr.GetLogger())
if err != nil {
log.Error(err, "Failed to create Helm action config getter")
Expand All @@ -207,7 +203,6 @@ func run(cmd *cobra.Command, f *flags.Flags) {
}

err := controller.Add(mgr, controller.WatchOptions{
Namespace: namespace,
GVK: w.GroupVersionKind,
ManagerFactory: release.NewManagerFactory(mgr, acg, w.ChartDir),
ReconcilePeriod: reconcilePeriod,
Expand Down Expand Up @@ -250,3 +245,67 @@ func exitIfUnsupported(options manager.Options) {
os.Exit(1)
}
}

func getWatchNamespaces(defaultNamespace string) []string {
namespace, found := os.LookupEnv(k8sutil.WatchNamespaceEnvVar)
log = log.WithValues("Namespace", namespace)
if found {
log.V(1).Info(fmt.Sprintf("Setting namespace with value in %s", k8sutil.WatchNamespaceEnvVar))
if namespace == metav1.NamespaceAll {
log.Info("Watching all namespaces.")
return []string{metav1.NamespaceAll}
}
if strings.Contains(namespace, ",") {
log.Info("Watching multiple namespaces.")
return strings.Split(namespace, ",")
}
log.Info("Watching single namespace.")
return []string{namespace}
}
if defaultNamespace == "" {
log.Info(fmt.Sprintf("Watch namespaces not configured by environment variable %s or file. "+
"Watching all namespaces.", k8sutil.WatchNamespaceEnvVar))
return []string{metav1.NamespaceAll}
}
return []string{defaultNamespace}
}

func buildNewCacheFunc(watchNamespaces []string, ws []watches.Watch, sch *apimachruntime.Scheme) (cache.NewCacheFunc, error) {
selectorsByObject := cache.SelectorsByObject{}
chartNames := make([]string, 0, len(ws))
for _, w := range ws {
sch.AddKnownTypeWithName(w.GroupVersionKind, &unstructured.Unstructured{})

crObj := &unstructured.Unstructured{}
crObj.SetGroupVersionKind(w.GroupVersionKind)
sel, err := metav1.LabelSelectorAsSelector(&w.Selector)
if err != nil {
return nil, fmt.Errorf("unable to parse watch selector for %s: %v", w.GroupVersionKind, err)
}
selectorsByObject[crObj] = cache.ObjectSelector{Label: sel}

chrt, err := loader.LoadDir(w.ChartDir)
if err != nil {
return nil, fmt.Errorf("unable to load chart for %s: %v", w.GroupVersionKind, err)
}
chartNames = append(chartNames, chrt.Name())

}
req, err := labels.NewRequirement("helm.sdk.operatorframework.io/chart", selection.In, chartNames)
if err != nil {
return nil, fmt.Errorf("unable to create label requirement for cache default selector: %v", err)
}
defaultSelector := labels.NewSelector().Add(*req)

return func(config *rest.Config, opts cache.Options) (cache.Cache, error) {
opts.SelectorsByObject = selectorsByObject
opts.DefaultSelector = cache.ObjectSelector{Label: defaultSelector}
if len(watchNamespaces) > 1 {
return cache.MultiNamespacedCacheBuilder(watchNamespaces)(config, opts)
}
if len(watchNamespaces) == 1 {
opts.Namespace = watchNamespaces[0]
}
return cache.New(config, opts)
}, nil
}
42 changes: 42 additions & 0 deletions internal/helm/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,3 +105,45 @@ func containsResourcePolicyKeep(annotations map[string]string) bool {
resourcePolicyType = strings.ToLower(strings.TrimSpace(resourcePolicyType))
return resourcePolicyType == kube.KeepPolicy
}

type labelInjectingClient struct {
kube.Interface
labels map[string]string
}

func NewLabelInjectingClient(base kube.Interface, labels map[string]string) kube.Interface {
return &labelInjectingClient{
Interface: base,
labels: labels,
}
}

func (c *labelInjectingClient) Build(reader io.Reader, validate bool) (kube.ResourceList, error) {
resourceList, err := c.Interface.Build(reader, validate)
if err != nil {
return resourceList, err
}
err = resourceList.Visit(func(r *resource.Info, err error) error {
if err != nil {
return err
}
objMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(r.Object)
if err != nil {
return err
}
u := &unstructured.Unstructured{Object: objMap}
labels := u.GetLabels()
if labels == nil {
labels = map[string]string{}
}
for k, v := range c.labels {
labels[k] = v
}
u.SetLabels(labels)
return nil
})
if err != nil {
return nil, err
}
return resourceList, nil
}
42 changes: 5 additions & 37 deletions internal/helm/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package controller

import (
"fmt"
"reflect"
"strings"
"sync"
"time"
Expand All @@ -31,7 +30,6 @@ import (
crthandler "sigs.k8s.io/controller-runtime/pkg/handler"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/manager"
ctrlpredicate "sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/source"
"sigs.k8s.io/yaml"

Expand All @@ -46,7 +44,6 @@ var log = logf.Log.WithName("helm.controller")
// WatchOptions contains the necessary values to create a new controller that
// manages helm releases in a particular namespace based on a GVK watch.
type WatchOptions struct {
Namespace string
GVK schema.GroupVersionKind
ManagerFactory release.ManagerFactory
ReconcilePeriod time.Duration
Expand All @@ -71,10 +68,6 @@ func Add(mgr manager.Manager, options WatchOptions) error {
SuppressOverrideValues: options.SuppressOverrideValues,
}

// Register the GVK with the schema
mgr.GetScheme().AddKnownTypeWithName(options.GVK, &unstructured.Unstructured{})
metav1.AddToGroupVersion(mgr.GetScheme(), options.GVK.GroupVersion())

c, err := controller.New(controllerName, mgr, controller.Options{
Reconciler: r,
MaxConcurrentReconciles: options.MaxConcurrentReconciles,
Expand All @@ -86,18 +79,7 @@ func Add(mgr manager.Manager, options WatchOptions) error {
o := &unstructured.Unstructured{}
o.SetGroupVersionKind(options.GVK)

var preds []ctrlpredicate.Predicate
p, err := parsePredicateSelector(options.Selector)

if err != nil {
return err
}

if p != nil {
preds = append(preds, p)
}

if err := c.Watch(&source.Kind{Type: o}, &libhandler.InstrumentedEnqueueRequestForObject{}, preds...); err != nil {
if err := c.Watch(&source.Kind{Type: o}, &libhandler.InstrumentedEnqueueRequestForObject{}); err != nil {
return err
}

Expand All @@ -106,33 +88,19 @@ func Add(mgr manager.Manager, options WatchOptions) error {
}

log.Info("Watching resource", "apiVersion", options.GVK.GroupVersion(), "kind",
options.GVK.Kind, "namespace", options.Namespace, "reconcilePeriod", options.ReconcilePeriod.String())
options.GVK.Kind, "reconcilePeriod", options.ReconcilePeriod.String())
return nil
}

// parsePredicateSelector parses the selector in the WatchOptions and creates a predicate
// that is used to filter resources based on the specified selector
func parsePredicateSelector(selector metav1.LabelSelector) (ctrlpredicate.Predicate, error) {
// If a selector has been specified in watches.yaml, add it to the watch's predicates.
if !reflect.ValueOf(selector).IsZero() {
p, err := ctrlpredicate.LabelSelectorPredicate(selector)
if err != nil {
return nil, fmt.Errorf("error constructing predicate from watches selector: %v", err)
}
return p, nil
}
return nil, nil
}

// watchDependentResources adds a release hook function to the HelmOperatorReconciler
// that adds watches for resources in released Helm charts.
func watchDependentResources(mgr manager.Manager, r *HelmOperatorReconciler, c controller.Controller) {
owner := &unstructured.Unstructured{}
owner.SetGroupVersionKind(r.GVK)

var m sync.RWMutex
watches := map[schema.GroupVersionKind]struct{}{}
releaseHook := func(release *rpb.Release) error {
owner := &unstructured.Unstructured{}
owner.SetGroupVersionKind(r.GVK)
owner.SetNamespace(release.Namespace)
resources := releaseutil.SplitManifests(release.Manifest)
for _, resource := range resources {
var u unstructured.Unstructured
Expand Down
39 changes: 0 additions & 39 deletions internal/helm/controller/controller_test.go

This file was deleted.

4 changes: 4 additions & 0 deletions internal/helm/release/manager_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ func (f managerFactory) NewManager(cr *unstructured.Unstructured, overrideValues
return nil, fmt.Errorf("failed to load chart dir: %w", err)
}

actionConfig.KubeClient = client.NewLabelInjectingClient(actionConfig.KubeClient, map[string]string{
"helm.sdk.operatorframework.io/chart": crChart.Name(),
})

releaseName, err := getReleaseName(actionConfig.Releases, crChart.Name(), cr)
if err != nil {
return nil, fmt.Errorf("failed to get helm release name: %w", err)
Expand Down

0 comments on commit 16da916

Please sign in to comment.