Skip to content

Commit

Permalink
support distributed dead lock detection
Browse files Browse the repository at this point in the history
  • Loading branch information
cfzjywxk committed Nov 14, 2019
1 parent 467a86f commit 46f79e4
Show file tree
Hide file tree
Showing 7 changed files with 445 additions and 15 deletions.
326 changes: 326 additions & 0 deletions tikv/deadlock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,326 @@
package tikv

import (
"context"
"errors"
"github.com/ngaut/log"
"github.com/ngaut/unistore/pd"
deadlockPb "github.com/pingcap/kvproto/pkg/deadlock"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/tidb/util/deadlock"
"google.golang.org/grpc"
"math"
"sync"
"sync/atomic"
)

// Follower will send detection rpc to Leader
const (
Follower = iota
Leader
)

type TaskType int32
const (
DetectTask TaskType = 0
CleanUpWaitForTask TaskType = 1
CleanupTask TaskType = 2
RoleChange TaskType = 3
)

type Task struct {
taskType TaskType
txnTs uint64
lockTs uint64
keyHash uint64
nextRole int32
result chan error
isRpc bool
}

type detectionConn struct {
streamMu sync.Mutex
stream deadlockPb.Deadlock_DetectClient
ctx context.Context
cancel context.CancelFunc
}

type detectionClient struct {
sync.RWMutex
conns map[string]*detectionConn
}

type DeadlockDetector struct {
detector *deadlock.Detector
role int32
client *detectionClient
pdClient pd.Client
storeMeta *metapb.Store
leaderStoreMeta *metapb.Store
mu sync.RWMutex
}

func NewDetectionConn(addr string) (*detectionConn, error) {
cc, err := grpc.Dial(addr, grpc.WithInsecure())
if err != nil {
return nil, err
}
ctx, cancel := context.WithCancel(context.Background())
stream, err := deadlockPb.NewDeadlockClient(cc).Detect(ctx)
if err != nil {
cancel()
return nil, err
}
dtConn := &detectionConn{
stream: stream,
ctx: ctx,
cancel: cancel,
}
return dtConn, nil
}

func (c *detectionConn) Stop() {
c.cancel()
}

func (c *detectionConn) Send(req *deadlockPb.DeadlockRequest) error {
c.streamMu.Lock()
defer c.streamMu.Unlock()
return c.stream.Send(req)
}

func (c *detectionClient) getConn(addr string) (*detectionConn, error) {
c.RLock()
conn, ok := c.conns[addr]
if ok {
c.RUnlock()
return conn, nil
}
c.RUnlock()
newConn, err := NewDetectionConn(addr)
if err != nil {
return nil, err
}
c.Lock()
defer c.Unlock()
if conn, ok := c.conns[addr]; ok {
newConn.Stop()
return conn, nil
}
c.conns[addr] = newConn
return newConn, nil
}

func (c *detectionClient) Send(addr string, req *deadlockPb.DeadlockRequest) error {
conn, err := c.getConn(addr)
if err != nil {
return err
}
err = conn.Send(req)
if err == nil {
return nil
}
log.Error("detectionClient failed to send")
// refresh the leader addr
c.Lock()
defer c.Unlock()
conn.Stop()
delete(c.conns, addr)
return err
}

func (c *detectionClient) Recv(addr string) (*deadlockPb.DeadlockResponse, error) {
conn, err := c.getConn(addr)
if err != nil {
return nil, err
}
return conn.stream.Recv()
}

func NewDetectionClient() *detectionClient {
return &detectionClient{conns:make(map[string]*detectionConn)}
}

func NewDeadlockDetector() *DeadlockDetector {
newDetector := &DeadlockDetector{
detector: deadlock.NewDetector(),
role: Follower,
client: NewDetectionClient(),
}
return newDetector
}

func (dt *DeadlockDetector) setLeaderStoreMeta(newMeta *metapb.Store) {
dt.mu.Lock()
defer dt.mu.Unlock()
dt.leaderStoreMeta = newMeta
}

func (dt *DeadlockDetector) getLeaderStoreMeta() *metapb.Store {
dt.mu.RLock()
defer dt.mu.RUnlock()
return dt.leaderStoreMeta
}

func (dt *DeadlockDetector) refreshFirstRegionLeader() error {
// find first region from pd, get the first region leader
ctx := context.Background()
region, err := dt.pdClient.GetRegion(ctx, []byte{})
if err != nil {
log.Errorf("get first region failed, err: %v", err)
return err
}
var peerLeader *metapb.Peer
for _, peer := range region.GetPeers() {
if !peer.GetIsLearner() {
peerLeader = peer
break
}
}
if peerLeader == nil {
return errors.New("get first region leader failed")
}
log.Infof("[ray] leader StoreId=%v", peerLeader.GetStoreId())
leaderStoreMeta, err := dt.pdClient.GetStore(ctx, peerLeader.GetStoreId())
if err != nil {
return err
}
dt.setLeaderStoreMeta(leaderStoreMeta)
log.Infof("[ray] leader addr=%s", leaderStoreMeta.GetAddress())
if leaderStoreMeta.GetId() == dt.storeMeta.GetId() {
log.Infof("[ray] refreshFirstRegionLeader found local is leader")
dt.RoleChange(Leader)
}
return nil
}

func (dt *DeadlockDetector) CleanUp(startTs uint64, rpc bool) {
task := &Task{taskType:CleanupTask, txnTs:startTs, result:make(chan error, 1), isRpc:rpc}
_ = dt.handleOneTask(task)
}

func (dt *DeadlockDetector) Detect(txnTs uint64, lockTs uint64, keyHash uint64, rpc bool) error {
task := &Task{taskType:DetectTask, txnTs:txnTs, lockTs:lockTs,
keyHash:keyHash, result:make(chan error, 1), isRpc:rpc}
err := dt.handleOneTask(task)
if err != nil {
// convert to local ErrDeadlock
if _, ok := err.(*deadlock.ErrDeadlock); ok {
log.Errorf("deadlock found return ErrDeadlock")
return &ErrDeadlock{DeadlockKeyHash:keyHash}
}
return err
}
return nil
}

func (dt *DeadlockDetector) CleanUpWaitFor(txn, waitForTxn, keyHash uint64, rpc bool) {
task := &Task{taskType:CleanUpWaitForTask, txnTs:txn, lockTs:waitForTxn,
keyHash:keyHash, result:make(chan error, 1), isRpc:rpc}
_ = dt.handleOneTask(task)
}

func (dt *DeadlockDetector) RoleChange(newRole int32) {
task := &Task {taskType:RoleChange, nextRole: newRole}
_ = dt.handleOneTask(task)
}

func (dt *DeadlockDetector) handleLocalTask(task *Task) error {
switch task.taskType {
case DetectTask:
err := dt.detector.Detect(task.txnTs, task.lockTs, task.keyHash)
if err != nil {
return err
}
case CleanUpWaitForTask:
dt.detector.CleanUpWaitFor(task.txnTs, task.lockTs, task.keyHash)
case CleanupTask:
dt.detector.CleanUp(task.txnTs)
case RoleChange:
dt.ChangeRole(task.nextRole)
}
return nil
}

func (dt *DeadlockDetector) sendReqToLeader(req *deadlockPb.DeadlockRequest) error {
for {
if !dt.isLeader() {
addr := dt.getLeaderStoreMeta().GetAddress()
err := dt.client.Send(addr, req)
if err != nil {
log.Errorf("send request failed to addr=%s", addr)
refreshErr := dt.refreshFirstRegionLeader()
if refreshErr != nil {
log.Errorf("cannot get leader from pd")
return refreshErr
}
continue
}
resp, err := dt.client.Recv(addr)
if err != nil {
log.Errorf("receive response failed from addr=%s", addr)
refreshErr := dt.refreshFirstRegionLeader()
if refreshErr != nil {
log.Errorf("cannot get leader from pd")
return refreshErr
}
continue
}
// MaxUint64 means non deadlock error, possible dest is not leader anymore
if resp.DeadlockKeyHash == math.MaxUint64 {
log.Errorf("dest possible not leader, need refresh")
refreshErr := dt.refreshFirstRegionLeader()
if refreshErr != nil {
log.Errorf("cannot get leader from pd")
return refreshErr
}
continue
} else {
log.Errorf("deadlock found lock key hash=%s", resp.DeadlockKeyHash)
return &deadlock.ErrDeadlock{KeyHash: resp.DeadlockKeyHash}
}
} else {
localTask := Task{taskType:TaskType(req.Tp), txnTs:req.Entry.Txn,
lockTs:req.Entry.WaitForTxn, keyHash:req.Entry.KeyHash}
return dt.handleLocalTask(&localTask)
}
break
}
return nil
}

func (dt *DeadlockDetector) handleRemoteTask(task *Task) error {
switch task.taskType {
case DetectTask, CleanUpWaitForTask, CleanupTask:
detectReq := &deadlockPb.DeadlockRequest{}
detectReq.Tp = deadlockPb.DeadlockRequestType(task.taskType)
detectReq.Entry.Txn = task.txnTs
detectReq.Entry.WaitForTxn = task.lockTs
detectReq.Entry.KeyHash = task.keyHash
err := dt.sendReqToLeader(detectReq)
if err != nil {
return err
}
case RoleChange:
dt.ChangeRole(task.nextRole)
}
return nil
}

func (dt *DeadlockDetector) handleOneTask(task *Task) error {
if dt.isLeader() {
return dt.handleLocalTask(task)
}
if task.isRpc {
return errors.New("not leader node")
}
return dt.handleRemoteTask(task)
}

func (dt *DeadlockDetector) isLeader() bool {
return atomic.LoadInt32(&dt.role) == Leader
}

func (dt *DeadlockDetector) ChangeRole(newRole int32) {
atomic.StoreInt32(&dt.role, newRole)
}
Loading

0 comments on commit 46f79e4

Please sign in to comment.