diff --git a/cluster/router/v3router/router_chain.go b/cluster/router/v3router/router_chain.go index 427a053feb..8662be2acc 100644 --- a/cluster/router/v3router/router_chain.go +++ b/cluster/router/v3router/router_chain.go @@ -29,6 +29,7 @@ import ( import ( "dubbo.apache.org/dubbo-go/v3/cluster/router" "dubbo.apache.org/dubbo-go/v3/common" + conf "dubbo.apache.org/dubbo-go/v3/common/config" "dubbo.apache.org/dubbo-go/v3/common/constant" "dubbo.apache.org/dubbo-go/v3/common/logger" "dubbo.apache.org/dubbo-go/v3/config" @@ -48,11 +49,11 @@ func NewUniformRouterChain() (router.PriorityRouter, error) { // 1. add mesh route listener r := &RouterChain{} rootConfig := config.GetRootConfig() - if rootConfig.ConfigCenter.DynamicConfiguration == nil { + dynamicConfiguration := conf.GetEnvInstance().GetDynamicConfiguration() + if dynamicConfiguration == nil { logger.Infof("Config center does not start, please check if the configuration center has been properly configured in dubbogo.yml") return nil, nil } - dynamicConfiguration := rootConfig.ConfigCenter.DynamicConfiguration dynamicConfiguration.AddListener(rootConfig.Application.Name, r) // 2. try to get mesh route configuration, default key is "dubbo.io.MESHAPPRULE" with group "dubbo" diff --git a/config/config_center_config.go b/config/config_center_config.go index 8a647721ab..28a83cd909 100644 --- a/config/config_center_config.go +++ b/config/config_center_config.go @@ -61,8 +61,6 @@ type CenterConfig struct { AppID string `default:"dubbo" yaml:"app-id" json:"app-id,omitempty"` Timeout string `default:"10s" yaml:"timeout" json:"timeout,omitempty"` Params map[string]string `yaml:"params" json:"parameters,omitempty"` - - DynamicConfiguration config_center.DynamicConfiguration } // Prefix dubbo.config-center @@ -132,11 +130,17 @@ func (c *CenterConfig) toURL() (*common.URL, error) { // it will prepare the environment func startConfigCenter(rc *RootConfig) error { cc := rc.ConfigCenter - strConf, err := cc.prepareEnvironment() + dynamicConfig, err := cc.GetDynamicConfiguration() if err != nil { - return errors.WithMessagef(err, "start config center error!") + logger.Errorf("Start dynamic configuration center error, error message is %v", err) + return err } + strConf, err := dynamicConfig.GetProperties(cc.DataId, config_center.WithGroup(cc.Group)) + if err != nil { + logger.Warnf("Dynamic onfig center has started, but config may not be initialized, because %s", err) + return nil + } koan := koanf.New(".") if err = koan.Load(rawbytes.Provider([]byte(strConf)), yaml.Parser()); err != nil { return err @@ -162,28 +166,16 @@ func (c *CenterConfig) CreateDynamicConfiguration() (config_center.DynamicConfig } func (c *CenterConfig) GetDynamicConfiguration() (config_center.DynamicConfiguration, error) { - if c.DynamicConfiguration != nil { - return c.DynamicConfiguration, nil + envInstance := conf.GetEnvInstance() + if envInstance.GetDynamicConfiguration() != nil { + return envInstance.GetDynamicConfiguration(), nil } dynamicConfig, err := c.CreateDynamicConfiguration() if err != nil { - logger.Warnf("Create dynamic configuration error , error message is %v", err) return nil, errors.WithStack(err) } - c.DynamicConfiguration = dynamicConfig - return dynamicConfig, nil -} - -func (c *CenterConfig) prepareEnvironment() (string, error) { - dynamicConfig, err := c.GetDynamicConfiguration() - if err != nil { - logger.Errorf("Create dynamic configuration error , error message is %v", err) - return "", errors.WithStack(err) - } - envInstance := conf.GetEnvInstance() envInstance.SetDynamicConfiguration(dynamicConfig) - - return dynamicConfig.GetProperties(c.DataId, config_center.WithGroup(c.Group)) + return dynamicConfig, nil } func NewConfigCenterConfigBuilder() *ConfigCenterConfigBuilder { diff --git a/config_center/apollo/impl.go b/config_center/apollo/impl.go index 9df8fd7b34..d6b32c335e 100644 --- a/config_center/apollo/impl.go +++ b/config_center/apollo/impl.go @@ -60,7 +60,7 @@ func newApolloConfiguration(url *common.URL) (*apolloConfiguration, error) { c.appConf = &config.AppConfig{ AppID: url.GetParam(constant.CONFIG_APP_ID_KEY, ""), Cluster: url.GetParam(constant.CONFIG_CLUSTER_KEY, ""), - NamespaceName: url.GetParam(constant.CONFIG_NAMESPACE_KEY, cc.DEFAULT_GROUP), + NamespaceName: url.GetParam(constant.CONFIG_NAMESPACE_KEY, cc.DefaultGroup), IP: c.getAddressWithProtocolPrefix(url), Secret: url.GetParam(constant.CONFIG_SECRET_KEY, ""), IsBackupConfig: url.GetParamBool(constant.CONFIG_BACKUP_CONFIG_KEY, true), diff --git a/config_center/dynamic_configuration.go b/config_center/dynamic_configuration.go index 447342e8af..12c1851fa2 100644 --- a/config_center/dynamic_configuration.go +++ b/config_center/dynamic_configuration.go @@ -34,10 +34,10 @@ import ( // DynamicConfiguration // //////////////////////////////////////// const ( - // DEFAULT_GROUP: default group - DEFAULT_GROUP = "dubbo" - // DEFAULT_CONFIG_TIMEOUT: default config timeout - DEFAULT_CONFIG_TIMEOUT = "10s" + // DefaultGroup default group + DefaultGroup = "dubbo" + // DefaultConfigTimeout default config timeout + DefaultConfigTimeout = "10s" ) // DynamicConfiguration for modify listener and get properties file diff --git a/config_center/file/impl.go b/config_center/file/impl.go index a3e2cdca67..5fdb20d7b4 100644 --- a/config_center/file/impl.go +++ b/config_center/file/impl.go @@ -195,7 +195,7 @@ func (fsdc *FileSystemDynamicConfiguration) GetPath(key string, group string) st } if len(group) == 0 { - group = config_center.DEFAULT_GROUP + group = config_center.DefaultGroup } return filepath.Join(fsdc.rootPath, group, adapterKey(key)) diff --git a/config_center/nacos/impl.go b/config_center/nacos/impl.go index 6d19fbd51f..06e1a16518 100644 --- a/config_center/nacos/impl.go +++ b/config_center/nacos/impl.go @@ -63,7 +63,7 @@ type nacosDynamicConfiguration struct { func newNacosDynamicConfiguration(url *common.URL) (*nacosDynamicConfiguration, error) { c := &nacosDynamicConfiguration{ - rootPath: "/" + url.GetParam(constant.CONFIG_NAMESPACE_KEY, config_center.DEFAULT_GROUP) + "/config", + rootPath: "/" + url.GetParam(constant.CONFIG_NAMESPACE_KEY, config_center.DefaultGroup) + "/config", url: url, done: make(chan struct{}), } diff --git a/config_center/zookeeper/impl.go b/config_center/zookeeper/impl.go index c5eb36cded..2c03ec6607 100644 --- a/config_center/zookeeper/impl.go +++ b/config_center/zookeeper/impl.go @@ -65,7 +65,7 @@ type zookeeperDynamicConfiguration struct { func newZookeeperDynamicConfiguration(url *common.URL) (*zookeeperDynamicConfiguration, error) { c := &zookeeperDynamicConfiguration{ url: url, - rootPath: "/" + url.GetParam(constant.CONFIG_NAMESPACE_KEY, config_center.DEFAULT_GROUP) + "/config", + rootPath: "/" + url.GetParam(constant.CONFIG_NAMESPACE_KEY, config_center.DefaultGroup) + "/config", } if v, ok := config.GetRootConfig().ConfigCenter.Params["base64"]; ok { base64Enabled, err := strconv.ParseBool(v) @@ -146,6 +146,8 @@ func (c *zookeeperDynamicConfiguration) PublishConfig(key string, group string, if c.base64Enabled { valueBytes = []byte(base64.StdEncoding.EncodeToString(valueBytes)) } + // FIXME this method need to be fixed, because it will recursively + // create every node in the path with given value which we may not expected. err := c.client.CreateWithValue(path, valueBytes) if err != nil { return perrors.WithStack(err) @@ -246,7 +248,7 @@ func (c *zookeeperDynamicConfiguration) getPath(key string, group string) string func (c *zookeeperDynamicConfiguration) buildPath(group string) string { if len(group) == 0 { - group = config_center.DEFAULT_GROUP + group = config_center.DefaultGroup } return c.rootPath + pathSeparator + group } diff --git a/config_center/zookeeper/listener.go b/config_center/zookeeper/listener.go index 7b67a70f6e..5dd5457ee0 100644 --- a/config_center/zookeeper/listener.go +++ b/config_center/zookeeper/listener.go @@ -25,6 +25,7 @@ import ( import ( "dubbo.apache.org/dubbo-go/v3/common/constant" "dubbo.apache.org/dubbo-go/v3/common/logger" + "dubbo.apache.org/dubbo-go/v3/config" "dubbo.apache.org/dubbo-go/v3/config_center" "dubbo.apache.org/dubbo-go/v3/remoting" ) @@ -65,10 +66,12 @@ func (l *CacheListener) DataChange(event remoting.Event) bool { // meanings new node return true } - key := l.pathToKey(event.Path) + var key string // TODO use common way - if strings.HasSuffix(key, constant.MeshRouteSuffix) { - key = key[:strings.Index(key, constant.MeshRouteSuffix)] + if strings.HasSuffix(event.Path, constant.MeshRouteSuffix) { + key = config.GetRootConfig().Application.Name + } else { + key = l.pathToKey(event.Path) } if key != "" { if listeners, ok := l.keyListeners.Load(key); ok { @@ -85,8 +88,7 @@ func (l *CacheListener) pathToKey(path string) string { key := strings.Replace(strings.Replace(path, l.rootPath+"/", "", -1), "/", ".", -1) if strings.HasSuffix(key, constant.ConfiguratorSuffix) || strings.HasSuffix(key, constant.TagRouterRuleSuffix) || - strings.HasSuffix(key, constant.ConditionRouterRuleSuffix) || - strings.HasSuffix(key, constant.MeshRouteSuffix) { + strings.HasSuffix(key, constant.ConditionRouterRuleSuffix) { // governance config, so we remove the "dubbo." prefix key = key[strings.Index(key, ".")+1:] } diff --git a/remoting/zookeeper/listener.go b/remoting/zookeeper/listener.go index 34d6c13908..3fdfec3285 100644 --- a/remoting/zookeeper/listener.go +++ b/remoting/zookeeper/listener.go @@ -213,14 +213,13 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li } } -func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkPath string, listener remoting.DataListener) { +func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, listener remoting.DataListener) { defer l.wg.Done() var ( failTimes int ttl time.Duration event chan struct{} - zkEvent zk.Event ) event = make(chan struct{}, 4) ttl = defaultTTL @@ -235,13 +234,13 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkPath string, listen defer close(event) for { // get current children for a zkPath - children, childEventCh, err := l.client.GetChildrenW(zkPath) + children, childEventCh, err := l.client.GetChildrenW(zkRootPath) if err != nil { failTimes++ if MaxFailTimes <= failTimes { failTimes = MaxFailTimes } - logger.Debugf("listenDirEvent(path{%s}) = error{%v}", zkPath, err) + logger.Debugf("listenDirEvent(path{%s}) = error{%v}", zkRootPath, err) // clear the event channel CLEAR: for { @@ -251,34 +250,33 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkPath string, listen break CLEAR } } - l.client.RegisterEvent(zkPath, &event) + l.client.RegisterEvent(zkRootPath, &event) if err == errNilNode { - logger.Warnf("listenDirEvent(path{%s}) got errNilNode,so exit listen", zkPath) - l.client.UnregisterEvent(zkPath, &event) + logger.Warnf("listenDirEvent(path{%s}) got errNilNode,so exit listen", zkRootPath) + l.client.UnregisterEvent(zkRootPath, &event) return } after := time.After(timeSecondDuration(failTimes * ConnDelay)) select { case <-after: - l.client.UnregisterEvent(zkPath, &event) + l.client.UnregisterEvent(zkRootPath, &event) continue case <-l.exit: - l.client.UnregisterEvent(zkPath, &event) - logger.Debugf("listen(path{%s}) goroutine exit now...", zkPath) + l.client.UnregisterEvent(zkRootPath, &event) + logger.Debugf("listen(path{%s}) goroutine exit now...", zkRootPath) return case <-event: logger.Debugf("get zk.EventNodeDataChange notify event") - l.client.UnregisterEvent(zkPath, &event) - l.handleZkNodeEvent(zkPath, nil, listener) + l.client.UnregisterEvent(zkRootPath, &event) + l.handleZkNodeEvent(zkRootPath, nil, listener) continue } } failTimes = 0 for _, c := range children { - // Only need to compare Path when subscribing to provider - if strings.LastIndex(zkPath, constant.PROVIDER_CATEGORY) != -1 { + if strings.LastIndex(zkRootPath, constant.PROVIDER_CATEGORY) != -1 { provider, _ := common.NewURL(c) if provider.ServiceKey() != conf.ServiceKey() { continue @@ -286,17 +284,17 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkPath string, listen } // listen l service node - dubboPath := path.Join(zkPath, c) + zkNodePath := path.Join(zkRootPath, c) // Save the path to avoid listen repeatedly l.pathMapLock.Lock() - _, ok := l.pathMap[dubboPath] + _, ok := l.pathMap[zkNodePath] if !ok { - l.pathMap[dubboPath] = uatomic.NewInt32(0) + l.pathMap[zkNodePath] = uatomic.NewInt32(0) } l.pathMapLock.Unlock() if ok { - logger.Warnf("@zkPath %s has already been listened.", dubboPath) + logger.Warnf("@zkPath %s has already been listened.", zkNodePath) continue } @@ -306,16 +304,17 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkPath string, listen l.client.RUnlock() break } - content, _, err := l.client.Conn.Get(dubboPath) + content, _, err := l.client.Conn.Get(zkNodePath) + l.client.RUnlock() if err != nil { - logger.Errorf("Get new node path {%v} 's content error,message is {%v}", dubboPath, perrors.WithStack(err)) + logger.Errorf("Get new node path {%v} 's content error,message is {%v}", zkNodePath, perrors.WithStack(err)) } - logger.Debugf("Get children!{%s}", dubboPath) - if !listener.DataChange(remoting.Event{Path: dubboPath, Action: remoting.EventTypeAdd, Content: string(content)}) { + logger.Debugf("Get children!{%s}", zkNodePath) + if !listener.DataChange(remoting.Event{Path: zkNodePath, Action: remoting.EventTypeAdd, Content: string(content)}) { continue } - logger.Infof("listen dubbo service key{%s}", dubboPath) + logger.Infof("listen dubbo service key{%s}", zkNodePath) l.wg.Add(1) go func(zkPath string, listener remoting.DataListener) { // invoker l.wg.Done() in l.listenServiceNodeEvent @@ -326,54 +325,59 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkPath string, listen l.pathMapLock.Unlock() } logger.Warnf("listenDirEvent->listenSelf(zk path{%s}) goroutine exit now", zkPath) - }(dubboPath, listener) + }(zkNodePath, listener) // listen sub path recursive // if zkPath is end of "providers/ & consumers/" we do not listen children dir - if strings.LastIndex(zkPath, constant.PROVIDER_CATEGORY) == -1 && - strings.LastIndex(zkPath, constant.CONSUMER_CATEGORY) == -1 { + if strings.LastIndex(zkRootPath, constant.PROVIDER_CATEGORY) == -1 && + strings.LastIndex(zkRootPath, constant.CONSUMER_CATEGORY) == -1 { l.wg.Add(1) go func(zkPath string, listener remoting.DataListener) { l.listenDirEvent(conf, zkPath, listener) logger.Warnf("listenDirEvent(zkPath{%s}) goroutine exit now", zkPath) - }(dubboPath, listener) + }(zkNodePath, listener) } } - // Periodically update provider information - tickerTTL := ttl - if tickerTTL > 20e9 { - tickerTTL = 20e9 + if l.startScheduleWatchTask(zkRootPath, children, ttl, listener, childEventCh) { + return } - ticker := time.NewTicker(tickerTTL) - WATCH: - for { - select { - case <-ticker.C: - l.handleZkNodeEvent(zkPath, children, listener) - if tickerTTL < ttl { - tickerTTL *= 2 - if tickerTTL > ttl { - tickerTTL = ttl - } - ticker.Stop() - ticker = time.NewTicker(tickerTTL) + } +} + +func (l *ZkEventListener) startScheduleWatchTask( + zkRootPath string, children []string, ttl time.Duration, + listener remoting.DataListener, childEventCh <-chan zk.Event) bool { + // Periodically update provider information + tickerTTL := ttl + if tickerTTL > 20e9 { + tickerTTL = 20e9 + } + ticker := time.NewTicker(tickerTTL) + for { + select { + case <-ticker.C: + l.handleZkNodeEvent(zkRootPath, children, listener) + if tickerTTL < ttl { + tickerTTL *= 2 + if tickerTTL > ttl { + tickerTTL = ttl } - case zkEvent = <-childEventCh: - logger.Warnf("get a zookeeper childEventCh{type:%s, server:%s, path:%s, state:%d-%s, err:%v}", - zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, gxzookeeper.StateToString(zkEvent.State), zkEvent.Err) ticker.Stop() - if zkEvent.Type != zk.EventNodeChildrenChanged { - break WATCH - } + ticker = time.NewTicker(tickerTTL) + } + case zkEvent := <-childEventCh: + logger.Warnf("get a zookeeper childEventCh{type:%s, server:%s, path:%s, state:%d-%s, err:%v}", + zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, gxzookeeper.StateToString(zkEvent.State), zkEvent.Err) + ticker.Stop() + if zkEvent.Type == zk.EventNodeChildrenChanged { l.handleZkNodeEvent(zkEvent.Path, children, listener) - break WATCH - case <-l.exit: - logger.Warnf("listen(path{%s}) goroutine exit now...", zkPath) - ticker.Stop() - return } + return false + case <-l.exit: + logger.Warnf("listen(path{%s}) goroutine exit now...", zkRootPath) + ticker.Stop() + return true } - } }