Skip to content

Commit

Permalink
fix: update scheduler estimator cache when cluster changed
Browse files Browse the repository at this point in the history
Signed-off-by: zhy76 <958474674@qq.com>
  • Loading branch information
zhy76 committed Jul 9, 2024
1 parent c8acebc commit e0352b1
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 11 deletions.
2 changes: 1 addition & 1 deletion pkg/descheduler/descheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ func TestDescheduler_worker(t *testing.T) {
mock.MatchedBy(func(context.Context) bool { return true }),
mock.MatchedBy(func(in *pb.UnschedulableReplicasRequest) bool { return in.Cluster == cluster.Name }),
).Return(mockResultFn, nil)
desched.schedulerEstimatorCache.AddCluster(cluster.Name, nil, mockClient)
desched.schedulerEstimatorCache.AddOrUpdateCluster(cluster.Name, nil, mockClient)
}

desched.informerFactory.Start(ctx.Done())
Expand Down
21 changes: 11 additions & 10 deletions pkg/estimator/client/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,10 @@ func (c *SchedulerEstimatorCache) IsEstimatorExist(name string) bool {
return exist
}

// AddCluster adds a grpc connection and associated client into the cache.
func (c *SchedulerEstimatorCache) AddCluster(name string, connection *grpc.ClientConn, client estimatorservice.EstimatorClient) {
// AddOrUpdateCluster adds or updates a grpc connection and associated client into the cache.
func (c *SchedulerEstimatorCache) AddOrUpdateCluster(name string, connection *grpc.ClientConn, client estimatorservice.EstimatorClient) {
c.lock.Lock()
defer c.lock.Unlock()
// If more than one worker have established connections at the same time,
// only the first one will be used. The ladder ones would be abandoned.
if _, exist := c.estimator[name]; exist {
_ = connection.Close()
return
}
c.estimator[name] = &clientWrapper{
connection: connection,
client: client,
Expand Down Expand Up @@ -98,8 +92,14 @@ func (c *SchedulerEstimatorCache) GetClient(name string) (estimatorservice.Estim

// EstablishConnection establishes a new gRPC connection with the specified cluster scheduler estimator.
func EstablishConnection(kubeClient kubernetes.Interface, name string, estimatorCache *SchedulerEstimatorCache, estimatorServicePrefix string, grpcConfig *grpcconnection.ClientConfig) error {
// check if the connection already exists, if so, close it and establish a new connection
if estimatorCache.IsEstimatorExist(name) {
return nil
// close it and establish a new connection
err := estimatorCache.estimator[name].connection.Close()
if err != nil {
klog.Errorf("Failed to close connection of cluster(%s): %v.", name, err)
return err
}
}

serverAddr, err := resolveCluster(kubeClient, util.NamespaceKarmadaSystem,
Expand All @@ -115,7 +115,8 @@ func EstablishConnection(kubeClient kubernetes.Interface, name string, estimator
return err
}
c := estimatorservice.NewEstimatorClient(cc)
estimatorCache.AddCluster(name, cc, c)
// add or update the cache
estimatorCache.AddOrUpdateCluster(name, cc, c)
klog.Infof("Connection with estimator server(%s) of cluster(%s) has been established.", serverAddr, name)
return nil
}

0 comments on commit e0352b1

Please sign in to comment.