From 46f79e44e6e1a0e0d88eef68f34bce2f0dc0e80f Mon Sep 17 00:00:00 2001 From: cfzjywxk Date: Thu, 14 Nov 2019 20:48:32 +0800 Subject: [PATCH] support distributed dead lock detection --- tikv/deadlock.go | 326 +++++++++++++++++++++++++++++++++++++ tikv/mvcc.go | 48 ++++-- tikv/raftstore/fsm_peer.go | 2 + tikv/raftstore/peer.go | 1 + tikv/region.go | 26 ++- tikv/server.go | 49 +++++- unistore-server/main.go | 8 +- 7 files changed, 445 insertions(+), 15 deletions(-) create mode 100644 tikv/deadlock.go diff --git a/tikv/deadlock.go b/tikv/deadlock.go new file mode 100644 index 00000000..289069f9 --- /dev/null +++ b/tikv/deadlock.go @@ -0,0 +1,326 @@ +package tikv + +import ( + "context" + "errors" + "github.com/ngaut/log" + "github.com/ngaut/unistore/pd" + deadlockPb "github.com/pingcap/kvproto/pkg/deadlock" + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/tidb/util/deadlock" + "google.golang.org/grpc" + "math" + "sync" + "sync/atomic" +) + +// Follower will send detection rpc to Leader +const ( + Follower = iota + Leader +) + +type TaskType int32 +const ( + DetectTask TaskType = 0 + CleanUpWaitForTask TaskType = 1 + CleanupTask TaskType = 2 + RoleChange TaskType = 3 +) + +type Task struct { + taskType TaskType + txnTs uint64 + lockTs uint64 + keyHash uint64 + nextRole int32 + result chan error + isRpc bool +} + +type detectionConn struct { + streamMu sync.Mutex + stream deadlockPb.Deadlock_DetectClient + ctx context.Context + cancel context.CancelFunc +} + +type detectionClient struct { + sync.RWMutex + conns map[string]*detectionConn +} + +type DeadlockDetector struct { + detector *deadlock.Detector + role int32 + client *detectionClient + pdClient pd.Client + storeMeta *metapb.Store + leaderStoreMeta *metapb.Store + mu sync.RWMutex +} + +func NewDetectionConn(addr string) (*detectionConn, error) { + cc, err := grpc.Dial(addr, grpc.WithInsecure()) + if err != nil { + return nil, err + } + ctx, cancel := context.WithCancel(context.Background()) + stream, err := deadlockPb.NewDeadlockClient(cc).Detect(ctx) + if err != nil { + cancel() + return nil, err + } + dtConn := &detectionConn{ + stream: stream, + ctx: ctx, + cancel: cancel, + } + return dtConn, nil +} + +func (c *detectionConn) Stop() { + c.cancel() +} + +func (c *detectionConn) Send(req *deadlockPb.DeadlockRequest) error { + c.streamMu.Lock() + defer c.streamMu.Unlock() + return c.stream.Send(req) +} + +func (c *detectionClient) getConn(addr string) (*detectionConn, error) { + c.RLock() + conn, ok := c.conns[addr] + if ok { + c.RUnlock() + return conn, nil + } + c.RUnlock() + newConn, err := NewDetectionConn(addr) + if err != nil { + return nil, err + } + c.Lock() + defer c.Unlock() + if conn, ok := c.conns[addr]; ok { + newConn.Stop() + return conn, nil + } + c.conns[addr] = newConn + return newConn, nil +} + +func (c *detectionClient) Send(addr string, req *deadlockPb.DeadlockRequest) error { + conn, err := c.getConn(addr) + if err != nil { + return err + } + err = conn.Send(req) + if err == nil { + return nil + } + log.Error("detectionClient failed to send") + // refresh the leader addr + c.Lock() + defer c.Unlock() + conn.Stop() + delete(c.conns, addr) + return err +} + +func (c *detectionClient) Recv(addr string) (*deadlockPb.DeadlockResponse, error) { + conn, err := c.getConn(addr) + if err != nil { + return nil, err + } + return conn.stream.Recv() +} + +func NewDetectionClient() *detectionClient { + return &detectionClient{conns:make(map[string]*detectionConn)} +} + +func NewDeadlockDetector() *DeadlockDetector { + newDetector := &DeadlockDetector{ + detector: deadlock.NewDetector(), + role: Follower, + client: NewDetectionClient(), + } + return newDetector +} + +func (dt *DeadlockDetector) setLeaderStoreMeta(newMeta *metapb.Store) { + dt.mu.Lock() + defer dt.mu.Unlock() + dt.leaderStoreMeta = newMeta +} + +func (dt *DeadlockDetector) getLeaderStoreMeta() *metapb.Store { + dt.mu.RLock() + defer dt.mu.RUnlock() + return dt.leaderStoreMeta +} + +func (dt *DeadlockDetector) refreshFirstRegionLeader() error { + // find first region from pd, get the first region leader + ctx := context.Background() + region, err := dt.pdClient.GetRegion(ctx, []byte{}) + if err != nil { + log.Errorf("get first region failed, err: %v", err) + return err + } + var peerLeader *metapb.Peer + for _, peer := range region.GetPeers() { + if !peer.GetIsLearner() { + peerLeader = peer + break + } + } + if peerLeader == nil { + return errors.New("get first region leader failed") + } + log.Infof("[ray] leader StoreId=%v", peerLeader.GetStoreId()) + leaderStoreMeta, err := dt.pdClient.GetStore(ctx, peerLeader.GetStoreId()) + if err != nil { + return err + } + dt.setLeaderStoreMeta(leaderStoreMeta) + log.Infof("[ray] leader addr=%s", leaderStoreMeta.GetAddress()) + if leaderStoreMeta.GetId() == dt.storeMeta.GetId() { + log.Infof("[ray] refreshFirstRegionLeader found local is leader") + dt.RoleChange(Leader) + } + return nil +} + +func (dt *DeadlockDetector) CleanUp(startTs uint64, rpc bool) { + task := &Task{taskType:CleanupTask, txnTs:startTs, result:make(chan error, 1), isRpc:rpc} + _ = dt.handleOneTask(task) +} + +func (dt *DeadlockDetector) Detect(txnTs uint64, lockTs uint64, keyHash uint64, rpc bool) error { + task := &Task{taskType:DetectTask, txnTs:txnTs, lockTs:lockTs, + keyHash:keyHash, result:make(chan error, 1), isRpc:rpc} + err := dt.handleOneTask(task) + if err != nil { + // convert to local ErrDeadlock + if _, ok := err.(*deadlock.ErrDeadlock); ok { + log.Errorf("deadlock found return ErrDeadlock") + return &ErrDeadlock{DeadlockKeyHash:keyHash} + } + return err + } + return nil +} + +func (dt *DeadlockDetector) CleanUpWaitFor(txn, waitForTxn, keyHash uint64, rpc bool) { + task := &Task{taskType:CleanUpWaitForTask, txnTs:txn, lockTs:waitForTxn, + keyHash:keyHash, result:make(chan error, 1), isRpc:rpc} + _ = dt.handleOneTask(task) +} + +func (dt *DeadlockDetector) RoleChange(newRole int32) { + task := &Task {taskType:RoleChange, nextRole: newRole} + _ = dt.handleOneTask(task) +} + +func (dt *DeadlockDetector) handleLocalTask(task *Task) error { + switch task.taskType { + case DetectTask: + err := dt.detector.Detect(task.txnTs, task.lockTs, task.keyHash) + if err != nil { + return err + } + case CleanUpWaitForTask: + dt.detector.CleanUpWaitFor(task.txnTs, task.lockTs, task.keyHash) + case CleanupTask: + dt.detector.CleanUp(task.txnTs) + case RoleChange: + dt.ChangeRole(task.nextRole) + } + return nil +} + +func (dt *DeadlockDetector) sendReqToLeader(req *deadlockPb.DeadlockRequest) error { + for { + if !dt.isLeader() { + addr := dt.getLeaderStoreMeta().GetAddress() + err := dt.client.Send(addr, req) + if err != nil { + log.Errorf("send request failed to addr=%s", addr) + refreshErr := dt.refreshFirstRegionLeader() + if refreshErr != nil { + log.Errorf("cannot get leader from pd") + return refreshErr + } + continue + } + resp, err := dt.client.Recv(addr) + if err != nil { + log.Errorf("receive response failed from addr=%s", addr) + refreshErr := dt.refreshFirstRegionLeader() + if refreshErr != nil { + log.Errorf("cannot get leader from pd") + return refreshErr + } + continue + } + // MaxUint64 means non deadlock error, possible dest is not leader anymore + if resp.DeadlockKeyHash == math.MaxUint64 { + log.Errorf("dest possible not leader, need refresh") + refreshErr := dt.refreshFirstRegionLeader() + if refreshErr != nil { + log.Errorf("cannot get leader from pd") + return refreshErr + } + continue + } else { + log.Errorf("deadlock found lock key hash=%s", resp.DeadlockKeyHash) + return &deadlock.ErrDeadlock{KeyHash: resp.DeadlockKeyHash} + } + } else { + localTask := Task{taskType:TaskType(req.Tp), txnTs:req.Entry.Txn, + lockTs:req.Entry.WaitForTxn, keyHash:req.Entry.KeyHash} + return dt.handleLocalTask(&localTask) + } + break + } + return nil +} + +func (dt *DeadlockDetector) handleRemoteTask(task *Task) error { + switch task.taskType { + case DetectTask, CleanUpWaitForTask, CleanupTask: + detectReq := &deadlockPb.DeadlockRequest{} + detectReq.Tp = deadlockPb.DeadlockRequestType(task.taskType) + detectReq.Entry.Txn = task.txnTs + detectReq.Entry.WaitForTxn = task.lockTs + detectReq.Entry.KeyHash = task.keyHash + err := dt.sendReqToLeader(detectReq) + if err != nil { + return err + } + case RoleChange: + dt.ChangeRole(task.nextRole) + } + return nil +} + +func (dt *DeadlockDetector) handleOneTask(task *Task) error { + if dt.isLeader() { + return dt.handleLocalTask(task) + } + if task.isRpc { + return errors.New("not leader node") + } + return dt.handleRemoteTask(task) +} + +func (dt *DeadlockDetector) isLeader() bool { + return atomic.LoadInt32(&dt.role) == Leader +} + +func (dt *DeadlockDetector) ChangeRole(newRole int32) { + atomic.StoreInt32(&dt.role, newRole) +} diff --git a/tikv/mvcc.go b/tikv/mvcc.go index a520c391..33825826 100644 --- a/tikv/mvcc.go +++ b/tikv/mvcc.go @@ -3,6 +3,7 @@ package tikv import ( "bufio" "bytes" + "context" "encoding/binary" "math" "os" @@ -16,14 +17,15 @@ import ( "github.com/juju/errors" "github.com/ngaut/log" "github.com/ngaut/unistore/lockstore" + "github.com/ngaut/unistore/pd" "github.com/ngaut/unistore/rowcodec" "github.com/ngaut/unistore/tikv/dbreader" "github.com/ngaut/unistore/tikv/mvcc" + "github.com/ngaut/unistore/tikv/raftstore" "github.com/ngaut/unistore/util/lockwaiter" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/util/codec" - "github.com/pingcap/tidb/util/deadlock" ) // MVCCStore is a wrapper of badger.DB to provide MVCC functions. @@ -38,8 +40,8 @@ type MVCCStore struct { gcLock sync.Mutex latestTS uint64 - deadlockDetector *deadlock.Detector lockWaiterManager *lockwaiter.Manager + deadlockDetector *DeadlockDetector } // NewMVCCStore creates a new MVCCStore @@ -51,8 +53,8 @@ func NewMVCCStore(bundle *mvcc.DBBundle, dataDir string, safePoint *SafePoint, w rollbackStore: bundle.RollbackStore, safePoint: safePoint, dbWriter: writer, - deadlockDetector: deadlock.NewDetector(), lockWaiterManager: lockwaiter.NewManager(), + deadlockDetector: NewDeadlockDetector(), } store.loadSafePoint() writer.Open() @@ -216,7 +218,7 @@ func (store *MVCCStore) PessimisticRollback(reqCtx *requestCtx, req *kvrpcpb.Pes err = store.dbWriter.Write(batch) } store.lockWaiterManager.WakeUp(startTS, 0, hashVals) - store.deadlockDetector.CleanUp(startTS) + store.deadlockDetector.CleanUp(startTS, false) return err } @@ -292,12 +294,20 @@ func (store *MVCCStore) handleCheckPessimisticErr(startTS uint64, err error, isF if lock, ok := err.(*ErrLocked); ok { keyHash := farm.Fingerprint64(lock.Key) if !isFirstLock { // No need to detect deadlock if it's first lock. - if err1 := store.deadlockDetector.Detect(startTS, lock.StartTS, keyHash); err1 != nil { - return nil, &ErrDeadlock{ - LockKey: lock.Key, - LockTS: lock.StartTS, - DeadlockKeyHash: *(*uint64)(unsafe.Pointer(err1)), // TODO: update here when keyHash is exported. + if err1 := store.deadlockDetector.Detect(startTS, lock.StartTS, keyHash, false); err1 != nil { + deadLockErr, ok := err1.(*ErrDeadlock) + if ok { + log.Errorf("deadlock found LockKey=%v, LockTS=%v, DeadlockKeyHash=%s", + deadLockErr.LockKey, deadLockErr.LockTS, deadLockErr.DeadlockKeyHash) + return nil, &ErrDeadlock{ + LockKey: lock.Key, + LockTS: lock.StartTS, + DeadlockKeyHash: deadLockErr.DeadlockKeyHash, + } } + // debug use + panic(err1) + return nil, err1 } } log.Infof("%d blocked by %d on key %d", startTS, lock.StartTS, keyHash) @@ -547,7 +557,7 @@ func (store *MVCCStore) Commit(req *requestCtx, keys [][]byte, startTS, commitTS err := store.dbWriter.Write(batch) store.lockWaiterManager.WakeUp(startTS, commitTS, hashVals) if isPessimisticTxn { - store.deadlockDetector.CleanUp(startTS) + store.deadlockDetector.CleanUp(startTS, false) } return err } @@ -608,7 +618,7 @@ func (store *MVCCStore) Rollback(reqCtx *requestCtx, keys [][]byte, startTS uint return err } } - store.deadlockDetector.CleanUp(startTS) + store.deadlockDetector.CleanUp(startTS, false) err := store.dbWriter.Write(batch) return errors.Trace(err) } @@ -859,6 +869,22 @@ func (store *MVCCStore) GC(reqCtx *requestCtx, safePoint uint64) error { return nil } +func (store *MVCCStore) StartDeadlockDetection(ctx context.Context, pdClient pd.Client, + innerSrv InnerServer, isRaft bool) error { + if !isRaft { + store.deadlockDetector.ChangeRole(Leader) + } else { + store.deadlockDetector.pdClient = pdClient + store.deadlockDetector.storeMeta = innerSrv.(*raftstore.RaftInnerServer).GetStoreMeta() + err := store.deadlockDetector.refreshFirstRegionLeader() + if err != nil { + return nil + } + } + log.Infof("[ray] StartDeadlockDetection finished started as role=%v", store.deadlockDetector.role) + return nil +} + type SafePoint struct { timestamp uint64 } diff --git a/tikv/raftstore/fsm_peer.go b/tikv/raftstore/fsm_peer.go index 470e3ca8..8f7729bf 100644 --- a/tikv/raftstore/fsm_peer.go +++ b/tikv/raftstore/fsm_peer.go @@ -41,6 +41,8 @@ type PeerEventObserver interface { OnSplitRegion(derived *metapb.Region, regions []*metapb.Region, peers []*PeerEventContext) // OnRegionConfChange will be invoked after conf change updated region's epoch. OnRegionConfChange(ctx *PeerEventContext, epoch *metapb.RegionEpoch) + // OnRoleChange will be invoked after peer state has changed + OnRoleChange(ctx *PeerEventContext, newState raft.StateType) } // If we create the peer actively, like bootstrap/split/merge region, we should diff --git a/tikv/raftstore/peer.go b/tikv/raftstore/peer.go index e82ccd70..6c28a196 100644 --- a/tikv/raftstore/peer.go +++ b/tikv/raftstore/peer.go @@ -719,6 +719,7 @@ func (p *Peer) OnRoleChanged(ctx *PollContext, ready *raft.Ready) { } else if ss.RaftState == raft.StateFollower { p.leaderLease.Expire() } + ctx.peerEventObserver.OnRoleChange(p.getEventContext(), ss.RaftState) } } diff --git a/tikv/region.go b/tikv/region.go index 0b5777e4..271c15ad 100644 --- a/tikv/region.go +++ b/tikv/region.go @@ -20,6 +20,7 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/tidb/util/codec" + "github.com/zhangjinpeng1987/raft" "golang.org/x/net/context" ) @@ -258,15 +259,17 @@ func (rm *regionManager) isEpochStale(lhs, rhs *metapb.RegionEpoch) bool { type RaftRegionManager struct { regionManager router *raftstore.RaftstoreRouter + mvccStore *MVCCStore } -func NewRaftRegionManager(store *metapb.Store, router *raftstore.RaftstoreRouter) *RaftRegionManager { +func NewRaftRegionManager(store *metapb.Store, router *raftstore.RaftstoreRouter, mvccStore *MVCCStore) *RaftRegionManager { return &RaftRegionManager{ - router: router, regionManager: regionManager{ storeMeta: store, regions: make(map[uint64]*regionCtx), }, + router: router, + mvccStore: mvccStore, } } @@ -313,6 +316,25 @@ func (rm *RaftRegionManager) OnRegionConfChange(ctx *raftstore.PeerEventContext, region.updateRegionEpoch(epoch) } +func (rm *RaftRegionManager) OnRoleChange(ctx *raftstore.PeerEventContext, newState raft.StateType) { + rm.mu.RLock() + region, ok := rm.regions[ctx.RegionId] + rm.mu.RUnlock() + if ok { + if bytes.Compare(region.startKey, []byte{}) == 0 { + newRole := Follower + if newState == raft.StateLeader { + newRole = Leader + } + log.Infof("[ray] first region role change newRole=%v", newRole) + rm.mvccStore.deadlockDetector.RoleChange(int32(newRole)) + } + } else { + // should not be here + log.Warnf("[ray] regionid=%v not found", ctx.RegionId) + } +} + func (rm *RaftRegionManager) GetRegionFromCtx(ctx *kvrpcpb.Context) (*regionCtx, *errorpb.Error) { regionCtx, err := rm.regionManager.GetRegionFromCtx(ctx) if err != nil { diff --git a/tikv/server.go b/tikv/server.go index a415eff3..dfb3f158 100644 --- a/tikv/server.go +++ b/tikv/server.go @@ -2,6 +2,8 @@ package tikv import ( "fmt" + "io" + "math" "sync" "sync/atomic" "time" @@ -13,6 +15,7 @@ import ( "github.com/ngaut/unistore/tikv/raftstore" "github.com/ngaut/unistore/util/lockwaiter" "github.com/pingcap/kvproto/pkg/coprocessor" + "github.com/pingcap/kvproto/pkg/deadlock" "github.com/pingcap/kvproto/pkg/errorpb" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/kvproto/pkg/tikvpb" @@ -206,7 +209,7 @@ func (svr *Server) KvPessimisticLock(ctx context.Context, req *kvrpcpb.Pessimist return resp, nil } result := waiter.Wait() - svr.mvccStore.deadlockDetector.CleanUpWaitFor(req.StartVersion, waiter.LockTS, waiter.KeyHash) + svr.mvccStore.deadlockDetector.CleanUpWaitFor(req.StartVersion, waiter.LockTS, waiter.KeyHash, false) if result.Position == lockwaiter.WaitTimeout { svr.mvccStore.lockWaiterManager.CleanUp(waiter) return resp, nil @@ -563,6 +566,50 @@ func (svr *Server) UnsafeDestroyRange(context.Context, *kvrpcpb.UnsafeDestroyRan return &kvrpcpb.UnsafeDestroyRangeResponse{}, nil } +// deadlock detection related services +// GetWaitForEntries tries to get the waitFor entries +func (svr *Server) GetWaitForEntries(ctx context.Context, + req *deadlock.WaitForEntriesRequest) (*deadlock.WaitForEntriesResponse, error) { + // TODO + return &deadlock.WaitForEntriesResponse{}, nil +} + +// Detect will handle detection rpc from other nodes +func (svr *Server) Detect(stream deadlock.Deadlock_DetectServer) error { + for { + req, err := stream.Recv() + if err != nil { + if err == io.EOF { + break + } + return err + } + resp := &deadlock.DeadlockResponse{} + switch req.Tp { + case deadlock.DeadlockRequestType_Detect: + detectErr := svr.mvccStore.deadlockDetector.Detect(req.Entry.Txn, req.Entry.WaitForTxn, req.Entry.KeyHash, true) + if detectErr != nil { + if _, ok := detectErr.(*ErrDeadlock); ok { + resp.Entry = req.Entry + resp.DeadlockKeyHash = req.Entry.KeyHash + } else { + // detection failed, possible not leader + resp.DeadlockKeyHash = math.MaxUint64 + } + } + case deadlock.DeadlockRequestType_CleanUpWaitFor: + svr.mvccStore.deadlockDetector.CleanUpWaitFor(req.Entry.Txn, req.Entry.WaitForTxn, req.Entry.KeyHash, true) + case deadlock.DeadlockRequestType_CleanUp: + svr.mvccStore.deadlockDetector.CleanUp(req.Entry.Txn, true) + } + err = stream.Send(resp) + if err != nil { + log.Errorf("send detection response error=%v", err) + } + } + return nil +} + func convertToKeyError(err error) *kvrpcpb.KeyError { if err == nil { return nil diff --git a/unistore-server/main.go b/unistore-server/main.go index b17f2d2c..e6efd0cd 100644 --- a/unistore-server/main.go +++ b/unistore-server/main.go @@ -1,6 +1,7 @@ package main import ( + "context" "encoding/binary" "flag" "net" @@ -50,6 +51,7 @@ func main() { log.SetLevelByString(conf.LogLevel) log.SetFlags(log.Ldate | log.Ltime | log.Lmicroseconds | log.Lshortfile) safePoint := &tikv.SafePoint{} + log.Infof("conf %v", conf) db := createDB(subPathKV, safePoint, &conf.Engine) bundle := &mvcc.DBBundle{ DB: db, @@ -72,6 +74,10 @@ func main() { } else { innerServer, store, regionManager = setupStandAlongInnerServer(bundle, safePoint, pdClient, conf) } + err = store.StartDeadlockDetection(context.Background(), pdClient, innerServer, conf.Raft) + if err != nil { + log.Fatal("StartDeadlockDetection error=%v", err) + } tikvServer := tikv.NewServer(regionManager, store, innerServer) @@ -166,7 +172,7 @@ func setupRaftInnerServer(bundle *mvcc.DBBundle, safePoint *tikv.SafePoint, pdCl router := innerServer.GetRaftstoreRouter() storeMeta := innerServer.GetStoreMeta() store := tikv.NewMVCCStore(bundle, dbPath, safePoint, raftstore.NewDBWriter(router)) - rm := tikv.NewRaftRegionManager(storeMeta, router) + rm := tikv.NewRaftRegionManager(storeMeta, router, store) innerServer.SetPeerEventObserver(rm) if err := innerServer.Start(pdClient); err != nil {