Skip to content

Commit

Permalink
store: when commit get a CommitTsExpired error, retry with a new co…
Browse files Browse the repository at this point in the history
…mmitTS (#12980)
  • Loading branch information
tiancaiamao authored and sre-bot committed Oct 30, 2019
1 parent 04a5c6a commit 7fe81b7
Show file tree
Hide file tree
Showing 8 changed files with 132 additions and 21 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ require (
github.com/pingcap/errors v0.11.4
github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e
github.com/pingcap/kvproto v0.0.0-20190910074005-0e61b6f435c1
github.com/pingcap/kvproto v0.0.0-20191025022903-62abb760d9b1
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9
github.com/pingcap/parser v0.0.0-20191025082927-f8adf1670b97
github.com/pingcap/pd v1.1.0-beta.0.20190923032047-5c648dc365e0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,8 @@ github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c/go.mod h1:DNS3Qg
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e h1:P73/4dPCL96rGrobssy1nVy2VaVpNCuLpCbr+FEaTA8=
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20190822090350-11ea838aedf7/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/kvproto v0.0.0-20190910074005-0e61b6f435c1 h1:DNvxkdcjA0TBIIIF+K2w9KMlTzMZzLZ5JVF26kTCPhg=
github.com/pingcap/kvproto v0.0.0-20190910074005-0e61b6f435c1/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/kvproto v0.0.0-20191025022903-62abb760d9b1 h1:qqGSXCFr9Uc5VIDBEt4zlmMcI8e4GlkWfDVzQ+dexRk=
github.com/pingcap/kvproto v0.0.0-20191025022903-62abb760d9b1/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9 h1:AJD9pZYm72vMgPcQDww9rkZ1DnWfl0pXV3BOWlkYIjA=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
Expand Down
15 changes: 14 additions & 1 deletion store/mockstore/mocktikv/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@

package mocktikv

import "fmt"
import (
"fmt"

"github.com/pingcap/kvproto/pkg/kvrpcpb"
)

// ErrLocked is returned when trying to Read/Write on a locked key. Client should
// backoff or cleanup the lock then retry.
Expand Down Expand Up @@ -83,3 +87,12 @@ type ErrDeadlock struct {
func (e *ErrDeadlock) Error() string {
return "deadlock"
}

// ErrCommitTSExpired is returned when commit.CommitTS < lock.MinCommitTS
type ErrCommitTSExpired struct {
kvrpcpb.CommitTsExpired
}

func (e *ErrCommitTSExpired) Error() string {
return "commit ts expired"
}
13 changes: 13 additions & 0 deletions store/mockstore/mocktikv/mock_tikv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"testing"

. "github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
)

Expand Down Expand Up @@ -166,6 +167,7 @@ func (s *testMockTiKVSuite) mustPrewriteWithTTLOK(c *C, mutations []*kvrpcpb.Mut
PrimaryLock: []byte(primary),
StartVersion: startTS,
LockTtl: ttl,
MinCommitTs: startTS + 1,
}
errs := s.store.Prewrite(req)
for _, err := range errs {
Expand Down Expand Up @@ -682,6 +684,17 @@ func (s *testMVCCLevelDB) TestCheckTxnStatus(c *C) {
c.Assert(commitTS, Equals, uint64(0))
}

func (s *testMVCCLevelDB) TestRejectCommitTS(c *C) {
s.mustPrewriteOK(c, putMutations("x", "A"), "x", 5)
// Push the minCommitTS
_, _, err := s.store.CheckTxnStatus([]byte("x"), 5, 100, 100)
c.Assert(err, IsNil)
err = s.store.Commit([][]byte{[]byte("x")}, 5, 10)
e, ok := errors.Cause(err).(*ErrCommitTSExpired)
c.Assert(ok, IsTrue)
c.Assert(e.MinCommitTs, Equals, uint64(101))
}

func (s *testMVCCLevelDB) TestMvccGetByKey(c *C) {
s.mustPrewriteOK(c, putMutations("q1", "v5"), "p1", 5)
debugger, ok := s.store.(MVCCDebugger)
Expand Down
10 changes: 10 additions & 0 deletions store/mockstore/mocktikv/mvcc_leveldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -798,6 +798,16 @@ func commitKey(db *leveldb.DB, batch *leveldb.Batch, key []byte, startTS, commit
}
return ErrRetryable("txn not found")
}
// Reject the commit request whose commitTS is less than minCommiTS.
if dec.lock.minCommitTS > commitTS {
return &ErrCommitTSExpired{
kvrpcpb.CommitTsExpired{
StartTs: startTS,
AttemptedCommitTs: commitTS,
Key: key,
MinCommitTs: dec.lock.minCommitTS,
}}
}

if err = commitLock(batch, dec.lock, key, startTS, commitTS); err != nil {
return errors.Trace(err)
Expand Down
5 changes: 5 additions & 0 deletions store/mockstore/mocktikv/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@ func convertToKeyError(err error) *kvrpcpb.KeyError {
Retryable: retryable.Error(),
}
}
if expired, ok := errors.Cause(err).(*ErrCommitTSExpired); ok {
return &kvrpcpb.KeyError{
CommitTsExpired: &expired.CommitTsExpired,
}
}
return &kvrpcpb.KeyError{
Abort: err.Error(),
}
Expand Down
20 changes: 20 additions & 0 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -859,6 +859,26 @@ func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch
c.setUndeterminedErr(nil)
}
if keyErr := commitResp.GetError(); keyErr != nil {
if rejected := keyErr.GetCommitTsExpired(); rejected != nil {
logutil.Logger(bo.ctx).Info("2PC commitTS rejected by TiKV, retry with a newer commitTS",
zap.Uint64("txnStartTS", c.startTS),
zap.Stringer("info", logutil.Hex(rejected)))

// Update commit ts and retry.
commitTS, err := c.store.getTimestampWithRetry(bo)
if err != nil {
logutil.Logger(bo.ctx).Warn("2PC get commitTS failed",
zap.Error(err),
zap.Uint64("txnStartTS", c.startTS))
return errors.Trace(err)
}

c.mu.Lock()
c.commitTS = commitTS
c.mu.Unlock()
return c.commitKeys(bo, batch.keys)
}

c.mu.RLock()
defer c.mu.RUnlock()
err = extractKeyErr(keyErr)
Expand Down
84 changes: 67 additions & 17 deletions store/tikv/2pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package tikv

import (
"bytes"
"context"
"math"
"math/rand"
Expand Down Expand Up @@ -340,12 +341,12 @@ func (s *testCommitterSuite) TestCommitBeforePrewrite(c *C) {
txn := s.begin(c)
err := txn.Set([]byte("a"), []byte("a1"))
c.Assert(err, IsNil)
commiter, err := newTwoPhaseCommitterWithInit(txn, 0)
committer, err := newTwoPhaseCommitterWithInit(txn, 0)
c.Assert(err, IsNil)
ctx := context.Background()
err = commiter.cleanupKeys(NewBackoffer(ctx, cleanupMaxBackoff), commiter.keys)
err = committer.cleanupKeys(NewBackoffer(ctx, cleanupMaxBackoff), committer.keys)
c.Assert(err, IsNil)
err = commiter.prewriteKeys(NewBackoffer(ctx, PrewriteMaxBackoff), commiter.keys)
err = committer.prewriteKeys(NewBackoffer(ctx, PrewriteMaxBackoff), committer.keys)
c.Assert(err, NotNil)
errMsgMustContain(c, err, "conflictCommitTS")
}
Expand Down Expand Up @@ -385,9 +386,9 @@ func (s *testCommitterSuite) TestPrewritePrimaryKeyFailed(c *C) {

// clean again, shouldn't be failed when a rollback already exist.
ctx := context.Background()
commiter, err := newTwoPhaseCommitterWithInit(txn2, 0)
committer, err := newTwoPhaseCommitterWithInit(txn2, 0)
c.Assert(err, IsNil)
err = commiter.cleanupKeys(NewBackoffer(ctx, cleanupMaxBackoff), commiter.keys)
err = committer.cleanupKeys(NewBackoffer(ctx, cleanupMaxBackoff), committer.keys)
c.Assert(err, IsNil)

// check the data after rollback twice.
Expand Down Expand Up @@ -420,17 +421,17 @@ func (s *testCommitterSuite) TestWrittenKeysOnConflict(c *C) {
txn1 := s.begin(c)
txn2 := s.begin(c)
txn2.Set([]byte("x1"), []byte("1"))
commiter2, err := newTwoPhaseCommitterWithInit(txn2, 2)
committer2, err := newTwoPhaseCommitterWithInit(txn2, 2)
c.Assert(err, IsNil)
err = commiter2.execute(context.Background())
err = committer2.execute(context.Background())
c.Assert(err, IsNil)
txn1.Set([]byte("x1"), []byte("1"))
txn1.Set([]byte("y1"), []byte("2"))
commiter1, err := newTwoPhaseCommitterWithInit(txn1, 2)
committer1, err := newTwoPhaseCommitterWithInit(txn1, 2)
c.Assert(err, IsNil)
err = commiter1.execute(context.Background())
err = committer1.execute(context.Background())
c.Assert(err, NotNil)
commiter1.cleanWg.Wait()
committer1.cleanWg.Wait()
txn3 := s.begin(c)
start := time.Now()
txn3.Get(context.TODO(), []byte("y1"))
Expand All @@ -454,11 +455,11 @@ func (s *testCommitterSuite) TestPrewriteTxnSize(c *C) {
c.Assert(err, IsNil)
}

commiter, err := newTwoPhaseCommitterWithInit(txn, 1)
committer, err := newTwoPhaseCommitterWithInit(txn, 1)
c.Assert(err, IsNil)

ctx := context.Background()
err = commiter.prewriteKeys(NewBackoffer(ctx, PrewriteMaxBackoff), commiter.keys)
err = committer.prewriteKeys(NewBackoffer(ctx, PrewriteMaxBackoff), committer.keys)
c.Assert(err, IsNil)

// Check the written locks in the first region (50 keys)
Expand All @@ -474,19 +475,68 @@ func (s *testCommitterSuite) TestPrewriteTxnSize(c *C) {
}
}

func (s *testCommitterSuite) TestRejectCommitTS(c *C) {
txn := s.begin(c)
c.Assert(txn.Set([]byte("x"), []byte("v")), IsNil)

committer, err := newTwoPhaseCommitterWithInit(txn, 1)
c.Assert(err, IsNil)
bo := NewBackoffer(context.Background(), getMaxBackoff)
loc, err := s.store.regionCache.LocateKey(bo, []byte("x"))
c.Assert(err, IsNil)
batch := batchKeys{region: loc.Region, keys: [][]byte{[]byte("x")}}
mutations := make([]*kvrpcpb.Mutation, len(batch.keys))
for i, k := range batch.keys {
tmp := committer.mutations[string(k)]
mutations[i] = &tmp.Mutation
}
prewrite := &kvrpcpb.PrewriteRequest{
Mutations: mutations,
PrimaryLock: committer.primary(),
StartVersion: committer.startTS,
LockTtl: committer.lockTTL,
MinCommitTs: committer.startTS + 100, // Set minCommitTS
}
req := tikvrpc.NewRequest(tikvrpc.CmdPrewrite, prewrite)
_, err = s.store.SendReq(bo, req, loc.Region, readTimeoutShort)
c.Assert(err, IsNil)

// Make commitTS less than minCommitTS.
committer.commitTS = committer.startTS + 1
// Ensure that the new commit ts is greater than minCommitTS when retry
time.Sleep(3 * time.Millisecond)
err = committer.commitKeys(bo, committer.keys)
c.Assert(err, IsNil)

// Use startTS+2 to read the data and get nothing.
// Use max.Uint64 to read the data and success.
// That means the final commitTS > startTS+2, it's not the one we provide.
// So we cover the rety commitTS logic.
txn1, err := s.store.BeginWithStartTS(committer.startTS + 2)
c.Assert(err, IsNil)
_, err = txn1.Get(bo.ctx, []byte("x"))
c.Assert(kv.IsErrNotFound(err), IsTrue)

txn2, err := s.store.BeginWithStartTS(math.MaxUint64)
c.Assert(err, IsNil)
val, err := txn2.Get(bo.ctx, []byte("x"))
c.Assert(err, IsNil)
c.Assert(bytes.Equal(val, []byte("v")), IsTrue)
}

func (s *testCommitterSuite) TestPessimisticPrewriteRequest(c *C) {
// This test checks that the isPessimisticLock field is set in the request even when no keys are pessimistic lock.
txn := s.begin(c)
txn.SetOption(kv.Pessimistic, true)
err := txn.Set([]byte("t1"), []byte("v1"))
c.Assert(err, IsNil)
commiter, err := newTwoPhaseCommitterWithInit(txn, 0)
committer, err := newTwoPhaseCommitterWithInit(txn, 0)
c.Assert(err, IsNil)
commiter.forUpdateTS = 100
committer.forUpdateTS = 100
var batch batchKeys
batch.keys = append(batch.keys, []byte("t1"))
batch.region = RegionVerID{1, 1, 1}
req := commiter.buildPrewriteRequest(batch, 1)
req := committer.buildPrewriteRequest(batch, 1)
c.Assert(len(req.Prewrite().IsPessimisticLock), Greater, 0)
c.Assert(req.Prewrite().ForUpdateTs, Equals, uint64(100))
}
Expand Down Expand Up @@ -568,13 +618,13 @@ func (s *testCommitterSuite) getLockInfo(c *C, key []byte) *kvrpcpb.LockInfo {
txn := s.begin(c)
err := txn.Set(key, key)
c.Assert(err, IsNil)
commiter, err := newTwoPhaseCommitterWithInit(txn, 1)
committer, err := newTwoPhaseCommitterWithInit(txn, 1)
c.Assert(err, IsNil)
bo := NewBackoffer(context.Background(), getMaxBackoff)
loc, err := s.store.regionCache.LocateKey(bo, key)
c.Assert(err, IsNil)
batch := batchKeys{region: loc.Region, keys: [][]byte{key}}
req := commiter.buildPrewriteRequest(batch, 1)
req := committer.buildPrewriteRequest(batch, 1)
resp, err := s.store.SendReq(bo, req, loc.Region, readTimeoutShort)
c.Assert(err, IsNil)
c.Assert(resp.Resp, NotNil)
Expand Down

0 comments on commit 7fe81b7

Please sign in to comment.