Skip to content

Commit

Permalink
fix: fix ConfigClient data race (#483)
Browse files Browse the repository at this point in the history
  • Loading branch information
wcvb13 authored Jul 21, 2022
1 parent 5cb0711 commit dec3976
Showing 1 changed file with 20 additions and 17 deletions.
37 changes: 20 additions & 17 deletions clients/config_client/config_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/aliyun/alibaba-cloud-sdk-go/services/kms"

"github.com/nacos-group/nacos-sdk-go/clients/cache"
"github.com/nacos-group/nacos-sdk-go/clients/nacos_client"
"github.com/nacos-group/nacos-sdk-go/common/constant"
Expand All @@ -46,7 +48,7 @@ type ConfigClient struct {
mutex sync.Mutex
configProxy ConfigProxy
configCacheDir string
currentTaskCount int
currentTaskCount int32
cacheMap cache.ConcurrentMap
schedulerMap cache.ConcurrentMap
}
Expand Down Expand Up @@ -253,7 +255,7 @@ func (client *ConfigClient) DeleteConfig(param vo.ConfigParam) (deleted bool, er
return client.configProxy.DeleteConfigProxy(param, clientConfig.NamespaceId, clientConfig.AccessKey, clientConfig.SecretKey)
}

//Cancel Listen Config
// Cancel Listen Config
func (client *ConfigClient) CancelListenConfig(param vo.ConfigParam) (err error) {
clientConfig, err := client.GetClientConfig()
if err != nil {
Expand All @@ -263,13 +265,14 @@ func (client *ConfigClient) CancelListenConfig(param vo.ConfigParam) (err error)
client.cacheMap.Remove(util.GetConfigCacheKey(param.DataId, param.Group, clientConfig.NamespaceId))
logger.Infof("Cancel listen config DataId:%s Group:%s", param.DataId, param.Group)
remakeId := int(math.Ceil(float64(client.cacheMap.Count()) / float64(perTaskConfigSize)))
if remakeId < client.currentTaskCount {
currentTaskCount := int(atomic.LoadInt32(&client.currentTaskCount))
if remakeId < currentTaskCount {
client.remakeCacheDataTaskId(remakeId)
}
return err
}

//Remake cache data taskId
// Remake cache data taskId
func (client *ConfigClient) remakeCacheDataTaskId(remakeId int) {
for i := 0; i < remakeId; i++ {
count := 0
Expand Down Expand Up @@ -335,9 +338,9 @@ func (client *ConfigClient) ListenConfig(param vo.ConfigParam) (err error) {
return
}

//Delay Scheduler
//initialDelay the time to delay first execution
//delay the delay between the termination of one execution and the commencement of the next
// Delay Scheduler
// initialDelay the time to delay first execution
// delay the delay between the termination of one execution and the commencement of the next
func (client *ConfigClient) delayScheduler(t *time.Timer, delay time.Duration, taskId string, execute func() error) {
for {
if v, ok := client.schedulerMap.Get(taskId); ok {
Expand All @@ -354,31 +357,31 @@ func (client *ConfigClient) delayScheduler(t *time.Timer, delay time.Duration, t
}
}

//Listen for the configuration executor
// Listen for the configuration executor
func (client *ConfigClient) listenConfigExecutor() func() error {
return func() error {
listenerSize := client.cacheMap.Count()
taskCount := int(math.Ceil(float64(listenerSize) / float64(perTaskConfigSize)))

if taskCount > client.currentTaskCount {
for i := client.currentTaskCount; i < taskCount; i++ {
currentTaskCount := int(atomic.LoadInt32(&client.currentTaskCount))
if taskCount > currentTaskCount {
for i := currentTaskCount; i < taskCount; i++ {
client.schedulerMap.Set(strconv.Itoa(i), true)
go client.delayScheduler(time.NewTimer(1*time.Millisecond), 10*time.Millisecond, strconv.Itoa(i), client.longPulling(i))
}
client.currentTaskCount = taskCount
} else if taskCount < client.currentTaskCount {
for i := taskCount; i < client.currentTaskCount; i++ {
atomic.StoreInt32(&client.currentTaskCount, int32(taskCount))
} else if taskCount < currentTaskCount {
for i := taskCount; i < currentTaskCount; i++ {
if _, ok := client.schedulerMap.Get(strconv.Itoa(i)); ok {
client.schedulerMap.Set(strconv.Itoa(i), false)
}
}
client.currentTaskCount = taskCount
atomic.StoreInt32(&client.currentTaskCount, int32(taskCount))
}
return nil
}
}

//Long polling listening configuration
// Long polling listening configuration
func (client *ConfigClient) longPulling(taskId int) func() error {
return func() error {
var listeningConfigs string
Expand Down Expand Up @@ -438,7 +441,7 @@ func (client *ConfigClient) longPulling(taskId int) func() error {

}

//Execute the Listener callback func()
// Execute the Listener callback func()
func (client *ConfigClient) callListener(changed, tenant string) {
changedDecoded, _ := url.QueryUnescape(changed)
changedConfigs := strings.Split(changedDecoded, "\u0001")
Expand Down

0 comments on commit dec3976

Please sign in to comment.