Skip to content

Commit

Permalink
Adjust Discovery throttling (kptdev#3368)
Browse files Browse the repository at this point in the history
* Disable client-side throttling for resource discovery if server-side throttling is enabled
  • Loading branch information
jashandeep-sohi authored and chunglu-chou committed Aug 20, 2022
1 parent e3e4029 commit cb9d735
Showing 1 changed file with 34 additions and 23 deletions.
57 changes: 34 additions & 23 deletions commands/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ import (
func newFactory(cmd *cobra.Command, version string) cluster.Factory {
flags := cmd.PersistentFlags()
kubeConfigFlags := genericclioptions.NewConfigFlags(true).
WithDeprecatedPasswordFlag().
WithWrapConfigFn(UpdateQPS)
WithDeprecatedPasswordFlag()
kubeConfigFlags.AddFlags(flags)
UpdateQPS(kubeConfigFlags)
userAgentKubeConfigFlags := &cfgflags.UserAgentKubeConfigFlags{
Delegate: kubeConfigFlags,
UserAgent: fmt.Sprintf("kpt/%s", version),
Expand All @@ -43,8 +43,8 @@ func newFactory(cmd *cobra.Command, version string) cluster.Factory {
return cluster.NewFactory(userAgentKubeConfigFlags)
}

// UpdateQPS modifies a rest.Config to update the client-side throttling QPS and
// Burst QPS.
// UpdateQPS modifies a genericclioptions.ConfigFlags to update the client-side
// throttling QPS and Burst QPS (including for discovery).
//
// If Flow Control is enabled on the apiserver, client-side throttling is
// disabled!
Expand All @@ -54,26 +54,37 @@ func newFactory(cmd *cobra.Command, version string) cluster.Factory {
//
// Flow Control is enabled by default on Kubernetes v1.20+.
// https://kubernetes.io/docs/concepts/cluster-administration/flow-control/
func UpdateQPS(config *rest.Config) *rest.Config {
// Timeout if the query takes too long, defaulting to the lower QPS limits.
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
func UpdateQPS(flags *genericclioptions.ConfigFlags) {
flags.
WithWrapConfigFn(func(c *rest.Config) *rest.Config {
// Timeout if the query takes too long, defaulting to the lower QPS limits.
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

enabled, err := flowcontrol.IsEnabled(ctx, config)
if err != nil {
klog.Warningf("Failed to query apiserver to check for flow control enablement: %v", err)
// Default to the lower QPS limits.
}
if enabled {
config.QPS = -1
config.Burst = -1
klog.V(1).Infof("Flow control enabled on apiserver: client-side throttling QPS set to %.0f (burst: %d)", config.QPS, config.Burst)
} else {
config.QPS = maxIfNotNegative(config.QPS, 30)
config.Burst = int(maxIfNotNegative(float32(config.Burst), 60))
klog.V(1).Infof("Flow control disabled on apiserver: client-side throttling QPS set to %.0f (burst: %d)", config.QPS, config.Burst)
}
return config
enabled, err := flowcontrol.IsEnabled(ctx, c)
if err != nil {
klog.Warning("Failed to query apiserver to check for flow control enablement: %v", err)
// Default to the lower QPS limits.
}

qps := float32(-1)
burst := -1
if enabled {
klog.V(1).Infof("Flow control enabled on apiserver: client-side throttling QPS set to %.0f (burst: %d)", qps, burst)
} else {
qps = maxIfNotNegative(c.QPS, 30)
burst = int(maxIfNotNegative(float32(c.Burst), 60))
klog.V(1).Infof("Flow control disabled on apiserver: client-side throttling QPS set to %.0f (burst: %d)", qps, burst)
}

c.QPS = qps
c.Burst = burst
flags.
WithDiscoveryQPS(qps).
WithDiscoveryBurst(burst)

return c
})
}

func maxIfNotNegative(a, b float32) float32 {
Expand Down

0 comments on commit cb9d735

Please sign in to comment.