Skip to content

Commit

Permalink
defeat too much node listen goroutine
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexStocks committed Jun 5, 2021
1 parent cdae603 commit eb6e261
Showing 1 changed file with 21 additions and 7 deletions.
28 changes: 21 additions & 7 deletions remoting/zookeeper/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (
"github.com/dubbogo/go-zookeeper/zk"
gxzookeeper "github.com/dubbogo/gost/database/kv/zk"
perrors "github.com/pkg/errors"

uatomic "go.uber.org/atomic"
)

import (
Expand All @@ -39,14 +41,14 @@ import (
)

var (
defaultTTL = 15 * time.Minute
defaultTTL = 10 * time.Minute
)

// nolint
type ZkEventListener struct {
client *gxzookeeper.ZookeeperClient
pathMapLock sync.Mutex
pathMap map[string]struct{}
pathMap map[string]*uatomic.Int32
wg sync.WaitGroup
exit chan struct{}
}
Expand All @@ -55,7 +57,7 @@ type ZkEventListener struct {
func NewZkEventListener(client *gxzookeeper.ZookeeperClient) *ZkEventListener {
return &ZkEventListener{
client: client,
pathMap: make(map[string]struct{}),
pathMap: make(map[string]*uatomic.Int32),
exit: make(chan struct{}),
}
}
Expand Down Expand Up @@ -83,6 +85,16 @@ func (l *ZkEventListener) ListenServiceNodeEvent(zkPath string, listener remotin
// nolint
func (l *ZkEventListener) listenServiceNodeEvent(zkPath string, listener ...remoting.DataListener) bool {
defer l.wg.Done()

l.pathMapLock.Lock()
a, ok := l.pathMap[zkPath]
if !ok || a.Load() > 1 {
return false
}
a.Inc()
l.pathMapLock.Unlock()
defer a.Dec()

var zkEvent zk.Event
for {
keyEventCh, err := l.client.ExistW(zkPath)
Expand Down Expand Up @@ -174,6 +186,7 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li
// listen l service node
l.wg.Add(1)
go func(node string, listener remoting.DataListener) {
// invoker l.wg.Done() in l.listenServiceNodeEvent
if l.listenServiceNodeEvent(node, listener) {
logger.Warnf("delete zkNode{%s}", node)
listener.DataChange(remoting.Event{Path: node, Action: remoting.EventTypeDel})
Expand Down Expand Up @@ -271,15 +284,15 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkPath string, listen
// Save the path to avoid listen repeatedly
l.pathMapLock.Lock()
_, ok := l.pathMap[dubboPath]
if !ok {
l.pathMap[dubboPath] = uatomic.NewInt32(0)
}
l.pathMapLock.Unlock()
if ok {
logger.Warnf("@zkPath %s has already been listened.", dubboPath)
continue
}

l.pathMapLock.Lock()
l.pathMap[dubboPath] = struct{}{}
l.pathMapLock.Unlock()
// When Zk disconnected, the Conn will be set to nil, so here need check the value of Conn
l.client.RLock()
if l.client.Conn == nil {
Expand All @@ -298,13 +311,14 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkPath string, listen
logger.Infof("listen dubbo service key{%s}", dubboPath)
l.wg.Add(1)
go func(zkPath string, listener remoting.DataListener) {
// invoker l.wg.Done() in l.listenServiceNodeEvent
if l.listenServiceNodeEvent(zkPath, listener) {
listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeDel})
l.pathMapLock.Lock()
delete(l.pathMap, zkPath)
l.pathMapLock.Unlock()
logger.Warnf("listenDirEvent->listenSelf(zk path{%s}) goroutine exit now", zkPath)
}
logger.Warnf("listenDirEvent->listenSelf(zk path{%s}) goroutine exit now", zkPath)
}(dubboPath, listener)

// listen sub path recursive
Expand Down

0 comments on commit eb6e261

Please sign in to comment.