Skip to content

Commit

Permalink
Merge 5dc973a into 5b7b35b
Browse files Browse the repository at this point in the history
  • Loading branch information
cvictory authored Jul 2, 2021
2 parents 5b7b35b + 5dc973a commit 467c867
Show file tree
Hide file tree
Showing 12 changed files with 235 additions and 86 deletions.
21 changes: 16 additions & 5 deletions cluster/cluster_impl/base_cluster_invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,27 @@ type baseClusterInvoker struct {
availablecheck bool
destroyed *atomic.Bool
stickyInvoker protocol.Invoker
// healthState
serviceHealthState *protocol.ServiceHealthState
}

func newBaseClusterInvoker(directory cluster.Directory) baseClusterInvoker {
return baseClusterInvoker{
directory: directory,
availablecheck: true,
destroyed: atomic.NewBool(false),
// init from directory
serviceHealthState: directory.ServiceHealthState(),
}
}

func (invoker *baseClusterInvoker) getServiceHealthState() *protocol.ServiceHealthState {
if invoker.serviceHealthState == nil {
invoker.serviceHealthState = protocol.NewServiceState(invoker.GetURL().ServiceKey())
}
return invoker.serviceHealthState
}

func (invoker *baseClusterInvoker) GetURL() *common.URL {
return invoker.directory.GetURL()
}
Expand Down Expand Up @@ -118,12 +129,12 @@ func (invoker *baseClusterInvoker) doSelectInvoker(lb cluster.LoadBalance, invoc
if len(invokers) == 0 {
return nil
}
protocol.TryRefreshBlackList()
invoker.getServiceHealthState().TryRefreshBlackList()
if len(invokers) == 1 {
if invokers[0].IsAvailable() {
return invokers[0]
}
protocol.SetInvokerUnhealthyStatus(invokers[0])
invoker.getServiceHealthState().SetInvokerUnhealthyStatus(invokers[0])
logger.Errorf("the invokers of %s is nil. ", invokers[0].GetURL().ServiceKey())
return nil
}
Expand All @@ -132,10 +143,10 @@ func (invoker *baseClusterInvoker) doSelectInvoker(lb cluster.LoadBalance, invoc

//judge if the selected Invoker is invoked and available
if (!selectedInvoker.IsAvailable() && invoker.availablecheck) || isInvoked(selectedInvoker, invoked) {
protocol.SetInvokerUnhealthyStatus(selectedInvoker)
invoker.getServiceHealthState().SetInvokerUnhealthyStatus(selectedInvoker)
otherInvokers := getOtherInvokers(invokers, selectedInvoker)
// do reselect
for i := 0; i < 3; i++ {
for i := 0; i < 5; i++ {
if len(otherInvokers) == 0 {
// no other ivk to reselect, return to fallback
break
Expand All @@ -148,7 +159,7 @@ func (invoker *baseClusterInvoker) doSelectInvoker(lb cluster.LoadBalance, invoc
if !reselectedInvoker.IsAvailable() {
logger.Infof("the invoker of %s is not available, maybe some network error happened or the server is shutdown.",
invoker.GetURL().Ip)
protocol.SetInvokerUnhealthyStatus(reselectedInvoker)
invoker.getServiceHealthState().SetInvokerUnhealthyStatus(reselectedInvoker)
otherInvokers = getOtherInvokers(otherInvokers, reselectedInvoker)
continue
}
Expand Down
3 changes: 3 additions & 0 deletions cluster/cluster_impl/base_cluster_invoker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
)

import (
"github.com/apache/dubbo-go/cluster/directory"
"github.com/apache/dubbo-go/cluster/loadbalance"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/protocol"
Expand All @@ -47,6 +48,7 @@ func TestStickyNormal(t *testing.T) {
}
base := &baseClusterInvoker{}
base.availablecheck = true
base.directory = directory.NewStaticDirectory(invokers)
invoked := []protocol.Invoker{}

tmpRandomBalance := loadbalance.NewRandomLoadBalance()
Expand All @@ -65,6 +67,7 @@ func TestStickyNormalWhenError(t *testing.T) {
}
base := &baseClusterInvoker{}
base.availablecheck = true
base.directory = directory.NewStaticDirectory(invokers)

invoked := []protocol.Invoker{}
result := base.doSelect(loadbalance.NewRandomLoadBalance(), invocation.NewRPCInvocation(baseClusterInvokerMethodName, nil, nil), invokers, invoked)
Expand Down
2 changes: 1 addition & 1 deletion cluster/cluster_impl/failback_cluster_invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func (invoker *failbackClusterInvoker) Invoke(ctx context.Context, invocation pr
invoker.taskList.Put(timerTask)

logger.Errorf("Failback to invoke the method %v in the service %v, wait for retry in background. Ignored exception: %v.\n",
methodName, url.Service(), result.Error().Error())
methodName, url.Service(), result.Error())
// ignore
return &protocol.RPCResult{}
}
Expand Down
2 changes: 1 addition & 1 deletion cluster/cluster_impl/failsafe_cluster_invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (invoker *failsafeClusterInvoker) Invoke(ctx context.Context, invocation pr
result = ivk.Invoke(ctx, invocation)
if result.Error() != nil {
// ignore
logger.Errorf("Failsafe ignore exception: %v.\n", result.Error().Error())
logger.Errorf("Failsafe ignore exception: %v.\n", result.Error())
return &protocol.RPCResult{}
}
return result
Expand Down
1 change: 1 addition & 0 deletions cluster/directory.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@ import (
type Directory interface {
common.Node
List(invocation protocol.Invocation) []protocol.Invoker
ServiceHealthState() *protocol.ServiceHealthState
}
9 changes: 9 additions & 0 deletions cluster/directory/static_directory.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import (
type staticDirectory struct {
BaseDirectory
invokers []protocol.Invoker
// healthState
serviceHealthState *protocol.ServiceHealthState
}

// NewStaticDirectory Create a new staticDirectory with invokers
Expand All @@ -42,6 +44,8 @@ func NewStaticDirectory(invokers []protocol.Invoker) *staticDirectory {
dir := &staticDirectory{
BaseDirectory: NewBaseDirectory(url),
invokers: invokers,
// init serviceHealthState
serviceHealthState: protocol.NewServiceState(url.ServiceKey()),
}

return dir
Expand Down Expand Up @@ -74,6 +78,11 @@ func (dir *staticDirectory) List(invocation protocol.Invocation) []protocol.Invo
return routerChain.Route(dirUrl, invocation)
}

// Fetch ServiceHealthState
func (dir *staticDirectory) ServiceHealthState() *protocol.ServiceHealthState {
return dir.serviceHealthState
}

// Destroy Destroy
func (dir *staticDirectory) Destroy() {
dir.BaseDirectory.Destroy(func() {
Expand Down
3 changes: 2 additions & 1 deletion cluster/router/chain/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,8 @@ func (c *RouterChain) Loop() {
for {
select {
case <-ticker.C:
if protocol.GetAndRefreshState() {
if protocol.GetAndRefreshState(c.url) {
logger.Infof("start to build route cache because the invokers in black list is changed [%s]", c.url.ServiceKey())
c.buildCache()
}
case <-c.notify:
Expand Down
43 changes: 29 additions & 14 deletions cluster/router/conncheck/conn_check_route_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,10 @@ func TestConnCheckRouterRoute(t *testing.T) {
invoker1 := NewMockInvoker(url1)
invoker2 := NewMockInvoker(url2)
invoker3 := NewMockInvoker(url3)
protocol.SetInvokerUnhealthyStatus(invoker1)
protocol.SetInvokerUnhealthyStatus(invoker2)
state1 := protocol.NewServiceState(consumerURL.ServiceKey())

state1.SetInvokerUnhealthyStatus(invoker1)
state1.SetInvokerUnhealthyStatus(invoker2)

invokers = append(invokers, invoker1, invoker2, invoker3)
inv := invocation.NewRPCInvocation(connCheckRouteMethodNameTest, nil, nil)
Expand All @@ -75,7 +77,7 @@ func TestConnCheckRouterRoute(t *testing.T) {
assert.True(t, len(res.ToArray()) == 1)

// check blacklist remove
protocol.RemoveInvokerUnhealthyStatus(invoker1)
state1.RemoveInvokerUnhealthyStatus(invoker1)
res = hcr.Route(utils.ToBitmap(invokers), setUpAddrCache(hcr.(*ConnCheckRouter), invokers), consumerURL, inv)
// now invoker3 invoker1 is healthy
assert.True(t, len(res.ToArray()) == 2)
Expand All @@ -96,12 +98,23 @@ func TestRecovery(t *testing.T) {
invoker1.EXPECT().IsAvailable().Return(true).AnyTimes()
invoker2.EXPECT().IsAvailable().Return(true).AnyTimes()

protocol.SetInvokerUnhealthyStatus(invoker1)
protocol.SetInvokerUnhealthyStatus(invoker2)
assert.Equal(t, len(protocol.GetBlackListInvokers(16)), 2)
protocol.TryRefreshBlackList()
time.Sleep(1 * time.Second)
assert.Equal(t, len(protocol.GetBlackListInvokers(16)), 0)
state1 := protocol.NewServiceState(invoker1.GetURL().ServiceKey())
state2 := protocol.NewServiceState(invoker2.GetURL().ServiceKey())
assert.Equal(t, len(state1.GetBlackListInvokers(16)), 0)
assert.Equal(t, len(state2.GetBlackListInvokers(16)), 0)
state1.SetInvokerUnhealthyStatus(invoker1)
state2.SetInvokerUnhealthyStatus(invoker2)
assert.Equal(t, len(state1.GetBlackListInvokers(16)), 1)
assert.Equal(t, len(state2.GetBlackListInvokers(16)), 1)
state1.TryRefreshBlackList()
time.Sleep(300 * time.Millisecond)
assert.Equal(t, len(state1.GetBlackListInvokers(16)), 0)
assert.Equal(t, len(state2.GetBlackListInvokers(16)), 1)
state2.TryRefreshBlackList()
time.Sleep(300 * time.Millisecond)
assert.Equal(t, len(state1.GetBlackListInvokers(16)), 0)
assert.Equal(t, len(state2.GetBlackListInvokers(16)), 0)

}

func TestPrintlnConnCheckRouterRoute(t *testing.T) {
Expand All @@ -125,9 +138,11 @@ func TestPrintlnConnCheckRouterRoute(t *testing.T) {
invoker1 := NewMockInvoker(url1)
invoker2 := NewMockInvoker(url2)
invoker3 := NewMockInvoker(url3)
protocol.SetInvokerUnhealthyStatus(invoker1)
protocol.SetInvokerUnhealthyStatus(invoker2)
protocol.SetInvokerUnhealthyStatus(invoker3)

srvState := protocol.NewServiceState(consumerURL.ServiceKey())
srvState.SetInvokerUnhealthyStatus(invoker1)
srvState.SetInvokerUnhealthyStatus(invoker2)
srvState.SetInvokerUnhealthyStatus(invoker3)

invokers = append(invokers, invoker1, invoker2, invoker3)
inv := invocation.NewRPCInvocation(connCheckRouteMethodNameTest, nil, nil)
Expand All @@ -145,8 +160,8 @@ func TestPrintlnConnCheckRouterRoute(t *testing.T) {
assert.Equal(t, router.RouteSnapshot(cache), "conn-check-router -> Count:0 {}")

// check blacklist remove
protocol.RemoveInvokerUnhealthyStatus(invoker1)
protocol.RemoveInvokerUnhealthyStatus(invoker3)
srvState.RemoveInvokerUnhealthyStatus(invoker1)
srvState.RemoveInvokerUnhealthyStatus(invoker3)
cache = setUpAddrCache(hcr.(*ConnCheckRouter), invokers)
res = hcr.Route(utils.ToBitmap(invokers), cache, consumerURL, inv)
// now invoker3 invoker1 is healthy
Expand Down
2 changes: 1 addition & 1 deletion cluster/router/conncheck/conn_health_check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,6 @@ func TestDefaultConnCheckerIsHealthy(t *testing.T) {
invoker = NewMockInvoker(url)
cc = NewDefaultConnChecker(url).(*DefaultConnChecker)
// add to black list
protocol.SetInvokerUnhealthyStatus(invoker)
protocol.NewServiceState(url.ServiceKey()).SetInvokerUnhealthyStatus(invoker)
assert.False(t, cc.IsConnHealthy(invoker))
}
3 changes: 2 additions & 1 deletion common/constant/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ const (
RETRY_PERIOD_KEY = "retry.period"
RETRY_TIMES_KEY = "retry.times"
CYCLE_REPORT_KEY = "cycle.report"
DEFAULT_BLACK_LIST_RECOVER_BLOCK = 16
DEFAULT_BLACK_LIST_RECOVER_BLOCK = 64
DEFAULT_BLACK_LIST_MAX_RETRY_TIMES = 512
)

const (
Expand Down
Loading

0 comments on commit 467c867

Please sign in to comment.