From 97534b88c4686694d56cf07195b7456be7c41825 Mon Sep 17 00:00:00 2001 From: Ionut Balutoiu Date: Wed, 11 Oct 2023 12:36:37 +0300 Subject: [PATCH] Fix ClusterCacheTracker memory leak Decouple the code that creates uncached client from the code that creates the cached client. This way, we don't start the cache until the cached client is created. This avoids scenarios where the `ClusterCacheTracker` cache is started when only an uncached client is needed. There is an existing code path, described in the original issue, where the `ClusterCacheTracker` cache is started when only an uncached client is needed. The cache is re-created later, and the initial cache is still running continuously in the background. Signed-off-by: Ionut Balutoiu --- controllers/remote/cluster_cache_tracker.go | 86 ++++++++++++++------- 1 file changed, 59 insertions(+), 27 deletions(-) diff --git a/controllers/remote/cluster_cache_tracker.go b/controllers/remote/cluster_cache_tracker.go index be21cff54665..3c1c67248d60 100644 --- a/controllers/remote/cluster_cache_tracker.go +++ b/controllers/remote/cluster_cache_tracker.go @@ -29,6 +29,7 @@ import ( "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/serializer" @@ -298,8 +299,14 @@ func (t *ClusterCacheTracker) newClusterAccessor(ctx context.Context, cluster cl return nil, errors.Wrapf(err, "error fetching REST client config for remote cluster %q", cluster.String()) } - // Create a client and a cache for the cluster. - c, uncachedClient, cache, err := t.createClient(ctx, config, cluster, indexes) + // Create a http client and a mapper for the cluster. + httpClient, mapper, err := t.createHTTPClientAndMapper(config, cluster) + if err != nil { + return nil, errors.Wrapf(err, "error creating http client and mapper for remote cluster %q", cluster.String()) + } + + // Create an uncached client for the cluster. + uncachedClient, err := t.createUncachedClient(config, cluster, httpClient, mapper) if err != nil { return nil, err } @@ -324,16 +331,23 @@ func (t *ClusterCacheTracker) newClusterAccessor(ctx context.Context, cluster cl config.CAFile = inClusterConfig.CAFile config.Host = inClusterConfig.Host - // Create a new client and overwrite the previously created client. - c, _, cache, err = t.createClient(ctx, config, cluster, indexes) + // Update the http client and the mapper to use in-cluster config. + httpClient, mapper, err = t.createHTTPClientAndMapper(config, cluster) if err != nil { - return nil, errors.Wrap(err, "error creating client for self-hosted cluster") + return nil, errors.Wrapf(err, "error creating http client and mapper (using in-cluster config) for remote cluster %q", cluster.String()) } + log.Info(fmt.Sprintf("Creating cluster accessor for cluster %q with in-cluster service %q", cluster.String(), config.Host)) } else { log.Info(fmt.Sprintf("Creating cluster accessor for cluster %q with the regular apiserver endpoint %q", cluster.String(), config.Host)) } + // Create a client and a cache for the cluster. + cachedClient, err := t.createCachedClient(ctx, config, cluster, httpClient, mapper, indexes) + if err != nil { + return nil, err + } + // Generating a new private key to be used for generating temporary certificates to connect to // etcd on the target cluster. // NOTE: Generating a private key is an expensive operation, so we store it in the cluster accessor. @@ -343,9 +357,9 @@ func (t *ClusterCacheTracker) newClusterAccessor(ctx context.Context, cluster cl } return &clusterAccessor{ - cache: cache, + cache: cachedClient.Cache, config: config, - client: c, + client: cachedClient.Client, watches: sets.Set[string]{}, etcdClientCertificateKey: etcdKey, }, nil @@ -377,18 +391,18 @@ func (t *ClusterCacheTracker) runningOnWorkloadCluster(ctx context.Context, c cl return t.controllerPodMetadata.UID == pod.UID, nil } -// createClient creates a cached client, and uncached client and a mapper based on a rest.Config. -func (t *ClusterCacheTracker) createClient(ctx context.Context, config *rest.Config, cluster client.ObjectKey, indexes []Index) (client.Client, client.Client, *stoppableCache, error) { +// createHTTPClientAndMapper creates a http client and a dynamic rest mapper for the given cluster, based on the rest.Config. +func (t *ClusterCacheTracker) createHTTPClientAndMapper(config *rest.Config, cluster client.ObjectKey) (*http.Client, meta.RESTMapper, error) { // Create a http client for the cluster. httpClient, err := rest.HTTPClientFor(config) if err != nil { - return nil, nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error creating http client", cluster.String()) + return nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error creating http client", cluster.String()) } // Create a mapper for it mapper, err := apiutil.NewDynamicRESTMapper(config, httpClient) if err != nil { - return nil, nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error creating dynamic rest mapper", cluster.String()) + return nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error creating dynamic rest mapper", cluster.String()) } // Verify if we can get a rest mapping from the workload cluster apiserver. @@ -396,9 +410,34 @@ func (t *ClusterCacheTracker) createClient(ctx context.Context, config *rest.Con // to avoid further effort creating a cache and a client and to produce a clearer error message. _, err = mapper.RESTMapping(corev1.SchemeGroupVersion.WithKind("Node").GroupKind(), corev1.SchemeGroupVersion.Version) if err != nil { - return nil, nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error getting rest mapping", cluster.String()) + return nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error getting rest mapping", cluster.String()) + } + + return httpClient, mapper, nil +} + +// createUncachedClient creates an uncached client for the given cluster, based on the rest.Config. +func (t *ClusterCacheTracker) createUncachedClient(config *rest.Config, cluster client.ObjectKey, httpClient *http.Client, mapper meta.RESTMapper) (client.Client, error) { + // Create the uncached client for the remote cluster + uncachedClient, err := client.New(config, client.Options{ + Scheme: t.scheme, + Mapper: mapper, + HTTPClient: httpClient, + }) + if err != nil { + return nil, errors.Wrapf(err, "error creating uncached client for remote cluster %q", cluster.String()) } + return uncachedClient, nil +} + +type cachedClientOutput struct { + Client client.Client + Cache *stoppableCache +} + +// createCachedClient creates a cached client for the given cluster, based on a rest.Config. +func (t *ClusterCacheTracker) createCachedClient(ctx context.Context, config *rest.Config, cluster client.ObjectKey, httpClient *http.Client, mapper meta.RESTMapper, indexes []Index) (*cachedClientOutput, error) { // Create the cache for the remote cluster cacheOptions := cache.Options{ HTTPClient: httpClient, @@ -407,7 +446,7 @@ func (t *ClusterCacheTracker) createClient(ctx context.Context, config *rest.Con } remoteCache, err := cache.New(config, cacheOptions) if err != nil { - return nil, nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error creating cache", cluster.String()) + return nil, errors.Wrapf(err, "error creating cached client for remote cluster %q: error creating cache", cluster.String()) } cacheCtx, cacheCtxCancel := context.WithCancel(ctx) @@ -420,7 +459,7 @@ func (t *ClusterCacheTracker) createClient(ctx context.Context, config *rest.Con for _, index := range indexes { if err := cache.IndexField(ctx, index.Object, index.Field, index.ExtractValue); err != nil { - return nil, nil, nil, errors.Wrapf(err, "error adding index for field %q to cache for remote cluster %q", index.Field, cluster.String()) + return nil, errors.Wrapf(err, "error creating cached client for remote cluster %q: error adding index for field %q to cache", cluster.String(), index.Field) } } @@ -436,19 +475,9 @@ func (t *ClusterCacheTracker) createClient(ctx context.Context, config *rest.Con }, }) if err != nil { - return nil, nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q", cluster.String()) + return nil, errors.Wrapf(err, "error creating cached client for remote cluster %q", cluster.String()) } - // Create an uncached client. This is used in `runningOnWorkloadCluster` to ensure we don't continuously cache - // pods in the client. - uncachedClient, err := client.New(config, client.Options{ - Scheme: t.scheme, - Mapper: mapper, - HTTPClient: httpClient, - }) - if err != nil { - return nil, nil, nil, errors.Wrapf(err, "error creating uncached client for remote cluster %q", cluster.String()) - } // Start the cache!!! go cache.Start(cacheCtx) //nolint:errcheck @@ -457,7 +486,7 @@ func (t *ClusterCacheTracker) createClient(ctx context.Context, config *rest.Con defer cacheSyncCtxCancel() if !cache.WaitForCacheSync(cacheSyncCtx) { cache.Stop() - return nil, nil, nil, fmt.Errorf("failed waiting for cache for remote cluster %v to sync: %w", cluster, cacheCtx.Err()) + return nil, fmt.Errorf("failed waiting for cache for remote cluster %v to sync: %w", cluster, cacheCtx.Err()) } // Wrap the cached client with a client that sets timeouts on all Get and List calls @@ -474,7 +503,10 @@ func (t *ClusterCacheTracker) createClient(ctx context.Context, config *rest.Con httpClient: httpClient, }) - return cachedClient, uncachedClient, cache, nil + return &cachedClientOutput{ + Client: cachedClient, + Cache: cache, + }, nil } // deleteAccessor stops a clusterAccessor's cache and removes the clusterAccessor from the tracker.