Skip to content

Commit

Permalink
Merge pull request #5478 from jabellard/estimator_service_namespace
Browse files Browse the repository at this point in the history
Add the Ability to Specify the Namespace Used for Discovering Scheduler Estimator Services
  • Loading branch information
karmada-bot authored Sep 10, 2024
2 parents ca1b2ce + b6fbcb4 commit 07e2976
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 20 deletions.
3 changes: 3 additions & 0 deletions cmd/descheduler/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ type Options struct {

// SchedulerEstimatorTimeout specifies the timeout period of calling the accurate scheduler estimator service.
SchedulerEstimatorTimeout metav1.Duration
// SchedulerEstimatorServiceNamespace specifies the namespace to be used for discovering scheduler estimator services.
SchedulerEstimatorServiceNamespace string
// SchedulerEstimatorServicePrefix presents the prefix of the accurate scheduler estimator service name.
SchedulerEstimatorServicePrefix string
// SchedulerEstimatorPort is the port that the accurate scheduler estimator server serves at.
Expand Down Expand Up @@ -129,6 +131,7 @@ func (o *Options) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&o.SchedulerEstimatorKeyFile, "scheduler-estimator-key-file", "", "SSL key file used to secure scheduler estimator communication.")
fs.StringVar(&o.SchedulerEstimatorCaFile, "scheduler-estimator-ca-file", "", "SSL Certificate Authority file used to secure scheduler estimator communication.")
fs.BoolVar(&o.InsecureSkipEstimatorVerify, "insecure-skip-estimator-verify", false, "Controls whether verifies the scheduler estimator's certificate chain and host name.")
fs.StringVar(&o.SchedulerEstimatorServiceNamespace, "scheduler-estimator-service-namespace", util.NamespaceKarmadaSystem, "The namespace to be used for discovering scheduler estimator services.")
fs.StringVar(&o.SchedulerEstimatorServicePrefix, "scheduler-estimator-service-prefix", "karmada-scheduler-estimator", "The prefix of scheduler estimator service name")
fs.DurationVar(&o.DeschedulingInterval.Duration, "descheduling-interval", defaultDeschedulingInterval, "Time interval between two consecutive descheduler executions. Setting this value instructs the descheduler to run in a continuous loop at the interval specified.")
fs.DurationVar(&o.UnschedulableThreshold.Duration, "unschedulable-threshold", defaultUnschedulableThreshold, "The period of pod unschedulable condition. This value is considered as a classification standard of unschedulable replicas.")
Expand Down
3 changes: 3 additions & 0 deletions cmd/scheduler/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ type Options struct {
DisableSchedulerEstimatorInPullMode bool
// SchedulerEstimatorTimeout specifies the timeout period of calling the accurate scheduler estimator service.
SchedulerEstimatorTimeout metav1.Duration
// SchedulerEstimatorServiceNamespace specifies the namespace to be used for discovering scheduler estimator services.
SchedulerEstimatorServiceNamespace string
// SchedulerEstimatorServicePrefix presents the prefix of the accurate scheduler estimator service name.
SchedulerEstimatorServicePrefix string
// SchedulerEstimatorPort is the port that the accurate scheduler estimator server serves at.
Expand Down Expand Up @@ -164,6 +166,7 @@ func (o *Options) AddFlags(fs *pflag.FlagSet) {
fs.BoolVar(&o.EnableSchedulerEstimator, "enable-scheduler-estimator", false, "Enable calling cluster scheduler estimator for adjusting replicas.")
fs.BoolVar(&o.DisableSchedulerEstimatorInPullMode, "disable-scheduler-estimator-in-pull-mode", false, "Disable the scheduler estimator for clusters in pull mode, which takes effect only when enable-scheduler-estimator is true.")
fs.DurationVar(&o.SchedulerEstimatorTimeout.Duration, "scheduler-estimator-timeout", 3*time.Second, "Specifies the timeout period of calling the scheduler estimator service.")
fs.StringVar(&o.SchedulerEstimatorServiceNamespace, "scheduler-estimator-service-namespace", util.NamespaceKarmadaSystem, "The namespace to be used for discovering scheduler estimator services.")
fs.StringVar(&o.SchedulerEstimatorServicePrefix, "scheduler-estimator-service-prefix", "karmada-scheduler-estimator", "The prefix of scheduler estimator service name")
fs.IntVar(&o.SchedulerEstimatorPort, "scheduler-estimator-port", defaultEstimatorPort, "The secure port on which to connect the accurate scheduler estimator.")
fs.StringVar(&o.SchedulerEstimatorCertFile, "scheduler-estimator-cert-file", "", "SSL certification file used to secure scheduler estimator communication.")
Expand Down
1 change: 1 addition & 0 deletions cmd/scheduler/app/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ func run(opts *options.Options, stopChan <-chan struct{}, registryOptions ...Opt
scheduler.WithOutOfTreeRegistry(outOfTreeRegistry),
scheduler.WithEnableSchedulerEstimator(opts.EnableSchedulerEstimator),
scheduler.WithDisableSchedulerEstimatorInPullMode(opts.DisableSchedulerEstimatorInPullMode),
scheduler.WithSchedulerEstimatorServiceNamespace(opts.SchedulerEstimatorServiceNamespace),
scheduler.WithSchedulerEstimatorServicePrefix(opts.SchedulerEstimatorServicePrefix),
scheduler.WithSchedulerEstimatorConnection(opts.SchedulerEstimatorPort, opts.SchedulerEstimatorCertFile, opts.SchedulerEstimatorKeyFile, opts.SchedulerEstimatorCaFile, opts.InsecureSkipEstimatorVerify),
scheduler.WithSchedulerEstimatorTimeout(opts.SchedulerEstimatorTimeout),
Expand Down
30 changes: 21 additions & 9 deletions pkg/descheduler/descheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,11 @@ type Descheduler struct {

eventRecorder record.EventRecorder

schedulerEstimatorCache *estimatorclient.SchedulerEstimatorCache
schedulerEstimatorServicePrefix string
schedulerEstimatorClientConfig *grpcconnection.ClientConfig
schedulerEstimatorWorker util.AsyncWorker
schedulerEstimatorCache *estimatorclient.SchedulerEstimatorCache
schedulerEstimatorServiceNamespace string
schedulerEstimatorServicePrefix string
schedulerEstimatorClientConfig *grpcconnection.ClientConfig
schedulerEstimatorWorker util.AsyncWorker

unschedulableThreshold time.Duration
deschedulingInterval time.Duration
Expand All @@ -93,9 +94,10 @@ func NewDescheduler(karmadaClient karmadaclientset.Interface, kubeClient kuberne
KeyFile: opts.SchedulerEstimatorKeyFile,
TargetPort: opts.SchedulerEstimatorPort,
},
schedulerEstimatorServicePrefix: opts.SchedulerEstimatorServicePrefix,
unschedulableThreshold: opts.UnschedulableThreshold.Duration,
deschedulingInterval: opts.DeschedulingInterval.Duration,
schedulerEstimatorServiceNamespace: opts.SchedulerEstimatorServiceNamespace,
schedulerEstimatorServicePrefix: opts.SchedulerEstimatorServicePrefix,
unschedulableThreshold: opts.UnschedulableThreshold.Duration,
deschedulingInterval: opts.DeschedulingInterval.Duration,
}
// ignore the error here because the informers haven't been started
_ = desched.bindingInformer.SetTransform(fedinformer.StripUnusedFields)
Expand Down Expand Up @@ -284,7 +286,12 @@ func (d *Descheduler) establishEstimatorConnections() {
return
}
for i := range clusterList.Items {
if err = estimatorclient.EstablishConnection(d.KubeClient, clusterList.Items[i].Name, d.schedulerEstimatorCache, d.schedulerEstimatorServicePrefix, d.schedulerEstimatorClientConfig); err != nil {
serviceInfo := estimatorclient.SchedulerEstimatorServiceInfo{
Name: clusterList.Items[i].Name,
Namespace: d.schedulerEstimatorServiceNamespace,
NamePrefix: d.schedulerEstimatorServicePrefix,
}
if err = estimatorclient.EstablishConnection(d.KubeClient, serviceInfo, d.schedulerEstimatorCache, d.schedulerEstimatorClientConfig); err != nil {
klog.Error(err)
}
}
Expand All @@ -304,7 +311,12 @@ func (d *Descheduler) reconcileEstimatorConnection(key util.QueueKey) error {
}
return err
}
return estimatorclient.EstablishConnection(d.KubeClient, name, d.schedulerEstimatorCache, d.schedulerEstimatorServicePrefix, d.schedulerEstimatorClientConfig)
serviceInfo := estimatorclient.SchedulerEstimatorServiceInfo{
Name: name,
Namespace: d.schedulerEstimatorServiceNamespace,
NamePrefix: d.schedulerEstimatorServicePrefix,
}
return estimatorclient.EstablishConnection(d.KubeClient, serviceInfo, d.schedulerEstimatorCache, d.schedulerEstimatorClientConfig)
}

func (d *Descheduler) recordDescheduleResultEventForResourceBinding(rb *workv1alpha2.ResourceBinding, message string, err error) {
Expand Down
24 changes: 15 additions & 9 deletions pkg/estimator/client/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"k8s.io/klog/v2"

estimatorservice "github.com/karmada-io/karmada/pkg/estimator/service"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/grpcconnection"
"github.com/karmada-io/karmada/pkg/util/names"
)
Expand All @@ -37,6 +36,13 @@ type SchedulerEstimatorCache struct {
estimator map[string]*clientWrapper
}

// SchedulerEstimatorServiceInfo contains information needed to discover and connect to a scheduler estimator service.
type SchedulerEstimatorServiceInfo struct {
Name string
NamePrefix string
Namespace string
}

// NewSchedulerEstimatorCache returns an accurate scheduler estimator cache.
func NewSchedulerEstimatorCache() *SchedulerEstimatorCache {
return &SchedulerEstimatorCache{
Expand Down Expand Up @@ -97,25 +103,25 @@ func (c *SchedulerEstimatorCache) GetClient(name string) (estimatorservice.Estim
}

// EstablishConnection establishes a new gRPC connection with the specified cluster scheduler estimator.
func EstablishConnection(kubeClient kubernetes.Interface, name string, estimatorCache *SchedulerEstimatorCache, estimatorServicePrefix string, grpcConfig *grpcconnection.ClientConfig) error {
if estimatorCache.IsEstimatorExist(name) {
func EstablishConnection(kubeClient kubernetes.Interface, serviceInfo SchedulerEstimatorServiceInfo, estimatorCache *SchedulerEstimatorCache, grpcConfig *grpcconnection.ClientConfig) error {
if estimatorCache.IsEstimatorExist(serviceInfo.Name) {
return nil
}

serverAddr, err := resolveCluster(kubeClient, util.NamespaceKarmadaSystem,
names.GenerateEstimatorServiceName(estimatorServicePrefix, name), int32(grpcConfig.TargetPort))
serverAddr, err := resolveCluster(kubeClient, serviceInfo.Namespace,
names.GenerateEstimatorServiceName(serviceInfo.NamePrefix, serviceInfo.Name), int32(grpcConfig.TargetPort))
if err != nil {
return err
}

klog.Infof("Start dialing estimator server(%s) of cluster(%s).", serverAddr, name)
klog.Infof("Start dialing estimator server(%s) of cluster(%s).", serverAddr, serviceInfo.Name)
cc, err := grpcConfig.DialWithTimeOut(serverAddr, 5*time.Second)
if err != nil {
klog.Errorf("Failed to dial cluster(%s): %v.", name, err)
klog.Errorf("Failed to dial cluster(%s): %v.", serviceInfo.Name, err)
return err
}
c := estimatorservice.NewEstimatorClient(cc)
estimatorCache.AddCluster(name, cc, c)
klog.Infof("Connection with estimator server(%s) of cluster(%s) has been established.", serverAddr, name)
estimatorCache.AddCluster(serviceInfo.Name, cc, c)
klog.Infof("Connection with estimator server(%s) of cluster(%s) has been established.", serverAddr, serviceInfo.Name)
return nil
}
25 changes: 23 additions & 2 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ type Scheduler struct {
enableSchedulerEstimator bool
disableSchedulerEstimatorInPullMode bool
schedulerEstimatorCache *estimatorclient.SchedulerEstimatorCache
schedulerEstimatorServiceNamespace string
schedulerEstimatorServicePrefix string
schedulerEstimatorWorker util.AsyncWorker
schedulerEstimatorClientConfig *grpcconnection.ClientConfig
Expand All @@ -121,6 +122,8 @@ type schedulerOptions struct {
disableSchedulerEstimatorInPullMode bool
// schedulerEstimatorTimeout specifies the timeout period of calling the accurate scheduler estimator service.
schedulerEstimatorTimeout metav1.Duration
// schedulerEstimatorServiceNamespace specifies the namespace to be used for discovering scheduler estimator services.
schedulerEstimatorServiceNamespace string
// SchedulerEstimatorServicePrefix presents the prefix of the accurate scheduler estimator service name.
schedulerEstimatorServicePrefix string
// schedulerName is the name of the scheduler. Default is "default-scheduler".
Expand Down Expand Up @@ -174,6 +177,13 @@ func WithSchedulerEstimatorTimeout(schedulerEstimatorTimeout metav1.Duration) Op
}
}

// WithSchedulerEstimatorServiceNamespace sets the schedulerEstimatorServiceNamespace for the scheduler
func WithSchedulerEstimatorServiceNamespace(schedulerEstimatorServiceNamespace string) Option {
return func(o *schedulerOptions) {
o.schedulerEstimatorServiceNamespace = schedulerEstimatorServiceNamespace
}
}

// WithSchedulerEstimatorServicePrefix sets the schedulerEstimatorServicePrefix for scheduler
func WithSchedulerEstimatorServicePrefix(schedulerEstimatorServicePrefix string) Option {
return func(o *schedulerOptions) {
Expand Down Expand Up @@ -262,6 +272,7 @@ func NewScheduler(dynamicClient dynamic.Interface, karmadaClient karmadaclientse
sched.enableSchedulerEstimator = options.enableSchedulerEstimator
sched.disableSchedulerEstimatorInPullMode = options.disableSchedulerEstimatorInPullMode
sched.schedulerEstimatorServicePrefix = options.schedulerEstimatorServicePrefix
sched.schedulerEstimatorServiceNamespace = options.schedulerEstimatorServiceNamespace
sched.schedulerEstimatorClientConfig = options.schedulerEstimatorClientConfig
sched.schedulerEstimatorCache = estimatorclient.NewSchedulerEstimatorCache()
schedulerEstimatorWorkerOptions := util.Options{
Expand Down Expand Up @@ -776,7 +787,12 @@ func (s *Scheduler) reconcileEstimatorConnection(key util.QueueKey) error {
return nil
}

return estimatorclient.EstablishConnection(s.KubeClient, name, s.schedulerEstimatorCache, s.schedulerEstimatorServicePrefix, s.schedulerEstimatorClientConfig)
serviceInfo := estimatorclient.SchedulerEstimatorServiceInfo{
Name: name,
Namespace: s.schedulerEstimatorServiceNamespace,
NamePrefix: s.schedulerEstimatorServicePrefix,
}
return estimatorclient.EstablishConnection(s.KubeClient, serviceInfo, s.schedulerEstimatorCache, s.schedulerEstimatorClientConfig)
}

func (s *Scheduler) establishEstimatorConnections() {
Expand All @@ -789,7 +805,12 @@ func (s *Scheduler) establishEstimatorConnections() {
if clusterList.Items[i].Spec.SyncMode == clusterv1alpha1.Pull && s.disableSchedulerEstimatorInPullMode {
continue
}
if err = estimatorclient.EstablishConnection(s.KubeClient, clusterList.Items[i].Name, s.schedulerEstimatorCache, s.schedulerEstimatorServicePrefix, s.schedulerEstimatorClientConfig); err != nil {
serviceInfo := estimatorclient.SchedulerEstimatorServiceInfo{
Name: clusterList.Items[i].Name,
Namespace: s.schedulerEstimatorServiceNamespace,
NamePrefix: s.schedulerEstimatorServicePrefix,
}
if err = estimatorclient.EstablishConnection(s.KubeClient, serviceInfo, s.schedulerEstimatorCache, s.schedulerEstimatorClientConfig); err != nil {
klog.Error(err)
}
}
Expand Down
17 changes: 17 additions & 0 deletions pkg/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func TestCreateScheduler(t *testing.T) {
karmadaClient := karmadafake.NewSimpleClientset()
kubeClient := fake.NewSimpleClientset()
port := 10025
serviceNamespace := "tenant1"
servicePrefix := "test-service-prefix"
schedulerName := "test-scheduler"
timeout := metav1.Duration{Duration: 5 * time.Second}
Expand All @@ -51,6 +52,7 @@ func TestCreateScheduler(t *testing.T) {
schedulerEstimatorPort int
disableSchedulerEstimatorInPullMode bool
schedulerEstimatorTimeout metav1.Duration
schedulerEstimatorServiceNamespace string
schedulerEstimatorServicePrefix string
schedulerName string
schedulerEstimatorClientConfig *grpcconnection.ClientConfig
Expand Down Expand Up @@ -101,6 +103,17 @@ func TestCreateScheduler(t *testing.T) {
schedulerEstimatorPort: port,
schedulerEstimatorServicePrefix: servicePrefix,
},
{
name: "scheduler with custom SchedulerEstimatorServiceNamespace set",
opts: []Option{
WithEnableSchedulerEstimator(true),
WithSchedulerEstimatorConnection(port, "", "", "", false),
WithSchedulerEstimatorServiceNamespace(serviceNamespace),
},
enableSchedulerEstimator: true,
schedulerEstimatorPort: port,
schedulerEstimatorServiceNamespace: serviceNamespace,
},
{
name: "scheduler with SchedulerName enabled",
opts: []Option{
Expand Down Expand Up @@ -147,6 +160,10 @@ func TestCreateScheduler(t *testing.T) {
t.Errorf("unexpected disableSchedulerEstimatorInPullMode want %v, got %v", tc.disableSchedulerEstimatorInPullMode, sche.disableSchedulerEstimatorInPullMode)
}

if tc.schedulerEstimatorServiceNamespace != sche.schedulerEstimatorServiceNamespace {
t.Errorf("unexpected schedulerEstimatorServiceNamespace want %v, got %v", tc.schedulerEstimatorServiceNamespace, sche.schedulerEstimatorServiceNamespace)
}

if tc.schedulerEstimatorServicePrefix != sche.schedulerEstimatorServicePrefix {
t.Errorf("unexpected schedulerEstimatorServicePrefix want %v, got %v", tc.schedulerEstimatorServicePrefix, sche.schedulerEstimatorServicePrefix)
}
Expand Down

0 comments on commit 07e2976

Please sign in to comment.