Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ configurable controller replicas and master node selector #468

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pkg/cmd/hub/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ func NewHubOperatorCmd() *cobra.Command {

flags := cmd.Flags()
flags.BoolVar(&cmOptions.SkipRemoveCRDs, "skip-remove-crds", false, "Skip removing CRDs while ClusterManager is deleting.")
flags.StringVar(&cmOptions.ControlPlaneNodeLabelSelector, "control-plane-node-label-selector",
"node-role.kubernetes.io/master=", "control plane node labels, "+
"e.g. 'environment=production', 'tier notin (frontend,backend)'")
flags.Int32Var(&cmOptions.DeploymentReplicas, "deployment-replicas", 0,
"Number of deployment replicas, operator will automatically determine replicas if not set")
opts.AddFlags(flags)
return cmd
}
5 changes: 5 additions & 0 deletions pkg/cmd/spoke/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ func NewKlusterletOperatorCmd() *cobra.Command {
cmd.Flags().BoolVar(&klOptions.SkipPlaceholderHubSecret, "skip-placeholder-hub-secret", false,
"If set, will skip ensuring a placeholder hub secret which is originally intended for pulling "+
"work image before approved")
cmd.Flags().StringVar(&klOptions.ControlPlaneNodeLabelSelector, "control-plane-node-label-selector",
"node-role.kubernetes.io/master=", "control plane node labels, "+
"e.g. 'environment=production', 'tier notin (frontend,backend)'")
cmd.Flags().Int32Var(&klOptions.DeploymentReplicas, "deployment-replicas", 0,
"Number of deployment replicas, operator will automatically determine replicas if not set")
opts.AddFlags(flags)

return cmd
Expand Down
9 changes: 5 additions & 4 deletions pkg/operator/helpers/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,8 @@ func LoadClientConfigFromSecret(secret *corev1.Secret) (*rest.Config, error) {
// - kube version: if the kube version is less than v1.14 reutn 1
// - node: list master nodes in the cluster and return 1 if the
// number of master nodes is equal or less than 1. Return 3 otherwise.
func DetermineReplica(ctx context.Context, kubeClient kubernetes.Interface, mode operatorapiv1.InstallMode, kubeVersion *version.Version) int32 {
func DetermineReplica(ctx context.Context, kubeClient kubernetes.Interface, mode operatorapiv1.InstallMode, kubeVersion *version.Version,
controlPlaneNodeLabelSelector string) int32 {
// For hosted mode, there may be many cluster-manager/klusterlet running on the management cluster,
// set the replica to 1 to reduce the footprint of the management cluster.
if IsHosted(mode) {
Expand All @@ -451,14 +452,14 @@ func DetermineReplica(ctx context.Context, kubeClient kubernetes.Interface, mode
}
}

return DetermineReplicaByNodes(ctx, kubeClient)
return DetermineReplicaByNodes(ctx, kubeClient, controlPlaneNodeLabelSelector)
}

// DetermineReplicaByNodes determines the replica of deployment based on:
// list master nodes in the cluster and return 1 if
// the number of master nodes is equal or less than 1. Return 3 otherwise.
func DetermineReplicaByNodes(ctx context.Context, kubeClient kubernetes.Interface) int32 {
nodes, err := kubeClient.CoreV1().Nodes().List(ctx, metav1.ListOptions{LabelSelector: "node-role.kubernetes.io/master="})
func DetermineReplicaByNodes(ctx context.Context, kubeClient kubernetes.Interface, controlPlaneNodeLabelSelector string) int32 {
nodes, err := kubeClient.CoreV1().Nodes().List(ctx, metav1.ListOptions{LabelSelector: controlPlaneNodeLabelSelector})
if err != nil {
return defaultReplica
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/helpers/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ func TestDeterminReplica(t *testing.T) {
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
fakeKubeClient := fakekube.NewSimpleClientset(c.existingNodes...)
replica := DetermineReplica(context.Background(), fakeKubeClient, c.mode, c.kubeVersion)
replica := DetermineReplica(context.Background(), fakeKubeClient, c.mode, c.kubeVersion, "node-role.kubernetes.io/master=")
if replica != c.expectedReplica {
t.Errorf("Unexpected replica, actual: %d, expected: %d", replica, c.expectedReplica)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,10 @@ type clusterManagerController struct {
mwctrEnabled, addonManagerEnabled bool) error
generateHubClusterClients func(hubConfig *rest.Config) (kubernetes.Interface, apiextensionsclient.Interface,
migrationclient.StorageVersionMigrationsGetter, error)
skipRemoveCRDs bool
operatorNamespace string
skipRemoveCRDs bool
controlPlaneNodeLabelSelector string
deploymentReplicas int32
operatorNamespace string
}

type clusterManagerReconcile interface {
Expand All @@ -84,6 +86,8 @@ func NewClusterManagerController(
configMapInformer corev1informers.ConfigMapInformer,
recorder events.Recorder,
skipRemoveCRDs bool,
controlPlaneNodeLabelSelector string,
deploymentReplicas int32,
operatorNamespace string,
) factory.Controller {
controller := &clusterManagerController{
Expand All @@ -92,14 +96,16 @@ func NewClusterManagerController(
patcher: patcher.NewPatcher[
*operatorapiv1.ClusterManager, operatorapiv1.ClusterManagerSpec, operatorapiv1.ClusterManagerStatus](
clusterManagerClient),
clusterManagerLister: clusterManagerInformer.Lister(),
configMapLister: configMapInformer.Lister(),
recorder: recorder,
generateHubClusterClients: generateHubClients,
ensureSAKubeconfigs: ensureSAKubeconfigs,
cache: resourceapply.NewResourceCache(),
skipRemoveCRDs: skipRemoveCRDs,
operatorNamespace: operatorNamespace,
clusterManagerLister: clusterManagerInformer.Lister(),
configMapLister: configMapInformer.Lister(),
recorder: recorder,
generateHubClusterClients: generateHubClients,
ensureSAKubeconfigs: ensureSAKubeconfigs,
cache: resourceapply.NewResourceCache(),
skipRemoveCRDs: skipRemoveCRDs,
controlPlaneNodeLabelSelector: controlPlaneNodeLabelSelector,
deploymentReplicas: deploymentReplicas,
operatorNamespace: operatorNamespace,
}

return factory.New().WithSync(controller.sync).
Expand Down Expand Up @@ -141,6 +147,11 @@ func (n *clusterManagerController) sync(ctx context.Context, controllerContext f
workDriver = clusterManager.Spec.WorkConfiguration.WorkDriver
}

replica := n.deploymentReplicas
if replica <= 0 {
replica = helpers.DetermineReplica(ctx, n.operatorKubeClient, clusterManager.Spec.DeployOption.Mode, nil, n.controlPlaneNodeLabelSelector)
}

// This config is used to render template of manifests.
config := manifests.HubConfig{
ClusterManagerName: clusterManager.Name,
Expand All @@ -149,7 +160,7 @@ func (n *clusterManagerController) sync(ctx context.Context, controllerContext f
WorkImage: clusterManager.Spec.WorkImagePullSpec,
PlacementImage: clusterManager.Spec.PlacementImagePullSpec,
AddOnManagerImage: clusterManager.Spec.AddOnManagerImagePullSpec,
Replica: helpers.DetermineReplica(ctx, n.operatorKubeClient, clusterManager.Spec.DeployOption.Mode, nil),
Replica: replica,
HostedMode: clusterManager.Spec.DeployOption.Mode == operatorapiv1.InstallModeHosted,
RegistrationWebhook: manifests.Webhook{
Port: defaultWebhookPort,
Expand Down
6 changes: 5 additions & 1 deletion pkg/operator/operators/clustermanager/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ import (
)

type Options struct {
SkipRemoveCRDs bool
SkipRemoveCRDs bool
ControlPlaneNodeLabelSelector string
DeploymentReplicas int32
}

// RunClusterManagerOperator starts a new cluster manager operator
Expand Down Expand Up @@ -75,6 +77,8 @@ func (o *Options) RunClusterManagerOperator(ctx context.Context, controllerConte
kubeInformer.Core().V1().ConfigMaps(),
controllerContext.EventRecorder,
o.SkipRemoveCRDs,
o.ControlPlaneNodeLabelSelector,
o.DeploymentReplicas,
controllerContext.OperatorNamespace,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,14 @@ import (
)

type klusterletCleanupController struct {
patcher patcher.Patcher[*operatorapiv1.Klusterlet, operatorapiv1.KlusterletSpec, operatorapiv1.KlusterletStatus]
klusterletLister operatorlister.KlusterletLister
kubeClient kubernetes.Interface
kubeVersion *version.Version
operatorNamespace string
managedClusterClientsBuilder managedClusterClientsBuilderInterface
patcher patcher.Patcher[*operatorapiv1.Klusterlet, operatorapiv1.KlusterletSpec, operatorapiv1.KlusterletStatus]
klusterletLister operatorlister.KlusterletLister
kubeClient kubernetes.Interface
kubeVersion *version.Version
operatorNamespace string
managedClusterClientsBuilder managedClusterClientsBuilderInterface
controlPlaneNodeLabelSelector string
deploymentReplicas int32
}

// NewKlusterletCleanupController construct klusterlet cleanup controller
Expand All @@ -51,16 +53,20 @@ func NewKlusterletCleanupController(
appliedManifestWorkClient workv1client.AppliedManifestWorkInterface,
kubeVersion *version.Version,
operatorNamespace string,
controlPlaneNodeLabelSelector string,
deploymentReplicas int32,
recorder events.Recorder) factory.Controller {
controller := &klusterletCleanupController{
kubeClient: kubeClient,
patcher: patcher.NewPatcher[
*operatorapiv1.Klusterlet, operatorapiv1.KlusterletSpec, operatorapiv1.KlusterletStatus](klusterletClient).
WithOptions(patcher.PatchOptions{IgnoreResourceVersion: true}),
klusterletLister: klusterletInformer.Lister(),
kubeVersion: kubeVersion,
operatorNamespace: operatorNamespace,
managedClusterClientsBuilder: newManagedClusterClientsBuilder(kubeClient, apiExtensionClient, appliedManifestWorkClient, recorder),
klusterletLister: klusterletInformer.Lister(),
kubeVersion: kubeVersion,
operatorNamespace: operatorNamespace,
managedClusterClientsBuilder: newManagedClusterClientsBuilder(kubeClient, apiExtensionClient, appliedManifestWorkClient, recorder),
controlPlaneNodeLabelSelector: controlPlaneNodeLabelSelector,
deploymentReplicas: deploymentReplicas,
}

return factory.New().WithSync(controller.sync).
Expand Down Expand Up @@ -93,6 +99,10 @@ func (n *klusterletCleanupController) sync(ctx context.Context, controllerContex
_, err := n.patcher.AddFinalizer(ctx, klusterlet, desiredFinalizers...)
return err
}
replica := n.deploymentReplicas
if replica <= 0 {
replica = helpers.DetermineReplica(ctx, n.kubeClient, klusterlet.Spec.DeployOption.Mode, n.kubeVersion, n.controlPlaneNodeLabelSelector)
}
// Klusterlet is deleting, we remove its related resources on managed and management cluster
config := klusterletConfig{
KlusterletName: klusterlet.Name,
Expand All @@ -105,7 +115,7 @@ func (n *klusterletCleanupController) sync(ctx context.Context, controllerContex
HubKubeConfigSecret: helpers.HubKubeConfig,
ExternalServerURL: getServersFromKlusterlet(klusterlet),
OperatorNamespace: n.operatorNamespace,
Replica: helpers.DetermineReplica(ctx, n.kubeClient, klusterlet.Spec.DeployOption.Mode, n.kubeVersion),
Replica: replica,

ExternalManagedKubeConfigSecret: helpers.ExternalManagedKubeConfig,
ExternalManagedKubeConfigRegistrationSecret: helpers.ExternalManagedKubeConfigRegistration,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,16 @@ const (
)

type klusterletController struct {
patcher patcher.Patcher[*operatorapiv1.Klusterlet, operatorapiv1.KlusterletSpec, operatorapiv1.KlusterletStatus]
klusterletLister operatorlister.KlusterletLister
kubeClient kubernetes.Interface
kubeVersion *version.Version
operatorNamespace string
skipHubSecretPlaceholder bool
cache resourceapply.ResourceCache
managedClusterClientsBuilder managedClusterClientsBuilderInterface
patcher patcher.Patcher[*operatorapiv1.Klusterlet, operatorapiv1.KlusterletSpec, operatorapiv1.KlusterletStatus]
klusterletLister operatorlister.KlusterletLister
kubeClient kubernetes.Interface
kubeVersion *version.Version
operatorNamespace string
skipHubSecretPlaceholder bool
cache resourceapply.ResourceCache
managedClusterClientsBuilder managedClusterClientsBuilderInterface
controlPlaneNodeLabelSelector string
deploymentReplicas int32
}

type klusterletReconcile interface {
Expand All @@ -78,18 +80,22 @@ func NewKlusterletController(
appliedManifestWorkClient workv1client.AppliedManifestWorkInterface,
kubeVersion *version.Version,
operatorNamespace string,
controlPlaneNodeLabelSelector string,
deploymentReplicas int32,
recorder events.Recorder,
skipHubSecretPlaceholder bool) factory.Controller {
controller := &klusterletController{
kubeClient: kubeClient,
patcher: patcher.NewPatcher[
*operatorapiv1.Klusterlet, operatorapiv1.KlusterletSpec, operatorapiv1.KlusterletStatus](klusterletClient),
klusterletLister: klusterletInformer.Lister(),
kubeVersion: kubeVersion,
operatorNamespace: operatorNamespace,
skipHubSecretPlaceholder: skipHubSecretPlaceholder,
cache: resourceapply.NewResourceCache(),
managedClusterClientsBuilder: newManagedClusterClientsBuilder(kubeClient, apiExtensionClient, appliedManifestWorkClient, recorder),
klusterletLister: klusterletInformer.Lister(),
kubeVersion: kubeVersion,
operatorNamespace: operatorNamespace,
skipHubSecretPlaceholder: skipHubSecretPlaceholder,
cache: resourceapply.NewResourceCache(),
managedClusterClientsBuilder: newManagedClusterClientsBuilder(kubeClient, apiExtensionClient, appliedManifestWorkClient, recorder),
controlPlaneNodeLabelSelector: controlPlaneNodeLabelSelector,
deploymentReplicas: deploymentReplicas,
}

return factory.New().WithSync(controller.sync).
Expand Down Expand Up @@ -179,6 +185,11 @@ func (n *klusterletController) sync(ctx context.Context, controllerContext facto
return err
}

replica := n.deploymentReplicas
if replica <= 0 {
replica = helpers.DetermineReplica(ctx, n.kubeClient, klusterlet.Spec.DeployOption.Mode, n.kubeVersion, n.controlPlaneNodeLabelSelector)
}

config := klusterletConfig{
KlusterletName: klusterlet.Name,
KlusterletNamespace: helpers.KlusterletNamespace(klusterlet),
Expand All @@ -192,7 +203,7 @@ func (n *klusterletController) sync(ctx context.Context, controllerContext facto
HubKubeConfigSecret: helpers.HubKubeConfig,
ExternalServerURL: getServersFromKlusterlet(klusterlet),
OperatorNamespace: n.operatorNamespace,
Replica: helpers.DetermineReplica(ctx, n.kubeClient, klusterlet.Spec.DeployOption.Mode, n.kubeVersion),
Replica: replica,
PriorityClassName: helpers.AgentPriorityClassName(klusterlet, n.kubeVersion),
AppliedManifestWorkEvictionGracePeriod: getAppliedManifestWorkEvictionGracePeriod(klusterlet),

Expand Down
8 changes: 7 additions & 1 deletion pkg/operator/operators/klusterlet/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ import (
)

type Options struct {
SkipPlaceholderHubSecret bool
SkipPlaceholderHubSecret bool
ControlPlaneNodeLabelSelector string
DeploymentReplicas int32
}

// RunKlusterletOperator starts a new klusterlet operator
Expand Down Expand Up @@ -96,6 +98,8 @@ func (o *Options) RunKlusterletOperator(ctx context.Context, controllerContext *
workClient.WorkV1().AppliedManifestWorks(),
kubeVersion,
helpers.GetOperatorNamespace(),
o.ControlPlaneNodeLabelSelector,
o.DeploymentReplicas,
controllerContext.EventRecorder,
o.SkipPlaceholderHubSecret)

Expand All @@ -109,6 +113,8 @@ func (o *Options) RunKlusterletOperator(ctx context.Context, controllerContext *
workClient.WorkV1().AppliedManifestWorks(),
kubeVersion,
helpers.GetOperatorNamespace(),
o.ControlPlaneNodeLabelSelector,
o.DeploymentReplicas,
controllerContext.EventRecorder)

ssarController := ssarcontroller.NewKlusterletSSARController(
Expand Down
Loading