Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: update scheduler estimator cache when cluster changed #5151

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/descheduler/descheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ func TestDescheduler_worker(t *testing.T) {
mock.MatchedBy(func(context.Context) bool { return true }),
mock.MatchedBy(func(in *pb.UnschedulableReplicasRequest) bool { return in.Cluster == cluster.Name }),
).Return(mockResultFn, nil)
desched.schedulerEstimatorCache.AddCluster(cluster.Name, nil, mockClient)
desched.schedulerEstimatorCache.AddOrUpdateCluster(cluster.Name, nil, mockClient)
}

desched.informerFactory.Start(ctx.Done())
Expand Down
16 changes: 5 additions & 11 deletions pkg/estimator/client/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,13 @@ func (c *SchedulerEstimatorCache) IsEstimatorExist(name string) bool {
return exist
}

// AddCluster adds a grpc connection and associated client into the cache.
func (c *SchedulerEstimatorCache) AddCluster(name string, connection *grpc.ClientConn, client estimatorservice.EstimatorClient) {
// AddOrUpdateCluster adds or updates a grpc connection and associated client into the cache.
func (c *SchedulerEstimatorCache) AddOrUpdateCluster(name string, connection *grpc.ClientConn, client estimatorservice.EstimatorClient) {
c.lock.Lock()
defer c.lock.Unlock()
// If more than one worker have established connections at the same time,
// only the first one will be used. The ladder ones would be abandoned.

if _, exist := c.estimator[name]; exist {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggest:

if oldConn, exist := c.estimator[name]; exist {
    oldConn.Close()
}

_ = connection.Close()
return
_ = c.estimator[name].connection.Close()
}
c.estimator[name] = &clientWrapper{
connection: connection,
Expand Down Expand Up @@ -98,10 +96,6 @@ func (c *SchedulerEstimatorCache) GetClient(name string) (estimatorservice.Estim

// EstablishConnection establishes a new gRPC connection with the specified cluster scheduler estimator.
func EstablishConnection(kubeClient kubernetes.Interface, name string, estimatorCache *SchedulerEstimatorCache, estimatorServicePrefix string, grpcConfig *grpcconnection.ClientConfig) error {
if estimatorCache.IsEstimatorExist(name) {
return nil
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the cluster state changes frequently, I am worried that this modification will cause the connection to be updated too frequently. that would be bad.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree, updating too often is also a problem, unless it has to be updated.
@zhy76 Can you remind me what the problem is if the scheduler estimator cache is not updated when the cluster changes? Does it have to be updated every time? Or is it possible to update only part of it? For example, does the connection need to be updated?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've reconsidered, and it seems there is no impact. It looks like the author watches for updates only to ensure that all clusters are added to the cache. Only uses the cluster name to find the corresponding estimator server's service and has no other usage. This PR may no longer be necessary. Thank you all for your help. Ask @Garrybest for check and final decision.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly, the cluster name is used to connect the corresponding estimator service in host cluster. I think there is no such circumstance to update this service, right?

Meanwhile, a cluster status could change frequently, so too much unnecessary connection establishment could be expensive.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your reply~

serverAddr, err := resolveCluster(kubeClient, util.NamespaceKarmadaSystem,
names.GenerateEstimatorServiceName(estimatorServicePrefix, name), int32(grpcConfig.TargetPort))
if err != nil {
Expand All @@ -115,7 +109,7 @@ func EstablishConnection(kubeClient kubernetes.Interface, name string, estimator
return err
}
c := estimatorservice.NewEstimatorClient(cc)
estimatorCache.AddCluster(name, cc, c)
estimatorCache.AddOrUpdateCluster(name, cc, c)
klog.Infof("Connection with estimator server(%s) of cluster(%s) has been established.", serverAddr, name)
return nil
}