Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 make clusterctl REST client throttling configurable #8411

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cmd/clusterctl/client/alias.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ type CertManagerUpgradePlan cluster.CertManagerUpgradePlan
// Kubeconfig is a type that specifies inputs related to the actual kubeconfig.
type Kubeconfig cluster.Kubeconfig

// RESTThrottle is a type that specifies inputs related to the throttle on the rest.Config.
type RESTThrottle cluster.RESTThrottle

// Processor defines the methods necessary for creating a specific yaml
// processor.
type Processor yaml.Processor
18 changes: 16 additions & 2 deletions cmd/clusterctl/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,9 @@ type RepositoryClientFactory func(RepositoryClientFactoryInput) (repository.Clie

// ClusterClientFactoryInput represents the inputs required by the factory.
type ClusterClientFactoryInput struct {
Kubeconfig Kubeconfig
Processor Processor
Kubeconfig Kubeconfig
Processor Processor
RESTThrottle RESTThrottle
}

// ClusterClientFactory is a factory of cluster.Client from a given input.
Expand Down Expand Up @@ -207,11 +208,24 @@ func defaultRepositoryFactory(configClient config.Client) RepositoryClientFactor
// defaultClusterFactory is a ClusterClientFactory func the uses the default client provided by the cluster low level library.
func defaultClusterFactory(configClient config.Client) ClusterClientFactory {
return func(input ClusterClientFactoryInput) (cluster.Client, error) {
var proxyOpts []cluster.ProxyOption
if input.RESTThrottle.QPS > 0 {
proxyOpts = append(proxyOpts, cluster.InjectRESTConfigQPS(input.RESTThrottle.QPS))
}
if input.RESTThrottle.Burst > 0 {
proxyOpts = append(proxyOpts, cluster.InjectRESTConfigBurst(input.RESTThrottle.Burst))
}
return cluster.New(
// Kubeconfig is a type alias to cluster.Kubeconfig
cluster.Kubeconfig(input.Kubeconfig),
configClient,
cluster.InjectYamlProcessor(input.Processor),
cluster.InjectProxy(
cluster.NewProxy(
cluster.Kubeconfig(input.Kubeconfig),
proxyOpts...,
),
),
), nil
}
}
2 changes: 1 addition & 1 deletion cmd/clusterctl/client/cluster/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func newClusterClient(kubeconfig Kubeconfig, configClient config.Client, options

// if there is an injected proxy, use it, otherwise use a default one
if client.proxy == nil {
client.proxy = newProxy(client.kubeconfig)
client.proxy = NewProxy(client.kubeconfig)
}

// if there is an injected repositoryClientFactory, use it, otherwise use the default one
Expand Down
2 changes: 1 addition & 1 deletion cmd/clusterctl/client/cluster/ownergraph.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type OwnerGraphNode struct {
// own owner references; there is no guarantee about the stability of this API. Using this test with providers may require
// a custom implementation of this function, or the OwnerGraph it returns.
func GetOwnerGraph(namespace, kubeconfigPath string) (OwnerGraph, error) {
p := newProxy(Kubeconfig{Path: kubeconfigPath, Context: ""})
p := NewProxy(Kubeconfig{Path: kubeconfigPath, Context: ""})
invClient := newInventoryClient(p, nil)

graph := newObjectGraph(p, invClient)
Expand Down
41 changes: 37 additions & 4 deletions cmd/clusterctl/client/cluster/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ import (
"sigs.k8s.io/cluster-api/version"
)

const (
// DefaultRESTConfigQPS is the default maximum queries per second for the Kubernetes client.
DefaultRESTConfigQPS float32 = 20
// DefaultRESTConfigBurst is the default maximum request burst for the Kubernetes client.
DefaultRESTConfigBurst = 300
)

var (
localScheme = scheme.Scheme
)
Expand Down Expand Up @@ -77,10 +84,17 @@ type Proxy interface {
GetResourceNames(groupVersion, kind string, options []client.ListOption, prefix string) ([]string, error)
}

// RESTThrottle is a type that specifies inputs related to the throttle on the rest.Config.
type RESTThrottle struct {
QPS float32
Burst int
}

type proxy struct {
kubeconfig Kubeconfig
timeout time.Duration
configLoadingRules *clientcmd.ClientConfigLoadingRules
throttle RESTThrottle
}

var _ Proxy = &proxy{}
Expand Down Expand Up @@ -148,9 +162,8 @@ func (k *proxy) GetConfig() (*rest.Config, error) {
}
restConfig.UserAgent = fmt.Sprintf("clusterctl/%s (%s)", version.Get().GitVersion, version.Get().Platform)

// Set QPS and Burst to a threshold that ensures the controller runtime client/client go doesn't generate throttling log messages
restConfig.QPS = 20
restConfig.Burst = 100
restConfig.QPS = k.throttle.QPS
restConfig.Burst = k.throttle.Burst

return restConfig, nil
}
Expand Down Expand Up @@ -373,7 +386,22 @@ func InjectKubeconfigPaths(paths []string) ProxyOption {
}
}

func newProxy(kubeconfig Kubeconfig, opts ...ProxyOption) Proxy {
// InjectRESTConfigQPS sets the QPS for the REST config.
func InjectRESTConfigQPS(qps float32) ProxyOption {
return func(p *proxy) {
p.throttle.QPS = qps
}
}

// InjectRESTConfigBurst sets the burst for the REST config.
func InjectRESTConfigBurst(burst int) ProxyOption {
return func(p *proxy) {
p.throttle.Burst = burst
}
}

// NewProxy creates a new proxy.
func NewProxy(kubeconfig Kubeconfig, opts ...ProxyOption) Proxy {
// If a kubeconfig file isn't provided, find one in the standard locations.
rules := clientcmd.NewDefaultClientConfigLoadingRules()
if kubeconfig.Path != "" {
Expand All @@ -383,6 +411,11 @@ func newProxy(kubeconfig Kubeconfig, opts ...ProxyOption) Proxy {
kubeconfig: kubeconfig,
timeout: 30 * time.Second,
configLoadingRules: rules,
// Set QPS and Burst to a threshold that ensures the controller runtime client/client go doesn't get throttled. This is especially important in environments with many CustomResourceDefinitions.
throttle: RESTThrottle{
QPS: DefaultRESTConfigQPS,
Burst: DefaultRESTConfigBurst,
},
}

for _, o := range opts {
Expand Down
31 changes: 25 additions & 6 deletions cmd/clusterctl/client/cluster/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func TestProxyGetConfig(t *testing.T) {
configFile := filepath.Join(dir, ".test-kubeconfig.yaml")
g.Expect(os.WriteFile(configFile, []byte(tt.kubeconfigContents), 0600)).To(Succeed())

proxy := newProxy(Kubeconfig{Path: configFile, Context: tt.context})
proxy := NewProxy(Kubeconfig{Path: configFile, Context: tt.context})
conf, err := proxy.GetConfig()
if tt.expectErr {
g.Expect(err).To(HaveOccurred())
Expand All @@ -82,7 +82,7 @@ func TestProxyGetConfig(t *testing.T) {
g.Expect(conf.Host).To(Equal(tt.expectedHost))
g.Expect(conf.UserAgent).To(Equal(fmt.Sprintf("clusterctl/%s (%s)", version.Get().GitVersion, version.Get().Platform)))
g.Expect(conf.QPS).To(BeEquivalentTo(20))
g.Expect(conf.Burst).To(BeEquivalentTo(100))
g.Expect(conf.Burst).To(BeEquivalentTo(300))
g.Expect(conf.Timeout.String()).To(Equal("30s"))
})
}
Expand All @@ -96,11 +96,30 @@ func TestProxyGetConfig(t *testing.T) {
configFile := filepath.Join(dir, ".test-kubeconfig.yaml")
g.Expect(os.WriteFile(configFile, []byte(kubeconfig("management", "default")), 0600)).To(Succeed())

proxy := newProxy(Kubeconfig{Path: configFile, Context: "management"}, InjectProxyTimeout(23*time.Second))
proxy := NewProxy(Kubeconfig{Path: configFile, Context: "management"}, InjectProxyTimeout(23*time.Second))
conf, err := proxy.GetConfig()
g.Expect(err).ToNot(HaveOccurred())
g.Expect(conf.Timeout.String()).To(Equal("23s"))
})

t.Run("configure throttle", func(t *testing.T) {
g := NewWithT(t)
dir, err := os.MkdirTemp("", "clusterctl")
g.Expect(err).NotTo(HaveOccurred())
defer os.RemoveAll(dir)
configFile := filepath.Join(dir, ".test-kubeconfig.yaml")
g.Expect(os.WriteFile(configFile, []byte(kubeconfig("management", "default")), 0600)).To(Succeed())

proxy := NewProxy(
Kubeconfig{Path: configFile, Context: "management"},
InjectRESTConfigQPS(30),
InjectRESTConfigBurst(400),
)
conf, err := proxy.GetConfig()
g.Expect(err).ToNot(HaveOccurred())
g.Expect(conf.QPS).To(Equal(float32(30)))
g.Expect(conf.Burst).To(Equal(400))
})
}

// These tests are emulating the files passed in via KUBECONFIG env var by
Expand All @@ -123,7 +142,7 @@ func TestKUBECONFIGEnvVar(t *testing.T) {
configFile := filepath.Join(dir, ".test-kubeconfig.yaml")
g.Expect(os.WriteFile(configFile, []byte(kubeconfigContents), 0600)).To(Succeed())

proxy := newProxy(
proxy := NewProxy(
// dont't give an explicit path but rather define the file in the
// configLoadingRules precedence chain.
Kubeconfig{Path: "", Context: context},
Expand Down Expand Up @@ -151,7 +170,7 @@ func TestKUBECONFIGEnvVar(t *testing.T) {
configFile := filepath.Join(dir, ".test-kubeconfig.yaml")
g.Expect(os.WriteFile(configFile, []byte(kubeconfigContents), 0600)).To(Succeed())

proxy := newProxy(
proxy := NewProxy(
// dont't give an explicit path but rather define the file in the
// configLoadingRules precedence chain.
Kubeconfig{Path: "", Context: context},
Expand Down Expand Up @@ -229,7 +248,7 @@ func TestProxyCurrentNamespace(t *testing.T) {
g.Expect(os.WriteFile(configFile, []byte(tt.kubeconfigContents), 0600)).To(Succeed())
}

proxy := newProxy(Kubeconfig{Path: configFile, Context: tt.kubeconfigContext})
proxy := NewProxy(Kubeconfig{Path: configFile, Context: tt.kubeconfigContext})
ns, err := proxy.CurrentNamespace()
if tt.expectErr {
g.Expect(err).To(HaveOccurred())
Expand Down
9 changes: 8 additions & 1 deletion cmd/clusterctl/client/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,9 @@ type GetClusterTemplateOptions struct {
// YamlProcessor defines the yaml processor to use for the cluster
// template processing. If not defined, SimpleProcessor will be used.
YamlProcessor Processor

// RESTThrottle defines parameters for the rest.Config's throttle.
RESTThrottle RESTThrottle
}

// numSources return the number of template sources currently set on a GetClusterTemplateOptions.
Expand Down Expand Up @@ -217,7 +220,11 @@ func (c *clusterctlClient) GetClusterTemplate(options GetClusterTemplateOptions)
}

// Gets the client for the current management cluster
clusterClient, err := c.clusterClientFactory(ClusterClientFactoryInput{options.Kubeconfig, options.YamlProcessor})
clusterClient, err := c.clusterClientFactory(ClusterClientFactoryInput{
Kubeconfig: options.Kubeconfig,
Processor: options.YamlProcessor,
RESTThrottle: options.RESTThrottle,
})
if err != nil {
return nil, err
}
Expand Down
8 changes: 7 additions & 1 deletion cmd/clusterctl/client/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,16 @@ type DeleteOptions struct {

// SkipInventory forces the deletion of the inventory items used by clusterctl to track providers.
SkipInventory bool

// RESTThrottle defines parameters for the rest.Config's throttle.
RESTThrottle RESTThrottle
}

func (c *clusterctlClient) Delete(options DeleteOptions) error {
clusterClient, err := c.clusterClientFactory(ClusterClientFactoryInput{Kubeconfig: options.Kubeconfig})
clusterClient, err := c.clusterClientFactory(ClusterClientFactoryInput{
Kubeconfig: options.Kubeconfig,
RESTThrottle: options.RESTThrottle,
})
if err != nil {
return err
}
Expand Down
8 changes: 7 additions & 1 deletion cmd/clusterctl/client/describe.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,18 @@ type DescribeClusterOptions struct {
// Grouping groups machines objects in case the ready conditions
// have the same Status, Severity and Reason.
Grouping bool

// RESTThrottle defines parameters for the rest.Config's throttle.
RESTThrottle RESTThrottle
}

// DescribeCluster returns the object tree representing the status of a Cluster API cluster.
func (c *clusterctlClient) DescribeCluster(options DescribeClusterOptions) (*tree.ObjectTree, error) {
// gets access to the management cluster
cluster, err := c.clusterClientFactory(ClusterClientFactoryInput{Kubeconfig: options.Kubeconfig})
cluster, err := c.clusterClientFactory(ClusterClientFactoryInput{
Kubeconfig: options.Kubeconfig,
RESTThrottle: options.RESTThrottle,
})
if err != nil {
return nil, err
}
Expand Down
8 changes: 7 additions & 1 deletion cmd/clusterctl/client/get_kubeconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,17 @@ type GetKubeconfigOptions struct {

// WorkloadClusterName is the name of the workload cluster.
WorkloadClusterName string

// RESTThrottle defines parameters for the rest.Config's throttle.
RESTThrottle RESTThrottle
}

func (c *clusterctlClient) GetKubeconfig(options GetKubeconfigOptions) (string, error) {
// gets access to the management cluster
clusterClient, err := c.clusterClientFactory(ClusterClientFactoryInput{Kubeconfig: options.Kubeconfig})
clusterClient, err := c.clusterClientFactory(ClusterClientFactoryInput{
Kubeconfig: options.Kubeconfig,
RESTThrottle: options.RESTThrottle,
})
if err != nil {
return "", err
}
Expand Down
13 changes: 11 additions & 2 deletions cmd/clusterctl/client/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,20 @@ type InitOptions struct {
// allowMissingProviderCRD is used to allow for a missing provider CRD when listing images.
// It is set to false to enforce that provider CRD is available when performing the standard init operation.
allowMissingProviderCRD bool

// RESTThrottle defines parameters for the rest.Config's throttle.
RESTThrottle RESTThrottle
}

// Init initializes a management cluster by adding the requested list of providers.
func (c *clusterctlClient) Init(options InitOptions) ([]Components, error) {
log := logf.Log

// gets access to the management cluster
clusterClient, err := c.clusterClientFactory(ClusterClientFactoryInput{Kubeconfig: options.Kubeconfig})
clusterClient, err := c.clusterClientFactory(ClusterClientFactoryInput{
Kubeconfig: options.Kubeconfig,
RESTThrottle: options.RESTThrottle,
})
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -167,7 +173,10 @@ func (c *clusterctlClient) Init(options InitOptions) ([]Components, error) {
// InitImages returns the list of images required for init.
func (c *clusterctlClient) InitImages(options InitOptions) ([]string, error) {
// gets access to the management cluster
clusterClient, err := c.clusterClientFactory(ClusterClientFactoryInput{Kubeconfig: options.Kubeconfig})
clusterClient, err := c.clusterClientFactory(ClusterClientFactoryInput{
Kubeconfig: options.Kubeconfig,
RESTThrottle: options.RESTThrottle,
})
if err != nil {
return nil, err
}
Expand Down
Loading