Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#1559]: adjust the startup process of the configuration center #1560

Merged
merged 2 commits into from
Nov 5, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 10 additions & 15 deletions config/config_center_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,11 +132,19 @@ func (c *CenterConfig) toURL() (*common.URL, error) {
// it will prepare the environment
func startConfigCenter(rc *RootConfig) error {
cc := rc.ConfigCenter
strConf, err := cc.prepareEnvironment()
dynamicConfig, err := cc.GetDynamicConfiguration()
if err != nil {
return errors.WithMessagef(err, "start config center error!")
logger.Errorf("Start dynamic configuration center error, error message is %v", err)
return err
}
envInstance := conf.GetEnvInstance()
envInstance.SetDynamicConfiguration(dynamicConfig)

strConf, err := dynamicConfig.GetProperties(cc.DataId, config_center.WithGroup(cc.Group))
if err != nil {
logger.Warnf("Dynamic onfig center has started, but config may not be initialized, because %s", err)
return nil
justxuewei marked this conversation as resolved.
Show resolved Hide resolved
}
koan := koanf.New(".")
if err = koan.Load(rawbytes.Provider([]byte(strConf)), yaml.Parser()); err != nil {
return err
Expand Down Expand Up @@ -167,25 +175,12 @@ func (c *CenterConfig) GetDynamicConfiguration() (config_center.DynamicConfigura
}
dynamicConfig, err := c.CreateDynamicConfiguration()
if err != nil {
logger.Warnf("Create dynamic configuration error , error message is %v", err)
return nil, errors.WithStack(err)
}
c.DynamicConfiguration = dynamicConfig
return dynamicConfig, nil
}

func (c *CenterConfig) prepareEnvironment() (string, error) {
dynamicConfig, err := c.GetDynamicConfiguration()
if err != nil {
logger.Errorf("Create dynamic configuration error , error message is %v", err)
return "", errors.WithStack(err)
}
envInstance := conf.GetEnvInstance()
envInstance.SetDynamicConfiguration(dynamicConfig)

return dynamicConfig.GetProperties(c.DataId, config_center.WithGroup(c.Group))
}

func NewConfigCenterConfigBuilder() *ConfigCenterConfigBuilder {
return &ConfigCenterConfigBuilder{configCenterConfig: newEmptyConfigCenterConfig()}
}
Expand Down
2 changes: 1 addition & 1 deletion config_center/apollo/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func newApolloConfiguration(url *common.URL) (*apolloConfiguration, error) {
c.appConf = &config.AppConfig{
AppID: url.GetParam(constant.CONFIG_APP_ID_KEY, ""),
Cluster: url.GetParam(constant.CONFIG_CLUSTER_KEY, ""),
NamespaceName: url.GetParam(constant.CONFIG_NAMESPACE_KEY, cc.DEFAULT_GROUP),
NamespaceName: url.GetParam(constant.CONFIG_NAMESPACE_KEY, cc.DefaultGroup),
IP: c.getAddressWithProtocolPrefix(url),
Secret: url.GetParam(constant.CONFIG_SECRET_KEY, ""),
IsBackupConfig: url.GetParamBool(constant.CONFIG_BACKUP_CONFIG_KEY, true),
Expand Down
8 changes: 4 additions & 4 deletions config_center/dynamic_configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ import (
// DynamicConfiguration
// ////////////////////////////////////////
const (
// DEFAULT_GROUP: default group
DEFAULT_GROUP = "dubbo"
// DEFAULT_CONFIG_TIMEOUT: default config timeout
DEFAULT_CONFIG_TIMEOUT = "10s"
// DefaultGroup default group
DefaultGroup = "dubbo"
// DefaultConfigTimeout default config timeout
DefaultConfigTimeout = "10s"
)

// DynamicConfiguration for modify listener and get properties file
Expand Down
2 changes: 1 addition & 1 deletion config_center/file/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func (fsdc *FileSystemDynamicConfiguration) GetPath(key string, group string) st
}

if len(group) == 0 {
group = config_center.DEFAULT_GROUP
group = config_center.DefaultGroup
}

return filepath.Join(fsdc.rootPath, group, adapterKey(key))
Expand Down
2 changes: 1 addition & 1 deletion config_center/nacos/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ type nacosDynamicConfiguration struct {

func newNacosDynamicConfiguration(url *common.URL) (*nacosDynamicConfiguration, error) {
c := &nacosDynamicConfiguration{
rootPath: "/" + url.GetParam(constant.CONFIG_NAMESPACE_KEY, config_center.DEFAULT_GROUP) + "/config",
rootPath: "/" + url.GetParam(constant.CONFIG_NAMESPACE_KEY, config_center.DefaultGroup) + "/config",
url: url,
done: make(chan struct{}),
}
Expand Down
6 changes: 4 additions & 2 deletions config_center/zookeeper/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ type zookeeperDynamicConfiguration struct {
func newZookeeperDynamicConfiguration(url *common.URL) (*zookeeperDynamicConfiguration, error) {
c := &zookeeperDynamicConfiguration{
url: url,
rootPath: "/" + url.GetParam(constant.CONFIG_NAMESPACE_KEY, config_center.DEFAULT_GROUP) + "/config",
rootPath: "/" + url.GetParam(constant.CONFIG_NAMESPACE_KEY, config_center.DefaultGroup) + "/config",
}
if v, ok := config.GetRootConfig().ConfigCenter.Params["base64"]; ok {
base64Enabled, err := strconv.ParseBool(v)
Expand Down Expand Up @@ -146,6 +146,8 @@ func (c *zookeeperDynamicConfiguration) PublishConfig(key string, group string,
if c.base64Enabled {
valueBytes = []byte(base64.StdEncoding.EncodeToString(valueBytes))
}
// FIXME this method need to be fixed, because it will recursively
// create every node in the path with given value which we may not expected.
err := c.client.CreateWithValue(path, valueBytes)
if err != nil {
return perrors.WithStack(err)
Expand Down Expand Up @@ -246,7 +248,7 @@ func (c *zookeeperDynamicConfiguration) getPath(key string, group string) string

func (c *zookeeperDynamicConfiguration) buildPath(group string) string {
if len(group) == 0 {
group = config_center.DEFAULT_GROUP
group = config_center.DefaultGroup
}
return c.rootPath + pathSeparator + group
}
12 changes: 7 additions & 5 deletions config_center/zookeeper/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
import (
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/logger"
"dubbo.apache.org/dubbo-go/v3/config"
"dubbo.apache.org/dubbo-go/v3/config_center"
"dubbo.apache.org/dubbo-go/v3/remoting"
)
Expand Down Expand Up @@ -65,10 +66,12 @@ func (l *CacheListener) DataChange(event remoting.Event) bool {
// meanings new node
return true
}
key := l.pathToKey(event.Path)
var key string
// TODO use common way
if strings.HasSuffix(key, constant.MeshRouteSuffix) {
key = key[:strings.Index(key, constant.MeshRouteSuffix)]
if strings.HasSuffix(event.Path, constant.MeshRouteSuffix) {
key = config.GetRootConfig().Application.Name
} else {
key = l.pathToKey(event.Path)
}
if key != "" {
if listeners, ok := l.keyListeners.Load(key); ok {
Expand All @@ -85,8 +88,7 @@ func (l *CacheListener) pathToKey(path string) string {
key := strings.Replace(strings.Replace(path, l.rootPath+"/", "", -1), "/", ".", -1)
if strings.HasSuffix(key, constant.ConfiguratorSuffix) ||
strings.HasSuffix(key, constant.TagRouterRuleSuffix) ||
strings.HasSuffix(key, constant.ConditionRouterRuleSuffix) ||
strings.HasSuffix(key, constant.MeshRouteSuffix) {
strings.HasSuffix(key, constant.ConditionRouterRuleSuffix) {
// governance config, so we remove the "dubbo." prefix
key = key[strings.Index(key, ".")+1:]
}
Expand Down
116 changes: 60 additions & 56 deletions remoting/zookeeper/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,14 +213,13 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li
}
}

func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkPath string, listener remoting.DataListener) {
func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, listener remoting.DataListener) {
defer l.wg.Done()

var (
failTimes int
ttl time.Duration
event chan struct{}
zkEvent zk.Event
)
event = make(chan struct{}, 4)
ttl = defaultTTL
Expand All @@ -235,13 +234,13 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkPath string, listen
defer close(event)
for {
// get current children for a zkPath
children, childEventCh, err := l.client.GetChildrenW(zkPath)
children, childEventCh, err := l.client.GetChildrenW(zkRootPath)
if err != nil {
failTimes++
if MaxFailTimes <= failTimes {
failTimes = MaxFailTimes
}
logger.Debugf("listenDirEvent(path{%s}) = error{%v}", zkPath, err)
logger.Debugf("listenDirEvent(path{%s}) = error{%v}", zkRootPath, err)
// clear the event channel
CLEAR:
for {
Expand All @@ -251,52 +250,51 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkPath string, listen
break CLEAR
}
}
l.client.RegisterEvent(zkPath, &event)
l.client.RegisterEvent(zkRootPath, &event)
if err == errNilNode {
logger.Warnf("listenDirEvent(path{%s}) got errNilNode,so exit listen", zkPath)
l.client.UnregisterEvent(zkPath, &event)
logger.Warnf("listenDirEvent(path{%s}) got errNilNode,so exit listen", zkRootPath)
l.client.UnregisterEvent(zkRootPath, &event)
return
}

after := time.After(timeSecondDuration(failTimes * ConnDelay))
select {
case <-after:
l.client.UnregisterEvent(zkPath, &event)
l.client.UnregisterEvent(zkRootPath, &event)
continue
case <-l.exit:
l.client.UnregisterEvent(zkPath, &event)
logger.Debugf("listen(path{%s}) goroutine exit now...", zkPath)
l.client.UnregisterEvent(zkRootPath, &event)
logger.Debugf("listen(path{%s}) goroutine exit now...", zkRootPath)
return
case <-event:
logger.Debugf("get zk.EventNodeDataChange notify event")
l.client.UnregisterEvent(zkPath, &event)
l.handleZkNodeEvent(zkPath, nil, listener)
l.client.UnregisterEvent(zkRootPath, &event)
l.handleZkNodeEvent(zkRootPath, nil, listener)
continue
}
}
failTimes = 0
for _, c := range children {

// Only need to compare Path when subscribing to provider
if strings.LastIndex(zkPath, constant.PROVIDER_CATEGORY) != -1 {
if strings.LastIndex(zkRootPath, constant.PROVIDER_CATEGORY) != -1 {
provider, _ := common.NewURL(c)
if provider.ServiceKey() != conf.ServiceKey() {
continue
}
}

// listen l service node
dubboPath := path.Join(zkPath, c)
zkNodePath := path.Join(zkRootPath, c)

// Save the path to avoid listen repeatedly
l.pathMapLock.Lock()
_, ok := l.pathMap[dubboPath]
_, ok := l.pathMap[zkNodePath]
if !ok {
l.pathMap[dubboPath] = uatomic.NewInt32(0)
l.pathMap[zkNodePath] = uatomic.NewInt32(0)
}
l.pathMapLock.Unlock()
if ok {
logger.Warnf("@zkPath %s has already been listened.", dubboPath)
logger.Warnf("@zkPath %s has already been listened.", zkNodePath)
continue
}

Expand All @@ -306,16 +304,17 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkPath string, listen
l.client.RUnlock()
break
}
content, _, err := l.client.Conn.Get(dubboPath)
content, _, err := l.client.Conn.Get(zkNodePath)

l.client.RUnlock()
if err != nil {
logger.Errorf("Get new node path {%v} 's content error,message is {%v}", dubboPath, perrors.WithStack(err))
logger.Errorf("Get new node path {%v} 's content error,message is {%v}", zkNodePath, perrors.WithStack(err))
}
logger.Debugf("Get children!{%s}", dubboPath)
if !listener.DataChange(remoting.Event{Path: dubboPath, Action: remoting.EventTypeAdd, Content: string(content)}) {
logger.Debugf("Get children!{%s}", zkNodePath)
if !listener.DataChange(remoting.Event{Path: zkNodePath, Action: remoting.EventTypeAdd, Content: string(content)}) {
continue
}
logger.Infof("listen dubbo service key{%s}", dubboPath)
logger.Infof("listen dubbo service key{%s}", zkNodePath)
l.wg.Add(1)
go func(zkPath string, listener remoting.DataListener) {
// invoker l.wg.Done() in l.listenServiceNodeEvent
Expand All @@ -326,54 +325,59 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkPath string, listen
l.pathMapLock.Unlock()
}
logger.Warnf("listenDirEvent->listenSelf(zk path{%s}) goroutine exit now", zkPath)
}(dubboPath, listener)
}(zkNodePath, listener)

// listen sub path recursive
// if zkPath is end of "providers/ & consumers/" we do not listen children dir
if strings.LastIndex(zkPath, constant.PROVIDER_CATEGORY) == -1 &&
strings.LastIndex(zkPath, constant.CONSUMER_CATEGORY) == -1 {
if strings.LastIndex(zkRootPath, constant.PROVIDER_CATEGORY) == -1 &&
strings.LastIndex(zkRootPath, constant.CONSUMER_CATEGORY) == -1 {
l.wg.Add(1)
go func(zkPath string, listener remoting.DataListener) {
l.listenDirEvent(conf, zkPath, listener)
logger.Warnf("listenDirEvent(zkPath{%s}) goroutine exit now", zkPath)
}(dubboPath, listener)
}(zkNodePath, listener)
}
}
// Periodically update provider information
tickerTTL := ttl
if tickerTTL > 20e9 {
tickerTTL = 20e9
if l.startScheduleWatchTask(zkRootPath, children, ttl, listener, childEventCh) {
return
}
ticker := time.NewTicker(tickerTTL)
WATCH:
for {
select {
case <-ticker.C:
l.handleZkNodeEvent(zkPath, children, listener)
if tickerTTL < ttl {
tickerTTL *= 2
if tickerTTL > ttl {
tickerTTL = ttl
}
ticker.Stop()
ticker = time.NewTicker(tickerTTL)
}
}

func (l *ZkEventListener) startScheduleWatchTask(
zkRootPath string, children []string, ttl time.Duration,
listener remoting.DataListener, childEventCh <-chan zk.Event) bool {
// Periodically update provider information
tickerTTL := ttl
if tickerTTL > 20e9 {
tickerTTL = 20e9
}
ticker := time.NewTicker(tickerTTL)
for {
select {
case <-ticker.C:
l.handleZkNodeEvent(zkRootPath, children, listener)
if tickerTTL < ttl {
tickerTTL *= 2
if tickerTTL > ttl {
tickerTTL = ttl
}
case zkEvent = <-childEventCh:
logger.Warnf("get a zookeeper childEventCh{type:%s, server:%s, path:%s, state:%d-%s, err:%v}",
zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, gxzookeeper.StateToString(zkEvent.State), zkEvent.Err)
ticker.Stop()
if zkEvent.Type != zk.EventNodeChildrenChanged {
break WATCH
}
ticker = time.NewTicker(tickerTTL)
}
case zkEvent := <-childEventCh:
logger.Warnf("get a zookeeper childEventCh{type:%s, server:%s, path:%s, state:%d-%s, err:%v}",
zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, gxzookeeper.StateToString(zkEvent.State), zkEvent.Err)
ticker.Stop()
if zkEvent.Type == zk.EventNodeChildrenChanged {
l.handleZkNodeEvent(zkEvent.Path, children, listener)
break WATCH
case <-l.exit:
logger.Warnf("listen(path{%s}) goroutine exit now...", zkPath)
ticker.Stop()
return
}
return false
case <-l.exit:
logger.Warnf("listen(path{%s}) goroutine exit now...", zkRootPath)
ticker.Stop()
return true
}

}
}

Expand Down