Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix:deal the panic when invoker destroy #358

Merged
merged 7 commits into from
Feb 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 30 additions & 5 deletions protocol/dubbo/dubbo_invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"context"
"strconv"
"sync"
"sync/atomic"
"time"
)

import (
Expand All @@ -38,7 +40,8 @@ import (

var (
// ErrNoReply ...
ErrNoReply = perrors.New("request need @response")
ErrNoReply = perrors.New("request need @response")
ErrDestroyedInvoker = perrors.New("request Destroyed invoker")
)

var (
Expand All @@ -50,13 +53,16 @@ type DubboInvoker struct {
protocol.BaseInvoker
client *Client
quitOnce sync.Once
// Used to record the number of requests. -1 represent this DubboInvoker is destroyed
reqNum int64
}

// NewDubboInvoker ...
func NewDubboInvoker(url common.URL, client *Client) *DubboInvoker {
return &DubboInvoker{
BaseInvoker: *protocol.NewBaseInvoker(url),
client: client,
reqNum: 0,
}
}

Expand All @@ -66,6 +72,15 @@ func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocati
err error
result protocol.RPCResult
)
if di.reqNum < 0 {
// Generally, the case will not happen, because the invoker has been removed
// from the invoker list before destroy,so no new request will enter the destroyed invoker
logger.Warnf("this dubboInvoker is destroyed")
result.Err = ErrDestroyedInvoker
return &result
}
atomic.AddInt64(&(di.reqNum), 1)
defer atomic.AddInt64(&(di.reqNum), -1)

inv := invocation.(*invocation_impl.RPCInvocation)
for _, k := range attachmentKey {
Expand Down Expand Up @@ -110,11 +125,21 @@ func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocati
// Destroy ...
func (di *DubboInvoker) Destroy() {
di.quitOnce.Do(func() {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

someone check this pls: Can this be async

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the reqNum is used for judging if dubbo invoker is available ? Why not use isAvailable() func instead?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is used to check whether there are any stock requests in the invoker during destroy

di.BaseInvoker.Destroy()

if di.client != nil {
di.client.Close()
for {
if di.reqNum == 0 {
di.reqNum = -1
logger.Info("dubboInvoker is destroyed,url:{%s}", di.GetUrl().Key())
di.BaseInvoker.Destroy()
if di.client != nil {
di.client.Close()
di.client = nil
}
break
}
logger.Warnf("DubboInvoker is to be destroyed, wait {%v} req end,url:{%s}", di.reqNum, di.GetUrl().Key())
time.Sleep(1 * time.Second)
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In extreme cases, infinite circulation possible? Should we control the upper limit?

Copy link
Member Author

@pantianying pantianying Feb 19, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In extreme cases,there is always a request in the invoker that has not ended. If it is forcibly destroyed, it may cause a program panic. The severity of this result is greater than adding a loop.

})
}

Expand Down
39 changes: 28 additions & 11 deletions registry/directory/directory.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,10 @@ func (dir *registryDirectory) update(res *registry.ServiceEvent) {
}

func (dir *registryDirectory) refreshInvokers(res *registry.ServiceEvent) {
var url *common.URL
var (
url *common.URL
oldInvoker protocol.Invoker = nil
)
//judge is override or others
if res != nil {
url = &res.Service
Expand All @@ -126,10 +129,10 @@ func (dir *registryDirectory) refreshInvokers(res *registry.ServiceEvent) {
switch res.Action {
case remoting.EventTypeAdd, remoting.EventTypeUpdate:
//dir.cacheService.EventTypeAdd(res.Path, dir.serviceTTL)
dir.cacheInvoker(url)
oldInvoker = dir.cacheInvoker(url)
case remoting.EventTypeDel:
//dir.cacheService.EventTypeDel(res.Path, dir.serviceTTL)
dir.uncacheInvoker(url)
oldInvoker = dir.uncacheInvoker(url)
logger.Infof("selector delete service url{%s}", res.Service)
default:
return
Expand All @@ -138,8 +141,14 @@ func (dir *registryDirectory) refreshInvokers(res *registry.ServiceEvent) {

newInvokers := dir.toGroupInvokers()
dir.listenerLock.Lock()
defer dir.listenerLock.Unlock()
dir.cacheInvokers = newInvokers
dir.listenerLock.Unlock()
// After dir.cacheInvokers is updated,destroy the oldInvoker
// Ensure that no request will enter the oldInvoker
if oldInvoker != nil {
oldInvoker.Destroy()
}

}

func (dir *registryDirectory) toGroupInvokers() []protocol.Invoker {
Expand Down Expand Up @@ -177,12 +186,18 @@ func (dir *registryDirectory) toGroupInvokers() []protocol.Invoker {
return groupInvokersList
}

func (dir *registryDirectory) uncacheInvoker(url *common.URL) {
// uncacheInvoker return abandoned Invoker,if no Invoker to be abandoned,return nil
func (dir *registryDirectory) uncacheInvoker(url *common.URL) protocol.Invoker {
logger.Debugf("service will be deleted in cache invokers: invokers key is %s!", url.Key())
dir.cacheInvokersMap.Delete(url.Key())
if cacheInvoker, ok := dir.cacheInvokersMap.Load(url.Key()); ok {
dir.cacheInvokersMap.Delete(url.Key())
return cacheInvoker.(protocol.Invoker)
}
return nil
}

func (dir *registryDirectory) cacheInvoker(url *common.URL) {
// cacheInvoker return abandoned Invoker,if no Invoker to be abandoned,return nil
func (dir *registryDirectory) cacheInvoker(url *common.URL) protocol.Invoker {
dir.overrideUrl(dir.GetDirectoryUrl())
referenceUrl := dir.GetDirectoryUrl().SubURL

Expand All @@ -193,7 +208,7 @@ func (dir *registryDirectory) cacheInvoker(url *common.URL) {
}
if url == nil {
logger.Error("URL is nil ,pls check if service url is subscribe successfully!")
return
return nil
}
//check the url's protocol is equal to the protocol which is configured in reference config or referenceUrl is not care about protocol
if url.Protocol == referenceUrl.Protocol || referenceUrl.Protocol == "" {
Expand All @@ -210,10 +225,11 @@ func (dir *registryDirectory) cacheInvoker(url *common.URL) {
newInvoker := extension.GetProtocol(protocolwrapper.FILTER).Refer(*newUrl)
if newInvoker != nil {
dir.cacheInvokersMap.Store(newUrl.Key(), newInvoker)
cacheInvoker.(protocol.Invoker).Destroy()
return cacheInvoker.(protocol.Invoker)
}
}
}
return nil
}

//select the protocol invokers from the directory
Expand All @@ -239,10 +255,11 @@ func (dir *registryDirectory) IsAvailable() bool {
func (dir *registryDirectory) Destroy() {
//TODO:unregister & unsubscribe
dir.BaseDirectory.Destroy(func() {
for _, ivk := range dir.cacheInvokers {
invokers := dir.cacheInvokers
dir.cacheInvokers = []protocol.Invoker{}
for _, ivk := range invokers {
ivk.Destroy()
}
dir.cacheInvokers = []protocol.Invoker{}
})
}

Expand Down