From 667cd8f5a6808c6ddf9ef03a43c5fb191329a82d Mon Sep 17 00:00:00 2001 From: Wei Liu Date: Mon, 12 Jun 2023 17:47:47 +0800 Subject: [PATCH] expose the spoke informers Signed-off-by: Wei Liu --- pkg/common/options/options.go | 5 +- pkg/registration/spoke/spokeagent.go | 83 ++++++++++++++--------- pkg/registration/spoke/spokeagent_test.go | 2 +- pkg/work/spoke/spokeagent.go | 2 +- 4 files changed, 55 insertions(+), 37 deletions(-) diff --git a/pkg/common/options/options.go b/pkg/common/options/options.go index 6dce3a13b..cd8dece03 100644 --- a/pkg/common/options/options.go +++ b/pkg/common/options/options.go @@ -4,7 +4,6 @@ import ( "fmt" "strings" - "github.com/openshift/library-go/pkg/controller/controllercmd" "github.com/spf13/pflag" apimachineryvalidation "k8s.io/apimachinery/pkg/api/validation" "k8s.io/client-go/rest" @@ -39,9 +38,9 @@ func (o *AgentOptions) AddFlags(flags *pflag.FlagSet) { } // spokeKubeConfig builds kubeconfig for the spoke/managed cluster -func (o *AgentOptions) SpokeKubeConfig(controllerContext *controllercmd.ControllerContext) (*rest.Config, error) { +func (o *AgentOptions) SpokeKubeConfig(managedRestConfig *rest.Config) (*rest.Config, error) { if o.SpokeKubeconfigFile == "" { - return controllerContext.KubeConfig, nil + return managedRestConfig, nil } spokeRestConfig, err := clientcmd.BuildConfigFromFlags("" /* leave masterurl as empty */, o.SpokeKubeconfigFile) diff --git a/pkg/registration/spoke/spokeagent.go b/pkg/registration/spoke/spokeagent.go index fecdea05b..feeecc7a8 100644 --- a/pkg/registration/spoke/spokeagent.go +++ b/pkg/registration/spoke/spokeagent.go @@ -108,15 +108,11 @@ func NewSpokeAgentOptions() *SpokeAgentOptions { // create a valid hub kubeconfig. Once the hub kubeconfig is valid, the // temporary controller is stopped and the main controllers are started. func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext *controllercmd.ControllerContext) error { - // create management kube client - managementKubeClient, err := kubernetes.NewForConfig(controllerContext.KubeConfig) - if err != nil { - return err - } + kubeConfig := controllerContext.KubeConfig // load spoke client config and create spoke clients, // the registration agent may not running in the spoke/managed cluster. - spokeClientConfig, err := o.AgentOptions.SpokeKubeConfig(controllerContext) + spokeClientConfig, err := o.AgentOptions.SpokeKubeConfig(kubeConfig) if err != nil { return err } @@ -126,8 +122,38 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext return err } + spokeClusterClient, err := clusterv1client.NewForConfig(spokeClientConfig) + if err != nil { + return err + } + + return o.RunSpokeAgentWithSpokeInformers( + ctx, + kubeConfig, + spokeClientConfig, + spokeKubeClient, + informers.NewSharedInformerFactory(spokeKubeClient, 10*time.Minute), + clusterv1informers.NewSharedInformerFactory(spokeClusterClient, 10*time.Minute), + controllerContext.EventRecorder, + ) +} + +func (o *SpokeAgentOptions) RunSpokeAgentWithSpokeInformers(ctx context.Context, + kubeConfig, spokeClientConfig *rest.Config, + spokeKubeClient kubernetes.Interface, + spokeKubeInformerFactory informers.SharedInformerFactory, + spokeClusterInformerFactory clusterv1informers.SharedInformerFactory, + recorder events.Recorder) error { + klog.Infof("Cluster name is %q and agent name is %q", o.AgentOptions.SpokeClusterName, o.AgentName) + + // create management kube client + managementKubeClient, err := kubernetes.NewForConfig(kubeConfig) + if err != nil { + return err + } + // the hub kubeconfig secret stored in the cluster where the agent pod runs - if err := o.Complete(managementKubeClient.CoreV1(), ctx, controllerContext.EventRecorder); err != nil { + if err := o.Complete(managementKubeClient.CoreV1(), ctx, recorder); err != nil { klog.Fatal(err) } @@ -135,11 +161,6 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext klog.Fatal(err) } - klog.Infof("Cluster name is %q and agent name is %q", o.AgentOptions.SpokeClusterName, o.AgentName) - - // create shared informer factory for spoke cluster - spokeKubeInformerFactory := informers.NewSharedInformerFactory(spokeKubeClient, 10*time.Minute) - // get spoke cluster CA bundle spokeClusterCABundle, err := o.getSpokeClusterCABundle(spokeClientConfig) if err != nil { @@ -168,7 +189,7 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext o.AgentOptions.SpokeClusterName, o.SpokeExternalServerURLs, spokeClusterCABundle, bootstrapClusterClient, - controllerContext.EventRecorder, + recorder, ) go spokeClusterCreatingController.Run(ctx, 1) @@ -177,13 +198,13 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext // the hub kubeconfig secret stored in the cluster where the agent pod runs managementKubeClient.CoreV1(), namespacedManagementKubeInformerFactory.Core().V1().Secrets(), - controllerContext.EventRecorder, + recorder, ) go hubKubeconfigSecretController.Run(ctx, 1) go namespacedManagementKubeInformerFactory.Start(ctx.Done()) // check if there already exists a valid client config for hub - ok, err := o.hasValidHubClientConfig() + ok, err := o.hasValidHubClientConfig(ctx) if err != nil { return err } @@ -221,7 +242,7 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext o.ClientCertExpirationSeconds, managementKubeClient, registration.GenerateBootstrapStatusUpdater(), - controllerContext.EventRecorder, + recorder, controllerName, ) @@ -234,7 +255,7 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext // wait for the hub client config is ready. klog.Info("Waiting for hub client config and managed cluster to be ready") - if err := wait.PollImmediateInfinite(1*time.Second, o.hasValidHubClientConfig); err != nil { + if err := wait.PollUntilContextCancel(bootstrapCtx, 1*time.Second, true, o.hasValidHubClientConfig); err != nil { // TODO need run the bootstrap CSR forever to re-establish the client-cert if it is ever lost. stopBootstrap() return err @@ -273,7 +294,10 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext }), ) addOnInformerFactory := addoninformers.NewSharedInformerFactoryWithOptions( - addOnClient, 10*time.Minute, addoninformers.WithNamespace(o.AgentOptions.SpokeClusterName)) + addOnClient, + 10*time.Minute, + addoninformers.WithNamespace(o.AgentOptions.SpokeClusterName), + ) // create a cluster informer factory with name field selector because we just need to handle the current spoke cluster hubClusterInformerFactory := clusterv1informers.NewSharedInformerFactoryWithOptions( hubClusterClient, @@ -283,7 +307,7 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext }), ) - controllerContext.EventRecorder.Event("HubClientConfigReady", "Client config for hub is ready.") + recorder.Event("HubClientConfigReady", "Client config for hub is ready.") // create a kubeconfig with references to the key/cert files in the same secret kubeconfig := clientcert.BuildKubeconfig(hubClientConfig, clientcert.TLSCertFile, clientcert.TLSKeyFile) @@ -310,7 +334,7 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext hubClusterClient, hubClusterInformerFactory.Cluster().V1().ManagedClusters().Lister(), o.AgentOptions.SpokeClusterName), - controllerContext.EventRecorder, + recorder, controllerName, ) if err != nil { @@ -322,15 +346,9 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext o.AgentOptions.SpokeClusterName, hubKubeClient, hubClusterInformerFactory.Cluster().V1().ManagedClusters(), - controllerContext.EventRecorder, + recorder, ) - spokeClusterClient, err := clusterv1client.NewForConfig(spokeClientConfig) - if err != nil { - return err - } - spokeClusterInformerFactory := clusterv1informers.NewSharedInformerFactory(spokeClusterClient, 10*time.Minute) - // create NewManagedClusterStatusController to update the spoke cluster status managedClusterHealthCheckController := managedcluster.NewManagedClusterStatusController( o.AgentOptions.SpokeClusterName, @@ -341,7 +359,7 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext spokeKubeInformerFactory.Core().V1().Nodes(), o.MaxCustomClusterClaims, o.ClusterHealthCheckPeriod, - controllerContext.EventRecorder, + recorder, ) var addOnLeaseController factory.Controller @@ -355,7 +373,7 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext managementKubeClient.CoordinationV1(), spokeKubeClient.CoordinationV1(), AddOnLeaseControllerSyncInterval, //TODO: this interval time should be allowed to change from outside - controllerContext.EventRecorder, + recorder, ) addOnRegistrationController = addon.NewAddOnRegistrationController( @@ -367,15 +385,16 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext spokeKubeClient, csrControl, addOnInformerFactory.Addon().V1alpha1().ManagedClusterAddOns(), - controllerContext.EventRecorder, + recorder, ) } go hubKubeInformerFactory.Start(ctx.Done()) go hubClusterInformerFactory.Start(ctx.Done()) - go spokeKubeInformerFactory.Start(ctx.Done()) go namespacedManagementKubeInformerFactory.Start(ctx.Done()) go addOnInformerFactory.Start(ctx.Done()) + + go spokeKubeInformerFactory.Start(ctx.Done()) if features.DefaultSpokeRegistrationMutableFeatureGate.Enabled(ocmfeature.ClusterClaim) { go spokeClusterInformerFactory.Start(ctx.Done()) } @@ -489,7 +508,7 @@ func generateAgentName() string { // Normally, KubeconfigFile/TLSKeyFile/TLSCertFile will be created once the bootstrap process // completes. Changing the name of the cluster will make the existing hub kubeconfig invalid, // because certificate in TLSCertFile is issued to a specific cluster/agent. -func (o *SpokeAgentOptions) hasValidHubClientConfig() (bool, error) { +func (o *SpokeAgentOptions) hasValidHubClientConfig(ctx context.Context) (bool, error) { kubeconfigPath := path.Join(o.HubKubeconfigDir, clientcert.KubeconfigFile) if _, err := os.Stat(kubeconfigPath); os.IsNotExist(err) { klog.V(4).Infof("Kubeconfig file %q not found", kubeconfigPath) diff --git a/pkg/registration/spoke/spokeagent_test.go b/pkg/registration/spoke/spokeagent_test.go index bb501a452..61f1b2754 100644 --- a/pkg/registration/spoke/spokeagent_test.go +++ b/pkg/registration/spoke/spokeagent_test.go @@ -308,7 +308,7 @@ func TestHasValidHubClientConfig(t *testing.T) { AgentName: c.agentName, HubKubeconfigDir: tempDir, } - valid, err := options.hasValidHubClientConfig() + valid, err := options.hasValidHubClientConfig(context.TODO()) if err != nil { t.Errorf("unexpected error: %v", err) } diff --git a/pkg/work/spoke/spokeagent.go b/pkg/work/spoke/spokeagent.go index f936a4ea8..e78342e64 100644 --- a/pkg/work/spoke/spokeagent.go +++ b/pkg/work/spoke/spokeagent.go @@ -92,7 +92,7 @@ func (o *WorkloadAgentOptions) RunWorkloadAgent(ctx context.Context, controllerC // load spoke client config and create spoke clients, // the work agent may not running in the spoke/managed cluster. - spokeRestConfig, err := o.AgentOptions.SpokeKubeConfig(controllerContext) + spokeRestConfig, err := o.AgentOptions.SpokeKubeConfig(controllerContext.KubeConfig) if err != nil { return err }