Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store: when commit get a CommitTsExpired error, retry with a new commitTS #12980

Merged
merged 10 commits into from
Oct 30, 2019
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ require (
github.com/pingcap/errors v0.11.4
github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e
github.com/pingcap/kvproto v0.0.0-20190910074005-0e61b6f435c1
github.com/pingcap/kvproto v0.0.0-20191025022903-62abb760d9b1
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9
github.com/pingcap/parser v0.0.0-20191025082927-f8adf1670b97
github.com/pingcap/pd v1.1.0-beta.0.20190923032047-5c648dc365e0
Expand Down
5 changes: 3 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ github.com/blacktear23/go-proxyprotocol v0.0.0-20180807104634-af7a81e8dd0d/go.mo
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
github.com/chzyer/readline v0.0.0-20171208011716-f6d7a1f6fbf3/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
github.com/client9/misspell v0.3.4 h1:ta993UF76GwbvJcIo3Y68y/M3WxlpEHPWIGDkJYwzJI=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd h1:qMd81Ts1T2OTKmB4acZcyKaMtRnY5Y44NuXGX2GFJ1w=
github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd/go.mod h1:sE/e/2PUdi/liOCUjSTXgM1o87ZssimdTWN964YiIeI=
Expand Down Expand Up @@ -157,8 +158,8 @@ github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c/go.mod h1:DNS3Qg
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e h1:P73/4dPCL96rGrobssy1nVy2VaVpNCuLpCbr+FEaTA8=
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20190822090350-11ea838aedf7/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/kvproto v0.0.0-20190910074005-0e61b6f435c1 h1:DNvxkdcjA0TBIIIF+K2w9KMlTzMZzLZ5JVF26kTCPhg=
github.com/pingcap/kvproto v0.0.0-20190910074005-0e61b6f435c1/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/kvproto v0.0.0-20191025022903-62abb760d9b1 h1:qqGSXCFr9Uc5VIDBEt4zlmMcI8e4GlkWfDVzQ+dexRk=
github.com/pingcap/kvproto v0.0.0-20191025022903-62abb760d9b1/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9 h1:AJD9pZYm72vMgPcQDww9rkZ1DnWfl0pXV3BOWlkYIjA=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
Expand Down
15 changes: 14 additions & 1 deletion store/mockstore/mocktikv/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@

package mocktikv

import "fmt"
import (
"fmt"

"github.com/pingcap/kvproto/pkg/kvrpcpb"
)

// ErrLocked is returned when trying to Read/Write on a locked key. Client should
// backoff or cleanup the lock then retry.
Expand Down Expand Up @@ -83,3 +87,12 @@ type ErrDeadlock struct {
func (e *ErrDeadlock) Error() string {
return "deadlock"
}

// ErrCommitTSExpired is returned when commit.CommitTS < lock.MinCommitTS
type ErrCommitTSExpired struct {
kvrpcpb.CommitTsExpired
}

func (e *ErrCommitTSExpired) Error() string {
return "commit ts expired"
}
13 changes: 13 additions & 0 deletions store/mockstore/mocktikv/mock_tikv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"testing"

. "github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
)

Expand Down Expand Up @@ -166,6 +167,7 @@ func (s *testMockTiKVSuite) mustPrewriteWithTTLOK(c *C, mutations []*kvrpcpb.Mut
PrimaryLock: []byte(primary),
StartVersion: startTS,
LockTtl: ttl,
MinCommitTs: startTS + 1,
}
errs := s.store.Prewrite(req)
for _, err := range errs {
Expand Down Expand Up @@ -682,6 +684,17 @@ func (s *testMVCCLevelDB) TestCheckTxnStatus(c *C) {
c.Assert(commitTS, Equals, uint64(0))
}

func (s *testMVCCLevelDB) TestRejectCommitTS(c *C) {
s.mustPrewriteOK(c, putMutations("x", "A"), "x", 5)
// Push the minCommitTS
_, _, err := s.store.CheckTxnStatus([]byte("x"), 5, 100, 100)
c.Assert(err, IsNil)
err = s.store.Commit([][]byte{[]byte("x")}, 5, 10)
e, ok := errors.Cause(err).(*ErrCommitTSExpired)
c.Assert(ok, IsTrue)
c.Assert(e.MinCommitTs, Equals, uint64(101))
}

func (s *testMVCCLevelDB) TestMvccGetByKey(c *C) {
s.mustPrewriteOK(c, putMutations("q1", "v5"), "p1", 5)
debugger, ok := s.store.(MVCCDebugger)
Expand Down
10 changes: 10 additions & 0 deletions store/mockstore/mocktikv/mvcc_leveldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -794,6 +794,16 @@ func commitKey(db *leveldb.DB, batch *leveldb.Batch, key []byte, startTS, commit
}
return ErrRetryable("txn not found")
}
// Reject the commit request whose commitTS is less than minCommiTS.
if dec.lock.minCommitTS > commitTS {
return &ErrCommitTSExpired{
kvrpcpb.CommitTsExpired{
StartTs: startTS,
AttemptedCommitTs: commitTS,
Key: key,
MinCommitTs: dec.lock.minCommitTS,
}}
}

if err = commitLock(batch, dec.lock, key, startTS, commitTS); err != nil {
return errors.Trace(err)
Expand Down
5 changes: 5 additions & 0 deletions store/mockstore/mocktikv/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@ func convertToKeyError(err error) *kvrpcpb.KeyError {
Retryable: retryable.Error(),
}
}
if expired, ok := errors.Cause(err).(*ErrCommitTSExpired); ok {
return &kvrpcpb.KeyError{
CommitTsExpired: &expired.CommitTsExpired,
}
}
return &kvrpcpb.KeyError{
Abort: err.Error(),
}
Expand Down
27 changes: 27 additions & 0 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -852,6 +852,33 @@ func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch
c.setUndeterminedErr(nil)
}
if keyErr := commitResp.GetError(); keyErr != nil {
if rejected := keyErr.GetCommitTsExpired(); rejected != nil {
logutil.Logger(bo.ctx).Info("2PC commitTS rejected by TiKV, retry with a newer commitTS",
zap.Uint64("txnStartTS", c.startTS),
zap.Stringer("info", logutil.Hex(rejected)))

// Update commit ts and retry.
commitTS, err := c.store.getTimestampWithRetry(bo)
if err != nil {
logutil.Logger(bo.ctx).Warn("2PC get commitTS failed",
zap.Error(err),
zap.Uint64("txnStartTS", c.startTS))
return errors.Trace(err)
}
if commitTS < c.startTS || commitTS < rejected.MinCommitTs {
tiancaiamao marked this conversation as resolved.
Show resolved Hide resolved
logutil.Logger(bo.ctx).Error("2PC retry commit failed",
zap.Uint64("txnStartTS", c.startTS),
zap.Uint64("commitTS", commitTS),
zap.Uint64("minCommitTS", rejected.MinCommitTs))
return errors.New("2PC retry commit failed")
}

c.mu.Lock()
c.commitTS = commitTS
c.mu.Unlock()
return c.commitKeys(bo, batch.keys)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm kinda afraid that recursion may cause troubles...

Copy link
Contributor Author

@tiancaiamao tiancaiamao Oct 30, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Retry two many times and stack overflow?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. The old code also take this pattern
  2. If it's really a problem, we can refactor the code with another PR

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure to do we need a backoff here before recursion retry - -? @tiancaiamao if so I will add it at #16061 later

}

c.mu.RLock()
defer c.mu.RUnlock()
err = extractKeyErr(keyErr)
Expand Down
34 changes: 34 additions & 0 deletions store/tikv/2pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,40 @@ func (s *testCommitterSuite) TestPrewriteTxnSize(c *C) {
}
}

func (s *testCommitterSuite) TestRejectCommitTS(c *C) {
txn := s.begin(c)
c.Assert(txn.Set([]byte("x"), []byte("v")), IsNil)

commiter, err := newTwoPhaseCommitterWithInit(txn, 1)
tiancaiamao marked this conversation as resolved.
Show resolved Hide resolved
c.Assert(err, IsNil)
bo := NewBackoffer(context.Background(), getMaxBackoff)
loc, err := s.store.regionCache.LocateKey(bo, []byte("x"))
c.Assert(err, IsNil)
batch := batchKeys{region: loc.Region, keys: [][]byte{[]byte("x")}}
mutations := make([]*kvrpcpb.Mutation, len(batch.keys))
for i, k := range batch.keys {
tmp := commiter.mutations[string(k)]
mutations[i] = &tmp.Mutation
}
prewrite := &kvrpcpb.PrewriteRequest{
Mutations: mutations,
PrimaryLock: commiter.primary(),
StartVersion: commiter.startTS,
LockTtl: commiter.lockTTL,
MinCommitTs: commiter.startTS + 100, // Set minCommitTS
}
req := tikvrpc.NewRequest(tikvrpc.CmdPrewrite, prewrite)
_, err = s.store.SendReq(bo, req, loc.Region, readTimeoutShort)
c.Assert(err, IsNil)

// Make commitTS less than minCommitTS.
commiter.commitTS = commiter.startTS + 1
// Ensure that the new commit ts is greater than minCommitTS when retry
time.Sleep(3 * time.Millisecond)
err = commiter.commitKeys(bo, commiter.keys)
c.Assert(err, IsNil)
tiancaiamao marked this conversation as resolved.
Show resolved Hide resolved
}

func (s *testCommitterSuite) TestPessimisticPrewriteRequest(c *C) {
// This test checks that the isPessimisticLock field is set in the request even when no keys are pessimistic lock.
txn := s.begin(c)
Expand Down