Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
rpc: add rate limit and rpc client manage in dm-master (#157)
Browse files Browse the repository at this point in the history
  • Loading branch information
amyangfei authored Jun 21, 2019
1 parent 4f4fe78 commit 82ecc4b
Show file tree
Hide file tree
Showing 16 changed files with 936 additions and 364 deletions.
26 changes: 23 additions & 3 deletions dm/ctl/common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,17 @@ import (
"flag"
"fmt"
"net"
"time"

"github.com/BurntSushi/toml"
"github.com/pingcap/dm/pkg/utils"
"github.com/pingcap/errors"
)

const (
defaultRPCTimeout = "10m"
)

// NewConfig creates a new base config for dmctl.
func NewConfig() *Config {
cfg := &Config{}
Expand All @@ -33,6 +38,7 @@ func NewConfig() *Config {
fs.BoolVar(&cfg.printVersion, "V", false, "prints version and exit")
fs.StringVar(&cfg.ConfigFile, "config", "", "path to config file")
fs.StringVar(&cfg.MasterAddr, "master-addr", "", "master API server addr")
fs.StringVar(&cfg.RPCTimeoutStr, "rpc-timeout", defaultRPCTimeout, fmt.Sprintf("rpc timeout, default is %s", defaultRPCTimeout))
fs.StringVar(&cfg.encrypt, "encrypt", "", "encrypt plaintext to ciphertext")

return cfg
Expand All @@ -44,6 +50,9 @@ type Config struct {

MasterAddr string `toml:"master-addr" json:"master-addr"`

RPCTimeoutStr string `toml:"rpc-timeout" json:"rpc-timeout"`
RPCTimeout time.Duration `json:"-"`

ConfigFile string `json:"config-file"`

printVersion bool
Expand Down Expand Up @@ -106,8 +115,7 @@ func (c *Config) Parse(arguments []string) error {
return errors.Annotatef(err, "specify master addr %s", c.MasterAddr)
}

c.adjust()
return nil
return errors.Trace(c.adjust())
}

// configFromFile loads config from file.
Expand All @@ -117,7 +125,19 @@ func (c *Config) configFromFile(path string) error {
}

// adjust adjusts configs
func (c *Config) adjust() {
func (c *Config) adjust() error {
if c.RPCTimeoutStr == "" {
c.RPCTimeoutStr = defaultRPCTimeout
}
timeout, err := time.ParseDuration(c.RPCTimeoutStr)
if err != nil {
return errors.Trace(err)
}
if timeout <= time.Duration(0) {
return errors.Errorf("invalid time duration: %s", c.RPCTimeoutStr)
}
c.RPCTimeout = timeout
return nil
}

// validate host:port format address
Expand Down
14 changes: 13 additions & 1 deletion dm/ctl/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,16 @@ import (

var (
masterClient pb.MasterClient
globalConfig = &Config{}
)

// InitClient initializes dm-worker client or dm-master client
// InitUtils inits necessary dmctl utils
func InitUtils(cfg *Config) error {
globalConfig = cfg
return errors.Trace(InitClient(cfg.MasterAddr))
}

// InitClient initializes dm-master client
func InitClient(addr string) error {
conn, err := grpc.Dial(addr, grpc.WithInsecure(), grpc.WithBackoffMaxDelay(3*time.Second))
if err != nil {
Expand All @@ -44,6 +51,11 @@ func InitClient(addr string) error {
return nil
}

// GlobalConfig returns global dmctl config
func GlobalConfig() *Config {
return globalConfig
}

// MasterClient returns dm-master client
func MasterClient() pb.MasterClient {
return masterClient
Expand Down
2 changes: 1 addition & 1 deletion dm/ctl/ctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type CommandMasterFlags struct {
func Init(cfg *common.Config) error {
// set the log level temporarily
log.SetLevelByString("info")
return errors.Trace(common.InitClient(cfg.MasterAddr))
return errors.Trace(common.InitUtils(cfg))
}

// Start starts running a command
Expand Down
6 changes: 6 additions & 0 deletions dm/ctl/dmctl.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
# dmctl Configuration.

# rpc configuration
# rpc timeout is a positive number plus time unit. we use golang standard time
# units including: "ns", "us", "ms", "s", "m", "h". You should provide a proper
# rpc timeout according to your use scenario.
rpc-timeout = "10m"

master-addr = ":8261"
4 changes: 2 additions & 2 deletions dm/ctl/master/query_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ func queryStatusFunc(cmd *cobra.Command, _ []string) {
return
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cli := common.MasterClient()
ctx, cancel := context.WithTimeout(context.Background(), common.GlobalConfig().RPCTimeout)
defer cancel()
resp, err := cli.QueryStatus(ctx, &pb.QueryStatusListRequest{
Name: taskName,
Workers: workers,
Expand Down
109 changes: 75 additions & 34 deletions dm/master/agent_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,38 @@
package master

import (
"sync"
"context"
"math"

"golang.org/x/time/rate"

"github.com/pingcap/dm/pkg/log"
)

var (
pool *AgentPool // singleton instance
once sync.Once
agentlimit = 20
// rate limit related constant value
const (
DefaultRate float64 = 10
DefaultBurst = 40
ErrorNoEmitToken = "fail to get emit opportunity for %s"
)

type emitFunc func(args ...interface{})

// AgentPool is a pool to control communication with dm-workers
// It provides rate limit control for agent acquire, including dispatch rate r
// and permits bursts of at most b tokens.
// caller shouldn't to hold agent to avoid deadlock
type AgentPool struct {
limit int
agents chan *Agent
requests chan int
agents chan *Agent
cfg *RateLimitConfig
limiter *rate.Limiter
}

// RateLimitConfig holds rate limit config
type RateLimitConfig struct {
rate float64 // dispatch rate
burst int // max permits bursts
}

// Agent communicate with dm-workers
Expand All @@ -36,42 +54,65 @@ type Agent struct {
}

// NewAgentPool returns a agent pool
func NewAgentPool(limit int) *AgentPool {
agents := make(chan *Agent, limit)
for i := 0; i < limit; i++ {
agents <- &Agent{ID: i + 1}
}
func NewAgentPool(cfg *RateLimitConfig) *AgentPool {
requests := make(chan int, int(math.Ceil(1/cfg.rate))+cfg.burst)
agents := make(chan *Agent, cfg.burst)
limiter := rate.NewLimiter(rate.Limit(cfg.rate), cfg.burst)

return &AgentPool{
limit: limit,
agents: agents,
requests: requests,
agents: agents,
cfg: cfg,
limiter: limiter,
}
}

// Apply applies for a agent
func (pool *AgentPool) Apply() *Agent {
agent := <-pool.agents
return agent
}
// if ctx is canceled before we get an agent, returns nil
func (ap *AgentPool) Apply(ctx context.Context, id int) *Agent {
select {
case <-ctx.Done():
return nil
case ap.requests <- id:
}

// Recycle recycles agent
func (pool *AgentPool) Recycle(agent *Agent) {
pool.agents <- agent
select {
case <-ctx.Done():
return nil
case agent := <-ap.agents:
return agent
}
}

// GetAgentPool a singleton agent pool
func GetAgentPool() *AgentPool {
once.Do(func() {
pool = NewAgentPool(agentlimit)
})
return pool
// Start starts AgentPool background dispatcher
func (ap *AgentPool) Start(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case id := <-ap.requests:
err := ap.limiter.Wait(ctx)
if err != nil {
if err != context.Canceled {
log.Fatalf("agent limiter wait meets unexpected error: %v", err)
}
return
}
select {
case <-ctx.Done():
return
case ap.agents <- &Agent{ID: id}:
}
}
}
}

// Emit apply for a agent to communicates with dm-worker
func Emit(fn func(args ...interface{}), args ...interface{}) {
ap := GetAgentPool()
agent := ap.Apply()
defer ap.Recycle(agent)

fn(args...)
// Emit applies for an agent to communicates with dm-worker
func (ap *AgentPool) Emit(ctx context.Context, id int, fn emitFunc, errFn emitFunc, args ...interface{}) {
agent := ap.Apply(ctx, id)
if agent == nil {
errFn(args...)
} else {
fn(args...)
}
}
65 changes: 47 additions & 18 deletions dm/master/agent_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
package master

import (
"context"
"time"

. "github.com/pingcap/check"
)

Expand All @@ -23,33 +26,40 @@ func (t *testMaster) TestAgentPool(c *C) {
}

func (t *testMaster) testPool(c *C) {
var (
rate = 10
burst = 100
)
// test limit
agentlimit = 2
ap := NewAgentPool(&RateLimitConfig{rate: float64(rate), burst: burst})
go ap.Start(context.Background())
pc := make(chan *Agent)

go func() {
ap := GetAgentPool()
pc <- ap.Apply()
pc <- ap.Apply()
pc <- ap.Apply()

for i := 0; i < rate+burst; i++ {
pc <- ap.Apply(context.Background(), i)
}
}()

agent1 := <-pc
c.Assert(agent1.ID, Equals, 1)
agent2 := <-pc
c.Assert(agent2.ID, Equals, 2)
for i := 0; i < burst; i++ {
agent := <-pc
c.Assert(agent.ID, Equals, i)
}
select {
case <-pc:
c.FailNow()
c.Error("should not get agent now")
default:
}

GetAgentPool().Recycle(agent1)
agent := <-pc
c.Assert(agent.ID, Equals, 1)
GetAgentPool().Recycle(agent2)
GetAgentPool().Recycle(agent)
for i := 0; i < rate; i++ {
select {
case agent := <-pc:
c.Assert(agent.ID, Equals, i+burst)
case <-time.After(time.Millisecond * 150):
// add 50ms time drift here
c.Error("get agent timeout")
}
}
}

func (t *testMaster) testEmit(c *C) {
Expand All @@ -60,7 +70,10 @@ func (t *testMaster) testEmit(c *C) {
worker testWorkerType = 1
)

Emit(func(args ...interface{}) {
ap := NewAgentPool(&RateLimitConfig{rate: DefaultRate, burst: DefaultBurst})
go ap.Start(context.Background())

ap.Emit(context.Background(), 1, func(args ...interface{}) {
if len(args) != 2 {
c.Fatalf("args count is not 2, args %v", args)
}
Expand All @@ -80,6 +93,22 @@ func (t *testMaster) testEmit(c *C) {
if worker1 != worker {
c.Fatalf("args[1] is not expected worker, args[1] %v vs %v", worker1, worker)
}
}, []interface{}{id, worker}...)
}, func(args ...interface{}) {}, []interface{}{id, worker}...)

counter := 0
ctx, cancel := context.WithCancel(context.Background())
cancel()
ap.Emit(ctx, 1, func(args ...interface{}) {
c.FailNow()
}, func(args ...interface{}) {
if len(args) != 1 {
c.Fatalf("args count is not 1, args %v", args)
}
pCounter, ok := args[0].(*int)
if !ok {
c.Fatalf("args[0] is not *int, args %+v", args)
}
*pCounter++
}, []interface{}{&counter}...)
c.Assert(counter, Equals, 1)
}
Loading

0 comments on commit 82ecc4b

Please sign in to comment.