diff --git a/common/constant/key.go b/common/constant/key.go index a57a90c657..601a5dd342 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -136,13 +136,14 @@ const ( ) const ( - CONFIG_NAMESPACE_KEY = "config.namespace" - CONFIG_GROUP_KEY = "config.group" - CONFIG_APP_ID_KEY = "config.appId" - CONFIG_CLUSTER_KEY = "config.cluster" - CONFIG_CHECK_KEY = "config.check" - CONFIG_TIMEOUT_KET = "config.timeout" - CONFIG_LOG_DIR_KEY = "config.logDir" + CONFIG_NAMESPACE_KEY = "namespace" + CONFIG_GROUP_KEY = "group" + CONFIG_APP_ID_KEY = "appId" + CONFIG_CLUSTER_KEY = "cluster" + CONFIG_TIMEOUT_KEY = "timeout" + CONFIG_USERNAME_KEY = "username" + CONFIG_PASSWORD_KEY = "password" + CONFIG_LOG_DIR_KEY = "logDir" CONFIG_VERSION_KEY = "configVersion" COMPATIBLE_CONFIG_KEY = "compatible_config" ) @@ -172,6 +173,7 @@ const ( NACOS_DEFAULT_ROLETYPE = 3 NACOS_CACHE_DIR_KEY = "cacheDir" NACOS_LOG_DIR_KEY = "logDir" + NACOS_BEAT_INTERVAL_KEY = "beatInterval" NACOS_ENDPOINT = "endpoint" NACOS_SERVICE_NAME_SEPARATOR = ":" NACOS_CATEGORY_KEY = "category" @@ -181,6 +183,13 @@ const ( NACOS_PASSWORD = "password" NACOS_USERNAME = "username" NACOS_NOT_LOAD_LOCAL_CACHE = "nacos.not.load.cache" + NACOS_APP_NAME_KEY = "appName" + NACOS_REGION_ID_KEY = "regionId" + NACOS_ACCESS_KEY = "access" + NACOS_SECRET_KEY = "secret" + NACOS_OPEN_KMS_KEY = "kms" + NACOS_UPDATE_THREAD_NUM_KEY = "updateThreadNum" + NACOS_LOG_LEVEL_KEY = "logLevel" ) const ( diff --git a/config/config_center_config.go b/config/config_center_config.go index 415eadc549..360818c8d3 100644 --- a/config/config_center_config.go +++ b/config/config_center_config.go @@ -44,19 +44,20 @@ import ( // // ConfigCenter has currently supported Zookeeper, Nacos, Etcd, Consul, Apollo type ConfigCenterConfig struct { - Protocol string `required:"true" yaml:"protocol" json:"protocol,omitempty"` - Address string `yaml:"address" json:"address,omitempty"` - Cluster string `yaml:"cluster" json:"cluster,omitempty"` - Group string `default:"dubbo" yaml:"group" json:"group,omitempty"` - Username string `yaml:"username" json:"username,omitempty"` - Password string `yaml:"password" json:"password,omitempty"` - LogDir string `yaml:"log_dir" json:"log_dir,omitempty"` - ConfigFile string `default:"dubbo.properties" yaml:"config_file" json:"config_file,omitempty"` - Namespace string `default:"dubbo" yaml:"namespace" json:"namespace,omitempty"` - AppConfigFile string `default:"dubbo.properties" yaml:"app_config_file" json:"app_config_file,omitempty"` - AppId string `default:"dubbo" yaml:"app_id" json:"app_id,omitempty"` - TimeoutStr string `yaml:"timeout" json:"timeout,omitempty"` - RemoteRef string `required:"false" yaml:"remote_ref" json:"remote_ref,omitempty"` + Protocol string `required:"true" yaml:"protocol" json:"protocol,omitempty"` + Address string `yaml:"address" json:"address,omitempty"` + Cluster string `yaml:"cluster" json:"cluster,omitempty"` + Group string `default:"dubbo" yaml:"group" json:"group,omitempty"` + Username string `yaml:"username" json:"username,omitempty"` + Password string `yaml:"password" json:"password,omitempty"` + LogDir string `yaml:"log_dir" json:"log_dir,omitempty"` + ConfigFile string `default:"dubbo.properties" yaml:"config_file" json:"config_file,omitempty"` + Namespace string `default:"dubbo" yaml:"namespace" json:"namespace,omitempty"` + AppConfigFile string `default:"dubbo.properties" yaml:"app_config_file" json:"app_config_file,omitempty"` + AppID string `default:"dubbo" yaml:"app_id" json:"app_id,omitempty"` + TimeoutStr string `yaml:"timeout" json:"timeout,omitempty"` + RemoteRef string `required:"false" yaml:"remote_ref" json:"remote_ref,omitempty"` + Params map[string]string `yaml:"params" json:"parameters,omitempty"` } // UnmarshalYAML unmarshals the ConfigCenterConfig by @unmarshal function @@ -74,8 +75,15 @@ func (c *ConfigCenterConfig) GetUrlMap() url.Values { urlMap.Set(constant.CONFIG_NAMESPACE_KEY, c.Namespace) urlMap.Set(constant.CONFIG_GROUP_KEY, c.Group) urlMap.Set(constant.CONFIG_CLUSTER_KEY, c.Cluster) - urlMap.Set(constant.CONFIG_APP_ID_KEY, c.AppId) + urlMap.Set(constant.CONFIG_APP_ID_KEY, c.AppID) urlMap.Set(constant.CONFIG_LOG_DIR_KEY, c.LogDir) + urlMap.Set(constant.CONFIG_USERNAME_KEY, c.Username) + urlMap.Set(constant.CONFIG_PASSWORD_KEY, c.Password) + urlMap.Set(constant.CONFIG_TIMEOUT_KEY, c.TimeoutStr) + + for key, val := range c.Params { + urlMap.Set(key, val) + } return urlMap } @@ -84,22 +92,22 @@ type configCenter struct{} // toURL will compatible with baseConfig.ConfigCenterConfig.Address and baseConfig.ConfigCenterConfig.RemoteRef before 1.6.0 // After 1.6.0 will not compatible, only baseConfig.ConfigCenterConfig.RemoteRef func (b *configCenter) toURL(baseConfig BaseConfig) (*common.URL, error) { - if len(baseConfig.ConfigCenterConfig.Address) > 0 { + remoteRef := baseConfig.ConfigCenterConfig.RemoteRef + // if set remote ref use remote + if len(remoteRef) <= 0 { return common.NewURL(baseConfig.ConfigCenterConfig.Address, - common.WithProtocol(baseConfig.ConfigCenterConfig.Protocol), common.WithParams(baseConfig.ConfigCenterConfig.GetUrlMap())) + common.WithProtocol(baseConfig.ConfigCenterConfig.Protocol), + common.WithParams(baseConfig.ConfigCenterConfig.GetUrlMap())) } - - remoteRef := baseConfig.ConfigCenterConfig.RemoteRef rc, ok := baseConfig.GetRemoteConfig(remoteRef) - if !ok { return nil, perrors.New("Could not find out the remote ref config, name: " + remoteRef) } - - newURL, err := rc.toURL() - if err == nil { - newURL.SetParams(baseConfig.ConfigCenterConfig.GetUrlMap()) + // set protocol if remote not set + if len(rc.Protocol) <= 0 { + rc.Protocol = baseConfig.ConfigCenterConfig.Protocol } + newURL, err := rc.ToURL() return newURL, err } @@ -124,8 +132,10 @@ func (b *configCenter) prepareEnvironment(baseConfig BaseConfig, configCenterUrl logger.Errorf("Get dynamic configuration error , error message is %v", err) return perrors.WithStack(err) } - config.GetEnvInstance().SetDynamicConfiguration(dynamicConfig) - content, err := dynamicConfig.GetProperties(baseConfig.ConfigCenterConfig.ConfigFile, config_center.WithGroup(baseConfig.ConfigCenterConfig.Group)) + envInstance := config.GetEnvInstance() + envInstance.SetDynamicConfiguration(dynamicConfig) + content, err := dynamicConfig.GetProperties(baseConfig.ConfigCenterConfig.ConfigFile, + config_center.WithGroup(baseConfig.ConfigCenterConfig.Group)) if err != nil { logger.Errorf("Get config content in dynamic configuration error , error message is %v", err) return perrors.WithStack(err) @@ -155,7 +165,7 @@ func (b *configCenter) prepareEnvironment(baseConfig BaseConfig, configCenterUrl if err != nil { return perrors.WithStack(err) } - config.GetEnvInstance().UpdateExternalConfigMap(mapContent) + envInstance.UpdateExternalConfigMap(mapContent) // appGroup config file if len(appContent) != 0 { @@ -163,7 +173,7 @@ func (b *configCenter) prepareEnvironment(baseConfig BaseConfig, configCenterUrl if err != nil { return perrors.WithStack(err) } - config.GetEnvInstance().UpdateAppExternalConfigMap(appMapContent) + envInstance.UpdateAppExternalConfigMap(appMapContent) } return nil diff --git a/config/config_center_config_test.go b/config/config_center_config_test.go index f0ff05e04b..ec11947e4d 100644 --- a/config/config_center_config_test.go +++ b/config/config_center_config_test.go @@ -59,6 +59,7 @@ func TestStartConfigCenterWithRemoteRef(t *testing.T) { baseConfig := &BaseConfig{ Remotes: m, ConfigCenterConfig: &ConfigCenterConfig{ + Protocol: "mock", Group: "dubbo", RemoteRef: "mock", ConfigFile: "mockDubbo.properties", @@ -72,24 +73,3 @@ func TestStartConfigCenterWithRemoteRef(t *testing.T) { assert.True(t, b) assert.Equal(t, "ikurento.com", v) } - -func TestStartConfigCenterWithRemoteRefError(t *testing.T) { - extension.SetConfigCenterFactory("mock", func() config_center.DynamicConfigurationFactory { - return &config_center.MockDynamicConfigurationFactory{} - }) - m := make(map[string]*RemoteConfig) - m["mock"] = &RemoteConfig{Address: "172.0.0.1"} - baseConfig := &BaseConfig{ - Remotes: m, - ConfigCenterConfig: &ConfigCenterConfig{ - Protocol: "mock", - Group: "dubbo", - RemoteRef: "mock", - ConfigFile: "mockDubbo.properties", - }, - } - - c := &configCenter{} - err := c.startConfigCenter(*baseConfig) - assert.Error(t, err) -} diff --git a/config/remote_config.go b/config/remote_config.go index 25e17618e0..292a89e9d3 100644 --- a/config/remote_config.go +++ b/config/remote_config.go @@ -18,6 +18,7 @@ package config import ( + "net/url" "time" ) @@ -55,7 +56,8 @@ func (rc *RemoteConfig) Timeout() time.Duration { if res, err := time.ParseDuration(rc.TimeoutStr); err == nil { return res } - logger.Errorf("Could not parse the timeout string to Duration: %s, the default value will be returned", rc.TimeoutStr) + logger.Errorf("Could not parse the timeout string to Duration: %s, the default value will be returned", + rc.TimeoutStr) return 5 * time.Second } @@ -69,14 +71,29 @@ func (rc *RemoteConfig) GetParam(key string, def string) string { return param } -func (rc *RemoteConfig) toURL() (*common.URL, error) { +// ToURL config to url +func (rc *RemoteConfig) ToURL() (*common.URL, error) { if len(rc.Protocol) == 0 { return nil, perrors.Errorf("Must provide protocol in RemoteConfig.") } return common.NewURL(rc.Address, + common.WithProtocol(rc.Protocol), common.WithUsername(rc.Username), common.WithPassword(rc.Password), common.WithLocation(rc.Address), - common.WithProtocol(rc.Protocol), + common.WithParams(rc.getUrlMap()), ) } + +// getUrlMap get url map +func (rc *RemoteConfig) getUrlMap() url.Values { + urlMap := url.Values{} + urlMap.Set(constant.CONFIG_USERNAME_KEY, rc.Username) + urlMap.Set(constant.CONFIG_PASSWORD_KEY, rc.Password) + urlMap.Set(constant.CONFIG_TIMEOUT_KEY, rc.TimeoutStr) + + for key, val := range rc.Params { + urlMap.Set(key, val) + } + return urlMap +} diff --git a/config_center/apollo/impl_test.go b/config_center/apollo/impl_test.go index d0df630299..3392868223 100644 --- a/config_center/apollo/impl_test.go +++ b/config_center/apollo/impl_test.go @@ -187,7 +187,7 @@ func initMockApollo(t *testing.T) *apolloConfiguration { c := &config.BaseConfig{ConfigCenterConfig: &config.ConfigCenterConfig{ Protocol: "apollo", Address: "106.12.25.204:8080", - AppId: "testApplication_yang", + AppID: "testApplication_yang", Cluster: "dev", Namespace: "mockDubbog", }} diff --git a/config_center/nacos/client.go b/config_center/nacos/client.go index f53c629f4e..908012b783 100644 --- a/config_center/nacos/client.go +++ b/config_center/nacos/client.go @@ -18,7 +18,6 @@ package nacos import ( - "strings" "sync" "time" ) @@ -29,7 +28,6 @@ import ( ) import ( - "dubbo.apache.org/dubbo-go/v3/common/constant" "dubbo.apache.org/dubbo-go/v3/common/logger" "dubbo.apache.org/dubbo-go/v3/remoting/nacos" ) @@ -58,58 +56,21 @@ func (n *NacosClient) SetClient(configClient *nacosClient.NacosConfigClient) { n.Unlock() } -type option func(*options) - -type options struct { - nacosName string - // configClient *NacosClient -} - -// WithNacosName Set nacos name -func WithNacosName(name string) option { - return func(opt *options) { - opt.nacosName = name - } -} - // ValidateNacosClient Validate nacos configClient , if null then create it -func ValidateNacosClient(container nacosClientFacade, opts ...option) error { +func ValidateNacosClient(container nacosClientFacade) error { if container == nil { return perrors.Errorf("container can not be null") } - os := &options{} - for _, opt := range opts { - opt(os) - } - url := container.GetURL() - timeout, err := time.ParseDuration(url.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT)) - if err != nil { - logger.Errorf("invalid timeout config %+v,got err %+v", - url.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT), err) - return perrors.WithMessagef(err, "newNacosClient(address:%+v)", url.Location) - } - nacosAddresses := strings.Split(url.Location, ",") - if container.NacosClient() == nil { + if container.NacosClient() == nil || container.NacosClient().Client() == nil { // in dubbo ,every registry only connect one node ,so this is []string{r.Address} newClient, err := nacos.NewNacosConfigClientByUrl(url) if err != nil { - logger.Errorf("newNacosClient(name{%s}, nacos address{%v}, timeout{%d}) = error{%v}", - os.nacosName, url.Location, timeout.String(), err) + logger.Errorf("ValidateNacosClient(name{%s}, nacos address{%v} = error{%v}", url.Location, err) return perrors.WithMessagef(err, "newNacosClient(address:%+v)", url.Location) } container.SetNacosClient(newClient) } - - if container.NacosClient().Client() == nil { - configClient, err := nacos.NewNacosConfigClientByUrl(url) - if err != nil { - logger.Errorf("initNacosConfigClient(addr:%+v,timeout:%v,url:%v) = err %+v", - nacosAddresses, timeout.String(), url, err) - return perrors.WithMessagef(err, "newNacosClient(address:%+v)", url.Location) - } - container.NacosClient().SetClient(configClient.Client()) - } return perrors.WithMessagef(nil, "newNacosClient(address:%+v)", url.PrimitiveURL) } diff --git a/config_center/nacos/client_test.go b/config_center/nacos/client_test.go index 458f2d9885..86305e7f58 100644 --- a/config_center/nacos/client_test.go +++ b/config_center/nacos/client_test.go @@ -39,7 +39,7 @@ func TestNewNacosClient(t *testing.T) { url: registryUrl, done: make(chan struct{}), } - err := ValidateNacosClient(c, WithNacosName(nacosClientName)) + err := ValidateNacosClient(c) assert.NoError(t, err) c.wg.Add(1) go HandleClientRestart(c) @@ -63,7 +63,7 @@ func TestSetNacosClient(t *testing.T) { done: make(chan struct{}), } - err := ValidateNacosClient(c, WithNacosName(nacosClientName)) + err := ValidateNacosClient(c) assert.NoError(t, err) c.wg.Add(1) go HandleClientRestart(c) @@ -85,7 +85,7 @@ func TestNewNacosClient_connectError(t *testing.T) { url: registryUrl, done: make(chan struct{}), } - err = ValidateNacosClient(c, WithNacosName(nacosClientName)) + err = ValidateNacosClient(c) assert.NoError(t, err) c.wg.Add(1) go HandleClientRestart(c) diff --git a/config_center/nacos/impl.go b/config_center/nacos/impl.go index 50e10946fc..6d7326aaf0 100644 --- a/config_center/nacos/impl.go +++ b/config_center/nacos/impl.go @@ -65,7 +65,7 @@ func newNacosDynamicConfiguration(url *common.URL) (*nacosDynamicConfiguration, url: url, done: make(chan struct{}), } - err := ValidateNacosClient(c, WithNacosName(nacosClientName)) + err := ValidateNacosClient(c) if err != nil { logger.Errorf("nacos configClient start error ,error message is %v", err) return nil, err diff --git a/registry/nacos/service_discovery.go b/registry/nacos/service_discovery.go index def19967ae..174bf0bc82 100644 --- a/registry/nacos/service_discovery.go +++ b/registry/nacos/service_discovery.go @@ -326,7 +326,7 @@ func newNacosServiceDiscovery(name string) (registry.ServiceDiscovery, error) { return nil, perrors.New("could not init the instance because the config is invalid") } - remoteConfig, ok := config.GetBaseConfig().GetRemoteConfig(sdc.RemoteRef) + rc, ok := config.GetBaseConfig().GetRemoteConfig(sdc.RemoteRef) if !ok { return nil, perrors.New("could not find the remote config for name: " + sdc.RemoteRef) } @@ -334,13 +334,16 @@ func newNacosServiceDiscovery(name string) (registry.ServiceDiscovery, error) { if len(group) == 0 { group = defaultGroup } - - client, err := nacos.NewNacosClient(remoteConfig) + // set protocol if remote not set + if len(rc.Protocol) <= 0 { + rc.Protocol = sdc.Protocol + } + client, err := nacos.NewNacosClient(rc) if err != nil { return nil, perrors.WithMessage(err, "create nacos namingClient failed.") } - descriptor := fmt.Sprintf("nacos-service-discovery[%s]", remoteConfig.Address) + descriptor := fmt.Sprintf("nacos-service-discovery[%s]", rc.Address) newInstance := &nacosServiceDiscovery{ group: group, diff --git a/remoting/nacos/builder.go b/remoting/nacos/builder.go index 462406185b..c42cc93e93 100644 --- a/remoting/nacos/builder.go +++ b/remoting/nacos/builder.go @@ -68,65 +68,43 @@ func GetNacosConfig(url *common.URL) ([]nacosConstant.ServerConfig, nacosConstan serverConfigs = append(serverConfigs, nacosConstant.ServerConfig{IpAddr: ip, Port: uint64(port)}) } - var clientConfig nacosConstant.ClientConfig timeout, err := time.ParseDuration(url.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT)) if err != nil { return []nacosConstant.ServerConfig{}, nacosConstant.ClientConfig{}, err } - //enable local cache when nacos can not connect. - notLoadCache, err := strconv.ParseBool(url.GetParam(constant.NACOS_NOT_LOAD_LOCAL_CACHE, "true")) - if err != nil { - notLoadCache = false - } - clientConfig.TimeoutMs = uint64(timeout.Seconds() * 1000) - // clientConfig.ListenInterval = 2 * clientConfig.TimeoutMs - clientConfig.CacheDir = url.GetParam(constant.NACOS_CACHE_DIR_KEY, "") - clientConfig.LogDir = url.GetParam(constant.NACOS_LOG_DIR_KEY, "") - clientConfig.Endpoint = url.GetParam(constant.NACOS_ENDPOINT, "") - clientConfig.NamespaceId = url.GetParam(constant.NACOS_NAMESPACE_ID, "") - clientConfig.Username = url.GetParam(constant.NACOS_USERNAME, "") - clientConfig.Password = url.GetParam(constant.NACOS_PASSWORD, "") - clientConfig.NotLoadCacheAtStart = notLoadCache + clientConfig := nacosConstant.ClientConfig{ + TimeoutMs: uint64(int32(timeout / time.Millisecond)), + BeatInterval: url.GetParamInt(constant.NACOS_BEAT_INTERVAL_KEY, 5000), + NamespaceId: url.GetParam(constant.NACOS_NAMESPACE_ID, ""), + AppName: url.GetParam(constant.NACOS_APP_NAME_KEY, ""), + Endpoint: url.GetParam(constant.NACOS_ENDPOINT, ""), + RegionId: url.GetParam(constant.NACOS_REGION_ID_KEY, ""), + AccessKey: url.GetParam(constant.NACOS_ACCESS_KEY, ""), + SecretKey: url.GetParam(constant.NACOS_SECRET_KEY, ""), + OpenKMS: url.GetParamBool(constant.NACOS_OPEN_KMS_KEY, false), + CacheDir: url.GetParam(constant.NACOS_CACHE_DIR_KEY, ""), + UpdateThreadNum: url.GetParamByIntValue(constant.NACOS_UPDATE_THREAD_NUM_KEY, 20), + NotLoadCacheAtStart: url.GetParamBool(constant.NACOS_NOT_LOAD_LOCAL_CACHE, true), + Username: url.GetParam(constant.NACOS_USERNAME, ""), + Password: url.GetParam(constant.NACOS_PASSWORD, ""), + LogDir: url.GetParam(constant.NACOS_LOG_DIR_KEY, ""), + LogLevel: url.GetParam(constant.NACOS_LOG_LEVEL_KEY, "info"), + } return serverConfigs, clientConfig, nil } // NewNacosClient create an instance with the config func NewNacosClient(rc *config.RemoteConfig) (*nacosClient.NacosNamingClient, error) { - if len(rc.Address) == 0 { - return nil, perrors.New("nacos address is empty!") - } - addresses := strings.Split(rc.Address, ",") - scs := make([]nacosConstant.ServerConfig, 0, len(addresses)) - for _, addr := range addresses { - ip, portStr, err := net.SplitHostPort(addr) - if err != nil { - return nil, perrors.WithMessagef(err, "split [%s] ", addr) - } - port, _ := strconv.Atoi(portStr) - scs = append(scs, nacosConstant.ServerConfig{ - IpAddr: ip, - Port: uint64(port), - }) + url, err := rc.ToURL() + if err != nil { + return nil, err } + scs, cc, err := GetNacosConfig(url) - var cc nacosConstant.ClientConfig - timeout := rc.Timeout() - //enable local cache when nacos can not connect. - notLoadCache, err := strconv.ParseBool(rc.GetParam(constant.NACOS_NOT_LOAD_LOCAL_CACHE, "true")) if err != nil { - notLoadCache = false + return nil, err } - cc.TimeoutMs = uint64(timeout.Nanoseconds() / constant.MsToNanoRate) - // cc.ListenInterval = 2 * cc.TimeoutMs - cc.CacheDir = rc.GetParam(constant.NACOS_CACHE_DIR_KEY, "") - cc.LogDir = rc.GetParam(constant.NACOS_LOG_DIR_KEY, "") - cc.Endpoint = rc.GetParam(constant.NACOS_ENDPOINT, "") - cc.NamespaceId = rc.GetParam(constant.NACOS_NAMESPACE_ID, "") - cc.Username = rc.Username - cc.Password = rc.Password - cc.NotLoadCacheAtStart = notLoadCache - return nacosClient.NewNacosNamingClient(getNacosClientName(), true, scs, cc) } diff --git a/remoting/nacos/builder_test.go b/remoting/nacos/builder_test.go index 0d43ed09ae..959d27e2fe 100644 --- a/remoting/nacos/builder_test.go +++ b/remoting/nacos/builder_test.go @@ -35,6 +35,8 @@ import ( func TestNewNacosClient(t *testing.T) { rc := &config.RemoteConfig{} + rc.Protocol = "nacos" + rc.Username = "nacos" client, err := NewNacosClient(rc) // address is nil