Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix:#1143 Feature/reduce etcd registry conn; wait group modify #1297

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/github-actions.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ jobs:
# diff -u <(echo -n) <(gofmt -d -s .)
- name: Install go ci lint
run: curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin v1.27.0
run: curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin v1.41.1

- name: Run Linter
run: golangci-lint run --timeout=10m -v
Expand Down
6 changes: 3 additions & 3 deletions registry/base_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,9 @@ func (r *BaseRegistry) Destroy() {
// first step close registry's all listeners
r.facadeBasedRegistry.CloseListener()
// then close r.done to notify other program who listen to it
close(r.done)
close(r.Done())
// wait waitgroup done (wait listeners outside close over)
r.wg.Wait()
r.WaitGroup().Wait()

// close registry client
r.closeRegisters()
Expand Down Expand Up @@ -474,7 +474,7 @@ func (r *BaseRegistry) closeRegisters() {
// IsAvailable judge to is registry not closed by chan r.done
func (r *BaseRegistry) IsAvailable() bool {
select {
case <-r.done:
case <-r.Done():
return false
default:
return true
Expand Down
9 changes: 6 additions & 3 deletions registry/etcdv3/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,8 @@ func newETCDV3Registry(url *common.URL) (registry.Registry, error) {
); err != nil {
return nil, err
}
r.WaitGroup().Add(1) // etcdv3 client start successful, then wg +1

go etcdv3.HandleClientRestart(r)

r.handleClientRestart()
r.InitListeners()

return r, nil
Expand Down Expand Up @@ -175,3 +173,8 @@ func (r *etcdV3Registry) DoSubscribe(svc *common.URL) (registry.Listener, error)
func (r *etcdV3Registry) DoUnsubscribe(conf *common.URL) (registry.Listener, error) {
return nil, perrors.New("DoUnsubscribe is not support in etcdV3Registry")
}

func (r *etcdV3Registry) handleClientRestart() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AsyncHandleClientRestart

r.WaitGroup().Add(1)
go etcdv3.HandleClientRestart(r)
}
5 changes: 2 additions & 3 deletions registry/kubernetes/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,6 @@ func newKubernetesRegistry(url *common.URL) (registry.Registry, error) {
return nil, perrors.WithStack(err)
}

r.WaitGroup().Add(1)
go r.HandleClientRestart()
r.InitListeners()

Expand Down Expand Up @@ -191,12 +190,12 @@ func newMockKubernetesRegistry(

// HandleClientRestart will reconnect to kubernetes registry center
func (r *kubernetesRegistry) HandleClientRestart() {
r.WaitGroup().Add(1)
defer r.WaitGroup().Done()
var (
err error
failTimes int
)

defer r.WaitGroup().Done()
LOOP:
for {
select {
Expand Down
8 changes: 6 additions & 2 deletions registry/zookeeper/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func newZkRegistry(url *common.URL) (registry.Registry, error) {
return nil, err
}

r.WaitGroup().Add(1) //zk client start successful, then wg +1
r.WaitGroup().Add(1)
go zookeeper.HandleClientRestart(r)

r.listener = zookeeper.NewZkEventListener(r.client)
Expand Down Expand Up @@ -108,7 +108,6 @@ func newMockZkRegistry(url *common.URL, opts ...gxzookeeper.Option) (*zk.TestClu
if err != nil {
return nil, nil, err
}
r.WaitGroup().Add(1) // zk client start successful, then wg +1
go zookeeper.HandleClientRestart(r)
r.InitListeners()
return c, r, nil
Expand Down Expand Up @@ -314,3 +313,8 @@ func (r *zkRegistry) getCloseListener(conf *common.URL) (*RegistryConfigurationL

return zkListener, nil
}

func (r *zkRegistry) handleClientRestart() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AsyncHandleClientRestart

r.WaitGroup().Add(1)
go zookeeper.HandleClientRestart(r)
}
53 changes: 6 additions & 47 deletions remoting/etcdv3/facade.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,16 @@
package etcdv3

import (
getty "github.com/apache/dubbo-getty"
"sync"
"time"
)

import (
gxetcd "github.com/dubbogo/gost/database/kv/etcd/v3"
perrors "github.com/pkg/errors"
)

import (
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/logger"
)

Expand All @@ -45,56 +42,18 @@ type clientFacade interface {
}

// HandleClientRestart keeps the connection between client and server
// This method should be used only once. You can use handleClientRestart() in package registry.
func HandleClientRestart(r clientFacade) {
var (
err error
failTimes int
)

defer r.WaitGroup().Done()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

r.WaitGroup()的done,wait和add使用散落在各处,remoting作为基础组件,是否可能有同学直接调用 HandleClientRestart ,然后函数结束出现 negative 的问题?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

有可能,用once限制一下HandleClientRestart的调用次数是否可以呢;该方法在init包被引入的时候调用,只调用一次。

LOOP:
for {
select {
case <-r.Client().GetCtx().Done():
r.RestartCallBack()
// re-register all services
time.Sleep(10 * time.Microsecond)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

为什么这里要Sleep?
我看RestartCallBack的逻辑是重新注册,下面的逻辑是重建client,并且有重试逻辑。使用RestartCallBack的目的是什么呢?

Copy link
Contributor Author

@WilliamLeaves WilliamLeaves Jul 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

代码实现参照zk对于连接过多的优化,做了类似的实现。
<-GetCtx().Done()意味着已经断连,需要调用RestartCallBack重新注册

case <-r.Done():
logger.Warnf("(ETCDV3ProviderRegistry)reconnectETCDV3 goroutine exit now...")
break LOOP
// re-register all services
case <-r.Client().Done():
r.ClientLock().Lock()
clientName := gxetcd.RegistryETCDV3Client
timeout, _ := time.ParseDuration(r.GetURL().GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT))
endpoints := r.Client().GetEndPoints()
r.Client().Close()
r.SetClient(nil)
r.ClientLock().Unlock()

// try to connect to etcd,
failTimes = 0
for {
after := getty.GetTimeWheel().After(timeSecondDuration(failTimes * gxetcd.ConnDelay))
select {
case <-r.Done():
logger.Warnf("(ETCDV3ProviderRegistry)reconnectETCDRegistry goroutine exit now...")
break LOOP
case <-after: // avoid connect frequent
}
err = ValidateClient(
r,
gxetcd.WithName(clientName),
gxetcd.WithEndpoints(endpoints...),
gxetcd.WithTimeout(timeout),
gxetcd.WithHeartbeat(1),
)
logger.Infof("ETCDV3ProviderRegistry.validateETCDV3Client(etcd Addr{%s}) = error{%#v}",
endpoints, perrors.WithStack(err))
if err == nil && r.RestartCallBack() {
break
}
failTimes++
if gxetcd.MaxFailTimes <= failTimes {
failTimes = gxetcd.MaxFailTimes
}
}
return
}
}
}
1 change: 1 addition & 0 deletions remoting/zookeeper/facade.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type ZkClientFacade interface {
}

// HandleClientRestart keeps the connection between client and server
// This method should be used only once. You can use handleClientRestart() in package registry.
func HandleClientRestart(r ZkClientFacade) {
defer r.WaitGroup().Done()
for {
Expand Down