Skip to content

Commit

Permalink
🌱 increase clusterctl client throttle burst
Browse files Browse the repository at this point in the history
  • Loading branch information
nojnhuh committed May 2, 2023
1 parent 197e7f9 commit 8e9ce8e
Show file tree
Hide file tree
Showing 28 changed files with 359 additions and 36 deletions.
3 changes: 3 additions & 0 deletions cmd/clusterctl/client/alias.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ type CertManagerUpgradePlan cluster.CertManagerUpgradePlan
// Kubeconfig is a type that specifies inputs related to the actual kubeconfig.
type Kubeconfig cluster.Kubeconfig

// RESTThrottle is a type that specifies inputs related to the throttle on the rest.Config.
type RESTThrottle cluster.RESTThrottle

// Processor defines the methods necessary for creating a specific yaml
// processor.
type Processor yaml.Processor
18 changes: 16 additions & 2 deletions cmd/clusterctl/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,9 @@ type RepositoryClientFactory func(RepositoryClientFactoryInput) (repository.Clie

// ClusterClientFactoryInput represents the inputs required by the factory.
type ClusterClientFactoryInput struct {
Kubeconfig Kubeconfig
Processor Processor
Kubeconfig Kubeconfig
Processor Processor
RESTThrottle RESTThrottle
}

// ClusterClientFactory is a factory of cluster.Client from a given input.
Expand Down Expand Up @@ -207,11 +208,24 @@ func defaultRepositoryFactory(configClient config.Client) RepositoryClientFactor
// defaultClusterFactory is a ClusterClientFactory func the uses the default client provided by the cluster low level library.
func defaultClusterFactory(configClient config.Client) ClusterClientFactory {
return func(input ClusterClientFactoryInput) (cluster.Client, error) {
var proxyOpts []cluster.ProxyOption
if input.RESTThrottle.QPS > 0 {
proxyOpts = append(proxyOpts, cluster.InjectRESTConfigQPS(input.RESTThrottle.QPS))
}
if input.RESTThrottle.Burst > 0 {
proxyOpts = append(proxyOpts, cluster.InjectRESTConfigBurst(input.RESTThrottle.Burst))
}
return cluster.New(
// Kubeconfig is a type alias to cluster.Kubeconfig
cluster.Kubeconfig(input.Kubeconfig),
configClient,
cluster.InjectYamlProcessor(input.Processor),
cluster.InjectProxy(
cluster.NewProxy(
cluster.Kubeconfig(input.Kubeconfig),
proxyOpts...,
),
),
), nil
}
}
2 changes: 1 addition & 1 deletion cmd/clusterctl/client/cluster/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func newClusterClient(kubeconfig Kubeconfig, configClient config.Client, options

// if there is an injected proxy, use it, otherwise use a default one
if client.proxy == nil {
client.proxy = newProxy(client.kubeconfig)
client.proxy = NewProxy(client.kubeconfig)
}

// if there is an injected repositoryClientFactory, use it, otherwise use the default one
Expand Down
2 changes: 1 addition & 1 deletion cmd/clusterctl/client/cluster/ownergraph.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type OwnerGraphNode struct {
// own owner references; there is no guarantee about the stability of this API. Using this test with providers may require
// a custom implementation of this function, or the OwnerGraph it returns.
func GetOwnerGraph(namespace, kubeconfigPath string) (OwnerGraph, error) {
p := newProxy(Kubeconfig{Path: kubeconfigPath, Context: ""})
p := NewProxy(Kubeconfig{Path: kubeconfigPath, Context: ""})
invClient := newInventoryClient(p, nil)

graph := newObjectGraph(p, invClient)
Expand Down
41 changes: 37 additions & 4 deletions cmd/clusterctl/client/cluster/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ import (
"sigs.k8s.io/cluster-api/version"
)

const (
// DefaultRESTConfigQPS is the default maximum queries per second for the Kubernetes client.
DefaultRESTConfigQPS float32 = 20
// DefaultRESTConfigBurst is the default maximum request burst for the Kubernetes client.
DefaultRESTConfigBurst = 300
)

var (
localScheme = scheme.Scheme
)
Expand Down Expand Up @@ -77,10 +84,17 @@ type Proxy interface {
GetResourceNames(groupVersion, kind string, options []client.ListOption, prefix string) ([]string, error)
}

// RESTThrottle is a type that specifies inputs related to the throttle on the rest.Config.
type RESTThrottle struct {
QPS float32
Burst int
}

type proxy struct {
kubeconfig Kubeconfig
timeout time.Duration
configLoadingRules *clientcmd.ClientConfigLoadingRules
throttle RESTThrottle
}

var _ Proxy = &proxy{}
Expand Down Expand Up @@ -148,9 +162,8 @@ func (k *proxy) GetConfig() (*rest.Config, error) {
}
restConfig.UserAgent = fmt.Sprintf("clusterctl/%s (%s)", version.Get().GitVersion, version.Get().Platform)

// Set QPS and Burst to a threshold that ensures the controller runtime client/client go doesn't generate throttling log messages
restConfig.QPS = 20
restConfig.Burst = 100
restConfig.QPS = k.throttle.QPS
restConfig.Burst = k.throttle.Burst

return restConfig, nil
}
Expand Down Expand Up @@ -373,7 +386,22 @@ func InjectKubeconfigPaths(paths []string) ProxyOption {
}
}

func newProxy(kubeconfig Kubeconfig, opts ...ProxyOption) Proxy {
// InjectRESTConfigQPS sets the QPS for the REST config.
func InjectRESTConfigQPS(qps float32) ProxyOption {
return func(p *proxy) {
p.throttle.QPS = qps
}
}

// InjectRESTConfigBurst sets the burst for the REST config.
func InjectRESTConfigBurst(burst int) ProxyOption {
return func(p *proxy) {
p.throttle.Burst = burst
}
}

// NewProxy creates a new proxy.
func NewProxy(kubeconfig Kubeconfig, opts ...ProxyOption) Proxy {
// If a kubeconfig file isn't provided, find one in the standard locations.
rules := clientcmd.NewDefaultClientConfigLoadingRules()
if kubeconfig.Path != "" {
Expand All @@ -383,6 +411,11 @@ func newProxy(kubeconfig Kubeconfig, opts ...ProxyOption) Proxy {
kubeconfig: kubeconfig,
timeout: 30 * time.Second,
configLoadingRules: rules,
// Set QPS and Burst to a threshold that ensures the controller runtime client/client go doesn't get throttled. This is especially important in environments with many CustomResourceDefinitions.
throttle: RESTThrottle{
QPS: DefaultRESTConfigQPS,
Burst: DefaultRESTConfigBurst,
},
}

for _, o := range opts {
Expand Down
31 changes: 25 additions & 6 deletions cmd/clusterctl/client/cluster/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func TestProxyGetConfig(t *testing.T) {
configFile := filepath.Join(dir, ".test-kubeconfig.yaml")
g.Expect(os.WriteFile(configFile, []byte(tt.kubeconfigContents), 0600)).To(Succeed())

proxy := newProxy(Kubeconfig{Path: configFile, Context: tt.context})
proxy := NewProxy(Kubeconfig{Path: configFile, Context: tt.context})
conf, err := proxy.GetConfig()
if tt.expectErr {
g.Expect(err).To(HaveOccurred())
Expand All @@ -82,7 +82,7 @@ func TestProxyGetConfig(t *testing.T) {
g.Expect(conf.Host).To(Equal(tt.expectedHost))
g.Expect(conf.UserAgent).To(Equal(fmt.Sprintf("clusterctl/%s (%s)", version.Get().GitVersion, version.Get().Platform)))
g.Expect(conf.QPS).To(BeEquivalentTo(20))
g.Expect(conf.Burst).To(BeEquivalentTo(100))
g.Expect(conf.Burst).To(BeEquivalentTo(300))
g.Expect(conf.Timeout.String()).To(Equal("30s"))
})
}
Expand All @@ -96,11 +96,30 @@ func TestProxyGetConfig(t *testing.T) {
configFile := filepath.Join(dir, ".test-kubeconfig.yaml")
g.Expect(os.WriteFile(configFile, []byte(kubeconfig("management", "default")), 0600)).To(Succeed())

proxy := newProxy(Kubeconfig{Path: configFile, Context: "management"}, InjectProxyTimeout(23*time.Second))
proxy := NewProxy(Kubeconfig{Path: configFile, Context: "management"}, InjectProxyTimeout(23*time.Second))
conf, err := proxy.GetConfig()
g.Expect(err).ToNot(HaveOccurred())
g.Expect(conf.Timeout.String()).To(Equal("23s"))
})

t.Run("configure throttle", func(t *testing.T) {
g := NewWithT(t)
dir, err := os.MkdirTemp("", "clusterctl")
g.Expect(err).NotTo(HaveOccurred())
defer os.RemoveAll(dir)
configFile := filepath.Join(dir, ".test-kubeconfig.yaml")
g.Expect(os.WriteFile(configFile, []byte(kubeconfig("management", "default")), 0600)).To(Succeed())

proxy := NewProxy(
Kubeconfig{Path: configFile, Context: "management"},
InjectRESTConfigQPS(30),
InjectRESTConfigBurst(400),
)
conf, err := proxy.GetConfig()
g.Expect(err).ToNot(HaveOccurred())
g.Expect(conf.QPS).To(Equal(float32(30)))
g.Expect(conf.Burst).To(Equal(400))
})
}

// These tests are emulating the files passed in via KUBECONFIG env var by
Expand All @@ -123,7 +142,7 @@ func TestKUBECONFIGEnvVar(t *testing.T) {
configFile := filepath.Join(dir, ".test-kubeconfig.yaml")
g.Expect(os.WriteFile(configFile, []byte(kubeconfigContents), 0600)).To(Succeed())

proxy := newProxy(
proxy := NewProxy(
// dont't give an explicit path but rather define the file in the
// configLoadingRules precedence chain.
Kubeconfig{Path: "", Context: context},
Expand Down Expand Up @@ -151,7 +170,7 @@ func TestKUBECONFIGEnvVar(t *testing.T) {
configFile := filepath.Join(dir, ".test-kubeconfig.yaml")
g.Expect(os.WriteFile(configFile, []byte(kubeconfigContents), 0600)).To(Succeed())

proxy := newProxy(
proxy := NewProxy(
// dont't give an explicit path but rather define the file in the
// configLoadingRules precedence chain.
Kubeconfig{Path: "", Context: context},
Expand Down Expand Up @@ -229,7 +248,7 @@ func TestProxyCurrentNamespace(t *testing.T) {
g.Expect(os.WriteFile(configFile, []byte(tt.kubeconfigContents), 0600)).To(Succeed())
}

proxy := newProxy(Kubeconfig{Path: configFile, Context: tt.kubeconfigContext})
proxy := NewProxy(Kubeconfig{Path: configFile, Context: tt.kubeconfigContext})
ns, err := proxy.CurrentNamespace()
if tt.expectErr {
g.Expect(err).To(HaveOccurred())
Expand Down
9 changes: 8 additions & 1 deletion cmd/clusterctl/client/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,9 @@ type GetClusterTemplateOptions struct {
// YamlProcessor defines the yaml processor to use for the cluster
// template processing. If not defined, SimpleProcessor will be used.
YamlProcessor Processor

// RESTThrottle defines parameters for the rest.Config's throttle.
RESTThrottle RESTThrottle
}

// numSources return the number of template sources currently set on a GetClusterTemplateOptions.
Expand Down Expand Up @@ -217,7 +220,11 @@ func (c *clusterctlClient) GetClusterTemplate(options GetClusterTemplateOptions)
}

// Gets the client for the current management cluster
clusterClient, err := c.clusterClientFactory(ClusterClientFactoryInput{options.Kubeconfig, options.YamlProcessor})
clusterClient, err := c.clusterClientFactory(ClusterClientFactoryInput{
Kubeconfig: options.Kubeconfig,
Processor: options.YamlProcessor,
RESTThrottle: options.RESTThrottle,
})
if err != nil {
return nil, err
}
Expand Down
8 changes: 7 additions & 1 deletion cmd/clusterctl/client/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,16 @@ type DeleteOptions struct {

// SkipInventory forces the deletion of the inventory items used by clusterctl to track providers.
SkipInventory bool

// RESTThrottle defines parameters for the rest.Config's throttle.
RESTThrottle RESTThrottle
}

func (c *clusterctlClient) Delete(options DeleteOptions) error {
clusterClient, err := c.clusterClientFactory(ClusterClientFactoryInput{Kubeconfig: options.Kubeconfig})
clusterClient, err := c.clusterClientFactory(ClusterClientFactoryInput{
Kubeconfig: options.Kubeconfig,
RESTThrottle: options.RESTThrottle,
})
if err != nil {
return err
}
Expand Down
8 changes: 7 additions & 1 deletion cmd/clusterctl/client/describe.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,18 @@ type DescribeClusterOptions struct {
// Grouping groups machines objects in case the ready conditions
// have the same Status, Severity and Reason.
Grouping bool

// RESTThrottle defines parameters for the rest.Config's throttle.
RESTThrottle RESTThrottle
}

// DescribeCluster returns the object tree representing the status of a Cluster API cluster.
func (c *clusterctlClient) DescribeCluster(options DescribeClusterOptions) (*tree.ObjectTree, error) {
// gets access to the management cluster
cluster, err := c.clusterClientFactory(ClusterClientFactoryInput{Kubeconfig: options.Kubeconfig})
cluster, err := c.clusterClientFactory(ClusterClientFactoryInput{
Kubeconfig: options.Kubeconfig,
RESTThrottle: options.RESTThrottle,
})
if err != nil {
return nil, err
}
Expand Down
8 changes: 7 additions & 1 deletion cmd/clusterctl/client/get_kubeconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,17 @@ type GetKubeconfigOptions struct {

// WorkloadClusterName is the name of the workload cluster.
WorkloadClusterName string

// RESTThrottle defines parameters for the rest.Config's throttle.
RESTThrottle RESTThrottle
}

func (c *clusterctlClient) GetKubeconfig(options GetKubeconfigOptions) (string, error) {
// gets access to the management cluster
clusterClient, err := c.clusterClientFactory(ClusterClientFactoryInput{Kubeconfig: options.Kubeconfig})
clusterClient, err := c.clusterClientFactory(ClusterClientFactoryInput{
Kubeconfig: options.Kubeconfig,
RESTThrottle: options.RESTThrottle,
})
if err != nil {
return "", err
}
Expand Down
13 changes: 11 additions & 2 deletions cmd/clusterctl/client/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,20 @@ type InitOptions struct {
// allowMissingProviderCRD is used to allow for a missing provider CRD when listing images.
// It is set to false to enforce that provider CRD is available when performing the standard init operation.
allowMissingProviderCRD bool

// RESTThrottle defines parameters for the rest.Config's throttle.
RESTThrottle RESTThrottle
}

// Init initializes a management cluster by adding the requested list of providers.
func (c *clusterctlClient) Init(options InitOptions) ([]Components, error) {
log := logf.Log

// gets access to the management cluster
clusterClient, err := c.clusterClientFactory(ClusterClientFactoryInput{Kubeconfig: options.Kubeconfig})
clusterClient, err := c.clusterClientFactory(ClusterClientFactoryInput{
Kubeconfig: options.Kubeconfig,
RESTThrottle: options.RESTThrottle,
})
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -167,7 +173,10 @@ func (c *clusterctlClient) Init(options InitOptions) ([]Components, error) {
// InitImages returns the list of images required for init.
func (c *clusterctlClient) InitImages(options InitOptions) ([]string, error) {
// gets access to the management cluster
clusterClient, err := c.clusterClientFactory(ClusterClientFactoryInput{Kubeconfig: options.Kubeconfig})
clusterClient, err := c.clusterClientFactory(ClusterClientFactoryInput{
Kubeconfig: options.Kubeconfig,
RESTThrottle: options.RESTThrottle,
})
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 8e9ce8e

Please sign in to comment.