Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tikv: fix infinitely rebirthed secondary keys commit retry goroutine during tikv error #16061

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 45 additions & 13 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ import (
"go.uber.org/zap"
)

const knobSwitch = "2pcKnobs"

type twoPhaseCommitAction interface {
handleSingleBatch(*twoPhaseCommitter, *Backoffer, batchMutations) error
tiKVTxnRegionsNumHistogram() prometheus.Observer
Expand Down Expand Up @@ -159,6 +161,11 @@ type twoPhaseCommitter struct {
regionTxnSize map[uint64]int
// Used by pessimistic transaction and large transaction.
ttlManager

testingKnobs struct {
secondaryCommitDoneWg sync.WaitGroup
sendCommitReq func(bo *Backoffer, req *tikvrpc.Request, batch batchMutations) (*tikvrpc.Response, error, bool)
}
}

type committerMutations struct {
Expand Down Expand Up @@ -504,7 +511,16 @@ func (c *twoPhaseCommitter) doActionOnMutations(bo *Backoffer, action twoPhaseCo
// The backoffer instance is created outside of the goroutine to avoid
// potential data race in unit test since `CommitMaxBackoff` will be updated
// by test suites.
secondaryBo := NewBackoffer(context.Background(), CommitMaxBackoff).WithVars(c.txn.vars)
failpoint.Inject(knobSwitch, func() {
c.testingKnobs.secondaryCommitDoneWg.Add(1)
})
secondaryBo := &Backoffer{
ctx: context.Background(),
maxSleep: bo.maxSleep,
totalSleep: bo.totalSleep,
errors: bo.errors,
vars: bo.vars,
}
go func() {
e := c.doActionOnBatches(secondaryBo, action, batches)
if e != nil {
Expand All @@ -514,6 +530,9 @@ func (c *twoPhaseCommitter) doActionOnMutations(bo *Backoffer, action twoPhaseCo
zap.Error(e))
tikvSecondaryLockCleanupFailureCounterCommit.Inc()
}
failpoint.Inject(knobSwitch, func() {
c.testingKnobs.secondaryCommitDoneWg.Done()
})
}()
} else {
err = c.doActionOnBatches(bo, action, batches)
Expand Down Expand Up @@ -973,18 +992,7 @@ func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch
CommitVersion: c.commitTS,
}, pb.Context{Priority: c.priority, SyncLog: c.syncLog})

sender := NewRegionRequestSender(c.store.regionCache, c.store.client)
resp, err := sender.SendReq(bo, req, batch.region, readTimeoutShort)

// If we fail to receive response for the request that commits primary key, it will be undetermined whether this
// transaction has been successfully committed.
// Under this circumstance, we can not declare the commit is complete (may lead to data lost), nor can we throw
// an error (may lead to the duplicated key error when upper level restarts the transaction). Currently the best
// solution is to populate this error and let upper layer drop the connection to the corresponding mysql client.
if batch.isPrimary && sender.rpcError != nil {
c.setUndeterminedErr(errors.Trace(sender.rpcError))
}

resp, err := c.sendCommitRequestReq(bo, req, batch)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -1066,6 +1074,30 @@ func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch
return nil
}

func (c *twoPhaseCommitter) sendCommitRequestReq(bo *Backoffer, req *tikvrpc.Request, batch batchMutations) (*tikvrpc.Response, error) {
failpoint.Inject(knobSwitch, func() {
if c.testingKnobs.sendCommitReq != nil {
resp, err, skip := c.testingKnobs.sendCommitReq(bo, req, batch)
if !skip {
failpoint.Return(resp, err)
}
}
})

sender := NewRegionRequestSender(c.store.regionCache, c.store.client)
resp, err := sender.SendReq(bo, req, batch.region, readTimeoutShort)

// If we fail to receive response for the request that commits primary key, it will be undetermined whether this
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO, extracting this function is not a good idea, because this logic belongs to commit, it's not a general one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the reason for extracting this function is mockable, make a piece of code(send request) can be replaced by a test logic a good reason to extract a method in the practice.

if not extract this function we need a new "if else" block to the parent method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember I had read some test-driven books about the question: "when to extract a method? or when to use interface?", they said "extract method or interface when you need to replace it" and when logic need replaced in a test, it has high probability be replaced in future requirement :D

// transaction has been successfully committed.
// Under this circumstance, we can not declare the commit is complete (may lead to data lost), nor can we throw
// an error (may lead to the duplicated key error when upper level restarts the transaction). Currently the best
// solution is to populate this error and let upper layer drop the connection to the corresponding mysql client.
if batch.isPrimary && sender.rpcError != nil {
c.setUndeterminedErr(errors.Trace(sender.rpcError))
}
return resp, err
}

func (actionCleanup) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch batchMutations) error {
req := tikvrpc.NewRequest(tikvrpc.CmdBatchRollback, &pb.BatchRollbackRequest{
Keys: batch.mutations.keys,
Expand Down
54 changes: 54 additions & 0 deletions store/tikv/2pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"math"
"math/rand"
"reflect"
"strings"
"sync"
"sync/atomic"
Expand All @@ -27,13 +28,21 @@ import (
. "github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/errorpb"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/mockstore/mocktikv"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
)

var testKnobSwitch string

func init() {
pkg := reflect.TypeOf(twoPhaseCommitter{}).PkgPath()
testKnobSwitch = pkg + "/" + knobSwitch
}

type testCommitterSuite struct {
OneByOneSuite
cluster *mocktikv.Cluster
Expand Down Expand Up @@ -485,6 +494,51 @@ func (s *testCommitterSuite) TestPrewriteTxnSize(c *C) {
}
}

func (s *testCommitterSuite) TestCommitRetryLimit(c *C) {
c.Assert(failpoint.Enable(testKnobSwitch, `return(true)`), IsNil)
defer func() {
c.Assert(failpoint.Disable(testKnobSwitch), IsNil)
}()

// prepare regions.
region, _ := s.cluster.GetRegionByKey([]byte{50})
oldRegionID := region.Id
for i := byte(50); i < 70; i++ {
newRegionID := s.cluster.AllocID()
newPeerID := s.cluster.AllocID()
s.cluster.Split(oldRegionID, newRegionID, []byte{i}, []uint64{newPeerID}, newPeerID)
oldRegionID = newRegionID
}
txn := s.begin(c)
var val [1024]byte
for i := byte(50); i < 70; i++ {
err := txn.Set([]byte{i}, val[:])
c.Assert(err, IsNil)
}

committer, err := newTwoPhaseCommitterWithInit(txn, 1)
c.Assert(err, IsNil)
ctx := context.Background()
err = committer.prewriteMutations(NewBackoffer(ctx, PrewriteMaxBackoff), committer.mutations)
c.Assert(err, IsNil)
committer.commitTS = txn.StartTS() + 1

// mock kv store has down or busy and always returns deadline exceed in batch client for secondary keys.
// so region cache will return a region error to trigger outside retry.
committer.testingKnobs.sendCommitReq = func(bo *Backoffer, req *tikvrpc.Request, batch batchMutations) (response *tikvrpc.Response, err error, skip bool) {
if !batch.isPrimary {
response, err = tikvrpc.GenRegionErrorResp(req, &errorpb.Error{EpochNotMatch: &errorpb.EpochNotMatch{}})
return
}
skip = true
return
}
c.Assert(committer.commitMutations(NewBackoffer(ctx, CommitMaxBackoff), committer.mutations), IsNil)

// wait secondary keys goroutines(include retry guys) done.
committer.testingKnobs.secondaryCommitDoneWg.Wait()
}

func (s *testCommitterSuite) TestRejectCommitTS(c *C) {
txn := s.begin(c)
c.Assert(txn.Set([]byte("x"), []byte("v")), IsNil)
Expand Down