Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

atomic PrewriteMaxBackoff #488

Merged
merged 4 commits into from
May 10, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions integration_tests/lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ func (s *testLockSuite) TestCheckTxnStatusTTL() {
txn.Set([]byte("key"), []byte("value"))
s.prewriteTxnWithTTL(txn, 1000)

bo := tikv.NewBackofferWithVars(context.Background(), transaction.PrewriteMaxBackoff, nil)
bo := tikv.NewBackofferWithVars(context.Background(), int(transaction.PrewriteMaxBackoff.Load()), nil)
lr := s.store.NewLockResolver()
callerStartTS, err := s.store.GetOracle().GetTimestamp(bo.GetCtx(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
s.Nil(err)
Expand Down Expand Up @@ -315,7 +315,7 @@ func (s *testLockSuite) TestCheckTxnStatus() {
s.Nil(err)
s.Greater(currentTS, txn.StartTS())

bo := tikv.NewBackofferWithVars(context.Background(), transaction.PrewriteMaxBackoff, nil)
bo := tikv.NewBackofferWithVars(context.Background(), int(transaction.PrewriteMaxBackoff.Load()), nil)
resolver := s.store.NewLockResolver()
// Call getTxnStatus to check the lock status.
status, err := resolver.GetTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, currentTS, true, false, nil)
Expand Down Expand Up @@ -372,7 +372,7 @@ func (s *testLockSuite) TestCheckTxnStatusNoWait() {
o := s.store.GetOracle()
currentTS, err := o.GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
s.Nil(err)
bo := tikv.NewBackofferWithVars(context.Background(), transaction.PrewriteMaxBackoff, nil)
bo := tikv.NewBackofferWithVars(context.Background(), int(transaction.PrewriteMaxBackoff.Load()), nil)
resolver := s.store.NewLockResolver()

// Call getTxnStatus for the TxnNotFound case.
Expand Down Expand Up @@ -569,7 +569,7 @@ func (s *testLockSuite) TestZeroMinCommitTS() {
txn, err := s.store.Begin()
s.Nil(err)
txn.Set([]byte("key"), []byte("value"))
bo := tikv.NewBackofferWithVars(context.Background(), transaction.PrewriteMaxBackoff, nil)
bo := tikv.NewBackofferWithVars(context.Background(), int(transaction.PrewriteMaxBackoff.Load()), nil)

mockValue := fmt.Sprintf(`return(%d)`, txn.StartTS())
s.Nil(failpoint.Enable("tikvclient/mockZeroCommitTS", mockValue))
Expand Down
2 changes: 1 addition & 1 deletion tikv/test_probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (s StoreProbe) ClearTxnLatches() {

// SendTxnHeartbeat renews a txn's ttl.
func (s StoreProbe) SendTxnHeartbeat(ctx context.Context, key []byte, startTS uint64, ttl uint64) (uint64, error) {
bo := retry.NewBackofferWithVars(ctx, transaction.PrewriteMaxBackoff, nil)
bo := retry.NewBackofferWithVars(ctx, int(transaction.PrewriteMaxBackoff.Load()), nil)
newTTL, _, err := transaction.SendTxnHeartBeat(bo, s.KVStore, key, startTS, ttl)
return newTTL, err
}
Expand Down
7 changes: 4 additions & 3 deletions txnkv/transaction/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ import (
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/txnkv/txnlock"
"github.com/tikv/client-go/v2/util"
atomicutil "go.uber.org/atomic"
zap "go.uber.org/zap"
)

Expand All @@ -82,7 +83,7 @@ var (

var (
// PrewriteMaxBackoff is max sleep time of the `pre-write` command.
PrewriteMaxBackoff = 40000
PrewriteMaxBackoff = atomicutil.NewUint64(40000)
// CommitMaxBackoff is max sleep time of the 'commit' command
CommitMaxBackoff = uint64(40000)
)
Expand Down Expand Up @@ -1369,7 +1370,7 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) {
// The maxSleep should't be very long in this case.
// - If the region isn't found in PD, it's possible the reason is write-stall.
// The maxSleep can be long in this case.
bo := retry.NewBackofferWithVars(ctx, PrewriteMaxBackoff, c.txn.vars)
bo := retry.NewBackofferWithVars(ctx, int(PrewriteMaxBackoff.Load()), c.txn.vars)

// If we want to use async commit or 1PC and also want linearizability across
// all nodes, we have to make sure the commit TS of this transaction is greater
Expand Down Expand Up @@ -1733,7 +1734,7 @@ func (c *twoPhaseCommitter) tryAmendTxn(ctx context.Context, startInfoSchema Sch
return false, err
}
if c.prewriteStarted {
prewriteBo := retry.NewBackofferWithVars(ctx, PrewriteMaxBackoff, c.txn.vars)
prewriteBo := retry.NewBackofferWithVars(ctx, int(PrewriteMaxBackoff.Load()), c.txn.vars)
err = c.prewriteMutations(prewriteBo, addMutations)
if err != nil {
logutil.Logger(ctx).Warn("amend prewrite has failed", zap.Error(err), zap.Uint64("txnStartTS", c.startTS))
Expand Down
2 changes: 1 addition & 1 deletion txnkv/transaction/test_probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func (c CommitterProbe) PrewriteAllMutations(ctx context.Context) error {

// PrewriteMutations performs the first phase of commit for given keys.
func (c CommitterProbe) PrewriteMutations(ctx context.Context, mutations CommitterMutations) error {
return c.prewriteMutations(retry.NewBackofferWithVars(ctx, PrewriteMaxBackoff, nil), mutations)
return c.prewriteMutations(retry.NewBackofferWithVars(ctx, int(PrewriteMaxBackoff.Load()), nil), mutations)
}

// CommitMutations performs the second phase of commit.
Expand Down