-
Notifications
You must be signed in to change notification settings - Fork 829
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: update scheduler estimator cache when cluster changed #5151
Conversation
Codecov ReportAttention: Patch coverage is
❗ Your organization needs to install the Codecov GitHub app to enable full functionality. Additional details and impacted files@@ Coverage Diff @@
## master #5151 +/- ##
==========================================
+ Coverage 28.21% 28.22% +0.01%
==========================================
Files 632 632
Lines 43568 43542 -26
==========================================
- Hits 12291 12289 -2
+ Misses 30381 30355 -26
- Partials 896 898 +2
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
/assign |
pkg/estimator/client/cache.go
Outdated
// AddCluster adds a grpc connection and associated client into the cache. | ||
func (c *SchedulerEstimatorCache) AddCluster(name string, connection *grpc.ClientConn, client estimatorservice.EstimatorClient) { | ||
// AddUpdateCluster adds or updates a grpc connection and associated client into the cache. | ||
func (c *SchedulerEstimatorCache) AddUpdateCluster(name string, connection *grpc.ClientConn, client estimatorservice.EstimatorClient) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
func (c *SchedulerEstimatorCache) AddUpdateCluster(name string, connection *grpc.ClientConn, client estimatorservice.EstimatorClient) { | |
func (c *SchedulerEstimatorCache) AddOrUpdateCluster(name string, connection *grpc.ClientConn, client estimatorservice.EstimatorClient) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed,Thanks
pkg/estimator/client/cache.go
Outdated
// close it and establish a new connection | ||
err := estimatorCache.estimator[name].connection.Close() | ||
if err != nil { | ||
klog.Errorf("Failed to close connection of cluster(%s): %v.", name, err) | ||
return err | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There may be concurrency issues here. If more than one worker establish connections at the same time, a connection may be closed multiple times.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
e0352b1
to
1ea0af9
Compare
cb7dd1c
to
347fe3f
Compare
pkg/estimator/client/cache.go
Outdated
@@ -47,26 +47,13 @@ func NewSchedulerEstimatorCache() *SchedulerEstimatorCache { | |||
type clientWrapper struct { | |||
connection *grpc.ClientConn | |||
client estimatorservice.EstimatorClient | |||
closeLock sync.Mutex |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why add this lock? it seems that all operations are protected by this write lock c.lock
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agree, Continuing to use lock in estimatorCache is sufficient.
Would it be better to modify it like this?
// EstablishConnection establishes a new gRPC connection with the specified cluster scheduler estimator.
func EstablishConnection(kubeClient kubernetes.Interface, name string, estimatorCache *SchedulerEstimatorCache, estimatorServicePrefix string, grpcConfig *grpcconnection.ClientConfig) error {
serverAddr, err := resolveCluster(kubeClient, util.NamespaceKarmadaSystem,
names.GenerateEstimatorServiceName(estimatorServicePrefix, name), int32(grpcConfig.TargetPort))
if err != nil {
return err
}
klog.Infof("Start dialing estimator server(%s) of cluster(%s).", serverAddr, name)
cc, err := grpcConfig.DialWithTimeOut(serverAddr, 5*time.Second)
if err != nil {
klog.Errorf("Failed to dial cluster(%s): %v.", name, err)
return err
}
c := estimatorservice.NewEstimatorClient(cc)
estimatorCache.AddOrUpdateCluster(name, cc, c)
klog.Infof("Connection with estimator server(%s) of cluster(%s) has been established.", serverAddr, name)
return nil
}
// AddOrUpdateCluster adds or updates a grpc connection and associated client into the cache.
func (c *SchedulerEstimatorCache) AddOrUpdateCluster(name string, connection *grpc.ClientConn, client estimatorservice.EstimatorClient) {
c.lock.Lock()
defer c.lock.Unlock()
if _, exist := c.estimator[name]; exist {
_ = c.estimator[name].connection.Close()
}
c.estimator[name] = &clientWrapper{
connection: connection,
client: client,
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make sense,Thanks
pkg/estimator/client/cache.go
Outdated
es, exist := estimatorCache.estimator[name] | ||
if exist { | ||
// Use a lock to prevent if more than one worker close connection at the same time, | ||
// a connection may be closed multiple times. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it possible that multiple workers can acquire this lock estimatorCache.lock
at the same time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, I think this close function es.connection.Close()
is thread safe.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this close function es.connection.Close() is thread safe.
It is indeed thread-safe, but it may cause some unused connections to not be closed properly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it possible that multiple workers can acquire this lock
estimatorCache.lock
at the same time?
EstablishConnection
is uesd as a part of ReconcileFunc
of scheduler/descheduler.schedulerEstimatorWorker, and has the ability to run concurrently.
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here.
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
1 similar comment
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here.
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
pkg/estimator/client/cache.go
Outdated
func (c *SchedulerEstimatorCache) IsEstimatorExist(name string) bool { | ||
c.lock.RLock() | ||
defer c.lock.RUnlock() | ||
_, exist := c.estimator[name] | ||
return exist | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no need to remove this method. It is an exported function and may be used elsewhere.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done,Thanks~
Signed-off-by: zhy76 <958474674@qq.com>
if estimatorCache.IsEstimatorExist(name) { | ||
return nil | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if the cluster state changes frequently, I am worried that this modification will cause the connection to be updated too frequently. that would be bad.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agree, updating too often is also a problem, unless it has to be updated.
@zhy76 Can you remind me what the problem is if the scheduler estimator cache is not updated when the cluster changes? Does it have to be updated every time? Or is it possible to update only part of it? For example, does the connection need to be updated?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've reconsidered, and it seems there is no impact. It looks like the author watches for updates only to ensure that all clusters are added to the cache. Only uses the cluster name to find the corresponding estimator server's service and has no other usage. This PR may no longer be necessary. Thank you all for your help. Ask @Garrybest for check and final decision.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exactly, the cluster name is used to connect the corresponding estimator service in host cluster. I think there is no such circumstance to update this service, right?
Meanwhile, a cluster status could change frequently, so too much unnecessary connection establishment could be expensive.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your reply~
c.lock.Lock() | ||
defer c.lock.Unlock() | ||
// If more than one worker have established connections at the same time, | ||
// only the first one will be used. The ladder ones would be abandoned. | ||
|
||
if _, exist := c.estimator[name]; exist { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggest:
if oldConn, exist := c.estimator[name]; exist {
oldConn.Close()
}
invite @Garrybest to take a look. |
What type of PR is this?
/kind bug
What this PR does / why we need it:
fix: update scheduler estimator cache when cluster changed
Currently, when a cluster update occurs, if the cache already contains a clientWrapper for the corresponding cluster, the cache data will not be updated. This issue needs to be fixed.
Which issue(s) this PR fixes:
Fixes #5152
Special notes for your reviewer:
Does this PR introduce a user-facing change?: