Skip to content

Commit

Permalink
Merge pull request #400 from sxllwx/k8s_merge
Browse files Browse the repository at this point in the history
Add: kubernetes registry and remote package unit test
  • Loading branch information
flycash authored Mar 16, 2020
2 parents 36fdb01 + e48b698 commit e8f7526
Show file tree
Hide file tree
Showing 21 changed files with 2,901 additions and 34 deletions.
4 changes: 4 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ require (
go.uber.org/zap v1.10.0
google.golang.org/grpc v1.22.1
gopkg.in/yaml.v2 v2.2.2
k8s.io/api v0.0.0-20190325185214-7544f9db76f6
k8s.io/apimachinery v0.0.0-20190223001710-c182ff3b9841
k8s.io/client-go v8.0.0+incompatible
k8s.io/kube-openapi v0.0.0-20191107075043-30be4d16710a // indirect
)

go 1.13
48 changes: 39 additions & 9 deletions go.sum

Large diffs are not rendered by default.

14 changes: 9 additions & 5 deletions registry/etcdv3/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ type dataListener struct {
listener config_center.ConfigurationListener
}

// NewRegistryDataListener ...
// NewRegistryDataListener
func NewRegistryDataListener(listener config_center.ConfigurationListener) *dataListener {
return &dataListener{listener: listener, interestedURL: []*common.URL{}}
return &dataListener{listener: listener}
}

func (l *dataListener) AddInterestedURL(url *common.URL) {
Expand All @@ -49,7 +49,12 @@ func (l *dataListener) AddInterestedURL(url *common.URL) {

func (l *dataListener) DataChange(eventType remoting.Event) bool {

url := eventType.Path[strings.Index(eventType.Path, "/providers/")+len("/providers/"):]
index := strings.Index(eventType.Path, "/providers/")
if index == -1 {
logger.Warnf("Listen with no url, event.path={%v}", eventType.Path)
return false
}
url := eventType.Path[index+len("/providers/"):]
serviceURL, err := common.NewURL(url)
if err != nil {
logger.Warnf("Listen NewURL(r{%s}) = error{%v}", eventType.Path, err)
Expand All @@ -68,7 +73,6 @@ func (l *dataListener) DataChange(eventType remoting.Event) bool {
return true
}
}

return false
}

Expand Down Expand Up @@ -97,7 +101,7 @@ func (l *configurationListener) Next() (*registry.ServiceEvent, error) {

case e := <-l.events:
logger.Infof("got etcd event %#v", e)
if e.ConfigType == remoting.EventTypeDel {
if e.ConfigType == remoting.EventTypeDel && l.registry.client.Valid() {
select {
case <-l.registry.Done():
logger.Warnf("update @result{%s}. But its connection to registry is invalid", e.Value)
Expand Down
13 changes: 10 additions & 3 deletions registry/etcdv3/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@
package etcdv3

import (
"os"
"testing"
"time"

"github.com/apache/dubbo-go/config_center"
)

import (
Expand All @@ -32,6 +31,7 @@ import (

import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/config_center"
"github.com/apache/dubbo-go/remoting"
)

Expand All @@ -40,13 +40,16 @@ type RegistryTestSuite struct {
etcd *embed.Etcd
}

const defaultEtcdV3WorkDir = "/tmp/default-dubbo-go-registry.etcd"

// start etcd server
func (suite *RegistryTestSuite) SetupSuite() {

t := suite.T()

cfg := embed.NewConfig()
cfg.Dir = "/tmp/default.etcd"
// avoid conflict with default etcd work-dir
cfg.Dir = defaultEtcdV3WorkDir
e, err := embed.StartEtcd(cfg)
if err != nil {
t.Fatal(err)
Expand All @@ -66,6 +69,10 @@ func (suite *RegistryTestSuite) SetupSuite() {
// stop etcd server
func (suite *RegistryTestSuite) TearDownSuite() {
suite.etcd.Close()
// clean the etcd workdir
if err := os.RemoveAll(defaultEtcdV3WorkDir); err != nil {
suite.FailNow(err.Error())
}
}

func (suite *RegistryTestSuite) TestDataChange() {
Expand Down
4 changes: 2 additions & 2 deletions registry/etcdv3/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (suite *RegistryTestSuite) TestSubscribe() {
assert.Regexp(t, ".*ServiceEvent{Action{add}.*", serviceEvent.String())
}

func (suite *RegistryTestSuite) TestConsumerDestory() {
func (suite *RegistryTestSuite) TestConsumerDestroy() {

t := suite.T()
url, _ := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"}))
Expand All @@ -117,7 +117,7 @@ func (suite *RegistryTestSuite) TestConsumerDestory() {

}

func (suite *RegistryTestSuite) TestProviderDestory() {
func (suite *RegistryTestSuite) TestProviderDestroy() {

t := suite.T()
reg := initRegistry(t)
Expand Down
121 changes: 121 additions & 0 deletions registry/kubernetes/listener.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kubernetes

import (
"strings"
)

import (
perrors "github.com/pkg/errors"
)

import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/config_center"
"github.com/apache/dubbo-go/registry"
"github.com/apache/dubbo-go/remoting"
)

type dataListener struct {
interestedURL []*common.URL
listener config_center.ConfigurationListener
}

// NewRegistryDataListener
func NewRegistryDataListener(listener config_center.ConfigurationListener) *dataListener {
return &dataListener{listener: listener}
}

// AddInterestedURL
func (l *dataListener) AddInterestedURL(url *common.URL) {
l.interestedURL = append(l.interestedURL, url)
}

// DataChange
// notify listen, when interest event
func (l *dataListener) DataChange(eventType remoting.Event) bool {

index := strings.Index(eventType.Path, "/providers/")
if index == -1 {
logger.Warnf("Listen with no url, event.path={%v}", eventType.Path)
return false
}
url := eventType.Path[index+len("/providers/"):]
serviceURL, err := common.NewURL(url)
if err != nil {
logger.Warnf("Listen NewURL(r{%s}) = error{%v}", eventType.Path, err)
return false
}

for _, v := range l.interestedURL {
if serviceURL.URLEqual(*v) {
l.listener.Process(
&config_center.ConfigChangeEvent{
Key: eventType.Path,
Value: serviceURL,
ConfigType: eventType.Action,
},
)
return true
}
}
return false
}

type configurationListener struct {
registry *kubernetesRegistry
events chan *config_center.ConfigChangeEvent
}

// NewConfigurationListener for listening the event of kubernetes.
func NewConfigurationListener(reg *kubernetesRegistry) *configurationListener {
// add a new waiter
reg.WaitGroup().Add(1)
return &configurationListener{registry: reg, events: make(chan *config_center.ConfigChangeEvent, 32)}
}

func (l *configurationListener) Process(configType *config_center.ConfigChangeEvent) {
l.events <- configType
}

func (l *configurationListener) Next() (*registry.ServiceEvent, error) {
for {
select {
case <-l.registry.Done():
logger.Warnf("listener's kubernetes client connection is broken, so kubernetes event listener exits now.")
return nil, perrors.New("listener stopped")

case e := <-l.events:
logger.Infof("got kubernetes event %#v", e)
if e.ConfigType == remoting.EventTypeDel && !l.registry.client.Valid() {
select {
case <-l.registry.Done():
logger.Warnf("update @result{%s}. But its connection to registry is invalid", e.Value)
default:
}
continue
}
return &registry.ServiceEvent{Action: e.ConfigType, Service: e.Value.(common.URL)}, nil
}
}
}
func (l *configurationListener) Close() {
l.registry.WaitGroup().Done()
}
Loading

0 comments on commit e8f7526

Please sign in to comment.