From 6a93c8bf2d0e8be136b517ced58bb95d677149b8 Mon Sep 17 00:00:00 2001 From: wcvb13 <497272486@qq.com> Date: Mon, 4 Jul 2022 16:43:59 +0800 Subject: [PATCH] fix: fix ConfigClient data race --- clients/config_client/config_client.go | 37 ++++++++++++++------------ 1 file changed, 20 insertions(+), 17 deletions(-) diff --git a/clients/config_client/config_client.go b/clients/config_client/config_client.go index e394701a..1e17f2e7 100644 --- a/clients/config_client/config_client.go +++ b/clients/config_client/config_client.go @@ -25,9 +25,11 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" "github.com/aliyun/alibaba-cloud-sdk-go/services/kms" + "github.com/nacos-group/nacos-sdk-go/clients/cache" "github.com/nacos-group/nacos-sdk-go/clients/nacos_client" "github.com/nacos-group/nacos-sdk-go/common/constant" @@ -46,7 +48,7 @@ type ConfigClient struct { mutex sync.Mutex configProxy ConfigProxy configCacheDir string - currentTaskCount int + currentTaskCount int32 cacheMap cache.ConcurrentMap schedulerMap cache.ConcurrentMap } @@ -252,7 +254,7 @@ func (client *ConfigClient) DeleteConfig(param vo.ConfigParam) (deleted bool, er return client.configProxy.DeleteConfigProxy(param, clientConfig.NamespaceId, clientConfig.AccessKey, clientConfig.SecretKey) } -//Cancel Listen Config +// Cancel Listen Config func (client *ConfigClient) CancelListenConfig(param vo.ConfigParam) (err error) { clientConfig, err := client.GetClientConfig() if err != nil { @@ -262,13 +264,14 @@ func (client *ConfigClient) CancelListenConfig(param vo.ConfigParam) (err error) client.cacheMap.Remove(util.GetConfigCacheKey(param.DataId, param.Group, clientConfig.NamespaceId)) logger.Infof("Cancel listen config DataId:%s Group:%s", param.DataId, param.Group) remakeId := int(math.Ceil(float64(client.cacheMap.Count()) / float64(perTaskConfigSize))) - if remakeId < client.currentTaskCount { + currentTaskCount := int(atomic.LoadInt32(&client.currentTaskCount)) + if remakeId < currentTaskCount { client.remakeCacheDataTaskId(remakeId) } return err } -//Remake cache data taskId +// Remake cache data taskId func (client *ConfigClient) remakeCacheDataTaskId(remakeId int) { for i := 0; i < remakeId; i++ { count := 0 @@ -334,9 +337,9 @@ func (client *ConfigClient) ListenConfig(param vo.ConfigParam) (err error) { return } -//Delay Scheduler -//initialDelay the time to delay first execution -//delay the delay between the termination of one execution and the commencement of the next +// Delay Scheduler +// initialDelay the time to delay first execution +// delay the delay between the termination of one execution and the commencement of the next func (client *ConfigClient) delayScheduler(t *time.Timer, delay time.Duration, taskId string, execute func() error) { for { if v, ok := client.schedulerMap.Get(taskId); ok { @@ -353,31 +356,31 @@ func (client *ConfigClient) delayScheduler(t *time.Timer, delay time.Duration, t } } -//Listen for the configuration executor +// Listen for the configuration executor func (client *ConfigClient) listenConfigExecutor() func() error { return func() error { listenerSize := client.cacheMap.Count() taskCount := int(math.Ceil(float64(listenerSize) / float64(perTaskConfigSize))) - - if taskCount > client.currentTaskCount { - for i := client.currentTaskCount; i < taskCount; i++ { + currentTaskCount := int(atomic.LoadInt32(&client.currentTaskCount)) + if taskCount > currentTaskCount { + for i := currentTaskCount; i < taskCount; i++ { client.schedulerMap.Set(strconv.Itoa(i), true) go client.delayScheduler(time.NewTimer(1*time.Millisecond), 10*time.Millisecond, strconv.Itoa(i), client.longPulling(i)) } - client.currentTaskCount = taskCount - } else if taskCount < client.currentTaskCount { - for i := taskCount; i < client.currentTaskCount; i++ { + atomic.StoreInt32(&client.currentTaskCount, int32(taskCount)) + } else if taskCount < currentTaskCount { + for i := taskCount; i < currentTaskCount; i++ { if _, ok := client.schedulerMap.Get(strconv.Itoa(i)); ok { client.schedulerMap.Set(strconv.Itoa(i), false) } } - client.currentTaskCount = taskCount + atomic.StoreInt32(&client.currentTaskCount, int32(taskCount)) } return nil } } -//Long polling listening configuration +// Long polling listening configuration func (client *ConfigClient) longPulling(taskId int) func() error { return func() error { var listeningConfigs string @@ -437,7 +440,7 @@ func (client *ConfigClient) longPulling(taskId int) func() error { } -//Execute the Listener callback func() +// Execute the Listener callback func() func (client *ConfigClient) callListener(changed, tenant string) { changedDecoded, _ := url.QueryUnescape(changed) changedConfigs := strings.Split(changedDecoded, "\u0001")