Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🌱 expose the spoke informers #179

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions pkg/common/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"fmt"
"strings"

"github.com/openshift/library-go/pkg/controller/controllercmd"
"github.com/spf13/pflag"
apimachineryvalidation "k8s.io/apimachinery/pkg/api/validation"
"k8s.io/client-go/rest"
Expand Down Expand Up @@ -39,9 +38,9 @@ func (o *AgentOptions) AddFlags(flags *pflag.FlagSet) {
}

// spokeKubeConfig builds kubeconfig for the spoke/managed cluster
func (o *AgentOptions) SpokeKubeConfig(controllerContext *controllercmd.ControllerContext) (*rest.Config, error) {
func (o *AgentOptions) SpokeKubeConfig(managedRestConfig *rest.Config) (*rest.Config, error) {
if o.SpokeKubeconfigFile == "" {
return controllerContext.KubeConfig, nil
return managedRestConfig, nil
}

spokeRestConfig, err := clientcmd.BuildConfigFromFlags("" /* leave masterurl as empty */, o.SpokeKubeconfigFile)
Expand Down
83 changes: 51 additions & 32 deletions pkg/registration/spoke/spokeagent.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,15 +108,11 @@ func NewSpokeAgentOptions() *SpokeAgentOptions {
// create a valid hub kubeconfig. Once the hub kubeconfig is valid, the
// temporary controller is stopped and the main controllers are started.
func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext *controllercmd.ControllerContext) error {
// create management kube client
managementKubeClient, err := kubernetes.NewForConfig(controllerContext.KubeConfig)
if err != nil {
return err
}
kubeConfig := controllerContext.KubeConfig

// load spoke client config and create spoke clients,
// the registration agent may not running in the spoke/managed cluster.
spokeClientConfig, err := o.AgentOptions.SpokeKubeConfig(controllerContext)
spokeClientConfig, err := o.AgentOptions.SpokeKubeConfig(kubeConfig)
if err != nil {
return err
}
Expand All @@ -126,20 +122,45 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext
return err
}

spokeClusterClient, err := clusterv1client.NewForConfig(spokeClientConfig)
if err != nil {
return err
}

return o.RunSpokeAgentWithSpokeInformers(
ctx,
kubeConfig,
spokeClientConfig,
spokeKubeClient,
informers.NewSharedInformerFactory(spokeKubeClient, 10*time.Minute),
clusterv1informers.NewSharedInformerFactory(spokeClusterClient, 10*time.Minute),
controllerContext.EventRecorder,
)
}

func (o *SpokeAgentOptions) RunSpokeAgentWithSpokeInformers(ctx context.Context,
kubeConfig, spokeClientConfig *rest.Config,
spokeKubeClient kubernetes.Interface,
spokeKubeInformerFactory informers.SharedInformerFactory,
spokeClusterInformerFactory clusterv1informers.SharedInformerFactory,
recorder events.Recorder) error {
klog.Infof("Cluster name is %q and agent name is %q", o.AgentOptions.SpokeClusterName, o.AgentName)

// create management kube client
managementKubeClient, err := kubernetes.NewForConfig(kubeConfig)
if err != nil {
return err
}

// the hub kubeconfig secret stored in the cluster where the agent pod runs
if err := o.Complete(managementKubeClient.CoreV1(), ctx, controllerContext.EventRecorder); err != nil {
if err := o.Complete(managementKubeClient.CoreV1(), ctx, recorder); err != nil {
klog.Fatal(err)
}

if err := o.Validate(); err != nil {
klog.Fatal(err)
}

klog.Infof("Cluster name is %q and agent name is %q", o.AgentOptions.SpokeClusterName, o.AgentName)

// create shared informer factory for spoke cluster
spokeKubeInformerFactory := informers.NewSharedInformerFactory(spokeKubeClient, 10*time.Minute)

// get spoke cluster CA bundle
spokeClusterCABundle, err := o.getSpokeClusterCABundle(spokeClientConfig)
if err != nil {
Expand Down Expand Up @@ -168,7 +189,7 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext
o.AgentOptions.SpokeClusterName, o.SpokeExternalServerURLs,
spokeClusterCABundle,
bootstrapClusterClient,
controllerContext.EventRecorder,
recorder,
)
go spokeClusterCreatingController.Run(ctx, 1)

Expand All @@ -177,13 +198,13 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext
// the hub kubeconfig secret stored in the cluster where the agent pod runs
managementKubeClient.CoreV1(),
namespacedManagementKubeInformerFactory.Core().V1().Secrets(),
controllerContext.EventRecorder,
recorder,
)
go hubKubeconfigSecretController.Run(ctx, 1)
go namespacedManagementKubeInformerFactory.Start(ctx.Done())

// check if there already exists a valid client config for hub
ok, err := o.hasValidHubClientConfig()
ok, err := o.hasValidHubClientConfig(ctx)
if err != nil {
return err
}
Expand Down Expand Up @@ -221,7 +242,7 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext
o.ClientCertExpirationSeconds,
managementKubeClient,
registration.GenerateBootstrapStatusUpdater(),
controllerContext.EventRecorder,
recorder,
controllerName,
)

Expand All @@ -234,7 +255,7 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext

// wait for the hub client config is ready.
klog.Info("Waiting for hub client config and managed cluster to be ready")
if err := wait.PollImmediateInfinite(1*time.Second, o.hasValidHubClientConfig); err != nil {
if err := wait.PollUntilContextCancel(bootstrapCtx, 1*time.Second, true, o.hasValidHubClientConfig); err != nil {
// TODO need run the bootstrap CSR forever to re-establish the client-cert if it is ever lost.
stopBootstrap()
return err
Expand Down Expand Up @@ -273,7 +294,10 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext
}),
)
addOnInformerFactory := addoninformers.NewSharedInformerFactoryWithOptions(
addOnClient, 10*time.Minute, addoninformers.WithNamespace(o.AgentOptions.SpokeClusterName))
addOnClient,
10*time.Minute,
addoninformers.WithNamespace(o.AgentOptions.SpokeClusterName),
)
// create a cluster informer factory with name field selector because we just need to handle the current spoke cluster
hubClusterInformerFactory := clusterv1informers.NewSharedInformerFactoryWithOptions(
hubClusterClient,
Expand All @@ -283,7 +307,7 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext
}),
)

controllerContext.EventRecorder.Event("HubClientConfigReady", "Client config for hub is ready.")
recorder.Event("HubClientConfigReady", "Client config for hub is ready.")

// create a kubeconfig with references to the key/cert files in the same secret
kubeconfig := clientcert.BuildKubeconfig(hubClientConfig, clientcert.TLSCertFile, clientcert.TLSKeyFile)
Expand All @@ -310,7 +334,7 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext
hubClusterClient,
hubClusterInformerFactory.Cluster().V1().ManagedClusters().Lister(),
o.AgentOptions.SpokeClusterName),
controllerContext.EventRecorder,
recorder,
controllerName,
)
if err != nil {
Expand All @@ -322,15 +346,9 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext
o.AgentOptions.SpokeClusterName,
hubKubeClient,
hubClusterInformerFactory.Cluster().V1().ManagedClusters(),
controllerContext.EventRecorder,
recorder,
)

spokeClusterClient, err := clusterv1client.NewForConfig(spokeClientConfig)
if err != nil {
return err
}
spokeClusterInformerFactory := clusterv1informers.NewSharedInformerFactory(spokeClusterClient, 10*time.Minute)

// create NewManagedClusterStatusController to update the spoke cluster status
managedClusterHealthCheckController := managedcluster.NewManagedClusterStatusController(
o.AgentOptions.SpokeClusterName,
Expand All @@ -341,7 +359,7 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext
spokeKubeInformerFactory.Core().V1().Nodes(),
o.MaxCustomClusterClaims,
o.ClusterHealthCheckPeriod,
controllerContext.EventRecorder,
recorder,
)

var addOnLeaseController factory.Controller
Expand All @@ -355,7 +373,7 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext
managementKubeClient.CoordinationV1(),
spokeKubeClient.CoordinationV1(),
AddOnLeaseControllerSyncInterval, //TODO: this interval time should be allowed to change from outside
controllerContext.EventRecorder,
recorder,
)

addOnRegistrationController = addon.NewAddOnRegistrationController(
Expand All @@ -367,15 +385,16 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext
spokeKubeClient,
csrControl,
addOnInformerFactory.Addon().V1alpha1().ManagedClusterAddOns(),
controllerContext.EventRecorder,
recorder,
)
}

go hubKubeInformerFactory.Start(ctx.Done())
go hubClusterInformerFactory.Start(ctx.Done())
go spokeKubeInformerFactory.Start(ctx.Done())
go namespacedManagementKubeInformerFactory.Start(ctx.Done())
go addOnInformerFactory.Start(ctx.Done())

go spokeKubeInformerFactory.Start(ctx.Done())
if features.DefaultSpokeRegistrationMutableFeatureGate.Enabled(ocmfeature.ClusterClaim) {
go spokeClusterInformerFactory.Start(ctx.Done())
}
Expand Down Expand Up @@ -489,7 +508,7 @@ func generateAgentName() string {
// Normally, KubeconfigFile/TLSKeyFile/TLSCertFile will be created once the bootstrap process
// completes. Changing the name of the cluster will make the existing hub kubeconfig invalid,
// because certificate in TLSCertFile is issued to a specific cluster/agent.
func (o *SpokeAgentOptions) hasValidHubClientConfig() (bool, error) {
func (o *SpokeAgentOptions) hasValidHubClientConfig(ctx context.Context) (bool, error) {
kubeconfigPath := path.Join(o.HubKubeconfigDir, clientcert.KubeconfigFile)
if _, err := os.Stat(kubeconfigPath); os.IsNotExist(err) {
klog.V(4).Infof("Kubeconfig file %q not found", kubeconfigPath)
Expand Down
2 changes: 1 addition & 1 deletion pkg/registration/spoke/spokeagent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ func TestHasValidHubClientConfig(t *testing.T) {
AgentName: c.agentName,
HubKubeconfigDir: tempDir,
}
valid, err := options.hasValidHubClientConfig()
valid, err := options.hasValidHubClientConfig(context.TODO())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/work/spoke/spokeagent.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (o *WorkloadAgentOptions) RunWorkloadAgent(ctx context.Context, controllerC

// load spoke client config and create spoke clients,
// the work agent may not running in the spoke/managed cluster.
spokeRestConfig, err := o.AgentOptions.SpokeKubeConfig(controllerContext)
spokeRestConfig, err := o.AgentOptions.SpokeKubeConfig(controllerContext.KubeConfig)
if err != nil {
return err
}
Expand Down