Skip to content

Commit

Permalink
helm-operator: reduce cache memory footprint
Browse files Browse the repository at this point in the history
Make use of label selectors used by informers for both the primary CR
and for chart manifest objects

Signed-off-by: Joe Lanford <joe.lanford@gmail.com>
  • Loading branch information
joelanford committed Oct 4, 2023
1 parent d21ed64 commit f6af8f7
Show file tree
Hide file tree
Showing 8 changed files with 166 additions and 106 deletions.
9 changes: 9 additions & 0 deletions changelog/fragments/helm-operator-cache-selectors.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
entries:
- description: >
(helm): Use informer cache label selectors to reduce memory consumption.
kind: bugfix
breaking: false
- description: >
(helm): Fix bug with detection of owner reference support when setting up dynamic watches
kind: bugfix
breaking: false
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ require (
k8s.io/client-go v0.26.2
k8s.io/kubectl v0.26.2
k8s.io/utils v0.0.0-20230711102312-30195339c3c7
sigs.k8s.io/controller-runtime v0.14.5
sigs.k8s.io/controller-runtime v0.14.6
sigs.k8s.io/controller-tools v0.11.3
sigs.k8s.io/kubebuilder/v3 v3.9.1
sigs.k8s.io/yaml v1.3.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1658,8 +1658,8 @@ rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.35 h1:+xBL5uTc+BkPBwmMi3vYfUJjq+N3K+H6PXeETwf5cPI=
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.35/go.mod h1:WxjusMwXlKzfAs4p9km6XJRndVt2FROgMVCE4cdohFo=
sigs.k8s.io/controller-runtime v0.14.5 h1:6xaWFqzT5KuAQ9ufgUaj1G/+C4Y1GRkhrxl+BJ9i+5s=
sigs.k8s.io/controller-runtime v0.14.5/go.mod h1:WqIdsAY6JBsjfc/CqO0CORmNtoCtE4S6qbPc9s68h+0=
sigs.k8s.io/controller-runtime v0.14.6 h1:oxstGVvXGNnMvY7TAESYk+lzr6S3V5VFxQ6d92KcwQA=
sigs.k8s.io/controller-runtime v0.14.6/go.mod h1:WqIdsAY6JBsjfc/CqO0CORmNtoCtE4S6qbPc9s68h+0=
sigs.k8s.io/controller-tools v0.11.3 h1:T1xzLkog9saiyQSLz1XOImu4OcbdXWytc5cmYsBeBiE=
sigs.k8s.io/controller-tools v0.11.3/go.mod h1:qcfX7jfcfYD/b7lAhvqAyTbt/px4GpvN88WKLFFv7p8=
sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2 h1:iXTIw73aPyC+oRdyqqvVJuloN1p0AC/kzH07hu3NE+k=
Expand Down
130 changes: 103 additions & 27 deletions internal/cmd/helm-operator/run/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/spf13/cobra"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
apimachruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/rest"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
Expand All @@ -45,6 +46,10 @@ import (
"github.com/operator-framework/operator-sdk/internal/helm/watches"
"github.com/operator-framework/operator-sdk/internal/util/k8sutil"
sdkVersion "github.com/operator-framework/operator-sdk/internal/version"
"helm.sh/helm/v3/pkg/chart/loader"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/selection"
)

var log = logf.Log.WithName("cmd")
Expand Down Expand Up @@ -136,6 +141,23 @@ func run(cmd *cobra.Command, f *flags.Flags) {
// Set default manager options
options = f.ToManagerOptions(options)

if options.Scheme == nil {
options.Scheme = apimachruntime.NewScheme()
}

ws, err := watches.Load(f.WatchesFile)
if err != nil {
log.Error(err, "Failed to create new manager factories.")
os.Exit(1)
}

watchNamespaces := getWatchNamespaces(options.Namespace)
options.NewCache, err = buildNewCacheFunc(watchNamespaces, ws, options.Scheme)
if err != nil {
log.Error(err, "Failed to create NewCache function for manager.")
os.Exit(1)
}

if options.NewClient == nil {
options.NewClient = func(cache cache.Cache, config *rest.Config, options client.Options, uncachedObjects ...client.Object) (client.Client, error) {
// Create the Client for Write operations.
Expand All @@ -152,27 +174,6 @@ func run(cmd *cobra.Command, f *flags.Flags) {
})
}
}
namespace, found := os.LookupEnv(k8sutil.WatchNamespaceEnvVar)
log = log.WithValues("Namespace", namespace)
if found {
log.V(1).Info(fmt.Sprintf("Setting namespace with value in %s", k8sutil.WatchNamespaceEnvVar))
if namespace == metav1.NamespaceAll {
log.Info("Watching all namespaces.")
options.Namespace = metav1.NamespaceAll
} else {
if strings.Contains(namespace, ",") {
log.Info("Watching multiple namespaces.")
options.NewCache = cache.MultiNamespacedCacheBuilder(strings.Split(namespace, ","))
} else {
log.Info("Watching single namespace.")
options.Namespace = namespace
}
}
} else if options.Namespace == "" {
log.Info(fmt.Sprintf("Watch namespaces not configured by environment variable %s or file. "+
"Watching all namespaces.", k8sutil.WatchNamespaceEnvVar))
options.Namespace = metav1.NamespaceAll
}

mgr, err := manager.New(cfg, options)
if err != nil {
Expand All @@ -189,11 +190,6 @@ func run(cmd *cobra.Command, f *flags.Flags) {
os.Exit(1)
}

ws, err := watches.Load(f.WatchesFile)
if err != nil {
log.Error(err, "Failed to create new manager factories.")
os.Exit(1)
}
acg, err := helmClient.NewActionConfigGetter(mgr.GetConfig(), mgr.GetRESTMapper(), mgr.GetLogger())
if err != nil {
log.Error(err, "Failed to create Helm action config getter")
Expand All @@ -207,7 +203,6 @@ func run(cmd *cobra.Command, f *flags.Flags) {
}

err := controller.Add(mgr, controller.WatchOptions{
Namespace: namespace,
GVK: w.GroupVersionKind,
ManagerFactory: release.NewManagerFactory(mgr, acg, w.ChartDir),
ReconcilePeriod: reconcilePeriod,
Expand Down Expand Up @@ -250,3 +245,84 @@ func exitIfUnsupported(options manager.Options) {
os.Exit(1)
}
}

func getWatchNamespaces(defaultNamespace string) []string {
namespace, found := os.LookupEnv(k8sutil.WatchNamespaceEnvVar)
log = log.WithValues("Namespace", namespace)
if found {
log.V(1).Info(fmt.Sprintf("Setting namespace with value in %s", k8sutil.WatchNamespaceEnvVar))
if namespace == metav1.NamespaceAll {
log.Info("Watching all namespaces.")
return []string{metav1.NamespaceAll}
}
if strings.Contains(namespace, ",") {
log.Info("Watching multiple namespaces.")
return strings.Split(namespace, ",")
}
log.Info("Watching single namespace.")
return []string{namespace}
}
if defaultNamespace == "" {
log.Info(fmt.Sprintf("Watch namespaces not configured by environment variable %s or file. "+
"Watching all namespaces.", k8sutil.WatchNamespaceEnvVar))
return []string{metav1.NamespaceAll}
}
return []string{defaultNamespace}
}

func buildNewCacheFunc(watchNamespaces []string, ws []watches.Watch, sch *apimachruntime.Scheme) (cache.NewCacheFunc, error) {

Check failure on line 273 in internal/cmd/helm-operator/run/cmd.go

View workflow job for this annotation

GitHub Actions / sanity

buildNewCacheFunc - result 1 (error) is always nil (unparam)
// setCacheOptions is an anonymous function that updates the cache selectors
// and namespace configuration of the cache options that are passed into it.
// we use this anonymous function in the NewCache function that we build and
// return from buildNewCacheFunc.
setCacheOptions := func(options *cache.Options) error {
selectorsByObject := cache.SelectorsByObject{}
chartNames := make([]string, 0, len(ws))
for _, w := range ws {
sch.AddKnownTypeWithName(w.GroupVersionKind, &unstructured.Unstructured{})

crObj := &unstructured.Unstructured{}
crObj.SetGroupVersionKind(w.GroupVersionKind)
sel, err := metav1.LabelSelectorAsSelector(&w.Selector)
if err != nil {
return fmt.Errorf("unable to parse watch selector for %s: %v", w.GroupVersionKind, err)
}
selectorsByObject[crObj] = cache.ObjectSelector{Label: sel}

chrt, err := loader.LoadDir(w.ChartDir)
if err != nil {
return fmt.Errorf("unable to load chart for %s: %v", w.GroupVersionKind, err)
}
chartNames = append(chartNames, chrt.Name())
}

req, err := labels.NewRequirement("helm.sdk.operatorframework.io/chart", selection.In, chartNames)
if err != nil {
return fmt.Errorf("unable to create label requirement for cache default selector: %v", err)
}
defaultSelector := labels.NewSelector().Add(*req)

if len(watchNamespaces) == 1 {
options.Namespace = watchNamespaces[0]
}
options.SelectorsByObject = selectorsByObject
options.DefaultSelector = cache.ObjectSelector{Label: defaultSelector}
return nil
}

// Define the NewCache function to use based on the number of namespaces. If there are multiple namespaces,
// use a MultiNamespacedCacheBuilder. Otherwise, use the standard New function.
newCache := cache.New
if len(watchNamespaces) > 1 {
newCache = cache.MultiNamespacedCacheBuilder(watchNamespaces)
}

// Return a custom NewCache function that uses our setCacheOptions function
// to set the cache options as the cache is being created.
return func(config *rest.Config, opts cache.Options) (cache.Cache, error) {
if err := setCacheOptions(&opts); err != nil {
return nil, err
}
return newCache(config, opts)
}, nil
}
42 changes: 42 additions & 0 deletions internal/helm/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,3 +105,45 @@ func containsResourcePolicyKeep(annotations map[string]string) bool {
resourcePolicyType = strings.ToLower(strings.TrimSpace(resourcePolicyType))
return resourcePolicyType == kube.KeepPolicy
}

type labelInjectingClient struct {
kube.Interface
labels map[string]string
}

func NewLabelInjectingClient(base kube.Interface, labels map[string]string) kube.Interface {
return &labelInjectingClient{
Interface: base,
labels: labels,
}
}

func (c *labelInjectingClient) Build(reader io.Reader, validate bool) (kube.ResourceList, error) {
resourceList, err := c.Interface.Build(reader, validate)
if err != nil {
return resourceList, err
}
err = resourceList.Visit(func(r *resource.Info, err error) error {
if err != nil {
return err
}
objMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(r.Object)
if err != nil {
return err
}
u := &unstructured.Unstructured{Object: objMap}
labels := u.GetLabels()
if labels == nil {
labels = map[string]string{}
}
for k, v := range c.labels {
labels[k] = v
}
u.SetLabels(labels)
return nil
})
if err != nil {
return nil, err
}
return resourceList, nil
}
42 changes: 5 additions & 37 deletions internal/helm/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package controller

import (
"fmt"
"reflect"
"strings"
"sync"
"time"
Expand All @@ -31,7 +30,6 @@ import (
crthandler "sigs.k8s.io/controller-runtime/pkg/handler"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/manager"
ctrlpredicate "sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/source"
"sigs.k8s.io/yaml"

Expand All @@ -46,7 +44,6 @@ var log = logf.Log.WithName("helm.controller")
// WatchOptions contains the necessary values to create a new controller that
// manages helm releases in a particular namespace based on a GVK watch.
type WatchOptions struct {
Namespace string
GVK schema.GroupVersionKind
ManagerFactory release.ManagerFactory
ReconcilePeriod time.Duration
Expand All @@ -71,10 +68,6 @@ func Add(mgr manager.Manager, options WatchOptions) error {
SuppressOverrideValues: options.SuppressOverrideValues,
}

// Register the GVK with the schema
mgr.GetScheme().AddKnownTypeWithName(options.GVK, &unstructured.Unstructured{})
metav1.AddToGroupVersion(mgr.GetScheme(), options.GVK.GroupVersion())

c, err := controller.New(controllerName, mgr, controller.Options{
Reconciler: r,
MaxConcurrentReconciles: options.MaxConcurrentReconciles,
Expand All @@ -86,18 +79,7 @@ func Add(mgr manager.Manager, options WatchOptions) error {
o := &unstructured.Unstructured{}
o.SetGroupVersionKind(options.GVK)

var preds []ctrlpredicate.Predicate
p, err := parsePredicateSelector(options.Selector)

if err != nil {
return err
}

if p != nil {
preds = append(preds, p)
}

if err := c.Watch(&source.Kind{Type: o}, &libhandler.InstrumentedEnqueueRequestForObject{}, preds...); err != nil {
if err := c.Watch(&source.Kind{Type: o}, &libhandler.InstrumentedEnqueueRequestForObject{}); err != nil {
return err
}

Expand All @@ -106,33 +88,19 @@ func Add(mgr manager.Manager, options WatchOptions) error {
}

log.Info("Watching resource", "apiVersion", options.GVK.GroupVersion(), "kind",
options.GVK.Kind, "namespace", options.Namespace, "reconcilePeriod", options.ReconcilePeriod.String())
options.GVK.Kind, "reconcilePeriod", options.ReconcilePeriod.String())
return nil
}

// parsePredicateSelector parses the selector in the WatchOptions and creates a predicate
// that is used to filter resources based on the specified selector
func parsePredicateSelector(selector metav1.LabelSelector) (ctrlpredicate.Predicate, error) {
// If a selector has been specified in watches.yaml, add it to the watch's predicates.
if !reflect.ValueOf(selector).IsZero() {
p, err := ctrlpredicate.LabelSelectorPredicate(selector)
if err != nil {
return nil, fmt.Errorf("error constructing predicate from watches selector: %v", err)
}
return p, nil
}
return nil, nil
}

// watchDependentResources adds a release hook function to the HelmOperatorReconciler
// that adds watches for resources in released Helm charts.
func watchDependentResources(mgr manager.Manager, r *HelmOperatorReconciler, c controller.Controller) {
owner := &unstructured.Unstructured{}
owner.SetGroupVersionKind(r.GVK)

var m sync.RWMutex
watches := map[schema.GroupVersionKind]struct{}{}
releaseHook := func(release *rpb.Release) error {
owner := &unstructured.Unstructured{}
owner.SetGroupVersionKind(r.GVK)
owner.SetNamespace(release.Namespace)
resources := releaseutil.SplitManifests(release.Manifest)
for _, resource := range resources {
var u unstructured.Unstructured
Expand Down
39 changes: 0 additions & 39 deletions internal/helm/controller/controller_test.go

This file was deleted.

4 changes: 4 additions & 0 deletions internal/helm/release/manager_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ func (f managerFactory) NewManager(cr *unstructured.Unstructured, overrideValues
return nil, fmt.Errorf("failed to load chart dir: %w", err)
}

actionConfig.KubeClient = client.NewLabelInjectingClient(actionConfig.KubeClient, map[string]string{
"helm.sdk.operatorframework.io/chart": crChart.Name(),
})

releaseName, err := getReleaseName(actionConfig.Releases, crChart.Name(), cr)
if err != nil {
return nil, fmt.Errorf("failed to get helm release name: %w", err)
Expand Down

0 comments on commit f6af8f7

Please sign in to comment.