Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

helm-operator: reduce cache memory footprint #6377

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions changelog/fragments/helm-operator-cache-selectors.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
entries:
- description: >
(helm): Use informer cache label selectors to reduce memory consumption.
kind: bugfix
breaking: false
- description: >
(helm): Fix bug with detection of owner reference support when setting up dynamic watches
kind: bugfix
breaking: false
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ require (
k8s.io/client-go v0.26.2
k8s.io/kubectl v0.26.2
k8s.io/utils v0.0.0-20230711102312-30195339c3c7
sigs.k8s.io/controller-runtime v0.14.5
sigs.k8s.io/controller-runtime v0.14.6
sigs.k8s.io/controller-tools v0.11.3
sigs.k8s.io/kubebuilder/v3 v3.9.1
sigs.k8s.io/yaml v1.3.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1658,8 +1658,8 @@ rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.35 h1:+xBL5uTc+BkPBwmMi3vYfUJjq+N3K+H6PXeETwf5cPI=
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.35/go.mod h1:WxjusMwXlKzfAs4p9km6XJRndVt2FROgMVCE4cdohFo=
sigs.k8s.io/controller-runtime v0.14.5 h1:6xaWFqzT5KuAQ9ufgUaj1G/+C4Y1GRkhrxl+BJ9i+5s=
sigs.k8s.io/controller-runtime v0.14.5/go.mod h1:WqIdsAY6JBsjfc/CqO0CORmNtoCtE4S6qbPc9s68h+0=
sigs.k8s.io/controller-runtime v0.14.6 h1:oxstGVvXGNnMvY7TAESYk+lzr6S3V5VFxQ6d92KcwQA=
sigs.k8s.io/controller-runtime v0.14.6/go.mod h1:WqIdsAY6JBsjfc/CqO0CORmNtoCtE4S6qbPc9s68h+0=
sigs.k8s.io/controller-tools v0.11.3 h1:T1xzLkog9saiyQSLz1XOImu4OcbdXWytc5cmYsBeBiE=
sigs.k8s.io/controller-tools v0.11.3/go.mod h1:qcfX7jfcfYD/b7lAhvqAyTbt/px4GpvN88WKLFFv7p8=
sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2 h1:iXTIw73aPyC+oRdyqqvVJuloN1p0AC/kzH07hu3NE+k=
Expand Down
113 changes: 86 additions & 27 deletions internal/cmd/helm-operator/run/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/spf13/cobra"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
apimachruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/rest"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
Expand All @@ -45,6 +46,10 @@ import (
"github.com/operator-framework/operator-sdk/internal/helm/watches"
"github.com/operator-framework/operator-sdk/internal/util/k8sutil"
sdkVersion "github.com/operator-framework/operator-sdk/internal/version"
"helm.sh/helm/v3/pkg/chart/loader"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/selection"
)

var log = logf.Log.WithName("cmd")
Expand Down Expand Up @@ -136,6 +141,23 @@ func run(cmd *cobra.Command, f *flags.Flags) {
// Set default manager options
options = f.ToManagerOptions(options)

if options.Scheme == nil {
options.Scheme = apimachruntime.NewScheme()
}

ws, err := watches.Load(f.WatchesFile)
if err != nil {
log.Error(err, "Failed to load watches file.")
os.Exit(1)
}

watchNamespaces := getWatchNamespaces(options.Namespace)
options.NewCache, err = buildNewCacheFunc(watchNamespaces, ws, options.Scheme)
if err != nil {
log.Error(err, "Failed to create NewCache function for manager.")
os.Exit(1)
}

if options.NewClient == nil {
options.NewClient = func(cache cache.Cache, config *rest.Config, options client.Options, uncachedObjects ...client.Object) (client.Client, error) {
// Create the Client for Write operations.
Expand All @@ -152,27 +174,6 @@ func run(cmd *cobra.Command, f *flags.Flags) {
})
}
}
namespace, found := os.LookupEnv(k8sutil.WatchNamespaceEnvVar)
log = log.WithValues("Namespace", namespace)
if found {
log.V(1).Info(fmt.Sprintf("Setting namespace with value in %s", k8sutil.WatchNamespaceEnvVar))
if namespace == metav1.NamespaceAll {
log.Info("Watching all namespaces.")
options.Namespace = metav1.NamespaceAll
} else {
if strings.Contains(namespace, ",") {
log.Info("Watching multiple namespaces.")
options.NewCache = cache.MultiNamespacedCacheBuilder(strings.Split(namespace, ","))
} else {
log.Info("Watching single namespace.")
options.Namespace = namespace
}
}
} else if options.Namespace == "" {
log.Info(fmt.Sprintf("Watch namespaces not configured by environment variable %s or file. "+
"Watching all namespaces.", k8sutil.WatchNamespaceEnvVar))
options.Namespace = metav1.NamespaceAll
}

mgr, err := manager.New(cfg, options)
if err != nil {
Expand All @@ -189,11 +190,6 @@ func run(cmd *cobra.Command, f *flags.Flags) {
os.Exit(1)
}

ws, err := watches.Load(f.WatchesFile)
if err != nil {
log.Error(err, "Failed to create new manager factories.")
os.Exit(1)
}
acg, err := helmClient.NewActionConfigGetter(mgr.GetConfig(), mgr.GetRESTMapper(), mgr.GetLogger())
if err != nil {
log.Error(err, "Failed to create Helm action config getter")
Expand All @@ -207,7 +203,6 @@ func run(cmd *cobra.Command, f *flags.Flags) {
}

err := controller.Add(mgr, controller.WatchOptions{
Namespace: namespace,
GVK: w.GroupVersionKind,
ManagerFactory: release.NewManagerFactory(mgr, acg, w.ChartDir),
ReconcilePeriod: reconcilePeriod,
Expand Down Expand Up @@ -250,3 +245,67 @@ func exitIfUnsupported(options manager.Options) {
os.Exit(1)
}
}

func getWatchNamespaces(defaultNamespace string) []string {
namespace, found := os.LookupEnv(k8sutil.WatchNamespaceEnvVar)
log = log.WithValues("Namespace", namespace)
if found {
log.V(1).Info(fmt.Sprintf("Setting namespace with value in %s", k8sutil.WatchNamespaceEnvVar))
if namespace == metav1.NamespaceAll {
log.Info("Watching all namespaces.")
return []string{metav1.NamespaceAll}
}
if strings.Contains(namespace, ",") {
log.Info("Watching multiple namespaces.")
return strings.Split(namespace, ",")
}
log.Info("Watching single namespace.")
return []string{namespace}
}
if defaultNamespace == "" {
log.Info(fmt.Sprintf("Watch namespaces not configured by environment variable %s or file. "+
"Watching all namespaces.", k8sutil.WatchNamespaceEnvVar))
return []string{metav1.NamespaceAll}
}
return []string{defaultNamespace}
}

func buildNewCacheFunc(watchNamespaces []string, ws []watches.Watch, sch *apimachruntime.Scheme) (cache.NewCacheFunc, error) {
selectorsByObject := cache.SelectorsByObject{}
chartNames := make([]string, 0, len(ws))
for _, w := range ws {
sch.AddKnownTypeWithName(w.GroupVersionKind, &unstructured.Unstructured{})

crObj := &unstructured.Unstructured{}
crObj.SetGroupVersionKind(w.GroupVersionKind)
sel, err := metav1.LabelSelectorAsSelector(&w.Selector)
if err != nil {
return nil, fmt.Errorf("unable to parse watch selector for %s: %v", w.GroupVersionKind, err)
}
selectorsByObject[crObj] = cache.ObjectSelector{Label: sel}

chrt, err := loader.LoadDir(w.ChartDir)
if err != nil {
return nil, fmt.Errorf("unable to load chart for %s: %v", w.GroupVersionKind, err)
}
chartNames = append(chartNames, chrt.Name())

}
req, err := labels.NewRequirement("helm.sdk.operatorframework.io/chart", selection.In, chartNames)
if err != nil {
return nil, fmt.Errorf("unable to create label requirement for cache default selector: %v", err)
}
defaultSelector := labels.NewSelector().Add(*req)

return func(config *rest.Config, opts cache.Options) (cache.Cache, error) {
opts.SelectorsByObject = selectorsByObject
opts.DefaultSelector = cache.ObjectSelector{Label: defaultSelector}
if len(watchNamespaces) > 1 {
return cache.MultiNamespacedCacheBuilder(watchNamespaces)(config, opts)
}
if len(watchNamespaces) == 1 {
opts.Namespace = watchNamespaces[0]
}
return cache.New(config, opts)
}, nil
}
42 changes: 42 additions & 0 deletions internal/helm/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,3 +105,45 @@ func containsResourcePolicyKeep(annotations map[string]string) bool {
resourcePolicyType = strings.ToLower(strings.TrimSpace(resourcePolicyType))
return resourcePolicyType == kube.KeepPolicy
}

type labelInjectingClient struct {
kube.Interface
labels map[string]string
}

func NewLabelInjectingClient(base kube.Interface, labels map[string]string) kube.Interface {
return &labelInjectingClient{
Interface: base,
labels: labels,
}
}

func (c *labelInjectingClient) Build(reader io.Reader, validate bool) (kube.ResourceList, error) {
resourceList, err := c.Interface.Build(reader, validate)
if err != nil {
return resourceList, err
}
err = resourceList.Visit(func(r *resource.Info, err error) error {
if err != nil {
return err
}
objMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(r.Object)
if err != nil {
return err
}
u := &unstructured.Unstructured{Object: objMap}
labels := u.GetLabels()
if labels == nil {
labels = map[string]string{}
}
for k, v := range c.labels {
labels[k] = v
}
u.SetLabels(labels)
return nil
})
if err != nil {
return nil, err
}
return resourceList, nil
}
42 changes: 5 additions & 37 deletions internal/helm/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package controller

import (
"fmt"
"reflect"
"strings"
"sync"
"time"
Expand All @@ -31,7 +30,6 @@ import (
crthandler "sigs.k8s.io/controller-runtime/pkg/handler"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/manager"
ctrlpredicate "sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/source"
"sigs.k8s.io/yaml"

Expand All @@ -46,7 +44,6 @@ var log = logf.Log.WithName("helm.controller")
// WatchOptions contains the necessary values to create a new controller that
// manages helm releases in a particular namespace based on a GVK watch.
type WatchOptions struct {
Namespace string
GVK schema.GroupVersionKind
ManagerFactory release.ManagerFactory
ReconcilePeriod time.Duration
Expand All @@ -71,10 +68,6 @@ func Add(mgr manager.Manager, options WatchOptions) error {
SuppressOverrideValues: options.SuppressOverrideValues,
}

// Register the GVK with the schema
mgr.GetScheme().AddKnownTypeWithName(options.GVK, &unstructured.Unstructured{})
metav1.AddToGroupVersion(mgr.GetScheme(), options.GVK.GroupVersion())

c, err := controller.New(controllerName, mgr, controller.Options{
Reconciler: r,
MaxConcurrentReconciles: options.MaxConcurrentReconciles,
Expand All @@ -86,18 +79,7 @@ func Add(mgr manager.Manager, options WatchOptions) error {
o := &unstructured.Unstructured{}
o.SetGroupVersionKind(options.GVK)

var preds []ctrlpredicate.Predicate
p, err := parsePredicateSelector(options.Selector)

if err != nil {
return err
}

if p != nil {
preds = append(preds, p)
}

if err := c.Watch(&source.Kind{Type: o}, &libhandler.InstrumentedEnqueueRequestForObject{}, preds...); err != nil {
if err := c.Watch(&source.Kind{Type: o}, &libhandler.InstrumentedEnqueueRequestForObject{}); err != nil {
return err
}

Expand All @@ -106,33 +88,19 @@ func Add(mgr manager.Manager, options WatchOptions) error {
}

log.Info("Watching resource", "apiVersion", options.GVK.GroupVersion(), "kind",
options.GVK.Kind, "namespace", options.Namespace, "reconcilePeriod", options.ReconcilePeriod.String())
options.GVK.Kind, "reconcilePeriod", options.ReconcilePeriod.String())
return nil
}

// parsePredicateSelector parses the selector in the WatchOptions and creates a predicate
// that is used to filter resources based on the specified selector
func parsePredicateSelector(selector metav1.LabelSelector) (ctrlpredicate.Predicate, error) {
// If a selector has been specified in watches.yaml, add it to the watch's predicates.
if !reflect.ValueOf(selector).IsZero() {
p, err := ctrlpredicate.LabelSelectorPredicate(selector)
if err != nil {
return nil, fmt.Errorf("error constructing predicate from watches selector: %v", err)
}
return p, nil
}
return nil, nil
}

// watchDependentResources adds a release hook function to the HelmOperatorReconciler
// that adds watches for resources in released Helm charts.
func watchDependentResources(mgr manager.Manager, r *HelmOperatorReconciler, c controller.Controller) {
owner := &unstructured.Unstructured{}
owner.SetGroupVersionKind(r.GVK)

var m sync.RWMutex
watches := map[schema.GroupVersionKind]struct{}{}
releaseHook := func(release *rpb.Release) error {
owner := &unstructured.Unstructured{}
owner.SetGroupVersionKind(r.GVK)
owner.SetNamespace(release.Namespace)
resources := releaseutil.SplitManifests(release.Manifest)
for _, resource := range resources {
var u unstructured.Unstructured
Expand Down
39 changes: 0 additions & 39 deletions internal/helm/controller/controller_test.go

This file was deleted.

4 changes: 4 additions & 0 deletions internal/helm/release/manager_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ func (f managerFactory) NewManager(cr *unstructured.Unstructured, overrideValues
return nil, fmt.Errorf("failed to load chart dir: %w", err)
}

actionConfig.KubeClient = client.NewLabelInjectingClient(actionConfig.KubeClient, map[string]string{
"helm.sdk.operatorframework.io/chart": crChart.Name(),
})

releaseName, err := getReleaseName(actionConfig.Releases, crChart.Name(), cr)
if err != nil {
return nil, fmt.Errorf("failed to get helm release name: %w", err)
Expand Down
Loading