Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store/tikv: tiny refactoring, change twoPhaseCommitAction to interface #12845

Merged
merged 2 commits into from
Oct 21, 2019
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 61 additions & 71 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,23 @@ import (
"go.uber.org/zap"
)

type twoPhaseCommitAction int
type twoPhaseCommitAction interface {
handleSingleBatch(*twoPhaseCommitter, *Backoffer, batchKeys) error
String() string
}

const (
actionPrewrite twoPhaseCommitAction = 1 + iota
actionCommit
actionCleanup
actionPessimisticLock
actionPessimisticRollback
type actionPrewrite struct{}
type actionCommit struct{}
type actionCleanup struct{}
type actionPessimisticLock struct{}
type actionPessimisticRollback struct{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

type (
   actionPrewrite struct{}
   actionCommit struct{}
   actionCleanup struct{}
   actionPessimisticLock struct{}
   actionPessimisticRollback struct{}
)

we can avoid duplicate type words~


var (
_ twoPhaseCommitAction = actionPrewrite{}
_ twoPhaseCommitAction = actionCommit{}
_ twoPhaseCommitAction = actionCleanup{}
_ twoPhaseCommitAction = actionPessimisticLock{}
_ twoPhaseCommitAction = actionPessimisticRollback{}
)

var (
Expand All @@ -60,24 +69,28 @@ var (
PessimisticLockTTL uint64 = 15000 // 15s ~ 40s
)

func (ca twoPhaseCommitAction) String() string {
switch ca {
case actionPrewrite:
return "prewrite"
case actionCommit:
return "commit"
case actionCleanup:
return "cleanup"
case actionPessimisticLock:
return "pessimistic_lock"
case actionPessimisticRollback:
return "pessimistic_rollback"
}
return "unknown"
func (actionPrewrite) String() string {
return "prewrite"
}

func (actionCommit) String() string {
return "commit"
}

func (actionCleanup) String() string {
return "cleanup"
}

func (actionPessimisticLock) String() string {
return "pessimistic_lock"
}

// MetricsTag returns detail tag for metrics.
func (ca twoPhaseCommitAction) MetricsTag() string {
func (actionPessimisticRollback) String() string {
return "pessimistic_rollback"
}

// metricsTag returns detail tag for metrics.
func metricsTag(ca twoPhaseCommitAction) string {
return "2pc_" + ca.String()
}

Expand Down Expand Up @@ -120,13 +133,10 @@ type batchExecutor struct {
rateLimiter *rateLimit // rate limiter for concurrency control, maybe more strategies
committer *twoPhaseCommitter // here maybe more different type committer in the future
action twoPhaseCommitAction // the work action type
procFn procOneBatchFn // injected proc batch function
backoffer *Backoffer // Backoffer
tokenWaitDuration time.Duration // get token wait time
}

type procOneBatchFn func(bo *Backoffer, batch batchKeys) error

type mutationEx struct {
pb.Mutation
asserted bool
Expand Down Expand Up @@ -374,11 +384,11 @@ func (c *twoPhaseCommitter) doActionOnKeys(bo *Backoffer, action twoPhaseCommitA
return errors.Trace(err)
}

metrics.TiKVTxnRegionsNumHistogram.WithLabelValues(action.MetricsTag()).Observe(float64(len(groups)))
metrics.TiKVTxnRegionsNumHistogram.WithLabelValues(metricsTag(action)).Observe(float64(len(groups)))

var batches []batchKeys
var sizeFunc = c.keySize
if action == actionPrewrite {
if _, ok := action.(actionPrewrite); ok {
// Do not update regionTxnSize on retries. They are not used when building a PrewriteRequest.
if len(bo.errors) == 0 {
for region, keys := range groups {
Expand All @@ -396,15 +406,17 @@ func (c *twoPhaseCommitter) doActionOnKeys(bo *Backoffer, action twoPhaseCommitA
}

firstIsPrimary := bytes.Equal(keys[0], c.primary())
if firstIsPrimary && (action == actionCommit || action == actionCleanup) {
_, actionIsCommit := action.(actionCommit)
_, actionIsCleanup := action.(actionCleanup)
if firstIsPrimary && (actionIsCommit || actionIsCleanup) {
// primary should be committed/cleanup first
err = c.doActionOnBatches(bo, action, batches[:1])
if err != nil {
return errors.Trace(err)
}
batches = batches[1:]
}
if action == actionCommit {
if actionIsCommit {
// Commit secondary batches in background goroutine to reduce latency.
// The backoffer instance is created outside of the goroutine to avoid
// potential data race in unit test since `CommitMaxBackoff` will be updated
Expand All @@ -428,15 +440,12 @@ func (c *twoPhaseCommitter) doActionOnKeys(bo *Backoffer, action twoPhaseCommitA

// doActionOnBatches does action to batches in parallel.
func (c *twoPhaseCommitter) doActionOnBatches(bo *Backoffer, action twoPhaseCommitAction, batches []batchKeys) error {
singleBatchActionFunc, err := c.getProcFuncByType(action)
if err != nil {
return err
}
if len(batches) == 0 {
return nil
}

if len(batches) == 1 {
e := singleBatchActionFunc(bo, batches[0])
e := action.handleSingleBatch(c, bo, batches[0])
if e != nil {
logutil.BgLogger().Debug("2PC doActionOnBatches failed",
zap.Uint64("conn", c.connID),
Expand All @@ -447,30 +456,11 @@ func (c *twoPhaseCommitter) doActionOnBatches(bo *Backoffer, action twoPhaseComm
return errors.Trace(e)
}
rateLim := len(batches) // this will be used for LargeTxn, set rateLim here
batchExecutor := newBatchExecutor(rateLim, c, action, singleBatchActionFunc, bo)
err = batchExecutor.process(batches)
batchExecutor := newBatchExecutor(rateLim, c, action, bo)
err := batchExecutor.process(batches)
return errors.Trace(err)
}

func (c *twoPhaseCommitter) getProcFuncByType(action twoPhaseCommitAction) (procOneBatchFn, error) {
var singleBatchActionFunc procOneBatchFn
switch action {
case actionPrewrite:
singleBatchActionFunc = c.prewriteSingleBatch
case actionCommit:
singleBatchActionFunc = c.commitSingleBatch
case actionCleanup:
singleBatchActionFunc = c.cleanupSingleBatch
case actionPessimisticLock:
singleBatchActionFunc = c.pessimisticLockSingleBatch
case actionPessimisticRollback:
singleBatchActionFunc = c.pessimisticRollbackSingleBatch
default:
return nil, errors.Errorf("invalid action type=%v", action)
}
return singleBatchActionFunc, nil
}

func (c *twoPhaseCommitter) keyValueSize(key []byte) int {
size := len(key)
if mutation := c.mutations[string(key)]; mutation != nil {
Expand Down Expand Up @@ -508,7 +498,7 @@ func (c *twoPhaseCommitter) buildPrewriteRequest(batch batchKeys, txnSize uint64
return tikvrpc.NewRequest(tikvrpc.CmdPrewrite, req, pb.Context{Priority: c.priority, SyncLog: c.syncLog})
}

func (c *twoPhaseCommitter) prewriteSingleBatch(bo *Backoffer, batch batchKeys) error {
func (actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch batchKeys) error {
txnSize := uint64(c.regionTxnSize[batch.region.id])
// When we retry because of a region miss, we don't know the transaction size. We set the transaction size here
// to MaxUint64 to avoid unexpected "resolve lock lite".
Expand Down Expand Up @@ -653,7 +643,7 @@ func (tm *ttlManager) keepAlive(c *twoPhaseCommitter) {
}
}

func (c *twoPhaseCommitter) pessimisticLockSingleBatch(bo *Backoffer, batch batchKeys) error {
func (actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch batchKeys) error {
mutations := make([]*pb.Mutation, len(batch.keys))
for i, k := range batch.keys {
mut := &pb.Mutation{
Expand Down Expand Up @@ -730,7 +720,7 @@ func (c *twoPhaseCommitter) pessimisticLockSingleBatch(bo *Backoffer, batch batc
}
}

func (c *twoPhaseCommitter) pessimisticRollbackSingleBatch(bo *Backoffer, batch batchKeys) error {
func (actionPessimisticRollback) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch batchKeys) error {
req := tikvrpc.NewRequest(tikvrpc.CmdPessimisticRollback, &pb.PessimisticRollbackRequest{
StartVersion: c.startTS,
ForUpdateTs: c.forUpdateTS,
Expand Down Expand Up @@ -801,7 +791,7 @@ func (c *twoPhaseCommitter) getUndeterminedErr() error {
return c.mu.undeterminedErr
}

func (c *twoPhaseCommitter) commitSingleBatch(bo *Backoffer, batch batchKeys) error {
func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch batchKeys) error {
req := tikvrpc.NewRequest(tikvrpc.CmdCommit, &pb.CommitRequest{
StartVersion: c.startTS,
Keys: batch.keys,
Expand Down Expand Up @@ -873,7 +863,7 @@ func (c *twoPhaseCommitter) commitSingleBatch(bo *Backoffer, batch batchKeys) er
return nil
}

func (c *twoPhaseCommitter) cleanupSingleBatch(bo *Backoffer, batch batchKeys) error {
func (actionCleanup) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch batchKeys) error {
req := tikvrpc.NewRequest(tikvrpc.CmdBatchRollback, &pb.BatchRollbackRequest{
Keys: batch.keys,
StartVersion: c.startTS,
Expand Down Expand Up @@ -911,7 +901,7 @@ func (c *twoPhaseCommitter) prewriteKeys(bo *Backoffer, keys [][]byte) error {
bo.ctx = opentracing.ContextWithSpan(bo.ctx, span1)
}

return c.doActionOnKeys(bo, actionPrewrite, keys)
return c.doActionOnKeys(bo, actionPrewrite{}, keys)
}

func (c *twoPhaseCommitter) commitKeys(bo *Backoffer, keys [][]byte) error {
Expand All @@ -921,19 +911,19 @@ func (c *twoPhaseCommitter) commitKeys(bo *Backoffer, keys [][]byte) error {
bo.ctx = opentracing.ContextWithSpan(bo.ctx, span1)
}

return c.doActionOnKeys(bo, actionCommit, keys)
return c.doActionOnKeys(bo, actionCommit{}, keys)
}

func (c *twoPhaseCommitter) cleanupKeys(bo *Backoffer, keys [][]byte) error {
return c.doActionOnKeys(bo, actionCleanup, keys)
return c.doActionOnKeys(bo, actionCleanup{}, keys)
}

func (c *twoPhaseCommitter) pessimisticLockKeys(bo *Backoffer, keys [][]byte) error {
return c.doActionOnKeys(bo, actionPessimisticLock, keys)
return c.doActionOnKeys(bo, actionPessimisticLock{}, keys)
}

func (c *twoPhaseCommitter) pessimisticRollbackKeys(bo *Backoffer, keys [][]byte) error {
return c.doActionOnKeys(bo, actionPessimisticRollback, keys)
return c.doActionOnKeys(bo, actionPessimisticRollback{}, keys)
}

func (c *twoPhaseCommitter) executeAndWriteFinishBinlog(ctx context.Context) error {
Expand Down Expand Up @@ -1148,9 +1138,9 @@ func appendBatchBySize(b []batchKeys, region RegionVerID, keys [][]byte, sizeFn

// newBatchExecutor create processor to handle concurrent batch works(prewrite/commit etc)
func newBatchExecutor(rateLimit int, committer *twoPhaseCommitter,
action twoPhaseCommitAction, procFn procOneBatchFn, backoffer *Backoffer) *batchExecutor {
action twoPhaseCommitAction, backoffer *Backoffer) *batchExecutor {
return &batchExecutor{rateLimit, nil, committer,
action, procFn, backoffer, time.Duration(1 * time.Millisecond)}
action, backoffer, time.Duration(1 * time.Millisecond)}
}

// initUtils do initialize batchExecutor related policies like rateLimit util
Expand All @@ -1170,7 +1160,7 @@ func (batchExe *batchExecutor) startWorker(exitCh chan struct{}, ch chan error,
go func() {
defer batchExe.rateLimiter.putToken()
var singleBatchBackoffer *Backoffer
if batchExe.action == actionCommit {
if _, ok := batchExe.action.(actionCommit); ok {
// Because the secondary batches of the commit actions are implemented to be
// committed asynchronously in background goroutines, we should not
// fork a child context and call cancel() while the foreground goroutine exits.
Expand All @@ -1185,7 +1175,7 @@ func (batchExe *batchExecutor) startWorker(exitCh chan struct{}, ch chan error,
defer singleBatchCancel()
}
beforeSleep := singleBatchBackoffer.totalSleep
ch <- batchExe.procFn(singleBatchBackoffer, batch)
ch <- batchExe.action.handleSingleBatch(batchExe.committer, singleBatchBackoffer, batch)
commitDetail := batchExe.committer.getDetail()
if commitDetail != nil { // lock operations of pessimistic-txn will let commitDetail be nil
if delta := singleBatchBackoffer.totalSleep - beforeSleep; delta > 0 {
Expand Down Expand Up @@ -1217,7 +1207,7 @@ func (batchExe *batchExecutor) process(batches []batchKeys) error {
// For prewrite, stop sending other requests after receiving first error.
backoffer := batchExe.backoffer
var cancel context.CancelFunc
if batchExe.action == actionPrewrite {
if _, ok := batchExe.action.(actionPrewrite); ok {
backoffer, cancel = batchExe.backoffer.Fork()
defer cancel()
}
Expand Down