diff --git a/dm/master/server.go b/dm/master/server.go index abe4cd5bd3..5b33ef5272 100644 --- a/dm/master/server.go +++ b/dm/master/server.go @@ -18,6 +18,8 @@ import ( "fmt" "net" "net/http" + "reflect" + "runtime" "sort" "strings" "sync" @@ -305,14 +307,13 @@ func errorCommonWorkerResponse(msg string, source, worker string) *pb.CommonWork // key: /dm-worker/r/name // value: workerInfo func (s *Server) RegisterWorker(ctx context.Context, req *pb.RegisterWorkerRequest) (*pb.RegisterWorkerResponse, error) { - log.L().Info("", zap.Stringer("payload", req), zap.String("request", "RegisterWorker")) - - isLeader, needForward := s.isLeaderAndNeedForward() - if !isLeader { - if needForward { - return s.leaderClient.RegisterWorker(ctx, req) - } - return nil, terror.ErrMasterRequestIsNotForwardToLeader + var ( + resp2 *pb.RegisterWorkerResponse + err2 error + ) + shouldRet := s.sharedLogic(ctx, req, &resp2, &err2) + if shouldRet { + return resp2, err2 } err := s.scheduler.AddWorker(req.Name, req.Address) @@ -334,14 +335,13 @@ func (s *Server) RegisterWorker(ctx context.Context, req *pb.RegisterWorkerReque // key: /dm-worker/r // value: WorkerInfo func (s *Server) OfflineMember(ctx context.Context, req *pb.OfflineMemberRequest) (*pb.OfflineMemberResponse, error) { - log.L().Info("", zap.Stringer("payload", req), zap.String("request", "OfflineMember")) - - isLeader, needForward := s.isLeaderAndNeedForward() - if !isLeader { - if needForward { - return s.leaderClient.OfflineMember(ctx, req) - } - return nil, terror.ErrMasterRequestIsNotForwardToLeader + var ( + resp2 *pb.OfflineMemberResponse + err2 error + ) + shouldRet := s.sharedLogic(ctx, req, &resp2, &err2) + if shouldRet { + return resp2, err2 } if req.Type == common.Worker { @@ -409,14 +409,13 @@ func subtaskCfgPointersToInstances(stCfgPointers ...*config.SubTaskConfig) []con // StartTask implements MasterServer.StartTask func (s *Server) StartTask(ctx context.Context, req *pb.StartTaskRequest) (*pb.StartTaskResponse, error) { - log.L().Info("", zap.String("payload", utils.HidePassword(req.String())), zap.String("request", "StartTask")) - - isLeader, needForward := s.isLeaderAndNeedForward() - if !isLeader { - if needForward { - return s.leaderClient.StartTask(ctx, req) - } - return nil, terror.ErrMasterRequestIsNotForwardToLeader + var ( + resp2 *pb.StartTaskResponse + err2 error + ) + shouldRet := s.sharedLogic(ctx, req, &resp2, &err2) + if shouldRet { + return resp2, err2 } resp := &pb.StartTaskResponse{} @@ -494,14 +493,13 @@ func (s *Server) StartTask(ctx context.Context, req *pb.StartTaskRequest) (*pb.S // OperateTask implements MasterServer.OperateTask func (s *Server) OperateTask(ctx context.Context, req *pb.OperateTaskRequest) (*pb.OperateTaskResponse, error) { - log.L().Info("", zap.Stringer("payload", req), zap.String("request", "OperateTask")) - - isLeader, needForward := s.isLeaderAndNeedForward() - if !isLeader { - if needForward { - return s.leaderClient.OperateTask(ctx, req) - } - return nil, terror.ErrMasterRequestIsNotForwardToLeader + var ( + resp2 *pb.OperateTaskResponse + err2 error + ) + shouldRet := s.sharedLogic(ctx, req, &resp2, &err2) + if shouldRet { + return resp2, err2 } resp := &pb.OperateTaskResponse{ @@ -552,14 +550,13 @@ func (s *Server) OperateTask(ctx context.Context, req *pb.OperateTaskRequest) (* // GetSubTaskCfg implements MasterServer.GetSubTaskCfg func (s *Server) GetSubTaskCfg(ctx context.Context, req *pb.GetSubTaskCfgRequest) (*pb.GetSubTaskCfgResponse, error) { - log.L().Info("", zap.Stringer("payload", req), zap.String("request", "GetSubTaskCfg")) - - isLeader, needForward := s.isLeaderAndNeedForward() - if !isLeader { - if needForward { - return s.leaderClient.GetSubTaskCfg(ctx, req) - } - return nil, terror.ErrMasterRequestIsNotForwardToLeader + var ( + resp2 *pb.GetSubTaskCfgResponse + err2 error + ) + shouldRet := s.sharedLogic(ctx, req, &resp2, &err2) + if shouldRet { + return resp2, err2 } subCfgs := s.scheduler.GetSubTaskCfgsByTask(req.Name) @@ -592,14 +589,13 @@ func (s *Server) GetSubTaskCfg(ctx context.Context, req *pb.GetSubTaskCfgRequest // UpdateTask implements MasterServer.UpdateTask // TODO: support update task later func (s *Server) UpdateTask(ctx context.Context, req *pb.UpdateTaskRequest) (*pb.UpdateTaskResponse, error) { - log.L().Info("", zap.Stringer("payload", req), zap.String("request", "UpdateTask")) - - isLeader, needForward := s.isLeaderAndNeedForward() - if !isLeader { - if needForward { - return s.leaderClient.UpdateTask(ctx, req) - } - return nil, terror.ErrMasterRequestIsNotForwardToLeader + var ( + resp2 *pb.UpdateTaskResponse + err2 error + ) + shouldRet := s.sharedLogic(ctx, req, &resp2, &err2) + if shouldRet { + return resp2, err2 } cfg, stCfgs, err := s.generateSubTask(ctx, req.Task) @@ -727,14 +723,13 @@ func extractSources(s *Server, req hasWokers) ([]string, error) { // QueryStatus implements MasterServer.QueryStatus func (s *Server) QueryStatus(ctx context.Context, req *pb.QueryStatusListRequest) (*pb.QueryStatusListResponse, error) { - log.L().Info("", zap.Stringer("payload", req), zap.String("request", "QueryStatus")) - - isLeader, needForward := s.isLeaderAndNeedForward() - if !isLeader { - if needForward { - return s.leaderClient.QueryStatus(ctx, req) - } - return nil, terror.ErrMasterRequestIsNotForwardToLeader + var ( + resp2 *pb.QueryStatusListResponse + err2 error + ) + shouldRet := s.sharedLogic(ctx, req, &resp2, &err2) + if shouldRet { + return resp2, err2 } sources, err := extractSources(s, req) @@ -766,14 +761,13 @@ func (s *Server) QueryStatus(ctx context.Context, req *pb.QueryStatusListRequest // QueryError implements MasterServer.QueryError func (s *Server) QueryError(ctx context.Context, req *pb.QueryErrorListRequest) (*pb.QueryErrorListResponse, error) { - log.L().Info("", zap.Stringer("payload", req), zap.String("request", "QueryError")) - - isLeader, needForward := s.isLeaderAndNeedForward() - if !isLeader { - if needForward { - return s.leaderClient.QueryError(ctx, req) - } - return nil, terror.ErrMasterRequestIsNotForwardToLeader + var ( + resp2 *pb.QueryErrorListResponse + err2 error + ) + shouldRet := s.sharedLogic(ctx, req, &resp2, &err2) + if shouldRet { + return resp2, err2 } sources, err := extractSources(s, req) @@ -806,14 +800,13 @@ func (s *Server) QueryError(ctx context.Context, req *pb.QueryErrorListRequest) // ShowDDLLocks implements MasterServer.ShowDDLLocks func (s *Server) ShowDDLLocks(ctx context.Context, req *pb.ShowDDLLocksRequest) (*pb.ShowDDLLocksResponse, error) { - log.L().Info("", zap.Stringer("payload", req), zap.String("request", "ShowDDLLocks")) - - isLeader, needForward := s.isLeaderAndNeedForward() - if !isLeader { - if needForward { - return s.leaderClient.ShowDDLLocks(ctx, req) - } - return nil, terror.ErrMasterRequestIsNotForwardToLeader + var ( + resp2 *pb.ShowDDLLocksResponse + err2 error + ) + shouldRet := s.sharedLogic(ctx, req, &resp2, &err2) + if shouldRet { + return resp2, err2 } resp := &pb.ShowDDLLocksResponse{ @@ -834,14 +827,13 @@ func (s *Server) ShowDDLLocks(ctx context.Context, req *pb.ShowDDLLocksRequest) // UnlockDDLLock implements MasterServer.UnlockDDLLock // TODO(csuzhangxc): implement this later. func (s *Server) UnlockDDLLock(ctx context.Context, req *pb.UnlockDDLLockRequest) (*pb.UnlockDDLLockResponse, error) { - log.L().Info("", zap.String("lock ID", req.ID), zap.Stringer("payload", req), zap.String("request", "UnlockDDLLock")) - - isLeader, needForward := s.isLeaderAndNeedForward() - if !isLeader { - if needForward { - return s.leaderClient.UnlockDDLLock(ctx, req) - } - return nil, terror.ErrMasterRequestIsNotForwardToLeader + var ( + resp2 *pb.UnlockDDLLockResponse + err2 error + ) + shouldRet := s.sharedLogic(ctx, req, &resp2, &err2) + if shouldRet { + return resp2, err2 } resp := &pb.UnlockDDLLockResponse{} @@ -880,14 +872,13 @@ func (s *Server) UnlockDDLLock(ctx context.Context, req *pb.UnlockDDLLockRequest // PurgeWorkerRelay implements MasterServer.PurgeWorkerRelay func (s *Server) PurgeWorkerRelay(ctx context.Context, req *pb.PurgeWorkerRelayRequest) (*pb.PurgeWorkerRelayResponse, error) { - log.L().Info("", zap.Stringer("payload", req), zap.String("request", "PurgeWorkerRelay")) - - isLeader, needForward := s.isLeaderAndNeedForward() - if !isLeader { - if needForward { - return s.leaderClient.PurgeWorkerRelay(ctx, req) - } - return nil, terror.ErrMasterRequestIsNotForwardToLeader + var ( + resp2 *pb.PurgeWorkerRelayResponse + err2 error + ) + shouldRet := s.sharedLogic(ctx, req, &resp2, &err2) + if shouldRet { + return resp2, err2 } workerReq := &workerrpc.Request{ @@ -944,14 +935,13 @@ func (s *Server) PurgeWorkerRelay(ctx context.Context, req *pb.PurgeWorkerRelayR // SwitchWorkerRelayMaster implements MasterServer.SwitchWorkerRelayMaster func (s *Server) SwitchWorkerRelayMaster(ctx context.Context, req *pb.SwitchWorkerRelayMasterRequest) (*pb.SwitchWorkerRelayMasterResponse, error) { - log.L().Info("", zap.Stringer("payload", req), zap.String("request", "SwitchWorkerRelayMaster")) - - isLeader, needForward := s.isLeaderAndNeedForward() - if !isLeader { - if needForward { - return s.leaderClient.SwitchWorkerRelayMaster(ctx, req) - } - return nil, terror.ErrMasterRequestIsNotForwardToLeader + var ( + resp2 *pb.SwitchWorkerRelayMasterResponse + err2 error + ) + shouldRet := s.sharedLogic(ctx, req, &resp2, &err2) + if shouldRet { + return resp2, err2 } workerRespCh := make(chan *pb.CommonWorkerResponse, len(req.Sources)) @@ -1015,14 +1005,13 @@ func (s *Server) SwitchWorkerRelayMaster(ctx context.Context, req *pb.SwitchWork // OperateWorkerRelayTask implements MasterServer.OperateWorkerRelayTask func (s *Server) OperateWorkerRelayTask(ctx context.Context, req *pb.OperateWorkerRelayRequest) (*pb.OperateWorkerRelayResponse, error) { - log.L().Info("", zap.Stringer("payload", req), zap.String("request", "OperateWorkerRelayTask")) - - isLeader, needForward := s.isLeaderAndNeedForward() - if !isLeader { - if needForward { - return s.leaderClient.OperateWorkerRelayTask(ctx, req) - } - return nil, terror.ErrMasterRequestIsNotForwardToLeader + var ( + resp2 *pb.OperateWorkerRelayResponse + err2 error + ) + shouldRet := s.sharedLogic(ctx, req, &resp2, &err2) + if shouldRet { + return resp2, err2 } resp := &pb.OperateWorkerRelayResponse{ @@ -1192,14 +1181,13 @@ func (s *Server) checkTaskAndWorkerMatch(taskname string, targetWorker string) b // UpdateMasterConfig implements MasterServer.UpdateConfig func (s *Server) UpdateMasterConfig(ctx context.Context, req *pb.UpdateMasterConfigRequest) (*pb.UpdateMasterConfigResponse, error) { - log.L().Info("", zap.Stringer("payload", req), zap.String("request", "UpdateMasterConfig")) - - isLeader, needForward := s.isLeaderAndNeedForward() - if !isLeader { - if needForward { - return s.leaderClient.UpdateMasterConfig(ctx, req) - } - return nil, terror.ErrMasterRequestIsNotForwardToLeader + var ( + resp2 *pb.UpdateMasterConfigResponse + err2 error + ) + shouldRet := s.sharedLogic(ctx, req, &resp2, &err2) + if shouldRet { + return resp2, err2 } s.Lock() @@ -1235,14 +1223,13 @@ func (s *Server) UpdateMasterConfig(ctx context.Context, req *pb.UpdateMasterCon // UpdateWorkerRelayConfig updates config for relay and (dm-worker) func (s *Server) UpdateWorkerRelayConfig(ctx context.Context, req *pb.UpdateWorkerRelayConfigRequest) (*pb.CommonWorkerResponse, error) { - log.L().Info("", zap.Stringer("payload", req), zap.String("request", "UpdateWorkerRelayConfig")) - - isLeader, needForward := s.isLeaderAndNeedForward() - if !isLeader { - if needForward { - return s.leaderClient.UpdateWorkerRelayConfig(ctx, req) - } - return nil, terror.ErrMasterRequestIsNotForwardToLeader + var ( + resp2 *pb.CommonWorkerResponse + err2 error + ) + shouldRet := s.sharedLogic(ctx, req, &resp2, &err2) + if shouldRet { + return resp2, err2 } source := req.Source @@ -1279,14 +1266,13 @@ func (s *Server) getSourceConfigs(sources []*config.MySQLInstance) (map[string]c // MigrateWorkerRelay migrates dm-woker relay unit func (s *Server) MigrateWorkerRelay(ctx context.Context, req *pb.MigrateWorkerRelayRequest) (*pb.CommonWorkerResponse, error) { - log.L().Info("", zap.Stringer("payload", req), zap.String("request", "MigrateWorkerRelay")) - - isLeader, needForward := s.isLeaderAndNeedForward() - if !isLeader { - if needForward { - return s.leaderClient.MigrateWorkerRelay(ctx, req) - } - return nil, terror.ErrMasterRequestIsNotForwardToLeader + var ( + resp2 *pb.CommonWorkerResponse + err2 error + ) + shouldRet := s.sharedLogic(ctx, req, &resp2, &err2) + if shouldRet { + return resp2, err2 } source := req.Source @@ -1310,14 +1296,13 @@ func (s *Server) MigrateWorkerRelay(ctx context.Context, req *pb.MigrateWorkerRe // CheckTask checks legality of task configuration func (s *Server) CheckTask(ctx context.Context, req *pb.CheckTaskRequest) (*pb.CheckTaskResponse, error) { - log.L().Info("", zap.Stringer("payload", req), zap.String("request", "CheckTask")) - - isLeader, needForward := s.isLeaderAndNeedForward() - if !isLeader { - if needForward { - return s.leaderClient.CheckTask(ctx, req) - } - return nil, terror.ErrMasterRequestIsNotForwardToLeader + var ( + resp2 *pb.CheckTaskResponse + err2 error + ) + shouldRet := s.sharedLogic(ctx, req, &resp2, &err2) + if shouldRet { + return resp2, err2 } _, _, err := s.generateSubTask(ctx, req.Task) @@ -1362,14 +1347,13 @@ func parseAndAdjustSourceConfig(contents []string) ([]*config.SourceConfig, erro // OperateSource will create or update an upstream source. func (s *Server) OperateSource(ctx context.Context, req *pb.OperateSourceRequest) (*pb.OperateSourceResponse, error) { - log.L().Info("", zap.String("payload", utils.HidePassword(req.String())), zap.String("request", "OperateSource")) - - isLeader, needForward := s.isLeaderAndNeedForward() - if !isLeader { - if needForward { - return s.leaderClient.OperateSource(ctx, req) - } - return nil, terror.ErrMasterRequestIsNotForwardToLeader + var ( + resp2 *pb.OperateSourceResponse + err2 error + ) + shouldRet := s.sharedLogic(ctx, req, &resp2, &err2) + if shouldRet { + return resp2, err2 } cfgs, err := parseAndAdjustSourceConfig(req.Config) @@ -1959,14 +1943,13 @@ func (s *Server) listMemberLeader(ctx context.Context, names []string) (*pb.Memb // ListMember list member information func (s *Server) ListMember(ctx context.Context, req *pb.ListMemberRequest) (*pb.ListMemberResponse, error) { - log.L().Info("", zap.Stringer("payload", req), zap.String("request", "ListMember")) - - isLeader, needForward := s.isLeaderAndNeedForward() - if !isLeader { - if needForward { - return s.leaderClient.ListMember(ctx, req) - } - return nil, terror.ErrMasterRequestIsNotForwardToLeader + var ( + resp2 *pb.ListMemberResponse + err2 error + ) + shouldRet := s.sharedLogic(ctx, req, &resp2, &err2) + if shouldRet { + return resp2, err2 } if !req.Leader && !req.Master && !req.Worker { @@ -2018,14 +2001,13 @@ func (s *Server) ListMember(ctx context.Context, req *pb.ListMemberRequest) (*pb // OperateSchema operates schema of an upstream table. func (s *Server) OperateSchema(ctx context.Context, req *pb.OperateSchemaRequest) (*pb.OperateSchemaResponse, error) { - log.L().Info("", zap.Stringer("payload", req), zap.String("request", "OperateSchema")) - - isLeader, needForward := s.isLeaderAndNeedForward() - if !isLeader { - if needForward { - return s.leaderClient.OperateSchema(ctx, req) - } - return nil, terror.ErrMasterRequestIsNotForwardToLeader + var ( + resp2 *pb.OperateSchemaResponse + err2 error + ) + shouldRet := s.sharedLogic(ctx, req, &resp2, &err2) + if shouldRet { + return resp2, err2 } if len(req.Sources) == 0 { @@ -2093,14 +2075,13 @@ func (s *Server) OperateSchema(ctx context.Context, req *pb.OperateSchemaRequest // GetTaskCfg implements MasterServer.GetSubTaskCfg func (s *Server) GetTaskCfg(ctx context.Context, req *pb.GetTaskCfgRequest) (*pb.GetTaskCfgResponse, error) { - log.L().Info("", zap.Stringer("payload", req), zap.String("request", "GetTaskCfg")) - - isLeader, needForward := s.isLeaderAndNeedForward() - if !isLeader { - if needForward { - return s.leaderClient.GetTaskCfg(ctx, req) - } - return nil, terror.ErrMasterRequestIsNotForwardToLeader + var ( + resp2 *pb.GetTaskCfgResponse + err2 error + ) + shouldRet := s.sharedLogic(ctx, req, &resp2, &err2) + if shouldRet { + return resp2, err2 } cfg := s.scheduler.GetTaskCfg(req.Name) @@ -2120,14 +2101,13 @@ func (s *Server) GetTaskCfg(ctx context.Context, req *pb.GetTaskCfgRequest) (*pb // HandleError implements MasterServer.HandleError func (s *Server) HandleError(ctx context.Context, req *pb.HandleErrorRequest) (*pb.HandleErrorResponse, error) { - log.L().Info("", zap.Stringer("payload", req), zap.String("request", "HandleError")) - - isLeader, needForward := s.isLeaderAndNeedForward() - if !isLeader { - if needForward { - return s.leaderClient.HandleError(ctx, req) - } - return nil, terror.ErrMasterRequestIsNotForwardToLeader + var ( + resp2 *pb.HandleErrorResponse + err2 error + ) + shouldRet := s.sharedLogic(ctx, req, &resp2, &err2) + if shouldRet { + return resp2, err2 } sources := req.Sources @@ -2191,3 +2171,46 @@ func (s *Server) HandleError(ctx context.Context, req *pb.HandleErrorRequest) (* Sources: workerResps, }, nil } + +// sharedLogic does some shared logic for each RPC implementation +// arguments with `Pointer` suffix should be pointer to that variable its name indicated +// return `true` means caller should return with variable that `xxPointer` modified +func (s *Server) sharedLogic(ctx context.Context, req interface{}, respPointer interface{}, errPointer *error) bool { + pc, _, _, _ := runtime.Caller(1) + fullMethodName := runtime.FuncForPC(pc).Name() + methodName := fullMethodName[strings.LastIndexByte(fullMethodName, '.')+1:] + + log.L().Info("", zap.Any("payload", req), zap.String("request", methodName)) + + // origin code: + // isLeader, needForward := s.isLeaderAndNeedForward() + // if !isLeader { + // if needForward { + // return s.leaderClient.ListMember(ctx, req) + // } + // return nil, terror.ErrMasterRequestIsNotForwardToLeader + // } + isLeader, needForward := s.isLeaderAndNeedForward() + if isLeader { + return false + } + if needForward { + log.L().Info("forwarding", zap.String("from", s.cfg.Name), zap.String("to", s.leader), zap.String("request", methodName)) + params := []reflect.Value{reflect.ValueOf(ctx), reflect.ValueOf(req)} + results := reflect.ValueOf(s.leaderClient).MethodByName(methodName).Call(params) + // result's inner types should be (*pb.XXResponse, error), which is same as s.leaderClient.XXRPCMethod + reflect.ValueOf(respPointer).Elem().Set(results[0]) + errInterface := results[1].Interface() + // nil can't pass type conversion, so we handle it separately + if errInterface == nil { + *errPointer = nil + } else { + *errPointer = errInterface.(error) + } + return true + } + respType := reflect.ValueOf(respPointer).Elem().Type() + reflect.ValueOf(respPointer).Elem().Set(reflect.Zero(respType)) + *errPointer = terror.ErrMasterRequestIsNotForwardToLeader + return true +} diff --git a/dm/pb/hide_password.go b/dm/pb/hide_password.go new file mode 100644 index 0000000000..e0ac1da586 --- /dev/null +++ b/dm/pb/hide_password.go @@ -0,0 +1,24 @@ +package pb + +import ( + "go.uber.org/zap/zapcore" +) + +var ( + // HidePwdFunc should be overwrite by utils.HidePassword, this variable is for avoiding cycle import + HidePwdFunc = func(s string) string { + return s + } +) + +// MarshalLogObject implements zapcore.ObjectMarshaler +func (m *StartTaskRequest) MarshalLogObject(enc zapcore.ObjectEncoder) error { + enc.AddString("HidePasswordObject", HidePwdFunc(m.String())) + return nil +} + +// MarshalLogObject implements zapcore.ObjectMarshaler +func (m *OperateSourceRequest) MarshalLogObject(enc zapcore.ObjectEncoder) error { + enc.AddString("HidePasswordObject", HidePwdFunc(m.String())) + return nil +} diff --git a/pkg/utils/util.go b/pkg/utils/util.go index 6852c5074a..e5665c1221 100644 --- a/pkg/utils/util.go +++ b/pkg/utils/util.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/errors" "github.com/siddontang/go-mysql/mysql" + "github.com/pingcap/dm/dm/pb" "github.com/pingcap/dm/pkg/terror" ) @@ -96,6 +97,7 @@ func init() { OsExit = os.Exit builtInSkipDDLPatterns = regexp.MustCompile("(?i)" + strings.Join(builtInSkipDDLs, "|")) passwordRegexp = regexp.MustCompile(passwordPatterns) + pb.HidePwdFunc = HidePassword } // DecodeBinlogPosition parses a mysql.Position from string format