From d43a045228c06ea9df5be8db8deffe68abfe3785 Mon Sep 17 00:00:00 2001 From: zhy76 <958474674@qq.com> Date: Tue, 9 Jul 2024 11:41:18 +0800 Subject: [PATCH] fix: update scheduler estimator cache when cluster changed Signed-off-by: zhy76 <958474674@qq.com> --- pkg/descheduler/descheduler_test.go | 2 +- pkg/estimator/client/cache.go | 24 +++++------------------- 2 files changed, 6 insertions(+), 20 deletions(-) diff --git a/pkg/descheduler/descheduler_test.go b/pkg/descheduler/descheduler_test.go index 2b4e74822a05..3d523d39c906 100644 --- a/pkg/descheduler/descheduler_test.go +++ b/pkg/descheduler/descheduler_test.go @@ -505,7 +505,7 @@ func TestDescheduler_worker(t *testing.T) { mock.MatchedBy(func(context.Context) bool { return true }), mock.MatchedBy(func(in *pb.UnschedulableReplicasRequest) bool { return in.Cluster == cluster.Name }), ).Return(mockResultFn, nil) - desched.schedulerEstimatorCache.AddCluster(cluster.Name, nil, mockClient) + desched.schedulerEstimatorCache.AddOrUpdateCluster(cluster.Name, nil, mockClient) } desched.informerFactory.Start(ctx.Done()) diff --git a/pkg/estimator/client/cache.go b/pkg/estimator/client/cache.go index cc7ad0f6a9c9..33da2daca5c0 100644 --- a/pkg/estimator/client/cache.go +++ b/pkg/estimator/client/cache.go @@ -49,23 +49,13 @@ type clientWrapper struct { client estimatorservice.EstimatorClient } -// IsEstimatorExist checks whether the cluster estimator exists in the cache. -func (c *SchedulerEstimatorCache) IsEstimatorExist(name string) bool { - c.lock.RLock() - defer c.lock.RUnlock() - _, exist := c.estimator[name] - return exist -} - -// AddCluster adds a grpc connection and associated client into the cache. -func (c *SchedulerEstimatorCache) AddCluster(name string, connection *grpc.ClientConn, client estimatorservice.EstimatorClient) { +// AddOrUpdateCluster adds or updates a grpc connection and associated client into the cache. +func (c *SchedulerEstimatorCache) AddOrUpdateCluster(name string, connection *grpc.ClientConn, client estimatorservice.EstimatorClient) { c.lock.Lock() defer c.lock.Unlock() - // If more than one worker have established connections at the same time, - // only the first one will be used. The ladder ones would be abandoned. + if _, exist := c.estimator[name]; exist { - _ = connection.Close() - return + _ = c.estimator[name].connection.Close() } c.estimator[name] = &clientWrapper{ connection: connection, @@ -98,10 +88,6 @@ func (c *SchedulerEstimatorCache) GetClient(name string) (estimatorservice.Estim // EstablishConnection establishes a new gRPC connection with the specified cluster scheduler estimator. func EstablishConnection(kubeClient kubernetes.Interface, name string, estimatorCache *SchedulerEstimatorCache, estimatorServicePrefix string, grpcConfig *grpcconnection.ClientConfig) error { - if estimatorCache.IsEstimatorExist(name) { - return nil - } - serverAddr, err := resolveCluster(kubeClient, util.NamespaceKarmadaSystem, names.GenerateEstimatorServiceName(estimatorServicePrefix, name), int32(grpcConfig.TargetPort)) if err != nil { @@ -115,7 +101,7 @@ func EstablishConnection(kubeClient kubernetes.Interface, name string, estimator return err } c := estimatorservice.NewEstimatorClient(cc) - estimatorCache.AddCluster(name, cc, c) + estimatorCache.AddOrUpdateCluster(name, cc, c) klog.Infof("Connection with estimator server(%s) of cluster(%s) has been established.", serverAddr, name) return nil }