Skip to content

Commit

Permalink
fix: update scheduler estimator cache when cluster changed
Browse files Browse the repository at this point in the history
Signed-off-by: zhy76 <958474674@qq.com>
  • Loading branch information
zhy76 committed Jul 9, 2024
1 parent c8acebc commit 0aa66fa
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 20 deletions.
2 changes: 1 addition & 1 deletion pkg/descheduler/descheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ func TestDescheduler_worker(t *testing.T) {
mock.MatchedBy(func(context.Context) bool { return true }),
mock.MatchedBy(func(in *pb.UnschedulableReplicasRequest) bool { return in.Cluster == cluster.Name }),
).Return(mockResultFn, nil)
desched.schedulerEstimatorCache.AddCluster(cluster.Name, nil, mockClient)
desched.schedulerEstimatorCache.AddOrUpdateCluster(cluster.Name, nil, mockClient)
}

desched.informerFactory.Start(ctx.Done())
Expand Down
36 changes: 17 additions & 19 deletions pkg/estimator/client/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,24 +49,10 @@ type clientWrapper struct {
client estimatorservice.EstimatorClient
}

// IsEstimatorExist checks whether the cluster estimator exists in the cache.
func (c *SchedulerEstimatorCache) IsEstimatorExist(name string) bool {
c.lock.RLock()
defer c.lock.RUnlock()
_, exist := c.estimator[name]
return exist
}

// AddCluster adds a grpc connection and associated client into the cache.
func (c *SchedulerEstimatorCache) AddCluster(name string, connection *grpc.ClientConn, client estimatorservice.EstimatorClient) {
// AddOrUpdateCluster adds or updates a grpc connection and associated client into the cache.
func (c *SchedulerEstimatorCache) AddOrUpdateCluster(name string, connection *grpc.ClientConn, client estimatorservice.EstimatorClient) {
c.lock.Lock()
defer c.lock.Unlock()
// If more than one worker have established connections at the same time,
// only the first one will be used. The ladder ones would be abandoned.
if _, exist := c.estimator[name]; exist {
_ = connection.Close()
return
}
c.estimator[name] = &clientWrapper{
connection: connection,
client: client,
Expand Down Expand Up @@ -98,9 +84,20 @@ func (c *SchedulerEstimatorCache) GetClient(name string) (estimatorservice.Estim

// EstablishConnection establishes a new gRPC connection with the specified cluster scheduler estimator.
func EstablishConnection(kubeClient kubernetes.Interface, name string, estimatorCache *SchedulerEstimatorCache, estimatorServicePrefix string, grpcConfig *grpcconnection.ClientConfig) error {
if estimatorCache.IsEstimatorExist(name) {
return nil
// check if the connection already exists, if so, close it and establish a new connection
estimatorCache.lock.Lock()
es, exist := estimatorCache.estimator[name]
if exist {
// Use lock to prevent if more than one worker close connection at the same time,
// a connection may be closed multiple times.
err := es.connection.Close()
if err != nil {
estimatorCache.lock.Unlock()
klog.Errorf("Failed to close connection of cluster(%s): %v.", name, err)
return err
}
}
estimatorCache.lock.Unlock()

serverAddr, err := resolveCluster(kubeClient, util.NamespaceKarmadaSystem,
names.GenerateEstimatorServiceName(estimatorServicePrefix, name), int32(grpcConfig.TargetPort))
Expand All @@ -115,7 +112,8 @@ func EstablishConnection(kubeClient kubernetes.Interface, name string, estimator
return err
}
c := estimatorservice.NewEstimatorClient(cc)
estimatorCache.AddCluster(name, cc, c)
// add or update the cache
estimatorCache.AddOrUpdateCluster(name, cc, c)
klog.Infof("Connection with estimator server(%s) of cluster(%s) has been established.", serverAddr, name)
return nil
}

0 comments on commit 0aa66fa

Please sign in to comment.