Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🌱 Add separate concurrency flag for cluster cache tracker #9116

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 26 additions & 21 deletions bootstrap/kubeadm/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,26 +76,27 @@ func init() {
}

var (
metricsBindAddr string
enableLeaderElection bool
leaderElectionLeaseDuration time.Duration
leaderElectionRenewDeadline time.Duration
leaderElectionRetryPeriod time.Duration
watchFilterValue string
watchNamespace string
profilerAddress string
enableContentionProfiling bool
clusterConcurrency int
kubeadmConfigConcurrency int
syncPeriod time.Duration
restConfigQPS float32
restConfigBurst int
webhookPort int
webhookCertDir string
healthAddr string
tokenTTL time.Duration
tlsOptions = flags.TLSOptions{}
logOptions = logs.NewOptions()
metricsBindAddr string
enableLeaderElection bool
leaderElectionLeaseDuration time.Duration
leaderElectionRenewDeadline time.Duration
leaderElectionRetryPeriod time.Duration
watchFilterValue string
watchNamespace string
profilerAddress string
enableContentionProfiling bool
clusterConcurrency int
clusterCacheTrackerConcurrency int
kubeadmConfigConcurrency int
syncPeriod time.Duration
restConfigQPS float32
restConfigBurst int
webhookPort int
webhookCertDir string
healthAddr string
tokenTTL time.Duration
tlsOptions = flags.TLSOptions{}
logOptions = logs.NewOptions()
)

// InitFlags initializes this manager's flags.
Expand Down Expand Up @@ -128,6 +129,10 @@ func InitFlags(fs *pflag.FlagSet) {

fs.IntVar(&clusterConcurrency, "cluster-concurrency", 10,
chrischdi marked this conversation as resolved.
Show resolved Hide resolved
"Number of clusters to process simultaneously")
_ = fs.MarkDeprecated("cluster-concurrency", "This flag has no function anymore and is going to be removed in a next release. Use \"--clustercachetracker-concurrency\" instead.")

fs.IntVar(&clusterCacheTrackerConcurrency, "clustercachetracker-concurrency", 10,
"Number of clusters to process simultaneously")

fs.IntVar(&kubeadmConfigConcurrency, "kubeadmconfig-concurrency", 10,
"Number of kubeadm configs to process simultaneously")
Expand Down Expand Up @@ -307,7 +312,7 @@ func setupReconcilers(ctx context.Context, mgr ctrl.Manager) {
Client: mgr.GetClient(),
Tracker: tracker,
WatchFilterValue: watchFilterValue,
}).SetupWithManager(ctx, mgr, concurrency(clusterConcurrency)); err != nil {
}).SetupWithManager(ctx, mgr, concurrency(clusterCacheTrackerConcurrency)); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "ClusterCacheReconciler")
os.Exit(1)
}
Expand Down
6 changes: 5 additions & 1 deletion controlplane/kubeadm/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ var (
profilerAddress string
enableContentionProfiling bool
kubeadmControlPlaneConcurrency int
clusterCacheTrackerConcurrency int
syncPeriod time.Duration
restConfigQPS float32
restConfigBurst int
Expand Down Expand Up @@ -134,6 +135,9 @@ func InitFlags(fs *pflag.FlagSet) {
fs.IntVar(&kubeadmControlPlaneConcurrency, "kubeadmcontrolplane-concurrency", 10,
"Number of kubeadm control planes to process simultaneously")

fs.IntVar(&clusterCacheTrackerConcurrency, "clustercachetracker-concurrency", 10,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clustercachetracker is an implementation detail that's now being exposed to users, could we find a name like child-cluster-concurrency or workload-cluster-concurrency?

Copy link
Member

@sbueringer sbueringer Aug 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't think of a better name that expresses what we do :). So I thought it's a good idea to use something that expresses ~ the name of the controller.

We can use something like workload-cluster-concurrency but it seems misleading to be honest. It's not actually the concurrency of how many workload clusters we can reconcile. Also all our clusters are workload clusters.

Is something like clustercache-concurrency better? It doesn't point directly to CCT but it still expresses somewhat that it's the concurrency of the cache we use for clusters.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vincepri wdyt about dropping the flag and just hard-coding to 10? 10 should be enough anyway so there is no need for a flag.

I just wanted to avoid that the cct is using a lot of workers if someone needs more workers for the regular cluster reconcilers

"Number of clusters to process simultaneously")

fs.DurationVar(&syncPeriod, "sync-period", 10*time.Minute,
"The minimum interval at which watched resources are reconciled (e.g. 15m)")

Expand Down Expand Up @@ -320,7 +324,7 @@ func setupReconcilers(ctx context.Context, mgr ctrl.Manager) {
Client: mgr.GetClient(),
Tracker: tracker,
WatchFilterValue: watchFilterValue,
}).SetupWithManager(ctx, mgr, concurrency(kubeadmControlPlaneConcurrency)); err != nil {
}).SetupWithManager(ctx, mgr, concurrency(clusterCacheTrackerConcurrency)); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "ClusterCacheReconciler")
os.Exit(1)
}
Expand Down
62 changes: 33 additions & 29 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,34 +82,35 @@ var (
controllerName = "cluster-api-controller-manager"

// flags.
metricsBindAddr string
enableLeaderElection bool
leaderElectionLeaseDuration time.Duration
leaderElectionRenewDeadline time.Duration
leaderElectionRetryPeriod time.Duration
watchNamespace string
watchFilterValue string
profilerAddress string
enableContentionProfiling bool
clusterTopologyConcurrency int
clusterClassConcurrency int
clusterConcurrency int
extensionConfigConcurrency int
machineConcurrency int
machineSetConcurrency int
machineDeploymentConcurrency int
machinePoolConcurrency int
clusterResourceSetConcurrency int
machineHealthCheckConcurrency int
syncPeriod time.Duration
restConfigQPS float32
restConfigBurst int
nodeDrainClientTimeout time.Duration
webhookPort int
webhookCertDir string
healthAddr string
tlsOptions = flags.TLSOptions{}
logOptions = logs.NewOptions()
metricsBindAddr string
enableLeaderElection bool
leaderElectionLeaseDuration time.Duration
leaderElectionRenewDeadline time.Duration
leaderElectionRetryPeriod time.Duration
watchNamespace string
watchFilterValue string
profilerAddress string
enableContentionProfiling bool
clusterTopologyConcurrency int
clusterCacheTrackerConcurrency int
clusterClassConcurrency int
clusterConcurrency int
extensionConfigConcurrency int
machineConcurrency int
machineSetConcurrency int
machineDeploymentConcurrency int
machinePoolConcurrency int
clusterResourceSetConcurrency int
machineHealthCheckConcurrency int
syncPeriod time.Duration
restConfigQPS float32
restConfigBurst int
nodeDrainClientTimeout time.Duration
webhookPort int
webhookCertDir string
healthAddr string
tlsOptions = flags.TLSOptions{}
logOptions = logs.NewOptions()
)

func init() {
Expand Down Expand Up @@ -177,6 +178,9 @@ func InitFlags(fs *pflag.FlagSet) {
fs.IntVar(&clusterConcurrency, "cluster-concurrency", 10,
sbueringer marked this conversation as resolved.
Show resolved Hide resolved
"Number of clusters to process simultaneously")

fs.IntVar(&clusterCacheTrackerConcurrency, "clustercachetracker-concurrency", 10,
"Number of clusters to process simultaneously")

fs.IntVar(&extensionConfigConcurrency, "extensionconfig-concurrency", 10,
"Number of extension configs to process simultaneously")

Expand Down Expand Up @@ -394,7 +398,7 @@ func setupReconcilers(ctx context.Context, mgr ctrl.Manager) {
Client: mgr.GetClient(),
Tracker: tracker,
WatchFilterValue: watchFilterValue,
}).SetupWithManager(ctx, mgr, concurrency(clusterConcurrency)); err != nil {
}).SetupWithManager(ctx, mgr, concurrency(clusterCacheTrackerConcurrency)); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "ClusterCacheReconciler")
os.Exit(1)
}
Expand Down
42 changes: 23 additions & 19 deletions test/infrastructure/docker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,24 +68,25 @@ var (
controllerName = "cluster-api-docker-controller-manager"

// flags.
metricsBindAddr string
enableLeaderElection bool
leaderElectionLeaseDuration time.Duration
leaderElectionRenewDeadline time.Duration
leaderElectionRetryPeriod time.Duration
watchNamespace string
watchFilterValue string
profilerAddress string
enableContentionProfiling bool
concurrency int
syncPeriod time.Duration
restConfigQPS float32
restConfigBurst int
webhookPort int
webhookCertDir string
healthAddr string
tlsOptions = flags.TLSOptions{}
logOptions = logs.NewOptions()
metricsBindAddr string
enableLeaderElection bool
leaderElectionLeaseDuration time.Duration
leaderElectionRenewDeadline time.Duration
leaderElectionRetryPeriod time.Duration
watchNamespace string
watchFilterValue string
profilerAddress string
enableContentionProfiling bool
concurrency int
clusterCacheTrackerConcurrency int
syncPeriod time.Duration
restConfigQPS float32
restConfigBurst int
webhookPort int
webhookCertDir string
healthAddr string
tlsOptions = flags.TLSOptions{}
logOptions = logs.NewOptions()
)

func init() {
Expand Down Expand Up @@ -135,6 +136,9 @@ func initFlags(fs *pflag.FlagSet) {
fs.IntVar(&concurrency, "concurrency", 10,
"The number of docker machines to process simultaneously")

fs.IntVar(&clusterCacheTrackerConcurrency, "clustercachetracker-concurrency", 10,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this isn't set, could we actually default to concurrency? It makes some sense to keep these in sync

Copy link
Member

@sbueringer sbueringer Aug 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think usually you wouldn't use the same number of workers. Usually you need more workers for a controller when they otherwise can't keep up with the load. This mainly depends on reconcile duration.

Looking at the Reconcile func of ClusterCacheReconciler, it will always return almost instantly. So even with >10k Clusters I don't think we need more than 10 workers. (With 1 ms reconcile duration, 10 workers can reconcile 10k clusters in 1s)

In fact the only reconciler that I had to run with more than 10 workers with 2k clusters was KubeadmControlPlane (because it had reconcile durations of ~ 1-2 seconds). In that case I used 50 workers.

So I think while we would have to increase the concurrency of the Cluster controllers at some point to above 10. The ClusterCacheReconciler would still be fine with 10

"Number of clusters to process simultaneously")

fs.DurationVar(&syncPeriod, "sync-period", 10*time.Minute,
"The minimum interval at which watched resources are reconciled (e.g. 15m)")

Expand Down Expand Up @@ -316,7 +320,7 @@ func setupReconcilers(ctx context.Context, mgr ctrl.Manager) {
Tracker: tracker,
WatchFilterValue: watchFilterValue,
}).SetupWithManager(ctx, mgr, controller.Options{
MaxConcurrentReconciles: concurrency,
MaxConcurrentReconciles: clusterCacheTrackerConcurrency,
}); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "ClusterCacheReconciler")
os.Exit(1)
Expand Down