Skip to content

Commit

Permalink
*: implement the TxnHeartBeat API for the large transaction (#11979)
Browse files Browse the repository at this point in the history
  • Loading branch information
tiancaiamao authored and coocood committed Sep 9, 2019
1 parent d2e8bc6 commit 53a7cf6
Show file tree
Hide file tree
Showing 10 changed files with 182 additions and 3 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ require (
github.com/pingcap/errors v0.11.4
github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e
github.com/pingcap/kvproto v0.0.0-20190821201150-798d27658fae
github.com/pingcap/kvproto v0.0.0-20190904075355-9a1bd6a31da2
github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596
github.com/pingcap/parser v0.0.0-20190903084634-0daf3f706c76
github.com/pingcap/pd v0.0.0-20190712044914-75a1f9f3062b
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,8 @@ github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c/go.mod h1:DNS3Qg
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e h1:P73/4dPCL96rGrobssy1nVy2VaVpNCuLpCbr+FEaTA8=
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20190516013202-4cf58ad90b6c/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/kvproto v0.0.0-20190821201150-798d27658fae h1:WR4d5ga8zXT+QDWYFzzyA+PJMMszR0kQxyYMh6dvHPg=
github.com/pingcap/kvproto v0.0.0-20190821201150-798d27658fae/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/kvproto v0.0.0-20190904075355-9a1bd6a31da2 h1:wBORZD4gvEKK0tGP4g1Rv0Y7f2cNnObzI/ckPhsU11M=
github.com/pingcap/kvproto v0.0.0-20190904075355-9a1bd6a31da2/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/log v0.0.0-20190214045112-b37da76f67a7/go.mod h1:xsfkWVaFVV5B8e1K9seWfyJWFrIhbtUTAD8NV1Pq3+w=
github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596 h1:t2OQTpPJnrPDGlvA+3FwJptMTt6MEPdzK1Wt99oaefQ=
github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw=
Expand Down
23 changes: 23 additions & 0 deletions store/mockstore/mocktikv/mock_tikv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,10 @@ func (s *testMockTiKVSuite) mustRangeReverseScanOK(c *C, start, end string, limi
}

func (s *testMockTiKVSuite) mustPrewriteOK(c *C, mutations []*kvrpcpb.Mutation, primary string, startTS uint64) {
s.mustPrewriteWithTTLOK(c, mutations, primary, startTS, 0)
}

func (s *testMockTiKVSuite) mustPrewriteWithTTLOK(c *C, mutations []*kvrpcpb.Mutation, primary string, startTS uint64, ttl uint64) {
req := &kvrpcpb.PrewriteRequest{
Mutations: mutations,
PrimaryLock: []byte(primary),
Expand Down Expand Up @@ -666,3 +670,22 @@ func (s *testMVCCLevelDB) TestMvccGetByKey(c *C) {
}
c.Assert(mvccInfo, DeepEquals, except)
}

func (s *testMVCCLevelDB) TestTxnHeartBeat(c *C) {
s.mustPrewriteWithTTLOK(c, putMutations("pk", "val"), "pk", 5, 666)

// Update the ttl
ttl, err := s.store.TxnHeartBeat([]byte("pk"), 5, 888)
c.Assert(err, IsNil)
c.Assert(ttl, Greater, uint64(666))

// Advise ttl is small
ttl, err = s.store.TxnHeartBeat([]byte("pk"), 5, 300)
c.Assert(err, IsNil)
c.Assert(ttl, Greater, uint64(300))

// The lock has already been clean up
c.Assert(s.store.Cleanup([]byte("pk"), 5), IsNil)
_, err = s.store.TxnHeartBeat([]byte("pk"), 5, 1000)
c.Assert(err, NotNil)
}
1 change: 1 addition & 0 deletions store/mockstore/mocktikv/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ type MVCCStore interface {
Rollback(keys [][]byte, startTS uint64) error
Cleanup(key []byte, startTS uint64) error
ScanLock(startKey, endKey []byte, maxTS uint64) ([]*kvrpcpb.LockInfo, error)
TxnHeartBeat(primaryKey []byte, startTS uint64, adviseTTL uint64) (uint64, error)
ResolveLock(startKey, endKey []byte, startTS, commitTS uint64) error
BatchResolveLock(startKey, endKey []byte, txnInfos map[uint64]uint64) error
GC(startKey, endKey []byte, safePoint uint64) error
Expand Down
45 changes: 45 additions & 0 deletions store/mockstore/mocktikv/mvcc_leveldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -931,6 +931,51 @@ func (mvcc *MVCCLevelDB) Cleanup(key []byte, startTS uint64) error {
return mvcc.db.Write(batch, nil)
}

// TxnHeartBeat implements the MVCCStore interface.
func (mvcc *MVCCLevelDB) TxnHeartBeat(key []byte, startTS uint64, adviseTTL uint64) (uint64, error) {
mvcc.mu.Lock()
defer mvcc.mu.Unlock()

startKey := mvccEncode(key, lockVer)
iter := newIterator(mvcc.db, &util.Range{
Start: startKey,
})
defer iter.Release()

if iter.Valid() {
dec := lockDecoder{
expectKey: key,
}
ok, err := dec.Decode(iter)
if err != nil {
return 0, errors.Trace(err)
}
if ok && dec.lock.startTS == startTS {
if !bytes.Equal(dec.lock.primary, key) {
return 0, errors.New("txnHeartBeat on non-primary key, the code should not run here")
}

lock := dec.lock
batch := &leveldb.Batch{}
// Increase the ttl of this transaction.
if adviseTTL > lock.ttl {
lock.ttl = adviseTTL
writeKey := mvccEncode(key, lockVer)
writeValue, err := lock.MarshalBinary()
if err != nil {
return 0, errors.Trace(err)
}
batch.Put(writeKey, writeValue)
if err = mvcc.db.Write(batch, nil); err != nil {
return 0, errors.Trace(err)
}
}
return lock.ttl, nil
}
}
return 0, errors.New("lock doesn't exist")
}

// ScanLock implements the MVCCStore interface.
func (mvcc *MVCCLevelDB) ScanLock(startKey, endKey []byte, maxTS uint64) ([]*kvrpcpb.LockInfo, error) {
mvcc.mu.RLock()
Expand Down
20 changes: 20 additions & 0 deletions store/mockstore/mocktikv/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,19 @@ func (h *rpcHandler) handleKvCleanup(req *kvrpcpb.CleanupRequest) *kvrpcpb.Clean
return &resp
}

func (h *rpcHandler) handleTxnHeartBeat(req *kvrpcpb.TxnHeartBeatRequest) *kvrpcpb.TxnHeartBeatResponse {
if !h.checkKeyInRegion(req.PrimaryLock) {
panic("KvTxnHeartBeat: key not in region")
}
var resp kvrpcpb.TxnHeartBeatResponse
ttl, err := h.mvccStore.TxnHeartBeat(req.PrimaryLock, req.StartVersion, req.AdviseLockTtl)
if err != nil {
resp.Error = convertToKeyError(err)
}
resp.LockTtl = ttl
return &resp
}

func (h *rpcHandler) handleKvBatchGet(req *kvrpcpb.BatchGetRequest) *kvrpcpb.BatchGetResponse {
for _, k := range req.Keys {
if !h.checkKeyInRegion(k) {
Expand Down Expand Up @@ -775,6 +788,13 @@ func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R
return resp, nil
}
resp.Resp = handler.handleKvCleanup(r)
case tikvrpc.CmdTxnHeartBeat:
r := req.TxnHeartBeat()
if err := handler.checkRequest(reqCtx, r.Size()); err != nil {
resp.Resp = &kvrpcpb.TxnHeartBeatResponse{RegionError: err}
return resp, nil
}
resp.Resp = handler.handleTxnHeartBeat(r)
case tikvrpc.CmdBatchGet:
r := req.BatchGet()
if err := handler.checkRequest(reqCtx, r.Size()); err != nil {
Expand Down
37 changes: 37 additions & 0 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,43 @@ func newTwoPhaseCommitter(txn *tikvTxn, connID uint64) (*twoPhaseCommitter, erro
}, nil
}

func sendTxnHeartBeat(bo *Backoffer, store *tikvStore, primary []byte, startTS, ttl uint64) (uint64, error) {
req := tikvrpc.NewRequest(tikvrpc.CmdTxnHeartBeat, &pb.TxnHeartBeatRequest{
PrimaryLock: primary,
StartVersion: startTS,
AdviseLockTtl: ttl,
})
for {
loc, err := store.GetRegionCache().LocateKey(bo, primary)
if err != nil {
return 0, errors.Trace(err)
}
resp, err := store.SendReq(bo, req, loc.Region, readTimeoutShort)
if err != nil {
return 0, errors.Trace(err)
}
regionErr, err := resp.GetRegionError()
if err != nil {
return 0, errors.Trace(err)
}
if regionErr != nil {
err = bo.Backoff(BoRegionMiss, errors.New(regionErr.String()))
if err != nil {
return 0, errors.Trace(err)
}
continue
}
if resp.Resp == nil {
return 0, errors.Trace(ErrBodyMissing)
}
cmdResp := resp.Resp.(*pb.TxnHeartBeatResponse)
if keyErr := cmdResp.GetError(); keyErr != nil {
return 0, errors.Errorf("txn %d heartbeat fail, primary key = %v, err = %s", startTS, primary, keyErr.Abort)
}
return cmdResp.GetLockTtl(), nil
}
}

func (c *twoPhaseCommitter) initKeysAndMutations() error {
var (
keys [][]byte
Expand Down
25 changes: 25 additions & 0 deletions store/tikv/lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,31 @@ func (s *testLockSuite) TestGetTxnStatus(c *C) {
c.Assert(status.IsCommitted(), IsFalse)
}

func (s *testLockSuite) TestTxnHeartBeat(c *C) {
txn, err := s.store.Begin()
c.Assert(err, IsNil)
txn.Set(kv.Key("key"), []byte("value"))
s.prewriteTxn(c, txn.(*tikvTxn))

bo := NewBackoffer(context.Background(), prewriteMaxBackoff)
newTTL, err := sendTxnHeartBeat(bo, s.store, []byte("key"), txn.StartTS(), 666)
c.Assert(err, IsNil)
c.Assert(newTTL, Equals, uint64(666))

newTTL, err = sendTxnHeartBeat(bo, s.store, []byte("key"), txn.StartTS(), 555)
c.Assert(err, IsNil)
c.Assert(newTTL, Equals, uint64(666))

// The getTxnStatus API is confusing, it really means rollback!
status, err := newLockResolver(s.store).getTxnStatus(bo, txn.StartTS(), []byte("key"))
c.Assert(err, IsNil)
c.Assert(status, Equals, TxnStatus(0))

newTTL, err = sendTxnHeartBeat(bo, s.store, []byte("key"), txn.StartTS(), 666)
c.Assert(err, NotNil)
c.Assert(newTTL, Equals, uint64(0))
}

func (s *testLockSuite) prewriteTxn(c *C, txn *tikvTxn) {
committer, err := newTwoPhaseCommitterWithInit(txn, 0)
c.Assert(err, IsNil)
Expand Down
8 changes: 8 additions & 0 deletions store/tikv/region_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,14 @@ func (s *mockTikvGrpcServer) ReadIndex(context.Context, *kvrpcpb.ReadIndexReques
return nil, errors.New("unreachable")
}

func (s *mockTikvGrpcServer) KvTxnHeartBeat(ctx context.Context, in *kvrpcpb.TxnHeartBeatRequest) (*kvrpcpb.TxnHeartBeatResponse, error) {
return nil, errors.New("unreachable")
}

func (s *mockTikvGrpcServer) KvCheckTxnStatus(ctx context.Context, in *kvrpcpb.CheckTxnStatusRequest) (*kvrpcpb.CheckTxnStatusResponse, error) {
return nil, errors.New("unreachable")
}

func (s *testRegionRequestSuite) TestNoReloadRegionForGrpcWhenCtxCanceled(c *C) {
// prepare a mock tikv grpc server
addr := "localhost:56341"
Expand Down
20 changes: 20 additions & 0 deletions store/tikv/tikvrpc/tikvrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ const (
CmdDeleteRange
CmdPessimisticLock
CmdPessimisticRollback
CmdTxnHeartBeat

CmdRawGet CmdType = 256 + iota
CmdRawBatchGet
Expand Down Expand Up @@ -129,6 +130,8 @@ func (t CmdType) String() string {
return "SplitRegion"
case CmdDebugGetRegionProperties:
return "DebugGetRegionProperties"
case CmdTxnHeartBeat:
return "TxnHeartBeat"
}
return "Unknown"
}
Expand Down Expand Up @@ -304,6 +307,11 @@ func (req *Request) Empty() *tikvpb.BatchCommandsEmptyRequest {
return req.req.(*tikvpb.BatchCommandsEmptyRequest)
}

// TxnHeartBeat returns TxnHeartBeatRequest in request.
func (req *Request) TxnHeartBeat() *kvrpcpb.TxnHeartBeatRequest {
return req.req.(*kvrpcpb.TxnHeartBeatRequest)
}

// ToBatchCommandsRequest converts the request to an entry in BatchCommands request.
func (req *Request) ToBatchCommandsRequest() *tikvpb.BatchCommandsRequest_Request {
switch req.Type {
Expand Down Expand Up @@ -353,6 +361,8 @@ func (req *Request) ToBatchCommandsRequest() *tikvpb.BatchCommandsRequest_Reques
return &tikvpb.BatchCommandsRequest_Request{Cmd: &tikvpb.BatchCommandsRequest_Request_PessimisticRollback{PessimisticRollback: req.PessimisticRollback()}}
case CmdEmpty:
return &tikvpb.BatchCommandsRequest_Request{Cmd: &tikvpb.BatchCommandsRequest_Request_Empty{Empty: req.Empty()}}
case CmdTxnHeartBeat:
return &tikvpb.BatchCommandsRequest_Request{Cmd: &tikvpb.BatchCommandsRequest_Request_TxnHeartBeat{TxnHeartBeat: req.TxnHeartBeat()}}
}
return nil
}
Expand Down Expand Up @@ -420,6 +430,8 @@ func FromBatchCommandsResponse(res *tikvpb.BatchCommandsResponse_Response) *Resp
return &Response{Resp: res.PessimisticRollback}
case *tikvpb.BatchCommandsResponse_Response_Empty:
return &Response{Resp: res.Empty}
case *tikvpb.BatchCommandsResponse_Response_TxnHeartBeat:
return &Response{Resp: res.TxnHeartBeat}
}
return nil
}
Expand Down Expand Up @@ -498,6 +510,8 @@ func SetContext(req *Request, region *metapb.Region, peer *metapb.Peer) error {
req.SplitRegion().Context = ctx
case CmdEmpty:
req.SplitRegion().Context = ctx
case CmdTxnHeartBeat:
req.TxnHeartBeat().Context = ctx
default:
return fmt.Errorf("invalid request type %v", req.Type)
}
Expand Down Expand Up @@ -621,6 +635,10 @@ func GenRegionErrorResp(req *Request, e *errorpb.Error) (*Response, error) {
RegionError: e,
}
case CmdEmpty:
case CmdTxnHeartBeat:
p = &kvrpcpb.TxnHeartBeatResponse{
RegionError: e,
}
default:
return nil, fmt.Errorf("invalid request type %v", req.Type)
}
Expand Down Expand Up @@ -714,6 +732,8 @@ func CallRPC(ctx context.Context, client tikvpb.TikvClient, req *Request) (*Resp
resp.Resp, err = client.SplitRegion(ctx, req.SplitRegion())
case CmdEmpty:
resp.Resp, err = &tikvpb.BatchCommandsEmptyResponse{}, nil
case CmdTxnHeartBeat:
resp.Resp, err = client.KvTxnHeartBeat(ctx, req.TxnHeartBeat())
default:
return nil, errors.Errorf("invalid request type: %v", req.Type)
}
Expand Down

0 comments on commit 53a7cf6

Please sign in to comment.