Skip to content

Commit

Permalink
Merge pull request #10880 from sbueringer/pr-cct-qps-burst
Browse files Browse the repository at this point in the history
✨ Add QPS & burst options & flags for ClusterCacheTracker
  • Loading branch information
k8s-ci-robot authored and jimmidyson committed Jul 17, 2024
1 parent 193d481 commit 85a7e57
Show file tree
Hide file tree
Showing 7 changed files with 155 additions and 90 deletions.
52 changes: 31 additions & 21 deletions bootstrap/kubeadm/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,25 +62,27 @@ var (
controllerName = "cluster-api-kubeadm-bootstrap-manager"

// flags.
enableLeaderElection bool
leaderElectionLeaseDuration time.Duration
leaderElectionRenewDeadline time.Duration
leaderElectionRetryPeriod time.Duration
watchFilterValue string
watchNamespace string
profilerAddress string
enableContentionProfiling bool
syncPeriod time.Duration
restConfigQPS float32
restConfigBurst int
webhookPort int
webhookCertDir string
webhookCertName string
webhookKeyName string
healthAddr string
tlsOptions = flags.TLSOptions{}
diagnosticsOptions = flags.DiagnosticsOptions{}
logOptions = logs.NewOptions()
enableLeaderElection bool
leaderElectionLeaseDuration time.Duration
leaderElectionRenewDeadline time.Duration
leaderElectionRetryPeriod time.Duration
watchFilterValue string
watchNamespace string
profilerAddress string
enableContentionProfiling bool
syncPeriod time.Duration
restConfigQPS float32
restConfigBurst int
clusterCacheTrackerClientQPS float32
clusterCacheTrackerClientBurst int
webhookPort int
webhookCertDir string
webhookCertName string
webhookKeyName string
healthAddr string
tlsOptions = flags.TLSOptions{}
diagnosticsOptions = flags.DiagnosticsOptions{}
logOptions = logs.NewOptions()
// CABPK specific flags.
clusterConcurrency int
clusterCacheTrackerConcurrency int
Expand Down Expand Up @@ -139,10 +141,16 @@ func InitFlags(fs *pflag.FlagSet) {
"The minimum interval at which watched resources are reconciled (e.g. 15m)")

fs.Float32Var(&restConfigQPS, "kube-api-qps", 20,
"Maximum queries per second from the controller client to the Kubernetes API server. Defaults to 20")
"Maximum queries per second from the controller client to the Kubernetes API server.")

fs.IntVar(&restConfigBurst, "kube-api-burst", 30,
"Maximum number of queries that should be allowed in one burst from the controller client to the Kubernetes API server. Default 30")
"Maximum number of queries that should be allowed in one burst from the controller client to the Kubernetes API server.")

fs.Float32Var(&clusterCacheTrackerClientQPS, "clustercachetracker-client-qps", 20,
"Maximum queries per second from the cluster cache tracker clients to the Kubernetes API server of workload clusters.")

fs.IntVar(&clusterCacheTrackerClientBurst, "clustercachetracker-client-burst", 30,
"Maximum number of queries that should be allowed in one burst from the cluster cache tracker clients to the Kubernetes API server of workload clusters.")

fs.DurationVar(&tokenTTL, "bootstrap-token-ttl", kubeadmbootstrapcontrollers.DefaultTokenTTL,
"The amount of time the bootstrap token will be valid")
Expand Down Expand Up @@ -312,6 +320,8 @@ func setupReconcilers(ctx context.Context, mgr ctrl.Manager) {
SecretCachingClient: secretCachingClient,
ControllerName: controllerName,
Log: &ctrl.Log,
ClientQPS: clusterCacheTrackerClientQPS,
ClientBurst: clusterCacheTrackerClientBurst,
},
)
if err != nil {
Expand Down
29 changes: 27 additions & 2 deletions controllers/remote/cluster_cache_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,11 @@ var ErrClusterLocked = errors.New("cluster is locked already")

// ClusterCacheTracker manages client caches for workload clusters.
type ClusterCacheTracker struct {
log logr.Logger
log logr.Logger

clientUncachedObjects []client.Object
clientQPS float32
clientBurst int

client client.Client

Expand Down Expand Up @@ -116,7 +119,18 @@ type ClusterCacheTrackerOptions struct {
// it'll instead query the API server directly.
// Defaults to never caching ConfigMap and Secret if not set.
ClientUncachedObjects []client.Object
Indexes []Index

// ClientQPS is the maximum queries per second from the controller client
// to the Kubernetes API server of workload clusters.
// Defaults to 20.
ClientQPS float32

// ClientBurst is the maximum number of queries that should be allowed in
// one burst from the controller client to the Kubernetes API server of workload clusters.
// Default 30.
ClientBurst int

Indexes []Index

// ControllerName is the name of the controller.
// This is used to calculate the user agent string.
Expand All @@ -139,6 +153,13 @@ func setDefaultOptions(opts *ClusterCacheTrackerOptions) {
&corev1.Secret{},
}
}

if opts.ClientQPS == 0 {
opts.ClientQPS = 20
}
if opts.ClientBurst == 0 {
opts.ClientBurst = 30
}
}

// NewClusterCacheTracker creates a new ClusterCacheTracker.
Expand Down Expand Up @@ -170,6 +191,8 @@ func NewClusterCacheTracker(manager ctrl.Manager, options ClusterCacheTrackerOpt
controllerPodMetadata: controllerPodMetadata,
log: *options.Log,
clientUncachedObjects: options.ClientUncachedObjects,
clientQPS: options.ClientQPS,
clientBurst: options.ClientBurst,
client: manager.GetClient(),
secretCachingClient: options.SecretCachingClient,
scheme: manager.GetScheme(),
Expand Down Expand Up @@ -303,6 +326,8 @@ func (t *ClusterCacheTracker) newClusterAccessor(ctx context.Context, cluster cl
if err != nil {
return nil, errors.Wrapf(err, "error fetching REST client config for remote cluster %q", cluster.String())
}
config.QPS = t.clientQPS
config.Burst = t.clientBurst

// Create a http client and a mapper for the cluster.
httpClient, mapper, err := t.createHTTPClientAndMapper(config, cluster)
Expand Down
52 changes: 31 additions & 21 deletions controlplane/kubeadm/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,25 +66,27 @@ var (
controllerName = "cluster-api-kubeadm-control-plane-manager"

// flags.
enableLeaderElection bool
leaderElectionLeaseDuration time.Duration
leaderElectionRenewDeadline time.Duration
leaderElectionRetryPeriod time.Duration
watchFilterValue string
watchNamespace string
profilerAddress string
enableContentionProfiling bool
syncPeriod time.Duration
restConfigQPS float32
restConfigBurst int
webhookPort int
webhookCertDir string
webhookCertName string
webhookKeyName string
healthAddr string
tlsOptions = flags.TLSOptions{}
diagnosticsOptions = flags.DiagnosticsOptions{}
logOptions = logs.NewOptions()
enableLeaderElection bool
leaderElectionLeaseDuration time.Duration
leaderElectionRenewDeadline time.Duration
leaderElectionRetryPeriod time.Duration
watchFilterValue string
watchNamespace string
profilerAddress string
enableContentionProfiling bool
syncPeriod time.Duration
restConfigQPS float32
restConfigBurst int
clusterCacheTrackerClientQPS float32
clusterCacheTrackerClientBurst int
webhookPort int
webhookCertDir string
webhookCertName string
webhookKeyName string
healthAddr string
tlsOptions = flags.TLSOptions{}
diagnosticsOptions = flags.DiagnosticsOptions{}
logOptions = logs.NewOptions()
// KCP specific flags.
kubeadmControlPlaneConcurrency int
clusterCacheTrackerConcurrency int
Expand Down Expand Up @@ -142,10 +144,16 @@ func InitFlags(fs *pflag.FlagSet) {
"The minimum interval at which watched resources are reconciled (e.g. 15m)")

fs.Float32Var(&restConfigQPS, "kube-api-qps", 20,
"Maximum queries per second from the controller client to the Kubernetes API server. Defaults to 20")
"Maximum queries per second from the controller client to the Kubernetes API server.")

fs.IntVar(&restConfigBurst, "kube-api-burst", 30,
"Maximum number of queries that should be allowed in one burst from the controller client to the Kubernetes API server. Default 30")
"Maximum number of queries that should be allowed in one burst from the controller client to the Kubernetes API server.")

fs.Float32Var(&clusterCacheTrackerClientQPS, "clustercachetracker-client-qps", 20,
"Maximum queries per second from the cluster cache tracker clients to the Kubernetes API server of workload clusters.")

fs.IntVar(&clusterCacheTrackerClientBurst, "clustercachetracker-client-burst", 30,
"Maximum number of queries that should be allowed in one burst from the cluster cache tracker clients to the Kubernetes API server of workload clusters.")

fs.IntVar(&webhookPort, "webhook-port", 9443,
"Webhook Server port")
Expand Down Expand Up @@ -332,6 +340,8 @@ func setupReconcilers(ctx context.Context, mgr ctrl.Manager) {
&appsv1.Deployment{},
&appsv1.DaemonSet{},
},
ClientQPS: clusterCacheTrackerClientQPS,
ClientBurst: clusterCacheTrackerClientBurst,
})
if err != nil {
setupLog.Error(err, "unable to create cluster cache tracker")
Expand Down
52 changes: 31 additions & 21 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,25 +83,27 @@ var (
controllerName = "cluster-api-controller-manager"

// flags.
enableLeaderElection bool
leaderElectionLeaseDuration time.Duration
leaderElectionRenewDeadline time.Duration
leaderElectionRetryPeriod time.Duration
watchFilterValue string
watchNamespace string
profilerAddress string
enableContentionProfiling bool
syncPeriod time.Duration
restConfigQPS float32
restConfigBurst int
webhookPort int
webhookCertDir string
webhookCertName string
webhookKeyName string
healthAddr string
tlsOptions = flags.TLSOptions{}
diagnosticsOptions = flags.DiagnosticsOptions{}
logOptions = logs.NewOptions()
enableLeaderElection bool
leaderElectionLeaseDuration time.Duration
leaderElectionRenewDeadline time.Duration
leaderElectionRetryPeriod time.Duration
watchFilterValue string
watchNamespace string
profilerAddress string
enableContentionProfiling bool
syncPeriod time.Duration
restConfigQPS float32
restConfigBurst int
clusterCacheTrackerClientQPS float32
clusterCacheTrackerClientBurst int
webhookPort int
webhookCertDir string
webhookCertName string
webhookKeyName string
healthAddr string
tlsOptions = flags.TLSOptions{}
diagnosticsOptions = flags.DiagnosticsOptions{}
logOptions = logs.NewOptions()
// core Cluster API specific flags.
clusterTopologyConcurrency int
clusterCacheTrackerConcurrency int
Expand Down Expand Up @@ -207,10 +209,16 @@ func InitFlags(fs *pflag.FlagSet) {
"The minimum interval at which watched resources are reconciled (e.g. 15m)")

fs.Float32Var(&restConfigQPS, "kube-api-qps", 20,
"Maximum queries per second from the controller client to the Kubernetes API server. Defaults to 20")
"Maximum queries per second from the controller client to the Kubernetes API server.")

fs.IntVar(&restConfigBurst, "kube-api-burst", 30,
"Maximum number of queries that should be allowed in one burst from the controller client to the Kubernetes API server. Default 30")
"Maximum number of queries that should be allowed in one burst from the controller client to the Kubernetes API server.")

fs.Float32Var(&clusterCacheTrackerClientQPS, "clustercachetracker-client-qps", 20,
"Maximum queries per second from the cluster cache tracker clients to the Kubernetes API server of workload clusters.")

fs.IntVar(&clusterCacheTrackerClientBurst, "clustercachetracker-client-burst", 30,
"Maximum number of queries that should be allowed in one burst from the cluster cache tracker clients to the Kubernetes API server of workload clusters.")

fs.DurationVar(&nodeDrainClientTimeout, "node-drain-client-timeout-duration", time.Second*10,
"The timeout of the client used for draining nodes. Defaults to 10s")
Expand Down Expand Up @@ -408,6 +416,8 @@ func setupReconcilers(ctx context.Context, mgr ctrl.Manager) webhooks.ClusterCac
ControllerName: controllerName,
Log: &ctrl.Log,
Indexes: []remote.Index{remote.NodeProviderIDIndex},
ClientQPS: clusterCacheTrackerClientQPS,
ClientBurst: clusterCacheTrackerClientBurst,
},
)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions test/extension/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,10 @@ func InitFlags(fs *pflag.FlagSet) {
"The minimum interval at which watched resources are reconciled (e.g. 15m)")

fs.Float32Var(&restConfigQPS, "kube-api-qps", 20,
"Maximum queries per second from the controller client to the Kubernetes API server. Defaults to 20")
"Maximum queries per second from the controller client to the Kubernetes API server.")

fs.IntVar(&restConfigBurst, "kube-api-burst", 30,
"Maximum number of queries that should be allowed in one burst from the controller client to the Kubernetes API server. Default 30")
"Maximum number of queries that should be allowed in one burst from the controller client to the Kubernetes API server.")

fs.IntVar(&webhookPort, "webhook-port", 9443,
"Webhook Server port")
Expand Down
52 changes: 31 additions & 21 deletions test/infrastructure/docker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,25 +69,27 @@ var (
controllerName = "cluster-api-docker-controller-manager"

// flags.
enableLeaderElection bool
leaderElectionLeaseDuration time.Duration
leaderElectionRenewDeadline time.Duration
leaderElectionRetryPeriod time.Duration
watchFilterValue string
watchNamespace string
profilerAddress string
enableContentionProfiling bool
syncPeriod time.Duration
restConfigQPS float32
restConfigBurst int
webhookPort int
webhookCertDir string
webhookCertName string
webhookKeyName string
healthAddr string
tlsOptions = flags.TLSOptions{}
diagnosticsOptions = flags.DiagnosticsOptions{}
logOptions = logs.NewOptions()
enableLeaderElection bool
leaderElectionLeaseDuration time.Duration
leaderElectionRenewDeadline time.Duration
leaderElectionRetryPeriod time.Duration
watchFilterValue string
watchNamespace string
profilerAddress string
enableContentionProfiling bool
syncPeriod time.Duration
restConfigQPS float32
restConfigBurst int
clusterCacheTrackerClientQPS float32
clusterCacheTrackerClientBurst int
webhookPort int
webhookCertDir string
webhookCertName string
webhookKeyName string
healthAddr string
tlsOptions = flags.TLSOptions{}
diagnosticsOptions = flags.DiagnosticsOptions{}
logOptions = logs.NewOptions()
// CAPD specific flags.
concurrency int
clusterCacheTrackerConcurrency int
Expand Down Expand Up @@ -143,10 +145,16 @@ func InitFlags(fs *pflag.FlagSet) {
"The minimum interval at which watched resources are reconciled (e.g. 15m)")

fs.Float32Var(&restConfigQPS, "kube-api-qps", 20,
"Maximum queries per second from the controller client to the Kubernetes API server. Defaults to 20")
"Maximum queries per second from the controller client to the Kubernetes API server.")

fs.IntVar(&restConfigBurst, "kube-api-burst", 30,
"Maximum number of queries that should be allowed in one burst from the controller client to the Kubernetes API server. Default 30")
"Maximum number of queries that should be allowed in one burst from the controller client to the Kubernetes API server.")

fs.Float32Var(&clusterCacheTrackerClientQPS, "clustercachetracker-client-qps", 20,
"Maximum queries per second from the cluster cache tracker clients to the Kubernetes API server of workload clusters.")

fs.IntVar(&clusterCacheTrackerClientBurst, "clustercachetracker-client-burst", 30,
"Maximum number of queries that should be allowed in one burst from the cluster cache tracker clients to the Kubernetes API server of workload clusters.")

fs.IntVar(&webhookPort, "webhook-port", 9443,
"Webhook Server port")
Expand Down Expand Up @@ -323,6 +331,8 @@ func setupReconcilers(ctx context.Context, mgr ctrl.Manager) {
SecretCachingClient: secretCachingClient,
ControllerName: controllerName,
Log: &ctrl.Log,
ClientQPS: clusterCacheTrackerClientQPS,
ClientBurst: clusterCacheTrackerClientBurst,
},
)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions test/infrastructure/inmemory/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,10 @@ func InitFlags(fs *pflag.FlagSet) {
"The minimum interval at which watched resources are reconciled (e.g. 15m)")

fs.Float32Var(&restConfigQPS, "kube-api-qps", 20,
"Maximum queries per second from the controller client to the Kubernetes API server. Defaults to 20")
"Maximum queries per second from the controller client to the Kubernetes API server.")

fs.IntVar(&restConfigBurst, "kube-api-burst", 30,
"Maximum number of queries that should be allowed in one burst from the controller client to the Kubernetes API server. Default 30")
"Maximum number of queries that should be allowed in one burst from the controller client to the Kubernetes API server.")

fs.IntVar(&webhookPort, "webhook-port", 9443,
"Webhook Server port")
Expand Down

0 comments on commit 85a7e57

Please sign in to comment.