From 1f01eebbfb424f75f5555478ec6499cb1ded4161 Mon Sep 17 00:00:00 2001 From: Wei Liu Date: Mon, 12 Jun 2023 17:47:47 +0800 Subject: [PATCH] expose the spoke informers Signed-off-by: Wei Liu --- pkg/registration/spoke/spokeagent.go | 44 +++++++++++++++++----------- 1 file changed, 27 insertions(+), 17 deletions(-) diff --git a/pkg/registration/spoke/spokeagent.go b/pkg/registration/spoke/spokeagent.go index 7c1748f93..5064992fa 100644 --- a/pkg/registration/spoke/spokeagent.go +++ b/pkg/registration/spoke/spokeagent.go @@ -4,12 +4,13 @@ import ( "context" "errors" "fmt" - "open-cluster-management.io/ocm/pkg/registration/spoke/lease" - "open-cluster-management.io/ocm/pkg/registration/spoke/registration" "os" "path" "time" + "open-cluster-management.io/ocm/pkg/registration/spoke/lease" + "open-cluster-management.io/ocm/pkg/registration/spoke/registration" + clusterv1 "open-cluster-management.io/api/cluster/v1" ocmfeature "open-cluster-management.io/api/feature" @@ -66,6 +67,11 @@ type SpokeAgentOptions struct { ClusterHealthCheckPeriod time.Duration MaxCustomClusterClaims int ClientCertExpirationSeconds int32 + + // exposing the spoke informers, so that sharing them with other components which are + // integrated with registration agent in code level. + SpokeKubeInformerFactory informers.SharedInformerFactory + SpokeClusterInformerFactory clusterv1informers.SharedInformerFactory } // NewSpokeAgentOptions returns a SpokeAgentOptions @@ -128,6 +134,11 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext return err } + spokeClusterClient, err := clusterv1client.NewForConfig(spokeClientConfig) + if err != nil { + return err + } + // the hub kubeconfig secret stored in the cluster where the agent pod runs if err := o.Complete(managementKubeClient.CoreV1(), ctx, controllerContext.EventRecorder); err != nil { klog.Fatal(err) @@ -140,7 +151,8 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext klog.Infof("Cluster name is %q and agent name is %q", o.AgentOptions.SpokeClusterName, o.AgentName) // create shared informer factory for spoke cluster - spokeKubeInformerFactory := informers.NewSharedInformerFactory(spokeKubeClient, 10*time.Minute) + o.SpokeKubeInformerFactory = informers.NewSharedInformerFactory(spokeKubeClient, 10*time.Minute) + o.SpokeClusterInformerFactory = clusterv1informers.NewSharedInformerFactory(spokeClusterClient, 10*time.Minute) // get spoke cluster CA bundle spokeClusterCABundle, err := o.getSpokeClusterCABundle(spokeClientConfig) @@ -185,7 +197,7 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext go namespacedManagementKubeInformerFactory.Start(ctx.Done()) // check if there already exists a valid client config for hub - ok, err := o.hasValidHubClientConfig() + ok, err := o.hasValidHubClientConfig(ctx) if err != nil { return err } @@ -236,7 +248,7 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext // wait for the hub client config is ready. klog.Info("Waiting for hub client config and managed cluster to be ready") - if err := wait.PollImmediateInfinite(1*time.Second, o.hasValidHubClientConfig); err != nil { + if err := wait.PollUntilContextCancel(bootstrapCtx, 1*time.Second, true, o.hasValidHubClientConfig); err != nil { // TODO need run the bootstrap CSR forever to re-establish the client-cert if it is ever lost. stopBootstrap() return err @@ -275,7 +287,10 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext }), ) addOnInformerFactory := addoninformers.NewSharedInformerFactoryWithOptions( - addOnClient, 10*time.Minute, addoninformers.WithNamespace(o.AgentOptions.SpokeClusterName)) + addOnClient, + 10*time.Minute, + addoninformers.WithNamespace(o.AgentOptions.SpokeClusterName), + ) // create a cluster informer factory with name field selector because we just need to handle the current spoke cluster hubClusterInformerFactory := clusterv1informers.NewSharedInformerFactoryWithOptions( hubClusterClient, @@ -327,20 +342,14 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext controllerContext.EventRecorder, ) - spokeClusterClient, err := clusterv1client.NewForConfig(spokeClientConfig) - if err != nil { - return err - } - spokeClusterInformerFactory := clusterv1informers.NewSharedInformerFactory(spokeClusterClient, 10*time.Minute) - // create NewManagedClusterStatusController to update the spoke cluster status managedClusterHealthCheckController := managedcluster.NewManagedClusterStatusController( o.AgentOptions.SpokeClusterName, hubClusterClient, hubClusterInformerFactory.Cluster().V1().ManagedClusters(), spokeKubeClient.Discovery(), - spokeClusterInformerFactory.Cluster().V1alpha1().ClusterClaims(), - spokeKubeInformerFactory.Core().V1().Nodes(), + o.SpokeClusterInformerFactory.Cluster().V1alpha1().ClusterClaims(), + o.SpokeKubeInformerFactory.Core().V1().Nodes(), o.MaxCustomClusterClaims, o.ClusterHealthCheckPeriod, controllerContext.EventRecorder, @@ -375,11 +384,12 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext go hubKubeInformerFactory.Start(ctx.Done()) go hubClusterInformerFactory.Start(ctx.Done()) - go spokeKubeInformerFactory.Start(ctx.Done()) go namespacedManagementKubeInformerFactory.Start(ctx.Done()) go addOnInformerFactory.Start(ctx.Done()) + + go o.SpokeKubeInformerFactory.Start(ctx.Done()) if features.DefaultSpokeRegistrationMutableFeatureGate.Enabled(ocmfeature.ClusterClaim) { - go spokeClusterInformerFactory.Start(ctx.Done()) + go o.SpokeClusterInformerFactory.Start(ctx.Done()) } go clientCertForHubController.Run(ctx, 1) @@ -491,7 +501,7 @@ func generateAgentName() string { // Normally, KubeconfigFile/TLSKeyFile/TLSCertFile will be created once the bootstrap process // completes. Changing the name of the cluster will make the existing hub kubeconfig invalid, // because certificate in TLSCertFile is issued to a specific cluster/agent. -func (o *SpokeAgentOptions) hasValidHubClientConfig() (bool, error) { +func (o *SpokeAgentOptions) hasValidHubClientConfig(ctx context.Context) (bool, error) { kubeconfigPath := path.Join(o.HubKubeconfigDir, clientcert.KubeconfigFile) if _, err := os.Stat(kubeconfigPath); os.IsNotExist(err) { klog.V(4).Infof("Kubeconfig file %q not found", kubeconfigPath)