Skip to content

Commit

Permalink
expose the spoke informers (#179)
Browse files Browse the repository at this point in the history
Signed-off-by: Wei Liu <liuweixa@redhat.com>
  • Loading branch information
skeeey authored Jun 13, 2023
1 parent 7332a58 commit b55881d
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 37 deletions.
5 changes: 2 additions & 3 deletions pkg/common/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"fmt"
"strings"

"github.com/openshift/library-go/pkg/controller/controllercmd"
"github.com/spf13/pflag"
apimachineryvalidation "k8s.io/apimachinery/pkg/api/validation"
"k8s.io/client-go/rest"
Expand Down Expand Up @@ -39,9 +38,9 @@ func (o *AgentOptions) AddFlags(flags *pflag.FlagSet) {
}

// spokeKubeConfig builds kubeconfig for the spoke/managed cluster
func (o *AgentOptions) SpokeKubeConfig(controllerContext *controllercmd.ControllerContext) (*rest.Config, error) {
func (o *AgentOptions) SpokeKubeConfig(managedRestConfig *rest.Config) (*rest.Config, error) {
if o.SpokeKubeconfigFile == "" {
return controllerContext.KubeConfig, nil
return managedRestConfig, nil
}

spokeRestConfig, err := clientcmd.BuildConfigFromFlags("" /* leave masterurl as empty */, o.SpokeKubeconfigFile)
Expand Down
83 changes: 51 additions & 32 deletions pkg/registration/spoke/spokeagent.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,15 +108,11 @@ func NewSpokeAgentOptions() *SpokeAgentOptions {
// create a valid hub kubeconfig. Once the hub kubeconfig is valid, the
// temporary controller is stopped and the main controllers are started.
func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext *controllercmd.ControllerContext) error {
// create management kube client
managementKubeClient, err := kubernetes.NewForConfig(controllerContext.KubeConfig)
if err != nil {
return err
}
kubeConfig := controllerContext.KubeConfig

// load spoke client config and create spoke clients,
// the registration agent may not running in the spoke/managed cluster.
spokeClientConfig, err := o.AgentOptions.SpokeKubeConfig(controllerContext)
spokeClientConfig, err := o.AgentOptions.SpokeKubeConfig(kubeConfig)
if err != nil {
return err
}
Expand All @@ -126,20 +122,45 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext
return err
}

spokeClusterClient, err := clusterv1client.NewForConfig(spokeClientConfig)
if err != nil {
return err
}

return o.RunSpokeAgentWithSpokeInformers(
ctx,
kubeConfig,
spokeClientConfig,
spokeKubeClient,
informers.NewSharedInformerFactory(spokeKubeClient, 10*time.Minute),
clusterv1informers.NewSharedInformerFactory(spokeClusterClient, 10*time.Minute),
controllerContext.EventRecorder,
)
}

func (o *SpokeAgentOptions) RunSpokeAgentWithSpokeInformers(ctx context.Context,
kubeConfig, spokeClientConfig *rest.Config,
spokeKubeClient kubernetes.Interface,
spokeKubeInformerFactory informers.SharedInformerFactory,
spokeClusterInformerFactory clusterv1informers.SharedInformerFactory,
recorder events.Recorder) error {
klog.Infof("Cluster name is %q and agent name is %q", o.AgentOptions.SpokeClusterName, o.AgentName)

// create management kube client
managementKubeClient, err := kubernetes.NewForConfig(kubeConfig)
if err != nil {
return err
}

// the hub kubeconfig secret stored in the cluster where the agent pod runs
if err := o.Complete(managementKubeClient.CoreV1(), ctx, controllerContext.EventRecorder); err != nil {
if err := o.Complete(managementKubeClient.CoreV1(), ctx, recorder); err != nil {
klog.Fatal(err)
}

if err := o.Validate(); err != nil {
klog.Fatal(err)
}

klog.Infof("Cluster name is %q and agent name is %q", o.AgentOptions.SpokeClusterName, o.AgentName)

// create shared informer factory for spoke cluster
spokeKubeInformerFactory := informers.NewSharedInformerFactory(spokeKubeClient, 10*time.Minute)

// get spoke cluster CA bundle
spokeClusterCABundle, err := o.getSpokeClusterCABundle(spokeClientConfig)
if err != nil {
Expand Down Expand Up @@ -168,7 +189,7 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext
o.AgentOptions.SpokeClusterName, o.SpokeExternalServerURLs,
spokeClusterCABundle,
bootstrapClusterClient,
controllerContext.EventRecorder,
recorder,
)
go spokeClusterCreatingController.Run(ctx, 1)

Expand All @@ -177,13 +198,13 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext
// the hub kubeconfig secret stored in the cluster where the agent pod runs
managementKubeClient.CoreV1(),
namespacedManagementKubeInformerFactory.Core().V1().Secrets(),
controllerContext.EventRecorder,
recorder,
)
go hubKubeconfigSecretController.Run(ctx, 1)
go namespacedManagementKubeInformerFactory.Start(ctx.Done())

// check if there already exists a valid client config for hub
ok, err := o.hasValidHubClientConfig()
ok, err := o.hasValidHubClientConfig(ctx)
if err != nil {
return err
}
Expand Down Expand Up @@ -221,7 +242,7 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext
o.ClientCertExpirationSeconds,
managementKubeClient,
registration.GenerateBootstrapStatusUpdater(),
controllerContext.EventRecorder,
recorder,
controllerName,
)

Expand All @@ -234,7 +255,7 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext

// wait for the hub client config is ready.
klog.Info("Waiting for hub client config and managed cluster to be ready")
if err := wait.PollImmediateInfinite(1*time.Second, o.hasValidHubClientConfig); err != nil {
if err := wait.PollUntilContextCancel(bootstrapCtx, 1*time.Second, true, o.hasValidHubClientConfig); err != nil {
// TODO need run the bootstrap CSR forever to re-establish the client-cert if it is ever lost.
stopBootstrap()
return err
Expand Down Expand Up @@ -273,7 +294,10 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext
}),
)
addOnInformerFactory := addoninformers.NewSharedInformerFactoryWithOptions(
addOnClient, 10*time.Minute, addoninformers.WithNamespace(o.AgentOptions.SpokeClusterName))
addOnClient,
10*time.Minute,
addoninformers.WithNamespace(o.AgentOptions.SpokeClusterName),
)
// create a cluster informer factory with name field selector because we just need to handle the current spoke cluster
hubClusterInformerFactory := clusterv1informers.NewSharedInformerFactoryWithOptions(
hubClusterClient,
Expand All @@ -283,7 +307,7 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext
}),
)

controllerContext.EventRecorder.Event("HubClientConfigReady", "Client config for hub is ready.")
recorder.Event("HubClientConfigReady", "Client config for hub is ready.")

// create a kubeconfig with references to the key/cert files in the same secret
kubeconfig := clientcert.BuildKubeconfig(hubClientConfig, clientcert.TLSCertFile, clientcert.TLSKeyFile)
Expand All @@ -310,7 +334,7 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext
hubClusterClient,
hubClusterInformerFactory.Cluster().V1().ManagedClusters().Lister(),
o.AgentOptions.SpokeClusterName),
controllerContext.EventRecorder,
recorder,
controllerName,
)
if err != nil {
Expand All @@ -322,15 +346,9 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext
o.AgentOptions.SpokeClusterName,
hubKubeClient,
hubClusterInformerFactory.Cluster().V1().ManagedClusters(),
controllerContext.EventRecorder,
recorder,
)

spokeClusterClient, err := clusterv1client.NewForConfig(spokeClientConfig)
if err != nil {
return err
}
spokeClusterInformerFactory := clusterv1informers.NewSharedInformerFactory(spokeClusterClient, 10*time.Minute)

// create NewManagedClusterStatusController to update the spoke cluster status
managedClusterHealthCheckController := managedcluster.NewManagedClusterStatusController(
o.AgentOptions.SpokeClusterName,
Expand All @@ -341,7 +359,7 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext
spokeKubeInformerFactory.Core().V1().Nodes(),
o.MaxCustomClusterClaims,
o.ClusterHealthCheckPeriod,
controllerContext.EventRecorder,
recorder,
)

var addOnLeaseController factory.Controller
Expand All @@ -355,7 +373,7 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext
managementKubeClient.CoordinationV1(),
spokeKubeClient.CoordinationV1(),
AddOnLeaseControllerSyncInterval, //TODO: this interval time should be allowed to change from outside
controllerContext.EventRecorder,
recorder,
)

addOnRegistrationController = addon.NewAddOnRegistrationController(
Expand All @@ -367,15 +385,16 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext
spokeKubeClient,
csrControl,
addOnInformerFactory.Addon().V1alpha1().ManagedClusterAddOns(),
controllerContext.EventRecorder,
recorder,
)
}

go hubKubeInformerFactory.Start(ctx.Done())
go hubClusterInformerFactory.Start(ctx.Done())
go spokeKubeInformerFactory.Start(ctx.Done())
go namespacedManagementKubeInformerFactory.Start(ctx.Done())
go addOnInformerFactory.Start(ctx.Done())

go spokeKubeInformerFactory.Start(ctx.Done())
if features.DefaultSpokeRegistrationMutableFeatureGate.Enabled(ocmfeature.ClusterClaim) {
go spokeClusterInformerFactory.Start(ctx.Done())
}
Expand Down Expand Up @@ -489,7 +508,7 @@ func generateAgentName() string {
// Normally, KubeconfigFile/TLSKeyFile/TLSCertFile will be created once the bootstrap process
// completes. Changing the name of the cluster will make the existing hub kubeconfig invalid,
// because certificate in TLSCertFile is issued to a specific cluster/agent.
func (o *SpokeAgentOptions) hasValidHubClientConfig() (bool, error) {
func (o *SpokeAgentOptions) hasValidHubClientConfig(ctx context.Context) (bool, error) {
kubeconfigPath := path.Join(o.HubKubeconfigDir, clientcert.KubeconfigFile)
if _, err := os.Stat(kubeconfigPath); os.IsNotExist(err) {
klog.V(4).Infof("Kubeconfig file %q not found", kubeconfigPath)
Expand Down
2 changes: 1 addition & 1 deletion pkg/registration/spoke/spokeagent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ func TestHasValidHubClientConfig(t *testing.T) {
AgentName: c.agentName,
HubKubeconfigDir: tempDir,
}
valid, err := options.hasValidHubClientConfig()
valid, err := options.hasValidHubClientConfig(context.TODO())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/work/spoke/spokeagent.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (o *WorkloadAgentOptions) RunWorkloadAgent(ctx context.Context, controllerC

// load spoke client config and create spoke clients,
// the work agent may not running in the spoke/managed cluster.
spokeRestConfig, err := o.AgentOptions.SpokeKubeConfig(controllerContext)
spokeRestConfig, err := o.AgentOptions.SpokeKubeConfig(controllerContext.KubeConfig)
if err != nil {
return err
}
Expand Down

0 comments on commit b55881d

Please sign in to comment.