Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store: when commit get a CommitTsExpired error, retry with a new commitTS #12980

Merged
merged 10 commits into from
Oct 30, 2019
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ require (
github.com/pingcap/errors v0.11.4
github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e
github.com/pingcap/kvproto v0.0.0-20190910074005-0e61b6f435c1
github.com/pingcap/kvproto v0.0.0-20191025022903-62abb760d9b1
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9
github.com/pingcap/parser v0.0.0-20191025082927-f8adf1670b97
github.com/pingcap/pd v1.1.0-beta.0.20190923032047-5c648dc365e0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,8 @@ github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c/go.mod h1:DNS3Qg
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e h1:P73/4dPCL96rGrobssy1nVy2VaVpNCuLpCbr+FEaTA8=
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20190822090350-11ea838aedf7/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/kvproto v0.0.0-20190910074005-0e61b6f435c1 h1:DNvxkdcjA0TBIIIF+K2w9KMlTzMZzLZ5JVF26kTCPhg=
github.com/pingcap/kvproto v0.0.0-20190910074005-0e61b6f435c1/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/kvproto v0.0.0-20191025022903-62abb760d9b1 h1:qqGSXCFr9Uc5VIDBEt4zlmMcI8e4GlkWfDVzQ+dexRk=
github.com/pingcap/kvproto v0.0.0-20191025022903-62abb760d9b1/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9 h1:AJD9pZYm72vMgPcQDww9rkZ1DnWfl0pXV3BOWlkYIjA=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
Expand Down
15 changes: 14 additions & 1 deletion store/mockstore/mocktikv/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@

package mocktikv

import "fmt"
import (
"fmt"

"github.com/pingcap/kvproto/pkg/kvrpcpb"
)

// ErrLocked is returned when trying to Read/Write on a locked key. Client should
// backoff or cleanup the lock then retry.
Expand Down Expand Up @@ -83,3 +87,12 @@ type ErrDeadlock struct {
func (e *ErrDeadlock) Error() string {
return "deadlock"
}

// ErrCommitTSExpired is returned when commit.CommitTS < lock.MinCommitTS
type ErrCommitTSExpired struct {
kvrpcpb.CommitTsExpired
}

func (e *ErrCommitTSExpired) Error() string {
return "commit ts expired"
}
13 changes: 13 additions & 0 deletions store/mockstore/mocktikv/mock_tikv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"testing"

. "github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
)

Expand Down Expand Up @@ -166,6 +167,7 @@ func (s *testMockTiKVSuite) mustPrewriteWithTTLOK(c *C, mutations []*kvrpcpb.Mut
PrimaryLock: []byte(primary),
StartVersion: startTS,
LockTtl: ttl,
MinCommitTs: startTS + 1,
}
errs := s.store.Prewrite(req)
for _, err := range errs {
Expand Down Expand Up @@ -682,6 +684,17 @@ func (s *testMVCCLevelDB) TestCheckTxnStatus(c *C) {
c.Assert(commitTS, Equals, uint64(0))
}

func (s *testMVCCLevelDB) TestRejectCommitTS(c *C) {
s.mustPrewriteOK(c, putMutations("x", "A"), "x", 5)
// Push the minCommitTS
_, _, err := s.store.CheckTxnStatus([]byte("x"), 5, 100, 100)
c.Assert(err, IsNil)
err = s.store.Commit([][]byte{[]byte("x")}, 5, 10)
e, ok := errors.Cause(err).(*ErrCommitTSExpired)
c.Assert(ok, IsTrue)
c.Assert(e.MinCommitTs, Equals, uint64(101))
}

func (s *testMVCCLevelDB) TestMvccGetByKey(c *C) {
s.mustPrewriteOK(c, putMutations("q1", "v5"), "p1", 5)
debugger, ok := s.store.(MVCCDebugger)
Expand Down
10 changes: 10 additions & 0 deletions store/mockstore/mocktikv/mvcc_leveldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -798,6 +798,16 @@ func commitKey(db *leveldb.DB, batch *leveldb.Batch, key []byte, startTS, commit
}
return ErrRetryable("txn not found")
}
// Reject the commit request whose commitTS is less than minCommiTS.
if dec.lock.minCommitTS > commitTS {
return &ErrCommitTSExpired{
kvrpcpb.CommitTsExpired{
StartTs: startTS,
AttemptedCommitTs: commitTS,
Key: key,
MinCommitTs: dec.lock.minCommitTS,
}}
}

if err = commitLock(batch, dec.lock, key, startTS, commitTS); err != nil {
return errors.Trace(err)
Expand Down
5 changes: 5 additions & 0 deletions store/mockstore/mocktikv/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@ func convertToKeyError(err error) *kvrpcpb.KeyError {
Retryable: retryable.Error(),
}
}
if expired, ok := errors.Cause(err).(*ErrCommitTSExpired); ok {
return &kvrpcpb.KeyError{
CommitTsExpired: &expired.CommitTsExpired,
}
}
return &kvrpcpb.KeyError{
Abort: err.Error(),
}
Expand Down
20 changes: 20 additions & 0 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -859,6 +859,26 @@ func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch
c.setUndeterminedErr(nil)
}
if keyErr := commitResp.GetError(); keyErr != nil {
if rejected := keyErr.GetCommitTsExpired(); rejected != nil {
logutil.Logger(bo.ctx).Info("2PC commitTS rejected by TiKV, retry with a newer commitTS",
zap.Uint64("txnStartTS", c.startTS),
zap.Stringer("info", logutil.Hex(rejected)))

// Update commit ts and retry.
commitTS, err := c.store.getTimestampWithRetry(bo)
if err != nil {
logutil.Logger(bo.ctx).Warn("2PC get commitTS failed",
zap.Error(err),
zap.Uint64("txnStartTS", c.startTS))
return errors.Trace(err)
}

c.mu.Lock()
c.commitTS = commitTS
c.mu.Unlock()
return c.commitKeys(bo, batch.keys)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm kinda afraid that recursion may cause troubles...

Copy link
Contributor Author

@tiancaiamao tiancaiamao Oct 30, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Retry two many times and stack overflow?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. The old code also take this pattern
  2. If it's really a problem, we can refactor the code with another PR

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure to do we need a backoff here before recursion retry - -? @tiancaiamao if so I will add it at #16061 later

}

c.mu.RLock()
defer c.mu.RUnlock()
err = extractKeyErr(keyErr)
Expand Down
84 changes: 67 additions & 17 deletions store/tikv/2pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package tikv

import (
"bytes"
"context"
"math"
"math/rand"
Expand Down Expand Up @@ -340,12 +341,12 @@ func (s *testCommitterSuite) TestCommitBeforePrewrite(c *C) {
txn := s.begin(c)
err := txn.Set([]byte("a"), []byte("a1"))
c.Assert(err, IsNil)
commiter, err := newTwoPhaseCommitterWithInit(txn, 0)
committer, err := newTwoPhaseCommitterWithInit(txn, 0)
c.Assert(err, IsNil)
ctx := context.Background()
err = commiter.cleanupKeys(NewBackoffer(ctx, cleanupMaxBackoff), commiter.keys)
err = committer.cleanupKeys(NewBackoffer(ctx, cleanupMaxBackoff), committer.keys)
c.Assert(err, IsNil)
err = commiter.prewriteKeys(NewBackoffer(ctx, PrewriteMaxBackoff), commiter.keys)
err = committer.prewriteKeys(NewBackoffer(ctx, PrewriteMaxBackoff), committer.keys)
c.Assert(err, NotNil)
errMsgMustContain(c, err, "conflictCommitTS")
}
Expand Down Expand Up @@ -385,9 +386,9 @@ func (s *testCommitterSuite) TestPrewritePrimaryKeyFailed(c *C) {

// clean again, shouldn't be failed when a rollback already exist.
ctx := context.Background()
commiter, err := newTwoPhaseCommitterWithInit(txn2, 0)
committer, err := newTwoPhaseCommitterWithInit(txn2, 0)
c.Assert(err, IsNil)
err = commiter.cleanupKeys(NewBackoffer(ctx, cleanupMaxBackoff), commiter.keys)
err = committer.cleanupKeys(NewBackoffer(ctx, cleanupMaxBackoff), committer.keys)
c.Assert(err, IsNil)

// check the data after rollback twice.
Expand Down Expand Up @@ -420,17 +421,17 @@ func (s *testCommitterSuite) TestWrittenKeysOnConflict(c *C) {
txn1 := s.begin(c)
txn2 := s.begin(c)
txn2.Set([]byte("x1"), []byte("1"))
commiter2, err := newTwoPhaseCommitterWithInit(txn2, 2)
committer2, err := newTwoPhaseCommitterWithInit(txn2, 2)
c.Assert(err, IsNil)
err = commiter2.execute(context.Background())
err = committer2.execute(context.Background())
c.Assert(err, IsNil)
txn1.Set([]byte("x1"), []byte("1"))
txn1.Set([]byte("y1"), []byte("2"))
commiter1, err := newTwoPhaseCommitterWithInit(txn1, 2)
committer1, err := newTwoPhaseCommitterWithInit(txn1, 2)
c.Assert(err, IsNil)
err = commiter1.execute(context.Background())
err = committer1.execute(context.Background())
c.Assert(err, NotNil)
commiter1.cleanWg.Wait()
committer1.cleanWg.Wait()
txn3 := s.begin(c)
start := time.Now()
txn3.Get(context.TODO(), []byte("y1"))
Expand All @@ -454,11 +455,11 @@ func (s *testCommitterSuite) TestPrewriteTxnSize(c *C) {
c.Assert(err, IsNil)
}

commiter, err := newTwoPhaseCommitterWithInit(txn, 1)
committer, err := newTwoPhaseCommitterWithInit(txn, 1)
c.Assert(err, IsNil)

ctx := context.Background()
err = commiter.prewriteKeys(NewBackoffer(ctx, PrewriteMaxBackoff), commiter.keys)
err = committer.prewriteKeys(NewBackoffer(ctx, PrewriteMaxBackoff), committer.keys)
c.Assert(err, IsNil)

// Check the written locks in the first region (50 keys)
Expand All @@ -474,19 +475,68 @@ func (s *testCommitterSuite) TestPrewriteTxnSize(c *C) {
}
}

func (s *testCommitterSuite) TestRejectCommitTS(c *C) {
txn := s.begin(c)
c.Assert(txn.Set([]byte("x"), []byte("v")), IsNil)

committer, err := newTwoPhaseCommitterWithInit(txn, 1)
c.Assert(err, IsNil)
bo := NewBackoffer(context.Background(), getMaxBackoff)
loc, err := s.store.regionCache.LocateKey(bo, []byte("x"))
c.Assert(err, IsNil)
batch := batchKeys{region: loc.Region, keys: [][]byte{[]byte("x")}}
mutations := make([]*kvrpcpb.Mutation, len(batch.keys))
for i, k := range batch.keys {
tmp := committer.mutations[string(k)]
mutations[i] = &tmp.Mutation
}
prewrite := &kvrpcpb.PrewriteRequest{
Mutations: mutations,
PrimaryLock: committer.primary(),
StartVersion: committer.startTS,
LockTtl: committer.lockTTL,
MinCommitTs: committer.startTS + 100, // Set minCommitTS
}
req := tikvrpc.NewRequest(tikvrpc.CmdPrewrite, prewrite)
_, err = s.store.SendReq(bo, req, loc.Region, readTimeoutShort)
c.Assert(err, IsNil)

// Make commitTS less than minCommitTS.
committer.commitTS = committer.startTS + 1
// Ensure that the new commit ts is greater than minCommitTS when retry
time.Sleep(3 * time.Millisecond)
err = committer.commitKeys(bo, committer.keys)
c.Assert(err, IsNil)

// Use startTS+2 to read the data and get nothing.
// Use max.Uint64 to read the data and success.
// That means the final commitTS > startTS+2, it's not the one we provide.
// So we cover the rety commitTS logic.
txn1, err := s.store.BeginWithStartTS(committer.startTS + 2)
c.Assert(err, IsNil)
_, err = txn1.Get(bo.ctx, []byte("x"))
c.Assert(kv.IsErrNotFound(err), IsTrue)

txn2, err := s.store.BeginWithStartTS(math.MaxUint64)
c.Assert(err, IsNil)
val, err := txn2.Get(bo.ctx, []byte("x"))
c.Assert(err, IsNil)
tiancaiamao marked this conversation as resolved.
Show resolved Hide resolved
c.Assert(bytes.Equal(val, []byte("v")), IsTrue)
}

func (s *testCommitterSuite) TestPessimisticPrewriteRequest(c *C) {
// This test checks that the isPessimisticLock field is set in the request even when no keys are pessimistic lock.
txn := s.begin(c)
txn.SetOption(kv.Pessimistic, true)
err := txn.Set([]byte("t1"), []byte("v1"))
c.Assert(err, IsNil)
commiter, err := newTwoPhaseCommitterWithInit(txn, 0)
committer, err := newTwoPhaseCommitterWithInit(txn, 0)
c.Assert(err, IsNil)
commiter.forUpdateTS = 100
committer.forUpdateTS = 100
var batch batchKeys
batch.keys = append(batch.keys, []byte("t1"))
batch.region = RegionVerID{1, 1, 1}
req := commiter.buildPrewriteRequest(batch, 1)
req := committer.buildPrewriteRequest(batch, 1)
c.Assert(len(req.Prewrite().IsPessimisticLock), Greater, 0)
c.Assert(req.Prewrite().ForUpdateTs, Equals, uint64(100))
}
Expand Down Expand Up @@ -568,13 +618,13 @@ func (s *testCommitterSuite) getLockInfo(c *C, key []byte) *kvrpcpb.LockInfo {
txn := s.begin(c)
err := txn.Set(key, key)
c.Assert(err, IsNil)
commiter, err := newTwoPhaseCommitterWithInit(txn, 1)
committer, err := newTwoPhaseCommitterWithInit(txn, 1)
c.Assert(err, IsNil)
bo := NewBackoffer(context.Background(), getMaxBackoff)
loc, err := s.store.regionCache.LocateKey(bo, key)
c.Assert(err, IsNil)
batch := batchKeys{region: loc.Region, keys: [][]byte{key}}
req := commiter.buildPrewriteRequest(batch, 1)
req := committer.buildPrewriteRequest(batch, 1)
resp, err := s.store.SendReq(bo, req, loc.Region, readTimeoutShort)
c.Assert(err, IsNil)
c.Assert(resp.Resp, NotNil)
Expand Down