Skip to content

Commit

Permalink
Improve access to member-cluster resources that use pb/json protocol …
Browse files Browse the repository at this point in the history
…and allow users to set qps, burst values.

Signed-off-by: wangxiaofei67 <wangxiaofei67@jd.com>
  • Loading branch information
wangxf1987 committed May 13, 2024
1 parent 3232c52 commit 7541be5
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 9 deletions.
2 changes: 2 additions & 0 deletions cmd/karmada-search/app/karmada-search.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,8 @@ func config(o *options.Options, outOfTreeRegistryOptions ...Option) (*search.Con
KarmadaFactory: factory,
MinRequestTimeout: time.Second * time.Duration(serverConfig.Config.MinRequestTimeout),
OutOfTreeRegistry: outOfTreeRegistry,
MemberClientQPS: o.MemberClusterKubeAPIQPS,
MemberClientBurst: o.MemberClusterKubeAPIBurst,
})

if err != nil {
Expand Down
6 changes: 6 additions & 0 deletions cmd/karmada-search/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ type Options struct {
KubeAPIQPS float32
// KubeAPIBurst is the burst to allow while talking with karmada-search.
KubeAPIBurst int
// MemberClusterKubeAPIQPS is the QPS to use while talking to member cluster.
MemberClusterKubeAPIQPS float32
// MemberClusterKubeAPIBurst is the burst to allow while talking to member cluster.
MemberClusterKubeAPIBurst int

ProfileOpts profileflag.Options

Expand Down Expand Up @@ -66,6 +70,8 @@ func (o *Options) AddFlags(flags *pflag.FlagSet) {

flags.Float32Var(&o.KubeAPIQPS, "kube-api-qps", 40.0, "QPS to use while talking with karmada-apiserver.")
flags.IntVar(&o.KubeAPIBurst, "kube-api-burst", 60, "Burst to use while talking with karmada-apiserver.")
flags.Float32Var(&o.MemberClusterKubeAPIQPS, "member-client-kube-api-qps", 40.0, "QPS to use while talking to member client.")
flags.IntVar(&o.MemberClusterKubeAPIBurst, "member-client-kube-api-burst", 60, "Burst to use while talking to member client.")
flags.BoolVar(&o.DisableSearch, "disable-search", false, "Disable search feature that would save memory usage significantly.")
flags.BoolVar(&o.DisableProxy, "disable-proxy", false, "Disable proxy feature that would save memory usage significantly.")

Expand Down
2 changes: 1 addition & 1 deletion pkg/metricsadapter/multiclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (m *MultiClusterDiscovery) Set(clusterName string) error {
secretGetter := func(namespace string, name string) (*corev1.Secret, error) {
return m.secretLister.Secrets(namespace).Get(name)
}
clusterConfig, err := util.BuildClusterConfig(clusterName, clusterGetter, secretGetter)
clusterConfig, err := util.BuildClusterConfig(clusterName, clusterGetter, secretGetter, nil, nil)
if err != nil {
return err
}
Expand Down
9 changes: 6 additions & 3 deletions pkg/search/proxy/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,17 @@ type NewControllerOption struct {
MinRequestTimeout time.Duration

OutOfTreeRegistry pluginruntime.Registry

MemberClientQPS float32
MemberClientBurst int
}

// NewController create a controller for proxy
func NewController(option NewControllerOption) (*Controller, error) {
secretLister := option.KubeFactory.Core().V1().Secrets().Lister()
clusterLister := option.KarmadaFactory.Cluster().V1alpha1().Clusters().Lister()

clientFactory := dynamicClientForClusterFunc(clusterLister, secretLister)
clientFactory := dynamicClientForClusterFunc(clusterLister, secretLister, option.MemberClientQPS, option.MemberClientBurst)
multiClusterStore := store.NewMultiClusterCache(clientFactory, option.RestMapper)

allPlugins, err := newPlugins(option, multiClusterStore)
Expand Down Expand Up @@ -296,7 +299,7 @@ func (ctl *Controller) Connect(ctx context.Context, proxyPath string, responder
}

func dynamicClientForClusterFunc(clusterLister clusterlisters.ClusterLister,
secretLister listcorev1.SecretLister) func(string) (dynamic.Interface, error) {
secretLister listcorev1.SecretLister, qps float32, burst int) func(string) (dynamic.Interface, error) {
clusterGetter := func(cluster string) (*clusterv1alpha1.Cluster, error) {
return clusterLister.Get(cluster)
}
Expand All @@ -305,7 +308,7 @@ func dynamicClientForClusterFunc(clusterLister clusterlisters.ClusterLister,
}

return func(clusterName string) (dynamic.Interface, error) {
clusterConfig, err := util.BuildClusterConfig(clusterName, clusterGetter, secretGetter)
clusterConfig, err := util.BuildClusterConfig(clusterName, clusterGetter, secretGetter, &qps, &burst)
if err != nil {
return nil, err
}
Expand Down
29 changes: 25 additions & 4 deletions pkg/util/membercluster_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ package util
import (
"context"
"fmt"
"k8s.io/apimachinery/pkg/runtime"
"net/http"
"net/url"
"strings"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -71,7 +73,7 @@ type ClientOption struct {

// NewClusterScaleClientSet returns a ClusterScaleClient for the given member cluster.
func NewClusterScaleClientSet(clusterName string, client client.Client) (*ClusterScaleClient, error) {
clusterConfig, err := BuildClusterConfig(clusterName, clusterGetter(client), secretGetter(client))
clusterConfig, err := BuildClusterConfig(clusterName, clusterGetter(client), secretGetter(client), nil, nil)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -104,7 +106,7 @@ func NewClusterScaleClientSet(clusterName string, client client.Client) (*Cluste

// NewClusterClientSet returns a ClusterClient for the given member cluster.
func NewClusterClientSet(clusterName string, client client.Client, clientOption *ClientOption) (*ClusterClient, error) {
clusterConfig, err := BuildClusterConfig(clusterName, clusterGetter(client), secretGetter(client))
clusterConfig, err := BuildClusterConfig(clusterName, clusterGetter(client), secretGetter(client), nil, nil)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -142,7 +144,7 @@ func NewClusterClientSetForAgent(clusterName string, _ client.Client, clientOpti

// NewClusterDynamicClientSet returns a dynamic client for the given member cluster.
func NewClusterDynamicClientSet(clusterName string, client client.Client) (*DynamicClusterClient, error) {
clusterConfig, err := BuildClusterConfig(clusterName, clusterGetter(client), secretGetter(client))
clusterConfig, err := BuildClusterConfig(clusterName, clusterGetter(client), secretGetter(client), nil, nil)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -171,7 +173,7 @@ func NewClusterDynamicClientSetForAgent(clusterName string, _ client.Client) (*D
// BuildClusterConfig return rest config for member cluster.
func BuildClusterConfig(clusterName string,
clusterGetter func(string) (*clusterv1alpha1.Cluster, error),
secretGetter func(string, string) (*corev1.Secret, error)) (*rest.Config, error) {
secretGetter func(string, string) (*corev1.Secret, error), qps *float32, burst *int) (*rest.Config, error) {
cluster, err := clusterGetter(clusterName)
if err != nil {
return nil, err
Expand Down Expand Up @@ -227,6 +229,25 @@ func BuildClusterConfig(clusterName string,
}
}

if clusterConfig.ContentType != "" {
clusterConfig.ContentType = strings.Join([]string{clusterConfig.ContentType, runtime.ContentTypeProtobuf, runtime.ContentTypeJSON}, ",")
} else {
clusterConfig.ContentType = strings.Join([]string{runtime.ContentTypeProtobuf, runtime.ContentTypeJSON}, ",")
}

if clusterConfig.AcceptContentTypes != "" {
clusterConfig.AcceptContentTypes = strings.Join([]string{clusterConfig.AcceptContentTypes, runtime.ContentTypeProtobuf, runtime.ContentTypeJSON}, ",")
} else {
clusterConfig.AcceptContentTypes = strings.Join([]string{runtime.ContentTypeProtobuf, runtime.ContentTypeJSON}, ",")
}

if qps != nil {
clusterConfig.QPS = *qps
}
if burst != nil {
clusterConfig.Burst = *burst
}

return clusterConfig, nil
}

Expand Down
2 changes: 1 addition & 1 deletion test/e2e/framework/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func WaitPodMetricsReady(kubeClient kubernetes.Interface, karmadaClient karmada.
secretGetter := func(namespace string, name string) (*corev1.Secret, error) {
return kubeClient.CoreV1().Secrets(namespace).Get(context.Background(), name, metav1.GetOptions{})
}
config, err := util.BuildClusterConfig(cluster, clusterGetter, secretGetter)
config, err := util.BuildClusterConfig(cluster, clusterGetter, secretGetter, nil, nil)
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
metricsClient, err := metricsclientset.NewForConfig(config)
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
Expand Down

0 comments on commit 7541be5

Please sign in to comment.