Skip to content

Commit

Permalink
Merge pull request #708 from beiwei30/bitmap-router
Browse files Browse the repository at this point in the history
Ftr: bitmap in router chain
  • Loading branch information
AlexStocks authored Aug 28, 2020
2 parents d0bfafb + 62a7e88 commit e6205de
Show file tree
Hide file tree
Showing 19 changed files with 902 additions and 302 deletions.
8 changes: 6 additions & 2 deletions cluster/directory/static_directory.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,13 @@ func NewStaticDirectory(invokers []protocol.Invoker) *staticDirectory {
if len(invokers) > 0 {
url = invokers[0].GetUrl()
}
return &staticDirectory{
dir := &staticDirectory{
BaseDirectory: NewBaseDirectory(&url),
invokers: invokers,
}

dir.routerChain.SetInvokers(invokers)
return dir
}

//for-loop invokers ,if all invokers is available ,then it means directory is available
Expand All @@ -69,7 +72,7 @@ func (dir *staticDirectory) List(invocation protocol.Invocation) []protocol.Invo
return invokers
}
dirUrl := dir.GetUrl()
return routerChain.Route(invokers, &dirUrl, invocation)
return routerChain.Route(&dirUrl, invocation)
}

// Destroy Destroy
Expand All @@ -92,6 +95,7 @@ func (dir *staticDirectory) BuildRouterChain(invokers []protocol.Invoker) error
if e != nil {
return e
}
routerChain.SetInvokers(dir.invokers)
dir.SetRouterChain(routerChain)
return nil
}
9 changes: 8 additions & 1 deletion cluster/router/chan.go → cluster/router/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,16 @@

package router

import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/protocol"
)

// Chain
type Chain interface {
router
Route(*common.URL, protocol.Invocation) []protocol.Invoker
// Refresh invokers
SetInvokers([]protocol.Invoker)
// AddRouters Add routers
AddRouters([]PriorityRouter)
}
210 changes: 192 additions & 18 deletions cluster/router/chain/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@
package chain

import (
"math"
"sort"
"sync"
"sync/atomic"
"time"
)

import (
Expand All @@ -30,11 +31,18 @@ import (
import (
"github.com/apache/dubbo-go/cluster/router"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/protocol"
)

const (
timeInterval = 5 * time.Second
timeThreshold = 2 * time.Second
countThreshold = 5
)

// RouterChain Router chain
type RouterChain struct {
// Full list of addresses from registry, classified by method name.
Expand All @@ -48,30 +56,38 @@ type RouterChain struct {
mutex sync.RWMutex

url common.URL

// The times of address notification since last update for address cache
count int64
// The timestamp of last update for address cache
last time.Time
// Channel for notify to update the address cache
notify chan struct{}
// Address cache
cache atomic.Value
}

// Route Loop routers in RouterChain and call Route method to determine the target invokers list.
func (c *RouterChain) Route(invokers []protocol.Invoker, url *common.URL, invocation protocol.Invocation) []protocol.Invoker {
finalInvokers := invokers
l := len(c.routers)
rs := make([]router.PriorityRouter, l, int(math.Ceil(float64(l)*1.2)))
c.mutex.RLock()
copy(rs, c.routers)
c.mutex.RUnlock()
func (c *RouterChain) Route(url *common.URL, invocation protocol.Invocation) []protocol.Invoker {
cache := c.loadCache()
if cache == nil {
c.mutex.RLock()
defer c.mutex.RUnlock()
return c.invokers
}

for _, r := range rs {
finalInvokers = r.Route(finalInvokers, url, invocation)
bitmap := cache.bitmap
for _, r := range c.copyRouters() {
bitmap = r.Route(bitmap, cache, url, invocation)
}
return finalInvokers
}

// SetInvokers notify router chain of the initial addresses from registry at the first time. Notify whenever addresses in registry change.
func (c *RouterChain) SetInvokers(invokers []protocol.Invoker) {
for _, r := range c.routers {
if notifyRouter, ok := r.(router.NotifyRouter); ok {
notifyRouter.Notify(invokers)
}
indexes := bitmap.ToArray()
finalInvokers := make([]protocol.Invoker, len(indexes))
for i, index := range indexes {
finalInvokers[i] = cache.invokers[index]
}

return finalInvokers
}

// AddRouters Add routers to router chain
Expand All @@ -88,6 +104,116 @@ func (c *RouterChain) AddRouters(routers []router.PriorityRouter) {
c.routers = newRouters
}

// SetInvokers receives updated invokers from registry center. If the times of notification exceeds countThreshold and
// time interval exceeds timeThreshold since last cache update, then notify to update the cache.
func (c *RouterChain) SetInvokers(invokers []protocol.Invoker) {
c.mutex.Lock()
c.invokers = invokers
c.mutex.Unlock()

c.count++
now := time.Now()
if c.count >= countThreshold && now.Sub(c.last) >= timeThreshold {
c.last = now
c.count = 0
go func() {
c.notify <- struct{}{}
}()
}
}

// loop listens on events to update the address cache when it's necessary, either when it receives notification
// from address update, or when timeInterval exceeds.
func (c *RouterChain) loop() {
for {
ticker := time.NewTicker(timeInterval)
select {
case <-ticker.C:
c.buildCache()
case <-c.notify:
c.buildCache()
}
}
}

// copyRouters make a snapshot copy from RouterChain's router list.
func (c *RouterChain) copyRouters() []router.PriorityRouter {
c.mutex.RLock()
defer c.mutex.RUnlock()
ret := make([]router.PriorityRouter, 0, len(c.routers))
ret = append(ret, c.routers...)
return ret
}

// copyInvokers copies a snapshot of the received invokers.
func (c *RouterChain) copyInvokers() []protocol.Invoker {
c.mutex.RLock()
defer c.mutex.RUnlock()
if c.invokers == nil || len(c.invokers) == 0 {
return nil
}
ret := make([]protocol.Invoker, 0, len(c.invokers))
ret = append(ret, c.invokers...)
return ret
}

// loadCache loads cache from sync.Value to guarantee the visibility
func (c *RouterChain) loadCache() *InvokerCache {
v := c.cache.Load()
if v == nil {
return nil
}

return v.(*InvokerCache)
}

// copyInvokerIfNecessary compares chain's invokers copy and cache's invokers copy, to avoid copy as much as possible
func (c *RouterChain) copyInvokerIfNecessary(cache *InvokerCache) []protocol.Invoker {
var invokers []protocol.Invoker
if cache != nil {
invokers = cache.invokers
}

c.mutex.RLock()
defer c.mutex.RUnlock()
if isInvokersChanged(invokers, c.invokers) {
invokers = c.copyInvokers()
}
return invokers
}

// buildCache builds address cache with the new invokers for all poolable routers.
func (c *RouterChain) buildCache() {
origin := c.loadCache()
invokers := c.copyInvokerIfNecessary(origin)
if invokers == nil || len(invokers) == 0 {
return
}

var (
mutex sync.Mutex
wg sync.WaitGroup
)

cache := BuildCache(invokers)
for _, r := range c.copyRouters() {
if p, ok := r.(router.Poolable); ok {
wg.Add(1)
go func(p router.Poolable) {
defer wg.Done()
pool, info := poolRouter(p, origin, invokers)
mutex.Lock()
defer mutex.Unlock()
cache.pools[p.Name()] = pool
cache.metadatas[p.Name()] = info
}(p)
}
}
wg.Wait()

c.cache.Store(cache)
}

// URL Return URL in RouterChain
func (c *RouterChain) URL() common.URL {
return c.url
Expand Down Expand Up @@ -118,14 +244,62 @@ func NewRouterChain(url *common.URL) (*RouterChain, error) {
chain := &RouterChain{
builtinRouters: routers,
routers: newRouters,
last: time.Now(),
notify: make(chan struct{}),
}
if url != nil {
chain.url = *url
}

go chain.loop()
return chain, nil
}

// poolRouter calls poolable router's Pool() to create new address pool and address metadata if necessary.
// If the corresponding cache entry exists, and the poolable router answers no need to re-pool (possibly because its
// rule doesn't change), and the address list doesn't change, then the existing data will be re-used.
func poolRouter(p router.Poolable, origin *InvokerCache, invokers []protocol.Invoker) (router.AddrPool, router.AddrMetadata) {
name := p.Name()
if isCacheMiss(origin, name) || p.ShouldPool() || &(origin.invokers) != &invokers {
logger.Debugf("build address cache for router %q", name)
return p.Pool(invokers)
}

logger.Debugf("reuse existing address cache for router %q", name)
return origin.pools[name], origin.metadatas[name]
}

// isCacheMiss checks if the corresponding cache entry for a poolable router has already existed.
// False returns when the cache is nil, or cache's pool is nil, or cache's invokers snapshot is nil, or the entry
// doesn't exist.
func isCacheMiss(cache *InvokerCache, key string) bool {
if cache == nil || cache.pools == nil || cache.invokers == nil || cache.pools[key] == nil {
return true
}
return false
}

// isInvokersChanged compares new invokers on the right changes, compared with the old invokers on the left.
func isInvokersChanged(left []protocol.Invoker, right []protocol.Invoker) bool {
if len(right) != len(left) {
return true
}

for _, r := range right {
found := false
for _, l := range left {
if common.IsEquals(l.GetUrl(), r.GetUrl(), constant.TIMESTAMP_KEY, constant.REMOTE_TIMESTAMP_KEY) {
found = true
break
}
}
if !found {
return true
}
}
return false
}

// sortRouter Sort router instance by priority with stable algorithm
func sortRouter(routers []router.PriorityRouter) {
sort.Stable(byPriority(routers))
Expand Down
13 changes: 9 additions & 4 deletions cluster/router/chain/chain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,12 +172,12 @@ func TestRouterChainRoute(t *testing.T) {
invokers := []protocol.Invoker{}
dubboURL, _ := common.NewURL(fmt.Sprintf(dubboForamt, test1234IP, port20000))
invokers = append(invokers, protocol.NewBaseInvoker(dubboURL))

chain.SetInvokers(invokers)
chain.buildCache()

targetURL, _ := common.NewURL(fmt.Sprintf(consumerFormat, test1111IP))
inv := &invocation.RPCInvocation{}
finalInvokers := chain.Route(invokers, &targetURL, inv)
finalInvokers := chain.Route(&targetURL, inv)

assert.Equal(t, 1, len(finalInvokers))
}
Expand Down Expand Up @@ -213,10 +213,12 @@ conditions:
invokers := []protocol.Invoker{}
dubboURL, _ := common.NewURL(fmt.Sprintf(dubboForamt, test1234IP, port20000))
invokers = append(invokers, protocol.NewBaseInvoker(dubboURL))
chain.SetInvokers(invokers)
chain.buildCache()

targetURL, _ := common.NewURL(fmt.Sprintf(consumerFormat, test1111IP))
inv := &invocation.RPCInvocation{}
finalInvokers := chain.Route(invokers, &targetURL, inv)
finalInvokers := chain.Route(&targetURL, inv)

assert.Equal(t, 0, len(finalInvokers))
}
Expand All @@ -236,13 +238,16 @@ func TestRouterChainRouteNoRoute(t *testing.T) {

url := getConditionRouteUrl(applicationKey)
assert.NotNil(t, url)

invokers := []protocol.Invoker{}
dubboURL, _ := common.NewURL(fmt.Sprintf(dubboForamt, test1234IP, port20000))
invokers = append(invokers, protocol.NewBaseInvoker(dubboURL))
chain.SetInvokers(invokers)
chain.buildCache()

targetURL, _ := common.NewURL(fmt.Sprintf(consumerFormat, test1111IP))
inv := &invocation.RPCInvocation{}
finalInvokers := chain.Route(invokers, &targetURL, inv)
finalInvokers := chain.Route(&targetURL, inv)

assert.Equal(t, 0, len(finalInvokers))
}
Expand Down
Loading

0 comments on commit e6205de

Please sign in to comment.