diff --git a/controllers/remote/cluster_cache_tracker.go b/controllers/remote/cluster_cache_tracker.go index 9bd40ceab13f..3f620743e7c4 100644 --- a/controllers/remote/cluster_cache_tracker.go +++ b/controllers/remote/cluster_cache_tracker.go @@ -29,6 +29,7 @@ import ( "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/serializer" @@ -295,8 +296,8 @@ func (t *ClusterCacheTracker) newClusterAccessor(ctx context.Context, cluster cl return nil, errors.Wrapf(err, "error fetching REST client config for remote cluster %q", cluster.String()) } - // Create a client and a cache for the cluster. - c, uncachedClient, cache, err := t.createClient(ctx, config, cluster, indexes) + // Create an uncached client for the cluster. + uncachedClient, err := t.createUncachedClient(ctx, config, cluster) if err != nil { return nil, err } @@ -308,6 +309,9 @@ func (t *ClusterCacheTracker) newClusterAccessor(ctx context.Context, cluster cl return nil, err } + var c client.Client + var cache *stoppableCache + // If the controller runs on the workload cluster, access the apiserver directly by using the // CA and Host from the in-cluster configuration. if runningOnCluster { @@ -321,13 +325,19 @@ func (t *ClusterCacheTracker) newClusterAccessor(ctx context.Context, cluster cl config.CAFile = inClusterConfig.CAFile config.Host = inClusterConfig.Host - // Create a new client and overwrite the previously created client. - c, _, cache, err = t.createClient(ctx, config, cluster, indexes) + // Create a client and a cache for the cluster, using the in-cluster config. + c, cache, err = t.createCachedClient(ctx, config, cluster, indexes) if err != nil { return nil, errors.Wrap(err, "error creating client for self-hosted cluster") } log.Info(fmt.Sprintf("Creating cluster accessor for cluster %q with in-cluster service %q", cluster.String(), config.Host)) } else { + // Create a client and a cache for the cluster. + c, cache, err = t.createCachedClient(ctx, config, cluster, indexes) + if err != nil { + return nil, err + } + log.Info(fmt.Sprintf("Creating cluster accessor for cluster %q with the regular apiserver endpoint %q", cluster.String(), config.Host)) } @@ -374,18 +384,18 @@ func (t *ClusterCacheTracker) runningOnWorkloadCluster(ctx context.Context, c cl return t.controllerPodMetadata.UID == pod.UID, nil } -// createClient creates a cached client, and uncached client and a mapper based on a rest.Config. -func (t *ClusterCacheTracker) createClient(ctx context.Context, config *rest.Config, cluster client.ObjectKey, indexes []Index) (client.Client, client.Client, *stoppableCache, error) { +// createHttpClientAndMapper creates a http client and a dynamic rest mapper for the given cluster, based on the rest.Config. +func (t *ClusterCacheTracker) createHttpClientAndMapper(ctx context.Context, config *rest.Config, cluster client.ObjectKey) (*http.Client, meta.RESTMapper, error) { // Create a http client for the cluster. httpClient, err := rest.HTTPClientFor(config) if err != nil { - return nil, nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error creating http client", cluster.String()) + return nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error creating http client", cluster.String()) } // Create a mapper for it mapper, err := apiutil.NewDynamicRESTMapper(config, httpClient) if err != nil { - return nil, nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error creating dynamic rest mapper", cluster.String()) + return nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error creating dynamic rest mapper", cluster.String()) } // Verify if we can get a rest mapping from the workload cluster apiserver. @@ -393,7 +403,39 @@ func (t *ClusterCacheTracker) createClient(ctx context.Context, config *rest.Con // to avoid further effort creating a cache and a client and to produce a clearer error message. _, err = mapper.RESTMapping(corev1.SchemeGroupVersion.WithKind("Node").GroupKind(), corev1.SchemeGroupVersion.Version) if err != nil { - return nil, nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error getting rest mapping", cluster.String()) + return nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error getting rest mapping", cluster.String()) + } + + return httpClient, mapper, nil +} + +// createUncachedClient creates an uncached client for the given cluster, based on the rest.Config. +func (t *ClusterCacheTracker) createUncachedClient(ctx context.Context, config *rest.Config, cluster client.ObjectKey) (client.Client, error) { + // Create a http client and a mapper for the cluster. + httpClient, mapper, err := t.createHttpClientAndMapper(ctx, config, cluster) + if err != nil { + return nil, errors.Wrapf(err, "error creating client for remote cluster %q", cluster.String()) + } + + // Create the uncached client for the remote cluster + uncachedClient, err := client.New(config, client.Options{ + Scheme: t.scheme, + Mapper: mapper, + HTTPClient: httpClient, + }) + if err != nil { + return nil, errors.Wrapf(err, "error creating uncached client for remote cluster %q", cluster.String()) + } + + return uncachedClient, nil +} + +// createCachedClient creates a cached client for the given cluster, based on a rest.Config. +func (t *ClusterCacheTracker) createCachedClient(ctx context.Context, config *rest.Config, cluster client.ObjectKey, indexes []Index) (client.Client, *stoppableCache, error) { + // Create a http client and a mapper for the cluster. + httpClient, mapper, err := t.createHttpClientAndMapper(ctx, config, cluster) + if err != nil { + return nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q", cluster.String()) } // Create the cache for the remote cluster @@ -404,7 +446,7 @@ func (t *ClusterCacheTracker) createClient(ctx context.Context, config *rest.Con } remoteCache, err := cache.New(config, cacheOptions) if err != nil { - return nil, nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error creating cache", cluster.String()) + return nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error creating cache", cluster.String()) } cacheCtx, cacheCtxCancel := context.WithCancel(ctx) @@ -417,7 +459,7 @@ func (t *ClusterCacheTracker) createClient(ctx context.Context, config *rest.Con for _, index := range indexes { if err := cache.IndexField(ctx, index.Object, index.Field, index.ExtractValue); err != nil { - return nil, nil, nil, errors.Wrapf(err, "error adding index for field %q to cache for remote cluster %q", index.Field, cluster.String()) + return nil, nil, errors.Wrapf(err, "error adding index for field %q to cache for remote cluster %q", index.Field, cluster.String()) } } @@ -433,19 +475,9 @@ func (t *ClusterCacheTracker) createClient(ctx context.Context, config *rest.Con }, }) if err != nil { - return nil, nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q", cluster.String()) + return nil, nil, errors.Wrapf(err, "error creating cached client for remote cluster %q", cluster.String()) } - // Create an uncached client. This is used in `runningOnWorkloadCluster` to ensure we don't continuously cache - // pods in the client. - uncachedClient, err := client.New(config, client.Options{ - Scheme: t.scheme, - Mapper: mapper, - HTTPClient: httpClient, - }) - if err != nil { - return nil, nil, nil, errors.Wrapf(err, "error creating uncached client for remote cluster %q", cluster.String()) - } // Start the cache!!! go cache.Start(cacheCtx) //nolint:errcheck @@ -454,7 +486,7 @@ func (t *ClusterCacheTracker) createClient(ctx context.Context, config *rest.Con defer cacheSyncCtxCancel() if !cache.WaitForCacheSync(cacheSyncCtx) { cache.Stop() - return nil, nil, nil, fmt.Errorf("failed waiting for cache for remote cluster %v to sync: %w", cluster, cacheCtx.Err()) + return nil, nil, fmt.Errorf("failed waiting for cache for remote cluster %v to sync: %w", cluster, cacheCtx.Err()) } // Wrap the cached client with a client that sets timeouts on all Get and List calls @@ -471,7 +503,7 @@ func (t *ClusterCacheTracker) createClient(ctx context.Context, config *rest.Con httpClient: httpClient, }) - return cachedClient, uncachedClient, cache, nil + return cachedClient, cache, nil } // deleteAccessor stops a clusterAccessor's cache and removes the clusterAccessor from the tracker.