Skip to content

Commit

Permalink
Merge 02a08e6 into cee963f
Browse files Browse the repository at this point in the history
  • Loading branch information
zhaoyunxing92 authored Jun 26, 2021
2 parents cee963f + 02a08e6 commit 46d772d
Show file tree
Hide file tree
Showing 11 changed files with 114 additions and 154 deletions.
23 changes: 16 additions & 7 deletions common/constant/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,13 +136,14 @@ const (
)

const (
CONFIG_NAMESPACE_KEY = "config.namespace"
CONFIG_GROUP_KEY = "config.group"
CONFIG_APP_ID_KEY = "config.appId"
CONFIG_CLUSTER_KEY = "config.cluster"
CONFIG_CHECK_KEY = "config.check"
CONFIG_TIMEOUT_KET = "config.timeout"
CONFIG_LOG_DIR_KEY = "config.logDir"
CONFIG_NAMESPACE_KEY = "namespace"
CONFIG_GROUP_KEY = "group"
CONFIG_APP_ID_KEY = "appId"
CONFIG_CLUSTER_KEY = "cluster"
CONFIG_TIMEOUT_KEY = "timeout"
CONFIG_USERNAME_KEY = "username"
CONFIG_PASSWORD_KEY = "password"
CONFIG_LOG_DIR_KEY = "logDir"
CONFIG_VERSION_KEY = "configVersion"
COMPATIBLE_CONFIG_KEY = "compatible_config"
)
Expand Down Expand Up @@ -172,6 +173,7 @@ const (
NACOS_DEFAULT_ROLETYPE = 3
NACOS_CACHE_DIR_KEY = "cacheDir"
NACOS_LOG_DIR_KEY = "logDir"
NACOS_BEAT_INTERVAL_KEY = "beatInterval"
NACOS_ENDPOINT = "endpoint"
NACOS_SERVICE_NAME_SEPARATOR = ":"
NACOS_CATEGORY_KEY = "category"
Expand All @@ -181,6 +183,13 @@ const (
NACOS_PASSWORD = "password"
NACOS_USERNAME = "username"
NACOS_NOT_LOAD_LOCAL_CACHE = "nacos.not.load.cache"
NACOS_APP_NAME_KEY = "appName"
NACOS_REGION_ID_KEY = "regionId"
NACOS_ACCESS_KEY = "access"
NACOS_SECRET_KEY = "secret"
NACOS_OPEN_KMS_KEY = "kms"
NACOS_UPDATE_THREAD_NUM_KEY = "updateThreadNum"
NACOS_LOG_LEVEL_KEY = "logLevel"
)

const (
Expand Down
64 changes: 37 additions & 27 deletions config/config_center_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,19 +44,20 @@ import (
//
// ConfigCenter has currently supported Zookeeper, Nacos, Etcd, Consul, Apollo
type ConfigCenterConfig struct {
Protocol string `required:"true" yaml:"protocol" json:"protocol,omitempty"`
Address string `yaml:"address" json:"address,omitempty"`
Cluster string `yaml:"cluster" json:"cluster,omitempty"`
Group string `default:"dubbo" yaml:"group" json:"group,omitempty"`
Username string `yaml:"username" json:"username,omitempty"`
Password string `yaml:"password" json:"password,omitempty"`
LogDir string `yaml:"log_dir" json:"log_dir,omitempty"`
ConfigFile string `default:"dubbo.properties" yaml:"config_file" json:"config_file,omitempty"`
Namespace string `default:"dubbo" yaml:"namespace" json:"namespace,omitempty"`
AppConfigFile string `default:"dubbo.properties" yaml:"app_config_file" json:"app_config_file,omitempty"`
AppId string `default:"dubbo" yaml:"app_id" json:"app_id,omitempty"`
TimeoutStr string `yaml:"timeout" json:"timeout,omitempty"`
RemoteRef string `required:"false" yaml:"remote_ref" json:"remote_ref,omitempty"`
Protocol string `required:"true" yaml:"protocol" json:"protocol,omitempty"`
Address string `yaml:"address" json:"address,omitempty"`
Cluster string `yaml:"cluster" json:"cluster,omitempty"`
Group string `default:"dubbo" yaml:"group" json:"group,omitempty"`
Username string `yaml:"username" json:"username,omitempty"`
Password string `yaml:"password" json:"password,omitempty"`
LogDir string `yaml:"log_dir" json:"log_dir,omitempty"`
ConfigFile string `default:"dubbo.properties" yaml:"config_file" json:"config_file,omitempty"`
Namespace string `default:"dubbo" yaml:"namespace" json:"namespace,omitempty"`
AppConfigFile string `default:"dubbo.properties" yaml:"app_config_file" json:"app_config_file,omitempty"`
AppID string `default:"dubbo" yaml:"app_id" json:"app_id,omitempty"`
TimeoutStr string `yaml:"timeout" json:"timeout,omitempty"`
RemoteRef string `required:"false" yaml:"remote_ref" json:"remote_ref,omitempty"`
Params map[string]string `yaml:"params" json:"parameters,omitempty"`
}

// UnmarshalYAML unmarshals the ConfigCenterConfig by @unmarshal function
Expand All @@ -74,8 +75,15 @@ func (c *ConfigCenterConfig) GetUrlMap() url.Values {
urlMap.Set(constant.CONFIG_NAMESPACE_KEY, c.Namespace)
urlMap.Set(constant.CONFIG_GROUP_KEY, c.Group)
urlMap.Set(constant.CONFIG_CLUSTER_KEY, c.Cluster)
urlMap.Set(constant.CONFIG_APP_ID_KEY, c.AppId)
urlMap.Set(constant.CONFIG_APP_ID_KEY, c.AppID)
urlMap.Set(constant.CONFIG_LOG_DIR_KEY, c.LogDir)
urlMap.Set(constant.CONFIG_USERNAME_KEY, c.Username)
urlMap.Set(constant.CONFIG_PASSWORD_KEY, c.Password)
urlMap.Set(constant.CONFIG_TIMEOUT_KEY, c.TimeoutStr)

for key, val := range c.Params {
urlMap.Set(key, val)
}
return urlMap
}

Expand All @@ -84,22 +92,22 @@ type configCenter struct{}
// toURL will compatible with baseConfig.ConfigCenterConfig.Address and baseConfig.ConfigCenterConfig.RemoteRef before 1.6.0
// After 1.6.0 will not compatible, only baseConfig.ConfigCenterConfig.RemoteRef
func (b *configCenter) toURL(baseConfig BaseConfig) (*common.URL, error) {
if len(baseConfig.ConfigCenterConfig.Address) > 0 {
remoteRef := baseConfig.ConfigCenterConfig.RemoteRef
// if set remote ref use remote
if len(remoteRef) <= 0 {
return common.NewURL(baseConfig.ConfigCenterConfig.Address,
common.WithProtocol(baseConfig.ConfigCenterConfig.Protocol), common.WithParams(baseConfig.ConfigCenterConfig.GetUrlMap()))
common.WithProtocol(baseConfig.ConfigCenterConfig.Protocol),
common.WithParams(baseConfig.ConfigCenterConfig.GetUrlMap()))
}

remoteRef := baseConfig.ConfigCenterConfig.RemoteRef
rc, ok := baseConfig.GetRemoteConfig(remoteRef)

if !ok {
return nil, perrors.New("Could not find out the remote ref config, name: " + remoteRef)
}

newURL, err := rc.toURL()
if err == nil {
newURL.SetParams(baseConfig.ConfigCenterConfig.GetUrlMap())
// set protocol if remote not set
if len(rc.Protocol) <= 0 {
rc.Protocol = baseConfig.ConfigCenterConfig.Protocol
}
newURL, err := rc.ToURL()
return newURL, err
}

Expand All @@ -124,8 +132,10 @@ func (b *configCenter) prepareEnvironment(baseConfig BaseConfig, configCenterUrl
logger.Errorf("Get dynamic configuration error , error message is %v", err)
return perrors.WithStack(err)
}
config.GetEnvInstance().SetDynamicConfiguration(dynamicConfig)
content, err := dynamicConfig.GetProperties(baseConfig.ConfigCenterConfig.ConfigFile, config_center.WithGroup(baseConfig.ConfigCenterConfig.Group))
envInstance := config.GetEnvInstance()
envInstance.SetDynamicConfiguration(dynamicConfig)
content, err := dynamicConfig.GetProperties(baseConfig.ConfigCenterConfig.ConfigFile,
config_center.WithGroup(baseConfig.ConfigCenterConfig.Group))
if err != nil {
logger.Errorf("Get config content in dynamic configuration error , error message is %v", err)
return perrors.WithStack(err)
Expand Down Expand Up @@ -155,15 +165,15 @@ func (b *configCenter) prepareEnvironment(baseConfig BaseConfig, configCenterUrl
if err != nil {
return perrors.WithStack(err)
}
config.GetEnvInstance().UpdateExternalConfigMap(mapContent)
envInstance.UpdateExternalConfigMap(mapContent)

// appGroup config file
if len(appContent) != 0 {
appMapContent, err := dynamicConfig.Parser().Parse(appContent)
if err != nil {
return perrors.WithStack(err)
}
config.GetEnvInstance().UpdateAppExternalConfigMap(appMapContent)
envInstance.UpdateAppExternalConfigMap(appMapContent)
}

return nil
Expand Down
22 changes: 1 addition & 21 deletions config/config_center_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func TestStartConfigCenterWithRemoteRef(t *testing.T) {
baseConfig := &BaseConfig{
Remotes: m,
ConfigCenterConfig: &ConfigCenterConfig{
Protocol: "mock",
Group: "dubbo",
RemoteRef: "mock",
ConfigFile: "mockDubbo.properties",
Expand All @@ -72,24 +73,3 @@ func TestStartConfigCenterWithRemoteRef(t *testing.T) {
assert.True(t, b)
assert.Equal(t, "ikurento.com", v)
}

func TestStartConfigCenterWithRemoteRefError(t *testing.T) {
extension.SetConfigCenterFactory("mock", func() config_center.DynamicConfigurationFactory {
return &config_center.MockDynamicConfigurationFactory{}
})
m := make(map[string]*RemoteConfig)
m["mock"] = &RemoteConfig{Address: "172.0.0.1"}
baseConfig := &BaseConfig{
Remotes: m,
ConfigCenterConfig: &ConfigCenterConfig{
Protocol: "mock",
Group: "dubbo",
RemoteRef: "mock",
ConfigFile: "mockDubbo.properties",
},
}

c := &configCenter{}
err := c.startConfigCenter(*baseConfig)
assert.Error(t, err)
}
23 changes: 20 additions & 3 deletions config/remote_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package config

import (
"net/url"
"time"
)

Expand Down Expand Up @@ -55,7 +56,8 @@ func (rc *RemoteConfig) Timeout() time.Duration {
if res, err := time.ParseDuration(rc.TimeoutStr); err == nil {
return res
}
logger.Errorf("Could not parse the timeout string to Duration: %s, the default value will be returned", rc.TimeoutStr)
logger.Errorf("Could not parse the timeout string to Duration: %s, the default value will be returned",
rc.TimeoutStr)
return 5 * time.Second
}

Expand All @@ -69,14 +71,29 @@ func (rc *RemoteConfig) GetParam(key string, def string) string {
return param
}

func (rc *RemoteConfig) toURL() (*common.URL, error) {
// ToURL config to url
func (rc *RemoteConfig) ToURL() (*common.URL, error) {
if len(rc.Protocol) == 0 {
return nil, perrors.Errorf("Must provide protocol in RemoteConfig.")
}
return common.NewURL(rc.Address,
common.WithProtocol(rc.Protocol),
common.WithUsername(rc.Username),
common.WithPassword(rc.Password),
common.WithLocation(rc.Address),
common.WithProtocol(rc.Protocol),
common.WithParams(rc.getUrlMap()),
)
}

// getUrlMap get url map
func (rc *RemoteConfig) getUrlMap() url.Values {
urlMap := url.Values{}
urlMap.Set(constant.CONFIG_USERNAME_KEY, rc.Username)
urlMap.Set(constant.CONFIG_PASSWORD_KEY, rc.Password)
urlMap.Set(constant.CONFIG_TIMEOUT_KEY, rc.TimeoutStr)

for key, val := range rc.Params {
urlMap.Set(key, val)
}
return urlMap
}
2 changes: 1 addition & 1 deletion config_center/apollo/impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func initMockApollo(t *testing.T) *apolloConfiguration {
c := &config.BaseConfig{ConfigCenterConfig: &config.ConfigCenterConfig{
Protocol: "apollo",
Address: "106.12.25.204:8080",
AppId: "testApplication_yang",
AppID: "testApplication_yang",
Cluster: "dev",
Namespace: "mockDubbog",
}}
Expand Down
45 changes: 3 additions & 42 deletions config_center/nacos/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package nacos

import (
"strings"
"sync"
"time"
)
Expand All @@ -29,7 +28,6 @@ import (
)

import (
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/logger"
"dubbo.apache.org/dubbo-go/v3/remoting/nacos"
)
Expand Down Expand Up @@ -58,58 +56,21 @@ func (n *NacosClient) SetClient(configClient *nacosClient.NacosConfigClient) {
n.Unlock()
}

type option func(*options)

type options struct {
nacosName string
// configClient *NacosClient
}

// WithNacosName Set nacos name
func WithNacosName(name string) option {
return func(opt *options) {
opt.nacosName = name
}
}

// ValidateNacosClient Validate nacos configClient , if null then create it
func ValidateNacosClient(container nacosClientFacade, opts ...option) error {
func ValidateNacosClient(container nacosClientFacade) error {
if container == nil {
return perrors.Errorf("container can not be null")
}
os := &options{}
for _, opt := range opts {
opt(os)
}

url := container.GetURL()
timeout, err := time.ParseDuration(url.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT))
if err != nil {
logger.Errorf("invalid timeout config %+v,got err %+v",
url.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT), err)
return perrors.WithMessagef(err, "newNacosClient(address:%+v)", url.Location)
}
nacosAddresses := strings.Split(url.Location, ",")
if container.NacosClient() == nil {
if container.NacosClient() == nil || container.NacosClient().Client() == nil {
// in dubbo ,every registry only connect one node ,so this is []string{r.Address}
newClient, err := nacos.NewNacosConfigClientByUrl(url)
if err != nil {
logger.Errorf("newNacosClient(name{%s}, nacos address{%v}, timeout{%d}) = error{%v}",
os.nacosName, url.Location, timeout.String(), err)
logger.Errorf("ValidateNacosClient(name{%s}, nacos address{%v} = error{%v}", url.Location, err)
return perrors.WithMessagef(err, "newNacosClient(address:%+v)", url.Location)
}
container.SetNacosClient(newClient)
}

if container.NacosClient().Client() == nil {
configClient, err := nacos.NewNacosConfigClientByUrl(url)
if err != nil {
logger.Errorf("initNacosConfigClient(addr:%+v,timeout:%v,url:%v) = err %+v",
nacosAddresses, timeout.String(), url, err)
return perrors.WithMessagef(err, "newNacosClient(address:%+v)", url.Location)
}
container.NacosClient().SetClient(configClient.Client())
}
return perrors.WithMessagef(nil, "newNacosClient(address:%+v)", url.PrimitiveURL)
}

Expand Down
6 changes: 3 additions & 3 deletions config_center/nacos/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestNewNacosClient(t *testing.T) {
url: registryUrl,
done: make(chan struct{}),
}
err := ValidateNacosClient(c, WithNacosName(nacosClientName))
err := ValidateNacosClient(c)
assert.NoError(t, err)
c.wg.Add(1)
go HandleClientRestart(c)
Expand All @@ -63,7 +63,7 @@ func TestSetNacosClient(t *testing.T) {
done: make(chan struct{}),
}

err := ValidateNacosClient(c, WithNacosName(nacosClientName))
err := ValidateNacosClient(c)
assert.NoError(t, err)
c.wg.Add(1)
go HandleClientRestart(c)
Expand All @@ -85,7 +85,7 @@ func TestNewNacosClient_connectError(t *testing.T) {
url: registryUrl,
done: make(chan struct{}),
}
err = ValidateNacosClient(c, WithNacosName(nacosClientName))
err = ValidateNacosClient(c)
assert.NoError(t, err)
c.wg.Add(1)
go HandleClientRestart(c)
Expand Down
2 changes: 1 addition & 1 deletion config_center/nacos/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func newNacosDynamicConfiguration(url *common.URL) (*nacosDynamicConfiguration,
url: url,
done: make(chan struct{}),
}
err := ValidateNacosClient(c, WithNacosName(nacosClientName))
err := ValidateNacosClient(c)
if err != nil {
logger.Errorf("nacos configClient start error ,error message is %v", err)
return nil, err
Expand Down
11 changes: 7 additions & 4 deletions registry/nacos/service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,21 +326,24 @@ func newNacosServiceDiscovery(name string) (registry.ServiceDiscovery, error) {
return nil, perrors.New("could not init the instance because the config is invalid")
}

remoteConfig, ok := config.GetBaseConfig().GetRemoteConfig(sdc.RemoteRef)
rc, ok := config.GetBaseConfig().GetRemoteConfig(sdc.RemoteRef)
if !ok {
return nil, perrors.New("could not find the remote config for name: " + sdc.RemoteRef)
}
group := sdc.Group
if len(group) == 0 {
group = defaultGroup
}

client, err := nacos.NewNacosClient(remoteConfig)
// set protocol if remote not set
if len(rc.Protocol) <= 0 {
rc.Protocol = sdc.Protocol
}
client, err := nacos.NewNacosClient(rc)
if err != nil {
return nil, perrors.WithMessage(err, "create nacos namingClient failed.")
}

descriptor := fmt.Sprintf("nacos-service-discovery[%s]", remoteConfig.Address)
descriptor := fmt.Sprintf("nacos-service-discovery[%s]", rc.Address)

newInstance := &nacosServiceDiscovery{
group: group,
Expand Down
Loading

0 comments on commit 46d772d

Please sign in to comment.