diff --git a/dm/ctl/common/config.go b/dm/ctl/common/config.go index dd1d53b4c8..0a520adff8 100644 --- a/dm/ctl/common/config.go +++ b/dm/ctl/common/config.go @@ -18,12 +18,17 @@ import ( "flag" "fmt" "net" + "time" "github.com/BurntSushi/toml" "github.com/pingcap/dm/pkg/utils" "github.com/pingcap/errors" ) +const ( + defaultRPCTimeout = "10m" +) + // NewConfig creates a new base config for dmctl. func NewConfig() *Config { cfg := &Config{} @@ -33,6 +38,7 @@ func NewConfig() *Config { fs.BoolVar(&cfg.printVersion, "V", false, "prints version and exit") fs.StringVar(&cfg.ConfigFile, "config", "", "path to config file") fs.StringVar(&cfg.MasterAddr, "master-addr", "", "master API server addr") + fs.StringVar(&cfg.RPCTimeoutStr, "rpc-timeout", defaultRPCTimeout, fmt.Sprintf("rpc timeout, default is %s", defaultRPCTimeout)) fs.StringVar(&cfg.encrypt, "encrypt", "", "encrypt plaintext to ciphertext") return cfg @@ -44,6 +50,9 @@ type Config struct { MasterAddr string `toml:"master-addr" json:"master-addr"` + RPCTimeoutStr string `toml:"rpc-timeout" json:"rpc-timeout"` + RPCTimeout time.Duration `json:"-"` + ConfigFile string `json:"config-file"` printVersion bool @@ -106,8 +115,7 @@ func (c *Config) Parse(arguments []string) error { return errors.Annotatef(err, "specify master addr %s", c.MasterAddr) } - c.adjust() - return nil + return errors.Trace(c.adjust()) } // configFromFile loads config from file. @@ -117,7 +125,19 @@ func (c *Config) configFromFile(path string) error { } // adjust adjusts configs -func (c *Config) adjust() { +func (c *Config) adjust() error { + if c.RPCTimeoutStr == "" { + c.RPCTimeoutStr = defaultRPCTimeout + } + timeout, err := time.ParseDuration(c.RPCTimeoutStr) + if err != nil { + return errors.Trace(err) + } + if timeout <= time.Duration(0) { + return errors.Errorf("invalid time duration: %s", c.RPCTimeoutStr) + } + c.RPCTimeout = timeout + return nil } // validate host:port format address diff --git a/dm/ctl/common/util.go b/dm/ctl/common/util.go index e0ed6f1aa1..416628d8dd 100644 --- a/dm/ctl/common/util.go +++ b/dm/ctl/common/util.go @@ -32,9 +32,16 @@ import ( var ( masterClient pb.MasterClient + globalConfig = &Config{} ) -// InitClient initializes dm-worker client or dm-master client +// InitUtils inits necessary dmctl utils +func InitUtils(cfg *Config) error { + globalConfig = cfg + return errors.Trace(InitClient(cfg.MasterAddr)) +} + +// InitClient initializes dm-master client func InitClient(addr string) error { conn, err := grpc.Dial(addr, grpc.WithInsecure(), grpc.WithBackoffMaxDelay(3*time.Second)) if err != nil { @@ -44,6 +51,11 @@ func InitClient(addr string) error { return nil } +// GlobalConfig returns global dmctl config +func GlobalConfig() *Config { + return globalConfig +} + // MasterClient returns dm-master client func MasterClient() pb.MasterClient { return masterClient diff --git a/dm/ctl/ctl.go b/dm/ctl/ctl.go index 210f822026..a922c32548 100644 --- a/dm/ctl/ctl.go +++ b/dm/ctl/ctl.go @@ -36,7 +36,7 @@ type CommandMasterFlags struct { func Init(cfg *common.Config) error { // set the log level temporarily log.SetLevelByString("info") - return errors.Trace(common.InitClient(cfg.MasterAddr)) + return errors.Trace(common.InitUtils(cfg)) } // Start starts running a command diff --git a/dm/ctl/dmctl.toml b/dm/ctl/dmctl.toml index 511b4e64ae..6e7b5a7d80 100644 --- a/dm/ctl/dmctl.toml +++ b/dm/ctl/dmctl.toml @@ -1,3 +1,9 @@ # dmctl Configuration. +# rpc configuration +# rpc timeout is a positive number plus time unit. we use golang standard time +# units including: "ns", "us", "ms", "s", "m", "h". You should provide a proper +# rpc timeout according to your use scenario. +rpc-timeout = "10m" + master-addr = ":8261" diff --git a/dm/ctl/master/query_status.go b/dm/ctl/master/query_status.go index cdfdcd49d0..db4a7a9f85 100644 --- a/dm/ctl/master/query_status.go +++ b/dm/ctl/master/query_status.go @@ -47,9 +47,9 @@ func queryStatusFunc(cmd *cobra.Command, _ []string) { return } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() cli := common.MasterClient() + ctx, cancel := context.WithTimeout(context.Background(), common.GlobalConfig().RPCTimeout) + defer cancel() resp, err := cli.QueryStatus(ctx, &pb.QueryStatusListRequest{ Name: taskName, Workers: workers, diff --git a/dm/master/agent_pool.go b/dm/master/agent_pool.go index 1dd495d798..42e537abf8 100644 --- a/dm/master/agent_pool.go +++ b/dm/master/agent_pool.go @@ -14,20 +14,38 @@ package master import ( - "sync" + "context" + "math" + + "golang.org/x/time/rate" + + "github.com/pingcap/dm/pkg/log" ) -var ( - pool *AgentPool // singleton instance - once sync.Once - agentlimit = 20 +// rate limit related constant value +const ( + DefaultRate float64 = 10 + DefaultBurst = 40 + ErrorNoEmitToken = "fail to get emit opportunity for %s" ) +type emitFunc func(args ...interface{}) + // AgentPool is a pool to control communication with dm-workers +// It provides rate limit control for agent acquire, including dispatch rate r +// and permits bursts of at most b tokens. // caller shouldn't to hold agent to avoid deadlock type AgentPool struct { - limit int - agents chan *Agent + requests chan int + agents chan *Agent + cfg *RateLimitConfig + limiter *rate.Limiter +} + +// RateLimitConfig holds rate limit config +type RateLimitConfig struct { + rate float64 // dispatch rate + burst int // max permits bursts } // Agent communicate with dm-workers @@ -36,42 +54,65 @@ type Agent struct { } // NewAgentPool returns a agent pool -func NewAgentPool(limit int) *AgentPool { - agents := make(chan *Agent, limit) - for i := 0; i < limit; i++ { - agents <- &Agent{ID: i + 1} - } +func NewAgentPool(cfg *RateLimitConfig) *AgentPool { + requests := make(chan int, int(math.Ceil(1/cfg.rate))+cfg.burst) + agents := make(chan *Agent, cfg.burst) + limiter := rate.NewLimiter(rate.Limit(cfg.rate), cfg.burst) return &AgentPool{ - limit: limit, - agents: agents, + requests: requests, + agents: agents, + cfg: cfg, + limiter: limiter, } } // Apply applies for a agent -func (pool *AgentPool) Apply() *Agent { - agent := <-pool.agents - return agent -} +// if ctx is canceled before we get an agent, returns nil +func (ap *AgentPool) Apply(ctx context.Context, id int) *Agent { + select { + case <-ctx.Done(): + return nil + case ap.requests <- id: + } -// Recycle recycles agent -func (pool *AgentPool) Recycle(agent *Agent) { - pool.agents <- agent + select { + case <-ctx.Done(): + return nil + case agent := <-ap.agents: + return agent + } } -// GetAgentPool a singleton agent pool -func GetAgentPool() *AgentPool { - once.Do(func() { - pool = NewAgentPool(agentlimit) - }) - return pool +// Start starts AgentPool background dispatcher +func (ap *AgentPool) Start(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case id := <-ap.requests: + err := ap.limiter.Wait(ctx) + if err != nil { + if err != context.Canceled { + log.Fatalf("agent limiter wait meets unexpected error: %v", err) + } + return + } + select { + case <-ctx.Done(): + return + case ap.agents <- &Agent{ID: id}: + } + } + } } -// Emit apply for a agent to communicates with dm-worker -func Emit(fn func(args ...interface{}), args ...interface{}) { - ap := GetAgentPool() - agent := ap.Apply() - defer ap.Recycle(agent) - - fn(args...) +// Emit applies for an agent to communicates with dm-worker +func (ap *AgentPool) Emit(ctx context.Context, id int, fn emitFunc, errFn emitFunc, args ...interface{}) { + agent := ap.Apply(ctx, id) + if agent == nil { + errFn(args...) + } else { + fn(args...) + } } diff --git a/dm/master/agent_pool_test.go b/dm/master/agent_pool_test.go index ea4a115b48..f200d7ee88 100644 --- a/dm/master/agent_pool_test.go +++ b/dm/master/agent_pool_test.go @@ -14,6 +14,9 @@ package master import ( + "context" + "time" + . "github.com/pingcap/check" ) @@ -23,33 +26,40 @@ func (t *testMaster) TestAgentPool(c *C) { } func (t *testMaster) testPool(c *C) { + var ( + rate = 10 + burst = 100 + ) // test limit - agentlimit = 2 + ap := NewAgentPool(&RateLimitConfig{rate: float64(rate), burst: burst}) + go ap.Start(context.Background()) pc := make(chan *Agent) go func() { - ap := GetAgentPool() - pc <- ap.Apply() - pc <- ap.Apply() - pc <- ap.Apply() - + for i := 0; i < rate+burst; i++ { + pc <- ap.Apply(context.Background(), i) + } }() - agent1 := <-pc - c.Assert(agent1.ID, Equals, 1) - agent2 := <-pc - c.Assert(agent2.ID, Equals, 2) + for i := 0; i < burst; i++ { + agent := <-pc + c.Assert(agent.ID, Equals, i) + } select { case <-pc: - c.FailNow() + c.Error("should not get agent now") default: } - GetAgentPool().Recycle(agent1) - agent := <-pc - c.Assert(agent.ID, Equals, 1) - GetAgentPool().Recycle(agent2) - GetAgentPool().Recycle(agent) + for i := 0; i < rate; i++ { + select { + case agent := <-pc: + c.Assert(agent.ID, Equals, i+burst) + case <-time.After(time.Millisecond * 150): + // add 50ms time drift here + c.Error("get agent timeout") + } + } } func (t *testMaster) testEmit(c *C) { @@ -60,7 +70,10 @@ func (t *testMaster) testEmit(c *C) { worker testWorkerType = 1 ) - Emit(func(args ...interface{}) { + ap := NewAgentPool(&RateLimitConfig{rate: DefaultRate, burst: DefaultBurst}) + go ap.Start(context.Background()) + + ap.Emit(context.Background(), 1, func(args ...interface{}) { if len(args) != 2 { c.Fatalf("args count is not 2, args %v", args) } @@ -80,6 +93,22 @@ func (t *testMaster) testEmit(c *C) { if worker1 != worker { c.Fatalf("args[1] is not expected worker, args[1] %v vs %v", worker1, worker) } - }, []interface{}{id, worker}...) + }, func(args ...interface{}) {}, []interface{}{id, worker}...) + counter := 0 + ctx, cancel := context.WithCancel(context.Background()) + cancel() + ap.Emit(ctx, 1, func(args ...interface{}) { + c.FailNow() + }, func(args ...interface{}) { + if len(args) != 1 { + c.Fatalf("args count is not 1, args %v", args) + } + pCounter, ok := args[0].(*int) + if !ok { + c.Fatalf("args[0] is not *int, args %+v", args) + } + *pCounter++ + }, []interface{}{&counter}...) + c.Assert(counter, Equals, 1) } diff --git a/dm/master/config.go b/dm/master/config.go index 45f79eba76..5a9627697a 100644 --- a/dm/master/config.go +++ b/dm/master/config.go @@ -20,6 +20,7 @@ import ( "fmt" "io/ioutil" "strings" + "time" "github.com/BurntSushi/toml" "github.com/pingcap/dm/pkg/log" @@ -32,6 +33,8 @@ import ( // and assign it to SampleConfigFile while we build dm-master var SampleConfigFile string +var defaultRPCTimeout = "30s" + // NewConfig creates a config for dm-master func NewConfig() *Config { cfg := &Config{} @@ -73,6 +76,11 @@ type Config struct { LogFile string `toml:"log-file" json:"log-file"` LogRotate string `toml:"log-rotate" json:"log-rotate"` + RPCTimeoutStr string `toml:"rpc-timeout" json:"rpc-timeout"` + RPCTimeout time.Duration `json:"-"` + RPCRateLimit float64 `toml:"rpc-rate-limit" json:"rpc-rate-limit"` + RPCRateBurst int `toml:"rpc-rate-burst" json:"rpc-rate-burst"` + MasterAddr string `toml:"master-addr" json:"master-addr"` Deploy []*DeployMapper `toml:"deploy" json:"-"` @@ -161,6 +169,26 @@ func (c *Config) adjust() error { c.DeployMap[item.Source] = item.Worker } + + if c.RPCTimeoutStr == "" { + c.RPCTimeoutStr = defaultRPCTimeout + } + timeout, err := time.ParseDuration(c.RPCTimeoutStr) + if err != nil { + return errors.Trace(err) + } + c.RPCTimeout = timeout + + // for backward compatibility + if c.RPCRateLimit <= 0 { + log.Warnf("[dm-master] invalid rpc-rate-limit: %f, use default value: %f", c.RPCRateLimit, DefaultRate) + c.RPCRateLimit = DefaultRate + } + if c.RPCRateBurst <= 0 { + log.Warnf("[dm-master] invalid rpc-rate-burst: %d, use default value: %d", c.RPCRateBurst, DefaultBurst) + c.RPCRateBurst = DefaultBurst + } + return nil } diff --git a/dm/master/dm-master.toml b/dm/master/dm-master.toml index 4937ecdfc0..e63ea190b6 100644 --- a/dm/master/dm-master.toml +++ b/dm/master/dm-master.toml @@ -1,5 +1,19 @@ # Master Configuration. +# rpc configuration +# +# rpc timeout is a positive number plus time unit. we use golang standard time +# units including: "ns", "us", "ms", "s", "m", "h". You should provide a proper +# rpc timeout according to your use scenario. +rpc-timeout = "30s" +# rpc limiter controls how frequently events are allowed to happen. +# It implements a "token bucket" of size `rpc-rate-limit`, initially full and +# refilled at rate `rpc-rate-limit` tokens per second. Note `rpc-rate-limit` +# is float64 type, so remember to add a decimal point and one trailing 0 if its +# literal value happens to be an integer. +rpc-rate-limit = 10.0 +rpc-rate-burst = 40 + #log configuration log-level = "info" log-file = "dm-master.log" diff --git a/dm/master/server.go b/dm/master/server.go index 1ab4879b87..e4b92ba81b 100644 --- a/dm/master/server.go +++ b/dm/master/server.go @@ -33,6 +33,7 @@ import ( "github.com/pingcap/dm/dm/common" "github.com/pingcap/dm/dm/config" "github.com/pingcap/dm/dm/master/sql-operator" + "github.com/pingcap/dm/dm/master/workerrpc" "github.com/pingcap/dm/dm/pb" "github.com/pingcap/dm/pkg/tracing" ) @@ -52,8 +53,8 @@ type Server struct { rootLis net.Listener svr *grpc.Server - // dm-worker-ID(host:ip) -> dm-worker-client - workerClients map[string]pb.WorkerClient + // dm-worker-ID(host:ip) -> dm-worker client management + workerClients map[string]workerrpc.Client // task-name -> worker-list taskWorkers map[string][]string @@ -67,6 +68,9 @@ type Server struct { // trace group id generator idGen *tracing.IDGenerator + // agent pool + ap *AgentPool + closed sync2.AtomicBool } @@ -74,12 +78,14 @@ type Server struct { func NewServer(cfg *Config) *Server { server := Server{ cfg: cfg, - workerClients: make(map[string]pb.WorkerClient), + workerClients: make(map[string]workerrpc.Client), taskWorkers: make(map[string][]string), lockKeeper: NewLockKeeper(), sqlOperatorHolder: operator.NewHolder(), idGen: tracing.NewIDGen(), + ap: NewAgentPool(&RateLimitConfig{rate: cfg.RPCRateLimit, burst: cfg.RPCRateBurst}), } + return &server } @@ -92,16 +98,22 @@ func (s *Server) Start() error { } for _, workerAddr := range s.cfg.DeployMap { - conn, err2 := grpc.Dial(workerAddr, grpc.WithInsecure(), grpc.WithBackoffMaxDelay(3*time.Second)) - if err2 != nil { - return errors.Trace(err2) + s.workerClients[workerAddr], err = workerrpc.NewGRPCClient(workerAddr) + if err != nil { + return errors.Trace(err) } - s.workerClients[workerAddr] = pb.NewWorkerClient(conn) } s.closed.Set(false) ctx, cancel := context.WithCancel(context.Background()) var wg sync.WaitGroup + + wg.Add(1) + go func() { + defer wg.Done() + s.ap.Start(ctx) + }() + wg.Add(1) go func() { defer wg.Done() @@ -167,6 +179,14 @@ func (s *Server) Close() { s.closed.Set(true) } +func errorCommonWorkerResponse(msg string, worker string) *pb.CommonWorkerResponse { + return &pb.CommonWorkerResponse{ + Result: false, + Worker: worker, + Msg: msg, + } +} + // StartTask implements MasterServer.StartTask func (s *Server) StartTask(ctx context.Context, req *pb.StartTaskRequest) (*pb.StartTaskResponse, error) { log.Infof("[server] receive StartTask request %+v", req) @@ -195,11 +215,7 @@ func (s *Server) StartTask(ctx context.Context, req *pb.StartTaskRequest) (*pb.S if stCfg, ok := workerCfg[worker]; ok { stCfgs = append(stCfgs, stCfg) } else { - workerRespCh <- &pb.CommonWorkerResponse{ - Result: false, - Worker: worker, - Msg: "worker not found in task's config or deployment config", - } + workerRespCh <- errorCommonWorkerResponse("worker not found in task's config or deployment config", worker) } } } @@ -208,32 +224,31 @@ func (s *Server) StartTask(ctx context.Context, req *pb.StartTaskRequest) (*pb.S var wg sync.WaitGroup for _, stCfg := range stCfgs { wg.Add(1) - go func(stCfg *config.SubTaskConfig) { + go s.ap.Emit(ctx, 0, func(args ...interface{}) { defer wg.Done() - worker, ok1 := s.cfg.DeployMap[stCfg.SourceID] - cli, ok2 := s.workerClients[worker] - if !ok1 || !ok2 { - workerRespCh <- &pb.CommonWorkerResponse{ - Result: false, - Msg: fmt.Sprintf("%s relevant worker not found", stCfg.SourceID), - } + cli, worker, stCfgToml, taskName, err := s.taskConfigArgsExtractor(args...) + if err != nil { + workerRespCh <- errorCommonWorkerResponse(err.Error(), worker) return } validWorkerCh <- worker - stCfgToml, err := stCfg.Toml() // convert to TOML format - if err != nil { - workerRespCh <- &pb.CommonWorkerResponse{ - Result: false, - Worker: worker, - Msg: errors.ErrorStack(err), - } - return + request := &workerrpc.Request{ + Type: workerrpc.CmdStartSubTask, + StartSubTask: &pb.StartSubTaskRequest{Task: stCfgToml}, } - workerResp, err := cli.StartSubTask(ctx, &pb.StartSubTaskRequest{Task: stCfgToml}) - workerResp = s.handleOperationResult(ctx, cli, stCfg.Name, err, workerResp) + resp, err := cli.SendRequest(ctx, request, s.cfg.RPCTimeout) + workerResp := s.handleOperationResult(ctx, cli, taskName, err, resp) workerResp.Meta.Worker = worker workerRespCh <- workerResp.Meta - }(stCfg) + }, func(args ...interface{}) { + defer wg.Done() + _, worker, _, _, err := s.taskConfigArgsExtractor(args...) + if err != nil { + workerRespCh <- errorCommonWorkerResponse(err.Error(), worker) + return + } + workerRespCh <- errorCommonWorkerResponse(fmt.Sprintf(ErrorNoEmitToken, worker), worker) + }, stCfg) } wg.Wait() @@ -245,6 +260,7 @@ func (s *Server) StartTask(ctx context.Context, req *pb.StartTaskRequest) (*pb.S workers = append(workers, workerResp.Worker) } + // TODO: simplify logic of response sort sort.Strings(workers) workerResps := make([]*pb.CommonWorkerResponse, 0, len(workers)) for _, worker := range workers { @@ -283,36 +299,49 @@ func (s *Server) OperateTask(ctx context.Context, req *pb.OperateTaskRequest) (* if len(req.Workers) > 0 { workers = req.Workers // specify only do operation on partial dm-workers } + workerRespCh := make(chan *pb.OperateSubTaskResponse, len(workers)) + + handleErr := func(err error, worker string) { + log.Error(err) + workerResp := &pb.OperateSubTaskResponse{ + Meta: errorCommonWorkerResponse(err.Error(), worker), + Op: req.Op, + } + workerRespCh <- workerResp + } - subReq := &pb.OperateSubTaskRequest{ - Op: req.Op, - Name: req.Name, + subReq := &workerrpc.Request{ + Type: workerrpc.CmdOperateSubTask, + OperateSubTask: &pb.OperateSubTaskRequest{ + Op: req.Op, + Name: req.Name, + }, } - workerRespCh := make(chan *pb.OperateSubTaskResponse, len(workers)) + var wg sync.WaitGroup for _, worker := range workers { wg.Add(1) - go func(worker string) { + go s.ap.Emit(ctx, 0, func(args ...interface{}) { defer wg.Done() - cli, ok := s.workerClients[worker] - if !ok { - workerResp := &pb.OperateSubTaskResponse{ - Meta: &pb.CommonWorkerResponse{ - Result: false, - Worker: worker, - Msg: fmt.Sprintf("%s relevant worker-client not found", worker), - }, - Op: req.Op, - } - workerRespCh <- workerResp + cli, worker1, err := s.workerArgsExtractor(args...) + if err != nil { + handleErr(err, worker1) return } - workerResp, err := cli.OperateSubTask(ctx, subReq) - workerResp = s.handleOperationResult(ctx, cli, req.Name, err, workerResp) + resp, err := cli.SendRequest(ctx, subReq, s.cfg.RPCTimeout) + workerResp := s.handleOperationResult(ctx, cli, req.Name, err, resp) workerResp.Op = req.Op - workerResp.Meta.Worker = worker + workerResp.Meta.Worker = worker1 workerRespCh <- workerResp - }(worker) + }, func(args ...interface{}) { + defer wg.Done() + _, worker1, err := s.workerArgsExtractor(args...) + if err != nil { + handleErr(err, worker1) + return + } + handleErr(errors.Errorf(ErrorNoEmitToken, worker1), worker1) + }, worker) } wg.Wait() @@ -372,11 +401,7 @@ func (s *Server) UpdateTask(ctx context.Context, req *pb.UpdateTaskRequest) (*pb if stCfg, ok := workerCfg[worker]; ok { stCfgs = append(stCfgs, stCfg) } else { - workerRespCh <- &pb.CommonWorkerResponse{ - Result: false, - Worker: worker, - Msg: "worker not found in task's config or deployment config", - } + workerRespCh <- errorCommonWorkerResponse("worker not found in task's config or deployment config", worker) } } } @@ -384,31 +409,30 @@ func (s *Server) UpdateTask(ctx context.Context, req *pb.UpdateTaskRequest) (*pb var wg sync.WaitGroup for _, stCfg := range stCfgs { wg.Add(1) - go func(stCfg *config.SubTaskConfig) { + go s.ap.Emit(ctx, 0, func(args ...interface{}) { defer wg.Done() - worker, ok1 := s.cfg.DeployMap[stCfg.SourceID] - cli, ok2 := s.workerClients[worker] - if !ok1 || !ok2 { - workerRespCh <- &pb.CommonWorkerResponse{ - Result: false, - Msg: fmt.Sprintf("%s relevant worker not found", stCfg.SourceID), - } - return - } - stCfgToml, err := stCfg.Toml() // convert to TOML format + cli, worker, stCfgToml, taskName, err := s.taskConfigArgsExtractor(args...) if err != nil { - workerRespCh <- &pb.CommonWorkerResponse{ - Result: false, - Worker: worker, - Msg: errors.ErrorStack(err), - } + workerRespCh <- errorCommonWorkerResponse(err.Error(), worker) return } - workerResp, err := cli.UpdateSubTask(ctx, &pb.UpdateSubTaskRequest{Task: stCfgToml}) - workerResp = s.handleOperationResult(ctx, cli, stCfg.Name, err, workerResp) + request := &workerrpc.Request{ + Type: workerrpc.CmdUpdateSubTask, + UpdateSubTask: &pb.UpdateSubTaskRequest{Task: stCfgToml}, + } + resp, err := cli.SendRequest(ctx, request, s.cfg.RPCTimeout) + workerResp := s.handleOperationResult(ctx, cli, taskName, err, resp) workerResp.Meta.Worker = worker workerRespCh <- workerResp.Meta - }(stCfg) + }, func(args ...interface{}) { + defer wg.Done() + _, worker, _, _, err := s.taskConfigArgsExtractor(args...) + if err != nil { + workerRespCh <- errorCommonWorkerResponse(err.Error(), worker) + return + } + workerRespCh <- errorCommonWorkerResponse(fmt.Sprintf(ErrorNoEmitToken, worker), worker) + }, stCfg) } wg.Wait() @@ -621,11 +645,14 @@ func (s *Server) UnlockDDLLock(ctx context.Context, req *pb.UnlockDDLLockRequest func (s *Server) BreakWorkerDDLLock(ctx context.Context, req *pb.BreakWorkerDDLLockRequest) (*pb.BreakWorkerDDLLockResponse, error) { log.Infof("[server] receive BreakWorkerDDLLock request %+v", req) - workerReq := &pb.BreakDDLLockRequest{ - Task: req.Task, - RemoveLockID: req.RemoveLockID, - ExecDDL: req.ExecDDL, - SkipDDL: req.SkipDDL, + request := &workerrpc.Request{ + Type: workerrpc.CmdBreakDDLLock, + BreakDDLLock: &pb.BreakDDLLockRequest{ + Task: req.Task, + RemoveLockID: req.RemoveLockID, + ExecDDL: req.ExecDDL, + SkipDDL: req.SkipDDL, + }, } workerRespCh := make(chan *pb.CommonWorkerResponse, len(req.Workers)) @@ -636,19 +663,15 @@ func (s *Server) BreakWorkerDDLLock(ctx context.Context, req *pb.BreakWorkerDDLL defer wg.Done() cli, ok := s.workerClients[worker] if !ok { - workerRespCh <- &pb.CommonWorkerResponse{ - Result: false, - Worker: worker, - Msg: fmt.Sprintf("worker %s relevant worker-client not found", worker), - } + workerRespCh <- errorCommonWorkerResponse(fmt.Sprintf("worker %s relevant worker-client not found", worker), worker) return } - workerResp, err := cli.BreakDDLLock(ctx, workerReq) + resp, err := cli.SendRequest(ctx, request, s.cfg.RPCTimeout) + workerResp := &pb.CommonWorkerResponse{} if err != nil { - workerResp = &pb.CommonWorkerResponse{ - Result: false, - Msg: errors.ErrorStack(err), - } + workerResp = errorCommonWorkerResponse(errors.ErrorStack(err), "") + } else { + workerResp = resp.BreakDDLLock } workerResp.Worker = worker workerRespCh <- workerResp @@ -705,26 +728,28 @@ func (s *Server) HandleSQLs(ctx context.Context, req *pb.HandleSQLsRequest) (*pb } // execute grpc call - subReq := &pb.HandleSubTaskSQLsRequest{ - Name: req.Name, - Op: req.Op, - Args: req.Args, - BinlogPos: req.BinlogPos, - SqlPattern: req.SqlPattern, + subReq := &workerrpc.Request{ + Type: workerrpc.CmdHandleSubTaskSQLs, + HandleSubTaskSQLs: &pb.HandleSubTaskSQLsRequest{ + Name: req.Name, + Op: req.Op, + Args: req.Args, + BinlogPos: req.BinlogPos, + SqlPattern: req.SqlPattern, + }, } cli, ok := s.workerClients[req.Worker] if !ok { resp.Msg = fmt.Sprintf("worker %s client not found in %v", req.Worker, s.workerClients) return resp, nil } - workerResp, err := cli.HandleSQLs(ctx, subReq) + response, err := cli.SendRequest(ctx, subReq, s.cfg.RPCTimeout) + workerResp := &pb.CommonWorkerResponse{} if err != nil { - workerResp = &pb.CommonWorkerResponse{ - Result: false, - Msg: errors.ErrorStack(err), - } + workerResp = errorCommonWorkerResponse(errors.ErrorStack(err), "") + } else { + workerResp = response.HandleSubTaskSQLs } - resp.Workers = []*pb.CommonWorkerResponse{workerResp} resp.Result = true return resp, nil @@ -734,11 +759,14 @@ func (s *Server) HandleSQLs(ctx context.Context, req *pb.HandleSQLsRequest) (*pb func (s *Server) PurgeWorkerRelay(ctx context.Context, req *pb.PurgeWorkerRelayRequest) (*pb.PurgeWorkerRelayResponse, error) { log.Infof("[server] receive PurgeWorkerRelay request %+v", req) - workerReq := &pb.PurgeRelayRequest{ - Inactive: req.Inactive, - Time: req.Time, - Filename: req.Filename, - SubDir: req.SubDir, + workerReq := &workerrpc.Request{ + Type: workerrpc.CmdPurgeRelay, + PurgeRelay: &pb.PurgeRelayRequest{ + Inactive: req.Inactive, + Time: req.Time, + Filename: req.Filename, + SubDir: req.SubDir, + }, } workerRespCh := make(chan *pb.CommonWorkerResponse, len(req.Workers)) @@ -749,19 +777,15 @@ func (s *Server) PurgeWorkerRelay(ctx context.Context, req *pb.PurgeWorkerRelayR defer wg.Done() cli, ok := s.workerClients[worker] if !ok { - workerRespCh <- &pb.CommonWorkerResponse{ - Result: false, - Worker: worker, - Msg: fmt.Sprintf("worker %s relevant worker-client not found", worker), - } + workerRespCh <- errorCommonWorkerResponse(fmt.Sprintf("worker %s relevant worker-client not found", worker), worker) return } - workerResp, err := cli.PurgeRelay(ctx, workerReq) + resp, err := cli.SendRequest(ctx, workerReq, s.cfg.RPCTimeout) + workerResp := &pb.CommonWorkerResponse{} if err != nil { - workerResp = &pb.CommonWorkerResponse{ - Result: false, - Msg: errors.ErrorStack(err), - } + workerResp = errorCommonWorkerResponse(errors.ErrorStack(err), "") + } else { + workerResp = resp.PurgeRelay } workerResp.Worker = worker workerRespCh <- workerResp @@ -791,33 +815,50 @@ func (s *Server) PurgeWorkerRelay(ctx context.Context, req *pb.PurgeWorkerRelayR func (s *Server) SwitchWorkerRelayMaster(ctx context.Context, req *pb.SwitchWorkerRelayMasterRequest) (*pb.SwitchWorkerRelayMasterResponse, error) { log.Infof("[server] receive SwitchWorkerRelayMaster request %+v", req) - workerReq := &pb.SwitchRelayMasterRequest{} - workerRespCh := make(chan *pb.CommonWorkerResponse, len(req.Workers)) + + handleErr := func(err error, worker string) { + log.Error(err) + resp := &pb.CommonWorkerResponse{ + Result: false, + Msg: errors.ErrorStack(err), + Worker: worker, + } + workerRespCh <- resp + } + var wg sync.WaitGroup for _, worker := range req.Workers { wg.Add(1) - go func(worker string) { + go s.ap.Emit(ctx, 0, func(args ...interface{}) { defer wg.Done() - cli, ok := s.workerClients[worker] - if !ok { - workerRespCh <- &pb.CommonWorkerResponse{ - Result: false, - Worker: worker, - Msg: fmt.Sprintf("worker %s relevant worker-client not found", worker), - } + cli, worker1, err := s.workerArgsExtractor(args...) + if err != nil { + handleErr(err, worker1) return } - workerResp, err := cli.SwitchRelayMaster(ctx, workerReq) + request := &workerrpc.Request{ + Type: workerrpc.CmdSwitchRelayMaster, + SwitchRelayMaster: &pb.SwitchRelayMasterRequest{}, + } + resp, err := cli.SendRequest(ctx, request, s.cfg.RPCTimeout) + workerResp := &pb.CommonWorkerResponse{} if err != nil { - workerResp = &pb.CommonWorkerResponse{ - Result: false, - Msg: errors.ErrorStack(err), - } + workerResp = errorCommonWorkerResponse(errors.ErrorStack(err), "") + } else { + workerResp = resp.SwitchRelayMaster } - workerResp.Worker = worker + workerResp.Worker = worker1 workerRespCh <- workerResp - }(worker) + }, func(args ...interface{}) { + defer wg.Done() + _, worker1, err := s.workerArgsExtractor(args...) + if err != nil { + handleErr(err, worker1) + return + } + workerRespCh <- errorCommonWorkerResponse(fmt.Sprintf(ErrorNoEmitToken, worker1), worker1) + }, worker) } wg.Wait() @@ -843,8 +884,10 @@ func (s *Server) SwitchWorkerRelayMaster(ctx context.Context, req *pb.SwitchWork func (s *Server) OperateWorkerRelayTask(ctx context.Context, req *pb.OperateWorkerRelayRequest) (*pb.OperateWorkerRelayResponse, error) { log.Infof("[server] receive OperateWorkerRelayTask request %+v", req) - workerReq := &pb.OperateRelayRequest{Op: req.Op} - + request := &workerrpc.Request{ + Type: workerrpc.CmdOperateRelay, + OperateRelay: &pb.OperateRelayRequest{Op: req.Op}, + } workerRespCh := make(chan *pb.OperateRelayResponse, len(req.Workers)) var wg sync.WaitGroup for _, worker := range req.Workers { @@ -862,12 +905,15 @@ func (s *Server) OperateWorkerRelayTask(ctx context.Context, req *pb.OperateWork workerRespCh <- workerResp return } - workerResp, err := cli.OperateRelay(ctx, workerReq) + resp, err := cli.SendRequest(ctx, request, s.cfg.RPCTimeout) + workerResp := &pb.OperateRelayResponse{} if err != nil { workerResp = &pb.OperateRelayResponse{ Result: false, Msg: errors.ErrorStack(err), } + } else { + workerResp = resp.OperateRelay } workerResp.Op = req.Op workerResp.Worker = worker @@ -1023,27 +1069,54 @@ func (s *Server) containWorker(workers []string, worker string) bool { // getStatusFromWorkers does RPC request to get status from dm-workers func (s *Server) getStatusFromWorkers(ctx context.Context, workers []string, taskName string) chan *pb.QueryStatusResponse { - workerReq := &pb.QueryStatusRequest{ - Name: taskName, + workerReq := &workerrpc.Request{ + Type: workerrpc.CmdQueryStatus, + QueryStatus: &pb.QueryStatusRequest{Name: taskName}, } - workerRespCh := make(chan *pb.QueryStatusResponse, len(workers)) + + handleErr := func(err error, worker string) bool { + log.Error(err) + resp := &pb.QueryStatusResponse{ + Result: false, + Msg: errors.ErrorStack(err), + Worker: worker, + } + workerRespCh <- resp + return false + } + var wg sync.WaitGroup for _, worker := range workers { wg.Add(1) - go func(worker string) { + go s.ap.Emit(ctx, 0, func(args ...interface{}) { defer wg.Done() - cli := s.workerClients[worker] - workerStatus, err := cli.QueryStatus(ctx, workerReq) + cli, worker1, err := s.workerArgsExtractor(args...) + if err != nil { + handleErr(err, worker1) + return + } + resp, err := cli.SendRequest(ctx, workerReq, s.cfg.RPCTimeout) + workerStatus := &pb.QueryStatusResponse{} if err != nil { workerStatus = &pb.QueryStatusResponse{ Result: false, Msg: errors.ErrorStack(err), } + } else { + workerStatus = resp.QueryStatus } - workerStatus.Worker = worker + workerStatus.Worker = worker1 workerRespCh <- workerStatus - }(worker) + }, func(args ...interface{}) { + defer wg.Done() + _, worker1, err := s.workerArgsExtractor(args...) + if err != nil { + handleErr(err, worker1) + return + } + handleErr(errors.Errorf(ErrorNoEmitToken, worker1), worker1) + }, worker) } wg.Wait() return workerRespCh @@ -1051,27 +1124,54 @@ func (s *Server) getStatusFromWorkers(ctx context.Context, workers []string, tas // getErrorFromWorkers does RPC request to get error information from dm-workers func (s *Server) getErrorFromWorkers(ctx context.Context, workers []string, taskName string) chan *pb.QueryErrorResponse { - workerReq := &pb.QueryErrorRequest{ - Name: taskName, + workerReq := &workerrpc.Request{ + Type: workerrpc.CmdQueryError, + QueryError: &pb.QueryErrorRequest{Name: taskName}, } - workerRespCh := make(chan *pb.QueryErrorResponse, len(workers)) + + handleErr := func(err error, worker string) bool { + log.Error(err) + resp := &pb.QueryErrorResponse{ + Result: false, + Msg: errors.ErrorStack(err), + Worker: worker, + } + workerRespCh <- resp + return false + } + var wg sync.WaitGroup for _, worker := range workers { wg.Add(1) - go func(worker string) { + go s.ap.Emit(ctx, 0, func(args ...interface{}) { defer wg.Done() - cli := s.workerClients[worker] - workerError, err := cli.QueryError(ctx, workerReq) + cli, worker1, err := s.workerArgsExtractor(args...) + if err != nil { + handleErr(err, worker1) + return + } + resp, err := cli.SendRequest(ctx, workerReq, s.cfg.RPCTimeout) + workerError := &pb.QueryErrorResponse{} if err != nil { workerError = &pb.QueryErrorResponse{ Result: false, Msg: errors.ErrorStack(err), } + } else { + workerError = resp.QueryError } - workerError.Worker = worker + workerError.Worker = worker1 workerRespCh <- workerError - }(worker) + }, func(args ...interface{}) { + defer wg.Done() + _, worker1, err := s.workerArgsExtractor(args...) + if err != nil { + handleErr(err, worker1) + return + } + handleErr(errors.Errorf(ErrorNoEmitToken, worker1), worker1) + }, worker) } wg.Wait() return workerRespCh @@ -1146,9 +1246,10 @@ func (s *Server) checkTaskAndWorkerMatch(taskname string, targetWorker string) b func (s *Server) fetchWorkerDDLInfo(ctx context.Context) { var wg sync.WaitGroup - for worker, cli := range s.workerClients { + request := &workerrpc.Request{Type: workerrpc.CmdFetchDDLInfo} + for worker, client := range s.workerClients { wg.Add(1) - go func(worker string, cli pb.WorkerClient) { + go func(worker string, cli workerrpc.Client) { defer wg.Done() var doRetry bool @@ -1166,12 +1267,13 @@ func (s *Server) fetchWorkerDDLInfo(ctx context.Context) { case <-ctx.Done(): return default: - stream, err := cli.FetchDDLInfo(ctx) + resp, err := cli.SendRequest(ctx, request, s.cfg.RPCTimeout) if err != nil { log.Errorf("[server] create FetchDDLInfo stream for worker %s fail %v", worker, err) doRetry = true continue } + stream := resp.FetchDDLInfo for { in, err := stream.Recv() if err == io.EOF { @@ -1250,7 +1352,7 @@ func (s *Server) fetchWorkerDDLInfo(ctx context.Context) { } } - }(worker, cli) + }(worker, client) } wg.Wait() @@ -1290,20 +1392,25 @@ func (s *Server) resolveDDLLock(ctx context.Context, lockID string, replaceOwner // try send handle SQLs request to owner if exists key, oper := s.sqlOperatorHolder.Get(lock.Task, lock.DDLs()) if oper != nil { - ownerReq := &pb.HandleSubTaskSQLsRequest{ - Name: oper.Req.Name, - Op: oper.Req.Op, - Args: oper.Req.Args, - BinlogPos: oper.Req.BinlogPos, - SqlPattern: oper.Req.SqlPattern, + ownerReq := &workerrpc.Request{ + Type: workerrpc.CmdHandleSubTaskSQLs, + HandleSubTaskSQLs: &pb.HandleSubTaskSQLsRequest{ + Name: oper.Req.Name, + Op: oper.Req.Op, + Args: oper.Req.Args, + BinlogPos: oper.Req.BinlogPos, + SqlPattern: oper.Req.SqlPattern, + }, } - ownerResp, err := cli.HandleSQLs(ctx, ownerReq) + resp, err := cli.SendRequest(ctx, ownerReq, s.cfg.RPCTimeout) if err != nil { - return nil, errors.Annotatef(err, "send handle SQLs request %s to DDL lock %s owner %s fail", ownerReq, lockID, owner) - } else if !ownerResp.Result { - return nil, errors.Errorf("request DDL lock %s owner %s handle SQLs request %s fail %s", lockID, owner, ownerReq, ownerResp.Msg) + return nil, errors.Annotatef(err, "send handle SQLs request %s to DDL lock %s owner %s fail", ownerReq.HandleSubTaskSQLs, lockID, owner) + } + ownerResp := resp.HandleSubTaskSQLs + if !ownerResp.Result { + return nil, errors.Errorf("request DDL lock %s owner %s handle SQLs request %s fail %s", lockID, owner, ownerReq.HandleSubTaskSQLs, ownerResp.Msg) } - log.Infof("[server] sent handle --sharding DDL request %s to owner %s for lock %s", ownerReq, owner, lockID) + log.Infof("[server] sent handle --sharding DDL request %s to owner %s for lock %s", ownerReq.HandleSubTaskSQLs, owner, lockID) s.sqlOperatorHolder.Remove(lock.Task, key) // remove SQL operator after sent to owner } @@ -1313,17 +1420,21 @@ func (s *Server) resolveDDLLock(ctx context.Context, lockID string, replaceOwner // single group. traceGID := s.idGen.NextID("resolveDDLLock", 0) log.Infof("[server] requesting %s to execute DDL (with ID %s)", owner, lockID) - ownerResp, err := cli.ExecuteDDL(ctx, &pb.ExecDDLRequest{ - Task: lock.Task, - LockID: lockID, - Exec: true, - TraceGID: traceGID, - }) + ownerReq := &workerrpc.Request{ + Type: workerrpc.CmdExecDDL, + ExecDDL: &pb.ExecDDLRequest{ + Task: lock.Task, + LockID: lockID, + Exec: true, + TraceGID: traceGID, + }, + } + resp, err := cli.SendRequest(ctx, ownerReq, s.cfg.RPCTimeout) + ownerResp := &pb.CommonWorkerResponse{} if err != nil { - ownerResp = &pb.CommonWorkerResponse{ - Result: false, - Msg: errors.ErrorStack(err), - } + ownerResp = errorCommonWorkerResponse(errors.ErrorStack(err), "") + } else { + ownerResp = resp.ExecDDL } ownerResp.Worker = owner if !ownerResp.Result { @@ -1343,13 +1454,15 @@ func (s *Server) resolveDDLLock(ctx context.Context, lockID string, replaceOwner } } - req := &pb.ExecDDLRequest{ - Task: lock.Task, - LockID: lockID, - Exec: false, // ignore and skip DDL - TraceGID: traceGID, + request := &workerrpc.Request{ + Type: workerrpc.CmdExecDDL, + ExecDDL: &pb.ExecDDLRequest{ + Task: lock.Task, + LockID: lockID, + Exec: false, // ignore and skip DDL + TraceGID: traceGID, + }, } - workerRespCh := make(chan *pb.CommonWorkerResponse, len(workers)) var wg sync.WaitGroup for _, worker := range workers { @@ -1362,29 +1475,21 @@ func (s *Server) resolveDDLLock(ctx context.Context, lockID string, replaceOwner defer wg.Done() cli, ok := s.workerClients[worker] if !ok { - workerRespCh <- &pb.CommonWorkerResponse{ - Result: false, - Worker: worker, - Msg: fmt.Sprintf("worker %s relevant worker-client not found", worker), - } + workerRespCh <- errorCommonWorkerResponse(fmt.Sprintf("worker %s relevant worker-client not found", worker), worker) return } if _, ok := ready[worker]; !ok { - workerRespCh <- &pb.CommonWorkerResponse{ - Result: false, - Worker: worker, - Msg: fmt.Sprintf("worker %s not waiting for DDL lock %s", owner, lockID), - } + workerRespCh <- errorCommonWorkerResponse(fmt.Sprintf("worker %s not waiting for DDL lock %s", owner, lockID), worker) return } log.Infof("[server] requesting %s to skip DDL (with ID %s)", worker, lockID) - workerResp, err2 := cli.ExecuteDDL(ctx, req) + resp, err2 := cli.SendRequest(ctx, request, s.cfg.RPCTimeout) + var workerResp *pb.CommonWorkerResponse if err2 != nil { - workerResp = &pb.CommonWorkerResponse{ - Result: false, - Msg: errors.ErrorStack(err2), - } + workerResp = errorCommonWorkerResponse(errors.ErrorStack(err2), "") + } else { + workerResp = resp.ExecDDL } workerResp.Worker = worker workerRespCh <- workerResp @@ -1458,12 +1563,12 @@ func (s *Server) UpdateMasterConfig(ctx context.Context, req *pb.UpdateMasterCon Task: "", Workers: wokerList, } - resp, err := s.ShowDDLLocks(ctx, DDLreq) - if err != nil { + resp, err2 := s.ShowDDLLocks(ctx, DDLreq) + if err2 != nil { s.Unlock() return &pb.UpdateMasterConfigResponse{ Result: false, - Msg: fmt.Sprintf("Failed to get DDL lock Info from %s, detail: ", workerAddr) + errors.ErrorStack(err), + Msg: fmt.Sprintf("Failed to get DDL lock Info from %s, detail: ", workerAddr) + errors.ErrorStack(err2), }, nil } if len(resp.Locks) != 0 { @@ -1483,7 +1588,7 @@ func (s *Server) UpdateMasterConfig(ctx context.Context, req *pb.UpdateMasterCon // add new worker for _, workerAddr := range cfg.DeployMap { if _, ok := s.workerClients[workerAddr]; !ok { - conn, err2 := grpc.Dial(workerAddr, grpc.WithInsecure(), grpc.WithBackoffMaxDelay(3*time.Second)) + cli, err2 := workerrpc.NewGRPCClient(workerAddr) if err2 != nil { s.Unlock() return &pb.UpdateMasterConfigResponse{ @@ -1491,7 +1596,7 @@ func (s *Server) UpdateMasterConfig(ctx context.Context, req *pb.UpdateMasterCon Msg: fmt.Sprintf("Failed to add woker %s, detail: ", workerAddr) + errors.ErrorStack(err2), }, nil } - s.workerClients[workerAddr] = pb.NewWorkerClient(conn) + s.workerClients[workerAddr] = cli } } @@ -1538,23 +1643,21 @@ func (s *Server) UpdateWorkerRelayConfig(ctx context.Context, req *pb.UpdateWork content := req.Config cli, ok := s.workerClients[worker] if !ok { - return &pb.CommonWorkerResponse{ - Result: false, - Worker: worker, - Msg: fmt.Sprintf("worker %s relevant worker-client not found", worker), - }, nil + return errorCommonWorkerResponse(fmt.Sprintf("worker %s relevant worker-client not found", worker), worker), nil } log.Infof("[server] try to update %s relay configure", worker) - workerResp, err := cli.UpdateRelayConfig(ctx, &pb.UpdateRelayRequest{Content: content}) + request := &workerrpc.Request{ + Type: workerrpc.CmdUpdateRelay, + UpdateRelay: &pb.UpdateRelayRequest{Content: content}, + } + resp, err := cli.SendRequest(ctx, request, s.cfg.RPCTimeout) if err != nil { - return &pb.CommonWorkerResponse{ - Result: false, - Msg: errors.ErrorStack(err), - }, nil + return errorCommonWorkerResponse(errors.ErrorStack(err), worker), nil } - return workerResp, nil + return resp.UpdateRelay, nil } +// TODO: refine the call stack of this API, query worker configs that we needed only func (s *Server) allWorkerConfigs(ctx context.Context) (map[string]config.DBConfig, error) { var ( wg sync.WaitGroup @@ -1563,59 +1666,71 @@ func (s *Server) allWorkerConfigs(ctx context.Context) (map[string]config.DBConf errCh = make(chan error, len(s.workerClients)) err error ) - handErr := func(err2 error) { - if err2 != nil { - log.Error(err2) - } + + handleErr := func(err2 error) bool { + log.Error(err2) errCh <- errors.Trace(err2) + return false } - for id, worker := range s.workerClients { + argsExtractor := func(args ...interface{}) (string, workerrpc.Client, bool) { + if len(args) != 2 { + return "", nil, handleErr(errors.Errorf("fail to call emit to fetch worker config, miss some arguments %v", args)) + } + + worker, ok := args[0].(string) + if !ok { + return "", nil, handleErr(errors.Errorf("fail to call emit to fetch worker config, can't get id from args[0], arguments %v", args)) + } + + client, ok := args[1].(*workerrpc.GRPCClient) + if !ok { + return "", nil, handleErr(errors.Errorf("fail to call emit to fetch config of worker %s, can't get worker client from args[1], arguments %v", worker, args)) + } + + return worker, client, true + } + + request := &workerrpc.Request{ + Type: workerrpc.CmdQueryWorkerConfig, + QueryWorkerConfig: &pb.QueryWorkerConfigRequest{}, + } + for worker, client := range s.workerClients { wg.Add(1) - go Emit(func(args ...interface{}) { + go s.ap.Emit(ctx, 0, func(args ...interface{}) { defer wg.Done() - if len(args) != 2 { - handErr(errors.Errorf("fail to call emit to fetch worker config, miss some arguments %v", args)) - return - } - - id1, ok := args[0].(string) - if !ok { - handErr(errors.Errorf("fail to call emit to fetch worker config, can't get id from args[0], arguments %v", args)) - return - } - worker1, ok := args[1].(pb.WorkerClient) + worker1, client1, ok := argsExtractor(args...) if !ok { - handErr(errors.Errorf("fail to call emit to fetch config of worker %s, can't get worker client from args[1], arguments %v", id1, args)) return } - resp, err1 := worker1.QueryWorkerConfig(ctx, &pb.QueryWorkerConfigRequest{}) + response, err1 := client1.SendRequest(ctx, request, s.cfg.RPCTimeout) if err1 != nil { - handErr(errors.Annotatef(err1, "fetch config of worker %s", id1)) + handleErr(errors.Annotatef(err1, "fetch config of worker %s", worker1)) return } + resp := response.QueryWorkerConfig if !resp.Result { - handErr(errors.Errorf("fail to query config from worker %s, message %s", id1, resp.Msg)) + handleErr(errors.Errorf("fail to query config from worker %s, message %s", worker1, resp.Msg)) return } if len(resp.Content) == 0 { - handErr(errors.Errorf("fail to query config from worker %s, config is empty", id1)) + handleErr(errors.Errorf("fail to query config from worker %s, config is empty", worker1)) return } if len(resp.SourceID) == 0 { - handErr(errors.Errorf("fail to query config from worker %s, source ID is empty, it should be set in worker config", id1)) + handleErr(errors.Errorf("fail to query config from worker %s, source ID is empty, it should be set in worker config", worker1)) return } dbCfg := &config.DBConfig{} err2 := dbCfg.Decode(resp.Content) if err2 != nil { - handErr(errors.Annotatef(err2, "unmarshal worker %s config", id1)) + handleErr(errors.Annotatef(err2, "unmarshal worker %s config, resp: %s", worker1, resp.Content)) return } @@ -1623,7 +1738,15 @@ func (s *Server) allWorkerConfigs(ctx context.Context) (map[string]config.DBConf workerCfgs[resp.SourceID] = *dbCfg workerMutex.Unlock() - }, []interface{}{id, worker}...) + }, func(args ...interface{}) { + defer wg.Done() + + worker1, _, ok := argsExtractor(args...) + if !ok { + return + } + handleErr(errors.Errorf("fail to get emit opporunity for worker %s", worker1)) + }, worker, client) } wg.Wait() @@ -1642,21 +1765,18 @@ func (s *Server) MigrateWorkerRelay(ctx context.Context, req *pb.MigrateWorkerRe binlogName := req.BinlogName cli, ok := s.workerClients[worker] if !ok { - return &pb.CommonWorkerResponse{ - Result: false, - Worker: worker, - Msg: fmt.Sprintf("worker %s relevant worker-client not found", worker), - }, nil + return errorCommonWorkerResponse(fmt.Sprintf("worker %s relevant worker-client not found", worker), worker), nil } log.Infof("[server] try to migrate %s relay", worker) - workerResp, err := cli.MigrateRelay(ctx, &pb.MigrateRelayRequest{BinlogName: binlogName, BinlogPos: binlogPos}) + request := &workerrpc.Request{ + Type: workerrpc.CmdMigrateRelay, + MigrateRelay: &pb.MigrateRelayRequest{BinlogName: binlogName, BinlogPos: binlogPos}, + } + resp, err := cli.SendRequest(ctx, request, s.cfg.RPCTimeout) if err != nil { - return &pb.CommonWorkerResponse{ - Result: false, - Msg: errors.ErrorStack(err), - }, nil + return errorCommonWorkerResponse(errors.ErrorStack(err), worker), nil } - return workerResp, nil + return resp.MigrateRelay, nil } // CheckTask checks legality of task configuration @@ -1707,53 +1827,116 @@ var ( retryInterval = time.Second ) -func (s *Server) waitOperationOk(ctx context.Context, cli pb.WorkerClient, name string, opLogID int64) error { - request := &pb.QueryTaskOperationRequest{ - Name: name, - LogID: opLogID, +func (s *Server) waitOperationOk(ctx context.Context, cli workerrpc.Client, name string, opLogID int64) error { + req := &workerrpc.Request{ + Type: workerrpc.CmdQueryTaskOperation, + QueryTaskOperation: &pb.QueryTaskOperationRequest{ + Name: name, + LogID: opLogID, + }, } for num := 0; num < maxRetryNum; num++ { - res, err := cli.QueryTaskOperation(ctx, request) + resp, err := cli.SendRequest(ctx, req, s.cfg.RPCTimeout) + var queryResp *pb.QueryTaskOperationResponse if err != nil { log.Errorf("fail to query task operation %v", err) - } else if res.Log == nil { - return errors.Errorf("operation %d of task %s not found, please execute `query-status` to check status", opLogID, name) - } else if res.Log.Success { - return nil - } else if len(res.Log.Message) != 0 { - return errors.New(res.Log.Message) + } else { + queryResp = resp.QueryTaskOperation + respLog := queryResp.Log + if respLog == nil { + return errors.Errorf("operation %d of task %s not found, please execute `query-status` to check status", opLogID, name) + } else if respLog.Success { + return nil + } else if len(respLog.Message) != 0 { + return errors.New(respLog.Message) + } } - log.Infof("wait task %s op log %d, current result %+v", name, opLogID, res) + log.Infof("wait task %s op log %d, current result %+v", name, opLogID, queryResp) select { case <-ctx.Done(): return ctx.Err() case <-time.After(retryInterval): } - } return errors.New("request is timeout, but request may be successful, please execute `query-status` to check status") } -func (s *Server) handleOperationResult(ctx context.Context, cli pb.WorkerClient, name string, err error, response *pb.OperateSubTaskResponse) *pb.OperateSubTaskResponse { +func (s *Server) handleOperationResult(ctx context.Context, cli workerrpc.Client, name string, err error, resp *workerrpc.Response) *pb.OperateSubTaskResponse { if err != nil { return &pb.OperateSubTaskResponse{ - Meta: &pb.CommonWorkerResponse{ - Result: false, - Msg: errors.ErrorStack(err), - }, + Meta: errorCommonWorkerResponse(errors.ErrorStack(err), ""), } } + response := &pb.OperateSubTaskResponse{} + switch resp.Type { + case workerrpc.CmdStartSubTask: + response = resp.StartSubTask + case workerrpc.CmdOperateSubTask: + response = resp.OperateSubTask + case workerrpc.CmdUpdateSubTask: + response = resp.UpdateSubTask + default: + // this should not happen + response.Meta = errorCommonWorkerResponse(fmt.Sprintf("invalid operate task type %v", resp.Type), "") + return response + } err = s.waitOperationOk(ctx, cli, name, response.LogID) if err != nil { - response.Meta = &pb.CommonWorkerResponse{ - Result: false, - Msg: errors.ErrorStack(err), - } + response.Meta = errorCommonWorkerResponse(errors.ErrorStack(err), "") } return response } + +// taskConfigArgsExtractor extracts SubTaskConfig from args and returns its relevant +// grpc client, worker id (host:port), subtask config in toml, task name and error +func (s *Server) taskConfigArgsExtractor(args ...interface{}) (workerrpc.Client, string, string, string, error) { + handleErr := func(err error) error { + log.Error(errors.ErrorStack(err)) + return err + } + + if len(args) != 1 { + return nil, "", "", "", handleErr(errors.Errorf("miss task config %v", args)) + } + + cfg, ok := args[0].(*config.SubTaskConfig) + if !ok { + return nil, "", "", "", handleErr(errors.Errorf("args[0] is not SubTaskConfig: %v", args[0])) + } + + worker, ok1 := s.cfg.DeployMap[cfg.SourceID] + cli, ok2 := s.workerClients[worker] + if !ok1 || !ok2 { + return nil, "", "", "", handleErr(errors.Errorf("%s relevant worker-client not found", worker)) + } + + cfgToml, err := cfg.Toml() + if err != nil { + return nil, "", "", "", handleErr(err) + } + + return cli, worker, cfgToml, cfg.Name, nil +} + +// workerArgsExtractor extracts worker from args and returns its relevant +// grpc client, worker id (host:port) and error +func (s *Server) workerArgsExtractor(args ...interface{}) (workerrpc.Client, string, error) { + if len(args) != 1 { + return nil, "", errors.Errorf("miss worker id %v", args) + } + worker, ok := args[0].(string) + if !ok { + return nil, "", errors.Errorf("invalid argument, args[0] is not valid worker id: %v", args[0]) + } + cli, ok := s.workerClients[worker] + if !ok { + return nil, worker, errors.Errorf("%s relevant worker-client not found", worker) + } + + return cli, worker, nil +} diff --git a/dm/master/server_test.go b/dm/master/server_test.go index 8b30ab4a51..f9bb0b7168 100644 --- a/dm/master/server_test.go +++ b/dm/master/server_test.go @@ -28,6 +28,7 @@ import ( "github.com/pingcap/dm/checker" "github.com/pingcap/dm/dm/config" + "github.com/pingcap/dm/dm/master/workerrpc" "github.com/pingcap/dm/dm/pb" "github.com/pingcap/dm/dm/pbmock" ) @@ -139,11 +140,17 @@ type testMaster struct { var _ = check.Suite(&testMaster{}) +func newMockRPCClient(client pb.WorkerClient) workerrpc.Client { + c, _ := workerrpc.NewGRPCClientWrap(nil, client) + return c +} + func testDefaultMasterServer(c *check.C) *Server { cfg := NewConfig() err := cfg.Parse([]string{"-config=./dm-master.toml"}) c.Assert(err, check.IsNil) server := NewServer(cfg) + go server.ap.Start(context.Background()) return server } @@ -182,7 +189,7 @@ func testMockWorkerConfig(c *check.C, server *Server, ctrl *gomock.Controller, p SourceID: deploy.Source, Content: rawConfig, }, nil) - server.workerClients[deploy.Worker] = mockWorkerClient + server.workerClients[deploy.Worker] = newMockRPCClient(mockWorkerClient) } } @@ -250,7 +257,7 @@ func testMockStartTask(c *check.C, server *Server, ctrl *gomock.Controller, work }, nil).MaxTimes(maxRetryNum) } - server.workerClients[deploy.Worker] = mockWorkerClient + server.workerClients[deploy.Worker] = newMockRPCClient(mockWorkerClient) } } @@ -267,7 +274,7 @@ func (t *testMaster) TestQueryStatus(c *check.C) { gomock.Any(), &pb.QueryStatusRequest{}, ).Return(&pb.QueryStatusResponse{Result: true}, nil) - server.workerClients[workerAddr] = mockWorkerClient + server.workerClients[workerAddr] = newMockRPCClient(mockWorkerClient) } resp, err := server.QueryStatus(context.Background(), &pb.QueryStatusListRequest{}) c.Assert(err, check.IsNil) @@ -282,7 +289,7 @@ func (t *testMaster) TestQueryStatus(c *check.C) { gomock.Any(), &pb.QueryStatusRequest{}, ).Return(&pb.QueryStatusResponse{Result: true}, nil) - server.workerClients[workerAddr] = mockWorkerClient + server.workerClients[workerAddr] = newMockRPCClient(mockWorkerClient) } resp, err = server.QueryStatus(context.Background(), &pb.QueryStatusListRequest{ Workers: workers, @@ -500,7 +507,7 @@ func (t *testMaster) TestQueryError(c *check.C) { gomock.Any(), &pb.QueryErrorRequest{}, ).Return(&pb.QueryErrorResponse{Result: true}, nil) - server.workerClients[workerAddr] = mockWorkerClient + server.workerClients[workerAddr] = newMockRPCClient(mockWorkerClient) } resp, err := server.QueryError(context.Background(), &pb.QueryErrorListRequest{}) c.Assert(err, check.IsNil) @@ -515,7 +522,7 @@ func (t *testMaster) TestQueryError(c *check.C) { gomock.Any(), &pb.QueryErrorRequest{}, ).Return(&pb.QueryErrorResponse{Result: true}, nil) - server.workerClients[workerAddr] = mockWorkerClient + server.workerClients[workerAddr] = newMockRPCClient(mockWorkerClient) } resp, err = server.QueryError(context.Background(), &pb.QueryErrorListRequest{ Workers: workers, @@ -606,7 +613,7 @@ func (t *testMaster) TestOperateTask(c *check.C) { Log: &pb.TaskLog{Id: logID, Ts: time.Now().Unix(), Success: true}, }, nil) - server.workerClients[deploy.Worker] = mockWorkerClient + server.workerClients[deploy.Worker] = newMockRPCClient(mockWorkerClient) } resp, err = server.OperateTask(context.Background(), &pb.OperateTaskRequest{ Op: pauseOp, @@ -631,7 +638,7 @@ func (t *testMaster) TestOperateTask(c *check.C) { Name: taskName, }, ).Return(nil, errors.New(errGRPCFailed)) - server.workerClients[workerAddr] = mockWorkerClient + server.workerClients[workerAddr] = newMockRPCClient(mockWorkerClient) } resp, err = server.OperateTask(context.Background(), &pb.OperateTaskRequest{ Op: pb.TaskOp_Pause, @@ -670,7 +677,7 @@ func (t *testMaster) TestOperateTask(c *check.C) { Meta: &pb.CommonWorkerResponse{Result: true, Worker: workers[0]}, Log: &pb.TaskLog{Id: logID, Ts: time.Now().Unix(), Success: true}, }, nil) - server.workerClients[workers[0]] = mockWorkerClient + server.workerClients[workers[0]] = newMockRPCClient(mockWorkerClient) resp, err = server.OperateTask(context.Background(), &pb.OperateTaskRequest{ Op: pb.TaskOp_Stop, Name: taskName, @@ -709,7 +716,7 @@ func (t *testMaster) TestOperateTask(c *check.C) { Meta: &pb.CommonWorkerResponse{Result: true, Worker: workers[0]}, Log: &pb.TaskLog{Id: logID, Ts: time.Now().Unix(), Success: true}, }, nil) - server.workerClients[deploy.Worker] = mockWorkerClient + server.workerClients[deploy.Worker] = newMockRPCClient(mockWorkerClient) } resp, err = server.OperateTask(context.Background(), &pb.OperateTaskRequest{ Op: pb.TaskOp_Stop, @@ -810,7 +817,7 @@ func (t *testMaster) TestUpdateTask(c *check.C) { }, nil) } - server.workerClients[deploy.Worker] = mockWorkerClient + server.workerClients[deploy.Worker] = newMockRPCClient(mockWorkerClient) } } @@ -895,7 +902,7 @@ func (t *testMaster) TestUnlockDDLLock(c *check.C) { }, ).Return(ret...) - server.workerClients[worker] = mockWorkerClient + server.workerClients[worker] = newMockRPCClient(mockWorkerClient) } } @@ -1025,7 +1032,7 @@ func (t *testMaster) TestBreakWorkerDDLLock(c *check.C) { SkipDDL: true, }, ).Return(rets...) - server.workerClients[deploy.Worker] = mockWorkerClient + server.workerClients[deploy.Worker] = newMockRPCClient(mockWorkerClient) } } @@ -1111,7 +1118,7 @@ func (t *testMaster) TestRefreshWorkerTasks(c *check.C) { }, }, }, nil) - server.workerClients[deploy.Worker] = mockWorkerClient + server.workerClients[deploy.Worker] = newMockRPCClient(mockWorkerClient) } // test RefreshWorkerTasks, with two running tasks for each workers @@ -1136,7 +1143,7 @@ func (t *testMaster) TestRefreshWorkerTasks(c *check.C) { Worker: deploy.Worker, Msg: msgNoSubTask, }, nil) - server.workerClients[deploy.Worker] = mockWorkerClient + server.workerClients[deploy.Worker] = newMockRPCClient(mockWorkerClient) } // test RefreshWorkerTasks, with no started tasks @@ -1191,7 +1198,7 @@ func (t *testMaster) TestPurgeWorkerRelay(c *check.C) { Filename: filename, }, ).Return(rets...) - server.workerClients[deploy.Worker] = mockWorkerClient + server.workerClients[deploy.Worker] = newMockRPCClient(mockWorkerClient) } } @@ -1275,7 +1282,7 @@ func (t *testMaster) TestSwitchWorkerRelayMaster(c *check.C) { gomock.Any(), &pb.SwitchRelayMasterRequest{}, ).Return(rets...) - server.workerClients[deploy.Worker] = mockWorkerClient + server.workerClients[deploy.Worker] = newMockRPCClient(mockWorkerClient) } } @@ -1288,7 +1295,7 @@ func (t *testMaster) TestSwitchWorkerRelayMaster(c *check.C) { c.Assert(resp.Workers, check.HasLen, 2) for _, w := range resp.Workers { c.Assert(w.Result, check.IsFalse) - c.Assert(w.Msg, check.Matches, ".*relevant worker-client not found") + c.Assert(w.Msg, check.Matches, "(?m).*relevant worker-client not found.*") } // test SwitchWorkerRelayMaster successfully @@ -1354,7 +1361,7 @@ func (t *testMaster) TestOperateWorkerRelayTask(c *check.C) { gomock.Any(), &pb.OperateRelayRequest{Op: pb.RelayOp_PauseRelay}, ).Return(rets...) - server.workerClients[deploy.Worker] = mockWorkerClient + server.workerClients[deploy.Worker] = newMockRPCClient(mockWorkerClient) } } @@ -1460,7 +1467,7 @@ func (t *testMaster) TestFetchWorkerDDLInfo(c *check.C) { }, ).Return(&pb.CommonWorkerResponse{Result: true}, nil).MaxTimes(1) - server.workerClients[deploy.Worker] = mockWorkerClient + server.workerClients[deploy.Worker] = newMockRPCClient(mockWorkerClient) } ctx, cancel := context.WithCancel(context.Background()) diff --git a/dm/master/status_test.go b/dm/master/status_test.go index ada2f8c66d..023515b88f 100644 --- a/dm/master/status_test.go +++ b/dm/master/status_test.go @@ -33,6 +33,8 @@ type testHTTPServer struct { func (t *testHTTPServer) startServer(c *check.C) { t.cfg = NewConfig() t.cfg.MasterAddr = ":8261" + t.cfg.RPCRateLimit = DefaultRate + t.cfg.RPCRateBurst = DefaultBurst t.server = NewServer(t.cfg) go t.server.Start() diff --git a/dm/master/workerrpc/interface.go b/dm/master/workerrpc/interface.go new file mode 100644 index 0000000000..a0607a2793 --- /dev/null +++ b/dm/master/workerrpc/interface.go @@ -0,0 +1,113 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package workerrpc + +import ( + "context" + "time" + + "github.com/pingcap/dm/dm/pb" +) + +// CmdType represents the concrete request type in Request or response type in Response. +type CmdType uint16 + +// CmdType values. +const ( + CmdStartSubTask CmdType = 1 + iota + CmdOperateSubTask + CmdUpdateSubTask + + CmdQueryStatus + CmdQueryError + CmdQueryTaskOperation + CmdQueryWorkerConfig + + CmdHandleSubTaskSQLs + CmdExecDDL + CmdBreakDDLLock + + CmdSwitchRelayMaster + CmdOperateRelay + CmdPurgeRelay + CmdUpdateRelay + CmdMigrateRelay + + CmdFetchDDLInfo +) + +// Request wraps all dm-worker rpc requests. +type Request struct { + Type CmdType + + StartSubTask *pb.StartSubTaskRequest + OperateSubTask *pb.OperateSubTaskRequest + UpdateSubTask *pb.UpdateSubTaskRequest + + QueryStatus *pb.QueryStatusRequest + QueryError *pb.QueryErrorRequest + QueryTaskOperation *pb.QueryTaskOperationRequest + QueryWorkerConfig *pb.QueryWorkerConfigRequest + + HandleSubTaskSQLs *pb.HandleSubTaskSQLsRequest + ExecDDL *pb.ExecDDLRequest + BreakDDLLock *pb.BreakDDLLockRequest + + SwitchRelayMaster *pb.SwitchRelayMasterRequest + OperateRelay *pb.OperateRelayRequest + PurgeRelay *pb.PurgeRelayRequest + UpdateRelay *pb.UpdateRelayRequest + MigrateRelay *pb.MigrateRelayRequest +} + +// Response wraps all dm-worker rpc responses. +type Response struct { + Type CmdType + + StartSubTask *pb.OperateSubTaskResponse + OperateSubTask *pb.OperateSubTaskResponse + UpdateSubTask *pb.OperateSubTaskResponse + + QueryStatus *pb.QueryStatusResponse + QueryError *pb.QueryErrorResponse + QueryTaskOperation *pb.QueryTaskOperationResponse + QueryWorkerConfig *pb.QueryWorkerConfigResponse + + HandleSubTaskSQLs *pb.CommonWorkerResponse + ExecDDL *pb.CommonWorkerResponse + BreakDDLLock *pb.CommonWorkerResponse + + SwitchRelayMaster *pb.CommonWorkerResponse + OperateRelay *pb.OperateRelayResponse + PurgeRelay *pb.CommonWorkerResponse + UpdateRelay *pb.CommonWorkerResponse + MigrateRelay *pb.CommonWorkerResponse + + FetchDDLInfo pb.Worker_FetchDDLInfoClient +} + +// Client is a client that sends RPC. +// It should not be used after calling Close(). +type Client interface { + // SendRequest sends Request + SendRequest(ctx context.Context, req *Request, timeout time.Duration) (*Response, error) + + // Cloase close client and releases all data + Close() error +} + +// IsStreamAPI checks whether a request is streaming API based on CmdType +func (req *Request) IsStreamAPI() bool { + return req.Type == CmdFetchDDLInfo +} diff --git a/dm/master/workerrpc/rawgrpc.go b/dm/master/workerrpc/rawgrpc.go new file mode 100644 index 0000000000..2fcfe493f4 --- /dev/null +++ b/dm/master/workerrpc/rawgrpc.go @@ -0,0 +1,127 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package workerrpc + +import ( + "context" + "sync/atomic" + "time" + + "github.com/pingcap/errors" + "google.golang.org/grpc" + + "github.com/pingcap/dm/dm/pb" +) + +// GRPCClient stores raw grpc connection and worker client +type GRPCClient struct { + conn *grpc.ClientConn + client pb.WorkerClient + closed int32 +} + +// NewGRPCClientWrap initializes a new grpc client from given grpc connection and worker client +func NewGRPCClientWrap(conn *grpc.ClientConn, client pb.WorkerClient) (*GRPCClient, error) { + return &GRPCClient{ + conn: conn, + client: client, + closed: 0, + }, nil +} + +// NewGRPCClient initializes a new grpc client from worker address +func NewGRPCClient(addr string) (*GRPCClient, error) { + conn, err := grpc.Dial(addr, grpc.WithInsecure(), grpc.WithBackoffMaxDelay(3*time.Second)) + if err != nil { + return nil, errors.Trace(err) + } + return NewGRPCClientWrap(conn, pb.NewWorkerClient(conn)) +} + +// SendRequest implements Client.SendRequest +func (c *GRPCClient) SendRequest(ctx context.Context, req *Request, timeout time.Duration) (*Response, error) { + if atomic.LoadInt32(&c.closed) != 0 { + return nil, errors.New("send request on a closed client") + } + if req.IsStreamAPI() { + // call stream API and returns a grpc stream client + return callRPC(ctx, c.client, req) + } + // call normal grpc request with a timeout + ctx1, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + return callRPC(ctx1, c.client, req) +} + +// Close implements Client.Close +func (c *GRPCClient) Close() error { + defer func() { + atomic.CompareAndSwapInt32(&c.closed, 0, 1) + c.conn = nil + }() + if c.conn == nil { + return nil + } + err := c.conn.Close() + if err != nil { + return errors.Annotatef(err, "close rpc client") + } + return nil +} + +func callRPC(ctx context.Context, client pb.WorkerClient, req *Request) (*Response, error) { + resp := &Response{} + resp.Type = req.Type + var err error + switch req.Type { + case CmdStartSubTask: + resp.StartSubTask, err = client.StartSubTask(ctx, req.StartSubTask) + case CmdOperateSubTask: + resp.OperateSubTask, err = client.OperateSubTask(ctx, req.OperateSubTask) + case CmdUpdateSubTask: + resp.UpdateSubTask, err = client.UpdateSubTask(ctx, req.UpdateSubTask) + case CmdQueryStatus: + resp.QueryStatus, err = client.QueryStatus(ctx, req.QueryStatus) + case CmdQueryError: + resp.QueryError, err = client.QueryError(ctx, req.QueryError) + case CmdQueryTaskOperation: + resp.QueryTaskOperation, err = client.QueryTaskOperation(ctx, req.QueryTaskOperation) + case CmdQueryWorkerConfig: + resp.QueryWorkerConfig, err = client.QueryWorkerConfig(ctx, req.QueryWorkerConfig) + case CmdHandleSubTaskSQLs: + resp.HandleSubTaskSQLs, err = client.HandleSQLs(ctx, req.HandleSubTaskSQLs) + case CmdExecDDL: + resp.ExecDDL, err = client.ExecuteDDL(ctx, req.ExecDDL) + case CmdBreakDDLLock: + resp.BreakDDLLock, err = client.BreakDDLLock(ctx, req.BreakDDLLock) + case CmdSwitchRelayMaster: + resp.SwitchRelayMaster, err = client.SwitchRelayMaster(ctx, req.SwitchRelayMaster) + case CmdOperateRelay: + resp.OperateRelay, err = client.OperateRelay(ctx, req.OperateRelay) + case CmdPurgeRelay: + resp.PurgeRelay, err = client.PurgeRelay(ctx, req.PurgeRelay) + case CmdUpdateRelay: + resp.UpdateRelay, err = client.UpdateRelayConfig(ctx, req.UpdateRelay) + case CmdMigrateRelay: + resp.MigrateRelay, err = client.MigrateRelay(ctx, req.MigrateRelay) + case CmdFetchDDLInfo: + resp.FetchDDLInfo, err = client.FetchDDLInfo(ctx) + default: + return nil, errors.Errorf("invalid request type: %v", req.Type) + } + if err != nil { + return nil, errors.Trace(err) + } + return resp, nil +} diff --git a/go.mod b/go.mod index b94879263c..1e5cc9fcc7 100644 --- a/go.mod +++ b/go.mod @@ -57,11 +57,11 @@ require ( go.etcd.io/bbolt v1.3.3 // indirect go.uber.org/atomic v1.4.0 // indirect go.uber.org/zap v1.10.0 // indirect - golang.org/x/crypto v0.0.0-20190611184440-5c40567a22f8 // indirect golang.org/x/net v0.0.0-20190613194153-d28f0bde5980 // indirect golang.org/x/sys v0.0.0-20190613124609-5ed2794edfdc + golang.org/x/text v0.3.2 // indirect + golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 golang.org/x/tools v0.0.0-20190613204242-ed0dc450797f // indirect - google.golang.org/appengine v1.6.1 // indirect google.golang.org/genproto v0.0.0-20190611190212-a7e196e89fd3 // indirect google.golang.org/grpc v1.21.1 gopkg.in/natefinch/lumberjack.v2 v2.0.0 diff --git a/go.sum b/go.sum index 81a02e5dee..e7a747a166 100644 --- a/go.sum +++ b/go.sum @@ -339,9 +339,6 @@ golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnf golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 h1:VklqNMn3ovrHsnt90PveolxSbWFaJdECFbxSq0Mqo2M= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= -golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= -golang.org/x/crypto v0.0.0-20190611184440-5c40567a22f8 h1:1wopBVtVdWnn03fZelqdXTqk7U7zPQCb+T4rbU9ZEoU= -golang.org/x/crypto v0.0.0-20190611184440-5c40567a22f8/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= @@ -357,8 +354,6 @@ golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73r golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190311183353-d8887717615a h1:oWX7TPOiFAMXLq8o0ikBYfCJVlRHBcsciT5bXOrH628= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= -golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= -golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks= golang.org/x/net v0.0.0-20190613194153-d28f0bde5980 h1:dfGZHvZk057jK2MCeWus/TowKpJ8y4AmooUzdBSR9GU= golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -375,9 +370,7 @@ golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190109145017-48ac38b7c8cb/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20190606165138-5da285871e9c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190613124609-5ed2794edfdc h1:x+/QxSNkVFAC+v4pL1f6mZr1z+qgi+FoR8ccXZPVC10= golang.org/x/sys v0.0.0-20190613124609-5ed2794edfdc/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= @@ -394,14 +387,11 @@ golang.org/x/tools v0.0.0-20190130214255-bb1329dc71a0/go.mod h1:n7NCudcB/nEzxVGm golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= golang.org/x/tools v0.0.0-20190425150028-36563e24a262 h1:qsl9y/CJx34tuA7QCPNp86JNJe4spst6Ff8MjvPUdPg= golang.org/x/tools v0.0.0-20190425150028-36563e24a262/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= -golang.org/x/tools v0.0.0-20190606124116-d0a3d012864b/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= golang.org/x/tools v0.0.0-20190613204242-ed0dc450797f h1:+zypR5600WBcnJgA2nzZAsBlM8cArEGa8dhhiNE4u3w= golang.org/x/tools v0.0.0-20190613204242-ed0dc450797f/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= google.golang.org/appengine v1.1.0 h1:igQkv0AAhEIvTEpD5LIpAfav2eeVO9HBTjvKHVJPRSs= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= -google.golang.org/appengine v1.6.1 h1:QzqyMA1tlu6CgqCDUtU9V+ZKhLFT2dkJuANu5QaxI3I= -google.golang.org/appengine v1.6.1/go.mod h1:i06prIuMbXzDqacNJfV5OdTW448YApPu5ww/cMBSeb0= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/genproto v0.0.0-20181004005441-af9cb2a35e7f/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/genproto v0.0.0-20190108161440-ae2f86662275 h1:9oFlwfEGIvmxXTcY53ygNyxIQtWciRHjrnUvZJCYXYU=