Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: apollo data change listener #670

Merged
merged 4 commits into from
Jul 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 13 additions & 23 deletions config_center/apollo/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,27 +26,28 @@ import (

import (
"github.com/pkg/errors"
"github.com/zouyx/agollo"
"github.com/zouyx/agollo/v3"
agolloConstant "github.com/zouyx/agollo/v3/constant"
"github.com/zouyx/agollo/v3/env/config"
)

import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
cc "github.com/apache/dubbo-go/config_center"
"github.com/apache/dubbo-go/config_center/parser"
"github.com/apache/dubbo-go/remoting"
)

const (
apolloProtocolPrefix = "http://"
apolloConfigFormat = "%s.%s"
apolloConfigFormat = "%s%s"
)

type apolloConfiguration struct {
url *common.URL

listeners sync.Map
appConf *agollo.AppConfig
appConf *config.AppConfig
parser parser.ConfigurationParser
}

Expand All @@ -59,39 +60,28 @@ func newApolloConfiguration(url *common.URL) (*apolloConfiguration, error) {

appId := url.GetParam(constant.CONFIG_APP_ID_KEY, "")
namespaces := getProperties(url.GetParam(constant.CONFIG_NAMESPACE_KEY, cc.DEFAULT_GROUP))
c.appConf = &agollo.AppConfig{
AppId: appId,
c.appConf = &config.AppConfig{
AppID: appId,
Cluster: configCluster,
NamespaceName: namespaces,
Ip: configAddr,
IP: configAddr,
}

agollo.InitCustomConfig(func() (*agollo.AppConfig, error) {
agollo.InitCustomConfig(func() (*config.AppConfig, error) {
return c.appConf, nil
})

return c, agollo.Start()
}

func getChangeType(change agollo.ConfigChangeType) remoting.EventType {
switch change {
case agollo.ADDED:
return remoting.EventTypeAdd
case agollo.DELETED:
return remoting.EventTypeDel
default:
return remoting.EventTypeUpdate
}
}

func (c *apolloConfiguration) AddListener(key string, listener cc.ConfigurationListener, opts ...cc.Option) {
k := &cc.Options{}
for _, opt := range opts {
opt(k)
}

key = k.Group + key
l, _ := c.listeners.LoadOrStore(key, NewApolloListener())
l, _ := c.listeners.LoadOrStore(key, newApolloListener())
l.(*apolloListener).AddListener(listener)
}

Expand All @@ -109,10 +99,10 @@ func (c *apolloConfiguration) RemoveListener(key string, listener cc.Configurati
}

func getProperties(namespace string) string {
return getNamespaceName(namespace, agollo.Properties)
return getNamespaceName(namespace, agolloConstant.Properties)
}

func getNamespaceName(namespace string, configFileFormat agollo.ConfigFileFormat) string {
func getNamespaceName(namespace string, configFileFormat agolloConstant.ConfigFileFormat) string {
return fmt.Sprintf(apolloConfigFormat, namespace, configFileFormat)
}

Expand All @@ -137,7 +127,7 @@ func (c *apolloConfiguration) GetProperties(key string, opts ...cc.Option) (stri
if config == nil {
return "", errors.New(fmt.Sprintf("nothing in namespace:%s ", key))
}
return config.GetContent(agollo.Properties), nil
return config.GetContent(), nil
}

func (c *apolloConfiguration) getAddressWithProtocolPrefix(url *common.URL) string {
Expand Down
25 changes: 5 additions & 20 deletions config_center/apollo/impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,9 @@ func initMockApollo(t *testing.T) *apolloConfiguration {
return configuration
}

func TestAddListener(t *testing.T) {
func TestListener(t *testing.T) {
listener := &apolloDataListener{}
listener.wg.Add(1)
listener.wg.Add(2)
apollo := initMockApollo(t)
mockConfigRes = `{
"appId": "testApplication_yang",
Expand All @@ -215,28 +215,14 @@ func TestAddListener(t *testing.T) {
},
"releaseKey": "20191104105242-0f13805d89f834a4"
}`
//test add
apollo.AddListener(mockNamespace, listener)
listener.wg.Wait()
assert.Equal(t, "registries.hangzhouzk.username", listener.event)
assert.Equal(t, "mockDubbog.properties", listener.event)
assert.Greater(t, listener.count, 0)
deleteMockJson(t)
}

func TestRemoveListener(t *testing.T) {
listener := &apolloDataListener{}
apollo := initMockApollo(t)
mockConfigRes = `{
"appId": "testApplication_yang",
"cluster": "default",
"namespaceName": "mockDubbog.properties",
"configurations": {
"registries.hangzhouzk.username": "11111"
},
"releaseKey": "20191104105242-0f13805d89f834a4"
}`
apollo.AddListener(mockNamespace, listener)
//test remove
apollo.RemoveListener(mockNamespace, listener)
assert.Equal(t, "", listener.event)
listenerCount := 0
apollo.listeners.Range(func(key, value interface{}) bool {
apolloListener := value.(*apolloListener)
Expand All @@ -247,7 +233,6 @@ func TestRemoveListener(t *testing.T) {
return true
})
assert.Equal(t, listenerCount, 0)
assert.Equal(t, listener.count, 0)
deleteMockJson(t)
}

Expand Down
38 changes: 26 additions & 12 deletions config_center/apollo/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,34 +18,48 @@
package apollo

import (
"github.com/zouyx/agollo"
"github.com/zouyx/agollo/v3"
"github.com/zouyx/agollo/v3/storage"
"gopkg.in/yaml.v2"
)

import (
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/config_center"
"github.com/apache/dubbo-go/remoting"
)

type apolloListener struct {
listeners map[config_center.ConfigurationListener]struct{}
}

// NewApolloListener ...
func NewApolloListener() *apolloListener {
// nolint
func newApolloListener() *apolloListener {
zouyx marked this conversation as resolved.
Show resolved Hide resolved
return &apolloListener{
listeners: make(map[config_center.ConfigurationListener]struct{}, 0),
}
}

// OnChange ...
func (a *apolloListener) OnChange(changeEvent *agollo.ChangeEvent) {
for key, change := range changeEvent.Changes {
for listener := range a.listeners {
listener.Process(&config_center.ConfigChangeEvent{
ConfigType: getChangeType(change.ChangeType),
Key: key,
Value: change.NewValue,
})
}
func (a *apolloListener) OnChange(changeEvent *storage.ChangeEvent) {

}

// OnNewestChange process each listener by all changes
func (a *apolloListener) OnNewestChange(changeEvent *storage.FullChangeEvent) {
b, err := yaml.Marshal(changeEvent.Changes)
if err != nil {
logger.Errorf("apollo onNewestChange err %+v",
err)
return
}
content := string(b)
for listener := range a.listeners {
listener.Process(&config_center.ConfigChangeEvent{
ConfigType: remoting.EventTypeUpdate,
Key: changeEvent.Namespace,
Value: content,
})
}
}

Expand Down
16 changes: 2 additions & 14 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,10 @@ module github.com/apache/dubbo-go
require (
github.com/Workiva/go-datastructures v1.0.50
github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5
github.com/aliyun/aliyun-oss-go-sdk v0.0.0-20190307165228-86c17b95fcd5 // indirect
github.com/apache/dubbo-go-hessian2 v1.4.0
github.com/baiyubin/aliyun-sts-go-sdk v0.0.0-20180326062324-cfa1a18b161f // indirect
github.com/coreos/bbolt v1.3.3 // indirect
github.com/coreos/etcd v3.3.13+incompatible
github.com/coreos/go-semver v0.3.0 // indirect
github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f // indirect
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f // indirect
github.com/creasty/defaults v1.3.0
github.com/dubbogo/getty v1.3.3
github.com/dubbogo/go-zookeeper v1.0.0
Expand All @@ -20,9 +16,6 @@ require (
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 // indirect
github.com/golang/mock v1.3.1
github.com/golang/protobuf v1.3.2
github.com/google/btree v1.0.0 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0 // indirect
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.9.5 // indirect
github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645
github.com/hashicorp/consul v1.5.3
Expand All @@ -36,19 +29,14 @@ require (
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.1.0
github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b
github.com/smartystreets/goconvey v0.0.0-20190710185942-9d28bd7c0945 // indirect
github.com/soheilhy/cmux v0.1.4 // indirect
github.com/stretchr/testify v1.5.1
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 // indirect
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect
github.com/zouyx/agollo v0.0.0-20191114083447-dde9fc9f35b8
github.com/zouyx/agollo/v3 v3.4.4
go.etcd.io/bbolt v1.3.3 // indirect
go.etcd.io/etcd v3.3.13+incompatible
go.uber.org/atomic v1.4.0
go.uber.org/zap v1.10.0
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 // indirect
google.golang.org/grpc v1.22.1
gopkg.in/yaml.v2 v2.2.2
gopkg.in/yaml.v2 v2.2.4
k8s.io/api v0.0.0-20190325185214-7544f9db76f6
k8s.io/apimachinery v0.0.0-20190223001710-c182ff3b9841
k8s.io/client-go v8.0.0+incompatible
Expand Down
Loading