Skip to content

Commit

Permalink
Fix ClusterCacheTracker memory leak
Browse files Browse the repository at this point in the history
Decouple the code that creates uncached client from the code that
creates the cached client. This way, we don't start the cache until
the cached client is created.

This avoids scenarios where the `ClusterCacheTracker` cache is started
when only an uncached client is needed.

There is an existing code path, described in the original issue, where
the `ClusterCacheTracker` cache is started when only an uncached client
is needed. The cache is re-created later, and the initial cache is
still running continuously in the background.

Signed-off-by: Ionut Balutoiu <ionut@balutoiu.com>
  • Loading branch information
ionutbalutoiu committed Nov 16, 2023
1 parent a82c340 commit 5009688
Showing 1 changed file with 61 additions and 29 deletions.
90 changes: 61 additions & 29 deletions controllers/remote/cluster_cache_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
Expand Down Expand Up @@ -295,8 +296,8 @@ func (t *ClusterCacheTracker) newClusterAccessor(ctx context.Context, cluster cl
return nil, errors.Wrapf(err, "error fetching REST client config for remote cluster %q", cluster.String())
}

// Create a client and a cache for the cluster.
c, uncachedClient, cache, err := t.createClient(ctx, config, cluster, indexes)
// Create an uncached client for the cluster.
uncachedClient, err := t.createUncachedClient(config, cluster)
if err != nil {
return nil, err
}
Expand All @@ -321,16 +322,17 @@ func (t *ClusterCacheTracker) newClusterAccessor(ctx context.Context, cluster cl
config.CAFile = inClusterConfig.CAFile
config.Host = inClusterConfig.Host

// Create a new client and overwrite the previously created client.
c, _, cache, err = t.createClient(ctx, config, cluster, indexes)
if err != nil {
return nil, errors.Wrap(err, "error creating client for self-hosted cluster")
}
log.Info(fmt.Sprintf("Creating cluster accessor for cluster %q with in-cluster service %q", cluster.String(), config.Host))
} else {
log.Info(fmt.Sprintf("Creating cluster accessor for cluster %q with the regular apiserver endpoint %q", cluster.String(), config.Host))
}

// Create a client and a cache for the cluster.
cachedClient, err := t.createCachedClient(ctx, config, cluster, indexes)
if err != nil {
return nil, err
}

// Generating a new private key to be used for generating temporary certificates to connect to
// etcd on the target cluster.
// NOTE: Generating a private key is an expensive operation, so we store it in the cluster accessor.
Expand All @@ -340,9 +342,9 @@ func (t *ClusterCacheTracker) newClusterAccessor(ctx context.Context, cluster cl
}

return &clusterAccessor{
cache: cache,
cache: cachedClient.Cache,
config: config,
client: c,
client: cachedClient.Client,
watches: sets.Set[string]{},
etcdClientCertificateKey: etcdKey,
}, nil
Expand Down Expand Up @@ -374,26 +376,63 @@ func (t *ClusterCacheTracker) runningOnWorkloadCluster(ctx context.Context, c cl
return t.controllerPodMetadata.UID == pod.UID, nil
}

// createClient creates a cached client, and uncached client and a mapper based on a rest.Config.
func (t *ClusterCacheTracker) createClient(ctx context.Context, config *rest.Config, cluster client.ObjectKey, indexes []Index) (client.Client, client.Client, *stoppableCache, error) {
// createHTTPClientAndMapper creates a http client and a dynamic rest mapper for the given cluster, based on the rest.Config.
func (t *ClusterCacheTracker) createHTTPClientAndMapper(config *rest.Config, cluster client.ObjectKey) (*http.Client, meta.RESTMapper, error) {
// Create a http client for the cluster.
httpClient, err := rest.HTTPClientFor(config)
if err != nil {
return nil, nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error creating http client", cluster.String())
return nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error creating http client", cluster.String())
}

// Create a mapper for it
mapper, err := apiutil.NewDynamicRESTMapper(config, httpClient)
if err != nil {
return nil, nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error creating dynamic rest mapper", cluster.String())
return nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error creating dynamic rest mapper", cluster.String())
}

// Verify if we can get a rest mapping from the workload cluster apiserver.
// Note: This also checks if the apiserver is up in general. We do this already here
// to avoid further effort creating a cache and a client and to produce a clearer error message.
_, err = mapper.RESTMapping(corev1.SchemeGroupVersion.WithKind("Node").GroupKind(), corev1.SchemeGroupVersion.Version)
if err != nil {
return nil, nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error getting rest mapping", cluster.String())
return nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error getting rest mapping", cluster.String())
}

return httpClient, mapper, nil
}

// createUncachedClient creates an uncached client for the given cluster, based on the rest.Config.
func (t *ClusterCacheTracker) createUncachedClient(config *rest.Config, cluster client.ObjectKey) (client.Client, error) {
// Create a http client and a mapper for the cluster.
httpClient, mapper, err := t.createHTTPClientAndMapper(config, cluster)
if err != nil {
return nil, errors.Wrapf(err, "error creating client for remote cluster %q", cluster.String())
}

// Create the uncached client for the remote cluster
uncachedClient, err := client.New(config, client.Options{
Scheme: t.scheme,
Mapper: mapper,
HTTPClient: httpClient,
})
if err != nil {
return nil, errors.Wrapf(err, "error creating uncached client for remote cluster %q", cluster.String())
}

return uncachedClient, nil
}

type cachedClientOutput struct {
Client client.Client
Cache *stoppableCache

Check failure on line 427 in controllers/remote/cluster_cache_tracker.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gci`-ed with --skip-generated -s standard -s default -s prefix(sigs.k8s.io/cluster-api) --custom-order (gci)
}

// createCachedClient creates a cached client for the given cluster, based on a rest.Config.
func (t *ClusterCacheTracker) createCachedClient(ctx context.Context, config *rest.Config, cluster client.ObjectKey, indexes []Index) (*cachedClientOutput, error) {
// Create a http client and a mapper for the cluster.
httpClient, mapper, err := t.createHTTPClientAndMapper(config, cluster)
if err != nil {
return nil, errors.Wrapf(err, "error creating client for remote cluster %q", cluster.String())
}

// Create the cache for the remote cluster
Expand All @@ -404,7 +443,7 @@ func (t *ClusterCacheTracker) createClient(ctx context.Context, config *rest.Con
}
remoteCache, err := cache.New(config, cacheOptions)
if err != nil {
return nil, nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error creating cache", cluster.String())
return nil, errors.Wrapf(err, "error creating client for remote cluster %q: error creating cache", cluster.String())
}

cacheCtx, cacheCtxCancel := context.WithCancel(ctx)
Expand All @@ -417,7 +456,7 @@ func (t *ClusterCacheTracker) createClient(ctx context.Context, config *rest.Con

for _, index := range indexes {
if err := cache.IndexField(ctx, index.Object, index.Field, index.ExtractValue); err != nil {
return nil, nil, nil, errors.Wrapf(err, "error adding index for field %q to cache for remote cluster %q", index.Field, cluster.String())
return nil, errors.Wrapf(err, "error adding index for field %q to cache for remote cluster %q", index.Field, cluster.String())
}
}

Expand All @@ -433,19 +472,9 @@ func (t *ClusterCacheTracker) createClient(ctx context.Context, config *rest.Con
},
})
if err != nil {
return nil, nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q", cluster.String())
return nil, errors.Wrapf(err, "error creating cached client for remote cluster %q", cluster.String())
}

// Create an uncached client. This is used in `runningOnWorkloadCluster` to ensure we don't continuously cache
// pods in the client.
uncachedClient, err := client.New(config, client.Options{
Scheme: t.scheme,
Mapper: mapper,
HTTPClient: httpClient,
})
if err != nil {
return nil, nil, nil, errors.Wrapf(err, "error creating uncached client for remote cluster %q", cluster.String())
}
// Start the cache!!!
go cache.Start(cacheCtx) //nolint:errcheck

Expand All @@ -454,7 +483,7 @@ func (t *ClusterCacheTracker) createClient(ctx context.Context, config *rest.Con
defer cacheSyncCtxCancel()
if !cache.WaitForCacheSync(cacheSyncCtx) {
cache.Stop()
return nil, nil, nil, fmt.Errorf("failed waiting for cache for remote cluster %v to sync: %w", cluster, cacheCtx.Err())
return nil, fmt.Errorf("failed waiting for cache for remote cluster %v to sync: %w", cluster, cacheCtx.Err())
}

// Wrap the cached client with a client that sets timeouts on all Get and List calls
Expand All @@ -471,7 +500,10 @@ func (t *ClusterCacheTracker) createClient(ctx context.Context, config *rest.Con
httpClient: httpClient,
})

return cachedClient, uncachedClient, cache, nil
return &cachedClientOutput{
Client: cachedClient,
Cache: cache,
}, nil
}

// deleteAccessor stops a clusterAccessor's cache and removes the clusterAccessor from the tracker.
Expand Down

0 comments on commit 5009688

Please sign in to comment.