Skip to content

Commit

Permalink
HA: refactor unlock-ddl-lock and break-ddl-lock (pingcap#522)
Browse files Browse the repository at this point in the history
  • Loading branch information
csuzhangxc authored Mar 9, 2020
1 parent a711a1c commit 263b86b
Show file tree
Hide file tree
Showing 24 changed files with 1,099 additions and 1,253 deletions.
1 change: 0 additions & 1 deletion dm/ctl/ctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ func NewRootCmd() *cobra.Command {
master.NewSQLInjectCmd(),
master.NewShowDDLLocksCmd(),
master.NewUnlockDDLLockCmd(),
master.NewBreakDDLLockCmd(),
master.NewSwitchRelayMasterCmd(),
master.NewPauseRelayCmd(),
master.NewResumeRelayCmd(),
Expand Down
104 changes: 0 additions & 104 deletions dm/ctl/master/break_ddl_lock.go

This file was deleted.

11 changes: 5 additions & 6 deletions dm/ctl/master/unlock_ddl_lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func NewUnlockDDLLockCmd() *cobra.Command {
Short: "forcefully unlock DDL lock",
Run: unlockDDLLockFunc,
}
cmd.Flags().StringP("owner", "o", "", "DM-worker to replace the default owner")
cmd.Flags().StringP("owner", "o", "", "source to replace the default owner")
cmd.Flags().BoolP("force-remove", "f", false, "force to remove DDL lock")
return cmd
}
Expand All @@ -52,9 +52,9 @@ func unlockDDLLockFunc(cmd *cobra.Command, _ []string) {

lockID := cmd.Flags().Arg(0)

sources, err := common.GetSourceArgs(cmd)
if err != nil {
fmt.Println(errors.ErrorStack(err))
sources, _ := common.GetSourceArgs(cmd)
if len(sources) > 0 {
fmt.Println("shoud not specify any sources")
return
}

Expand All @@ -70,11 +70,10 @@ func unlockDDLLockFunc(cmd *cobra.Command, _ []string) {
resp, err := cli.UnlockDDLLock(ctx, &pb.UnlockDDLLockRequest{
ID: lockID,
ReplaceOwner: owner,
Sources: sources,
ForceRemove: forceRemove,
})
if err != nil {
common.PrintLines("can not unlock DDL lock %s (in sources %v):\n%s", lockID, sources, errors.ErrorStack(err))
common.PrintLines("can not unlock DDL lock %s \n%s", lockID, err.Error())
return
}

Expand Down
6 changes: 6 additions & 0 deletions dm/master/election.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func (s *Server) electionNotify(ctx context.Context) {
// retire from leader
if leaderInfo == nil {
if s.leader == oneselfLeader {
s.pessimist.Close()
s.scheduler.Close()

s.Lock()
Expand All @@ -61,6 +62,11 @@ func (s *Server) electionNotify(ctx context.Context) {
log.L().Error("scheduler do not started", zap.Error(err))
}

err = s.pessimist.Start(ctx, s.etcdClient)
if err != nil {
log.L().Error("pessimist do not started", zap.Error(err))
}

s.Lock()
s.leader = oneselfLeader
s.closeLeaderClient()
Expand Down
41 changes: 20 additions & 21 deletions dm/master/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,12 +180,6 @@ func (s *Server) Start(ctx context.Context) (err error) {
return
}

// start the shard DDL pessimist.
err = s.pessimist.Start(ctx, s.etcdClient)
if err != nil {
return
}

s.closed.Set(false) // the server started now.

s.bgFunWg.Add(1)
Expand Down Expand Up @@ -226,8 +220,6 @@ func (s *Server) Close() {
s.Lock()
defer s.Unlock()

s.pessimist.Close()

if s.election != nil {
s.election.Close()
}
Expand Down Expand Up @@ -682,6 +674,8 @@ func (s *Server) ShowDDLLocks(ctx context.Context, req *pb.ShowDDLLocksRequest)
l.Unsynced = append(l.Unsynced, worker)
}
}
sort.Strings(l.Synced)
sort.Strings(l.Unsynced)
resp.Locks = append(resp.Locks, l)
}

Expand All @@ -695,20 +689,25 @@ func (s *Server) ShowDDLLocks(ctx context.Context, req *pb.ShowDDLLocksRequest)
// TODO(csuzhangxc): implement this later.
func (s *Server) UnlockDDLLock(ctx context.Context, req *pb.UnlockDDLLockRequest) (*pb.UnlockDDLLockResponse, error) {
log.L().Info("", zap.String("lock ID", req.ID), zap.Stringer("payload", req), zap.String("request", "UnlockDDLLock"))
return &pb.UnlockDDLLockResponse{
Result: false,
Msg: "not implement",
}, nil
}

// BreakWorkerDDLLock implements MasterServer.BreakWorkerDDLLock
// TODO(csuzhangxc): implement this later.
func (s *Server) BreakWorkerDDLLock(ctx context.Context, req *pb.BreakWorkerDDLLockRequest) (*pb.BreakWorkerDDLLockResponse, error) {
log.L().Info("", zap.String("lock ID", req.RemoveLockID), zap.Stringer("payload", req), zap.String("request", "BreakWorkerDDLLock"))
return &pb.BreakWorkerDDLLockResponse{
Result: false,
Msg: "not implement",
}, nil
isLeader, needForward := s.isLeaderAndNeedForward()
if !isLeader {
if needForward {
return s.leaderClient.UnlockDDLLock(ctx, req)
}
return nil, terror.ErrMasterRequestIsNotForwardToLeader
}

resp := &pb.UnlockDDLLockResponse{
Result: true,
}
err := s.pessimist.UnlockLock(ctx, req.ID, req.ReplaceOwner, req.ForceRemove)
if err != nil {
resp.Result = false
resp.Msg = terror.Message(err)
}

return resp, nil
}

// HandleSQLs implements MasterServer.HandleSQLs
Expand Down
Loading

0 comments on commit 263b86b

Please sign in to comment.