diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index c70f11521eca4..af3939d4a19cc 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -38,14 +38,23 @@ import ( "go.uber.org/zap" ) -type twoPhaseCommitAction int +type twoPhaseCommitAction interface { + handleSingleBatch(*twoPhaseCommitter, *Backoffer, batchKeys) error + String() string +} -const ( - actionPrewrite twoPhaseCommitAction = 1 + iota - actionCommit - actionCleanup - actionPessimisticLock - actionPessimisticRollback +type actionPrewrite struct{} +type actionCommit struct{} +type actionCleanup struct{} +type actionPessimisticLock struct{} +type actionPessimisticRollback struct{} + +var ( + _ twoPhaseCommitAction = actionPrewrite{} + _ twoPhaseCommitAction = actionCommit{} + _ twoPhaseCommitAction = actionCleanup{} + _ twoPhaseCommitAction = actionPessimisticLock{} + _ twoPhaseCommitAction = actionPessimisticRollback{} ) var ( @@ -60,24 +69,28 @@ var ( PessimisticLockTTL uint64 = 15000 // 15s ~ 40s ) -func (ca twoPhaseCommitAction) String() string { - switch ca { - case actionPrewrite: - return "prewrite" - case actionCommit: - return "commit" - case actionCleanup: - return "cleanup" - case actionPessimisticLock: - return "pessimistic_lock" - case actionPessimisticRollback: - return "pessimistic_rollback" - } - return "unknown" +func (actionPrewrite) String() string { + return "prewrite" +} + +func (actionCommit) String() string { + return "commit" +} + +func (actionCleanup) String() string { + return "cleanup" +} + +func (actionPessimisticLock) String() string { + return "pessimistic_lock" } -// MetricsTag returns detail tag for metrics. -func (ca twoPhaseCommitAction) MetricsTag() string { +func (actionPessimisticRollback) String() string { + return "pessimistic_rollback" +} + +// metricsTag returns detail tag for metrics. +func metricsTag(ca twoPhaseCommitAction) string { return "2pc_" + ca.String() } @@ -120,13 +133,10 @@ type batchExecutor struct { rateLimiter *rateLimit // rate limiter for concurrency control, maybe more strategies committer *twoPhaseCommitter // here maybe more different type committer in the future action twoPhaseCommitAction // the work action type - procFn procOneBatchFn // injected proc batch function backoffer *Backoffer // Backoffer tokenWaitDuration time.Duration // get token wait time } -type procOneBatchFn func(bo *Backoffer, batch batchKeys) error - type mutationEx struct { pb.Mutation asserted bool @@ -374,11 +384,11 @@ func (c *twoPhaseCommitter) doActionOnKeys(bo *Backoffer, action twoPhaseCommitA return errors.Trace(err) } - metrics.TiKVTxnRegionsNumHistogram.WithLabelValues(action.MetricsTag()).Observe(float64(len(groups))) + metrics.TiKVTxnRegionsNumHistogram.WithLabelValues(metricsTag(action)).Observe(float64(len(groups))) var batches []batchKeys var sizeFunc = c.keySize - if action == actionPrewrite { + if _, ok := action.(actionPrewrite); ok { // Do not update regionTxnSize on retries. They are not used when building a PrewriteRequest. if len(bo.errors) == 0 { for region, keys := range groups { @@ -396,7 +406,9 @@ func (c *twoPhaseCommitter) doActionOnKeys(bo *Backoffer, action twoPhaseCommitA } firstIsPrimary := bytes.Equal(keys[0], c.primary()) - if firstIsPrimary && (action == actionCommit || action == actionCleanup) { + _, actionIsCommit := action.(actionCommit) + _, actionIsCleanup := action.(actionCleanup) + if firstIsPrimary && (actionIsCommit || actionIsCleanup) { // primary should be committed/cleanup first err = c.doActionOnBatches(bo, action, batches[:1]) if err != nil { @@ -404,7 +416,7 @@ func (c *twoPhaseCommitter) doActionOnKeys(bo *Backoffer, action twoPhaseCommitA } batches = batches[1:] } - if action == actionCommit { + if actionIsCommit { // Commit secondary batches in background goroutine to reduce latency. // The backoffer instance is created outside of the goroutine to avoid // potential data race in unit test since `CommitMaxBackoff` will be updated @@ -428,15 +440,12 @@ func (c *twoPhaseCommitter) doActionOnKeys(bo *Backoffer, action twoPhaseCommitA // doActionOnBatches does action to batches in parallel. func (c *twoPhaseCommitter) doActionOnBatches(bo *Backoffer, action twoPhaseCommitAction, batches []batchKeys) error { - singleBatchActionFunc, err := c.getProcFuncByType(action) - if err != nil { - return err - } if len(batches) == 0 { return nil } + if len(batches) == 1 { - e := singleBatchActionFunc(bo, batches[0]) + e := action.handleSingleBatch(c, bo, batches[0]) if e != nil { logutil.BgLogger().Debug("2PC doActionOnBatches failed", zap.Uint64("conn", c.connID), @@ -447,30 +456,11 @@ func (c *twoPhaseCommitter) doActionOnBatches(bo *Backoffer, action twoPhaseComm return errors.Trace(e) } rateLim := len(batches) // this will be used for LargeTxn, set rateLim here - batchExecutor := newBatchExecutor(rateLim, c, action, singleBatchActionFunc, bo) - err = batchExecutor.process(batches) + batchExecutor := newBatchExecutor(rateLim, c, action, bo) + err := batchExecutor.process(batches) return errors.Trace(err) } -func (c *twoPhaseCommitter) getProcFuncByType(action twoPhaseCommitAction) (procOneBatchFn, error) { - var singleBatchActionFunc procOneBatchFn - switch action { - case actionPrewrite: - singleBatchActionFunc = c.prewriteSingleBatch - case actionCommit: - singleBatchActionFunc = c.commitSingleBatch - case actionCleanup: - singleBatchActionFunc = c.cleanupSingleBatch - case actionPessimisticLock: - singleBatchActionFunc = c.pessimisticLockSingleBatch - case actionPessimisticRollback: - singleBatchActionFunc = c.pessimisticRollbackSingleBatch - default: - return nil, errors.Errorf("invalid action type=%v", action) - } - return singleBatchActionFunc, nil -} - func (c *twoPhaseCommitter) keyValueSize(key []byte) int { size := len(key) if mutation := c.mutations[string(key)]; mutation != nil { @@ -508,7 +498,7 @@ func (c *twoPhaseCommitter) buildPrewriteRequest(batch batchKeys, txnSize uint64 return tikvrpc.NewRequest(tikvrpc.CmdPrewrite, req, pb.Context{Priority: c.priority, SyncLog: c.syncLog}) } -func (c *twoPhaseCommitter) prewriteSingleBatch(bo *Backoffer, batch batchKeys) error { +func (actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch batchKeys) error { txnSize := uint64(c.regionTxnSize[batch.region.id]) // When we retry because of a region miss, we don't know the transaction size. We set the transaction size here // to MaxUint64 to avoid unexpected "resolve lock lite". @@ -653,7 +643,7 @@ func (tm *ttlManager) keepAlive(c *twoPhaseCommitter) { } } -func (c *twoPhaseCommitter) pessimisticLockSingleBatch(bo *Backoffer, batch batchKeys) error { +func (actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch batchKeys) error { mutations := make([]*pb.Mutation, len(batch.keys)) for i, k := range batch.keys { mut := &pb.Mutation{ @@ -730,7 +720,7 @@ func (c *twoPhaseCommitter) pessimisticLockSingleBatch(bo *Backoffer, batch batc } } -func (c *twoPhaseCommitter) pessimisticRollbackSingleBatch(bo *Backoffer, batch batchKeys) error { +func (actionPessimisticRollback) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch batchKeys) error { req := tikvrpc.NewRequest(tikvrpc.CmdPessimisticRollback, &pb.PessimisticRollbackRequest{ StartVersion: c.startTS, ForUpdateTs: c.forUpdateTS, @@ -801,7 +791,7 @@ func (c *twoPhaseCommitter) getUndeterminedErr() error { return c.mu.undeterminedErr } -func (c *twoPhaseCommitter) commitSingleBatch(bo *Backoffer, batch batchKeys) error { +func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch batchKeys) error { req := tikvrpc.NewRequest(tikvrpc.CmdCommit, &pb.CommitRequest{ StartVersion: c.startTS, Keys: batch.keys, @@ -873,7 +863,7 @@ func (c *twoPhaseCommitter) commitSingleBatch(bo *Backoffer, batch batchKeys) er return nil } -func (c *twoPhaseCommitter) cleanupSingleBatch(bo *Backoffer, batch batchKeys) error { +func (actionCleanup) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch batchKeys) error { req := tikvrpc.NewRequest(tikvrpc.CmdBatchRollback, &pb.BatchRollbackRequest{ Keys: batch.keys, StartVersion: c.startTS, @@ -911,7 +901,7 @@ func (c *twoPhaseCommitter) prewriteKeys(bo *Backoffer, keys [][]byte) error { bo.ctx = opentracing.ContextWithSpan(bo.ctx, span1) } - return c.doActionOnKeys(bo, actionPrewrite, keys) + return c.doActionOnKeys(bo, actionPrewrite{}, keys) } func (c *twoPhaseCommitter) commitKeys(bo *Backoffer, keys [][]byte) error { @@ -921,19 +911,19 @@ func (c *twoPhaseCommitter) commitKeys(bo *Backoffer, keys [][]byte) error { bo.ctx = opentracing.ContextWithSpan(bo.ctx, span1) } - return c.doActionOnKeys(bo, actionCommit, keys) + return c.doActionOnKeys(bo, actionCommit{}, keys) } func (c *twoPhaseCommitter) cleanupKeys(bo *Backoffer, keys [][]byte) error { - return c.doActionOnKeys(bo, actionCleanup, keys) + return c.doActionOnKeys(bo, actionCleanup{}, keys) } func (c *twoPhaseCommitter) pessimisticLockKeys(bo *Backoffer, keys [][]byte) error { - return c.doActionOnKeys(bo, actionPessimisticLock, keys) + return c.doActionOnKeys(bo, actionPessimisticLock{}, keys) } func (c *twoPhaseCommitter) pessimisticRollbackKeys(bo *Backoffer, keys [][]byte) error { - return c.doActionOnKeys(bo, actionPessimisticRollback, keys) + return c.doActionOnKeys(bo, actionPessimisticRollback{}, keys) } func (c *twoPhaseCommitter) executeAndWriteFinishBinlog(ctx context.Context) error { @@ -1148,9 +1138,9 @@ func appendBatchBySize(b []batchKeys, region RegionVerID, keys [][]byte, sizeFn // newBatchExecutor create processor to handle concurrent batch works(prewrite/commit etc) func newBatchExecutor(rateLimit int, committer *twoPhaseCommitter, - action twoPhaseCommitAction, procFn procOneBatchFn, backoffer *Backoffer) *batchExecutor { + action twoPhaseCommitAction, backoffer *Backoffer) *batchExecutor { return &batchExecutor{rateLimit, nil, committer, - action, procFn, backoffer, time.Duration(1 * time.Millisecond)} + action, backoffer, time.Duration(1 * time.Millisecond)} } // initUtils do initialize batchExecutor related policies like rateLimit util @@ -1170,7 +1160,7 @@ func (batchExe *batchExecutor) startWorker(exitCh chan struct{}, ch chan error, go func() { defer batchExe.rateLimiter.putToken() var singleBatchBackoffer *Backoffer - if batchExe.action == actionCommit { + if _, ok := batchExe.action.(actionCommit); ok { // Because the secondary batches of the commit actions are implemented to be // committed asynchronously in background goroutines, we should not // fork a child context and call cancel() while the foreground goroutine exits. @@ -1185,7 +1175,7 @@ func (batchExe *batchExecutor) startWorker(exitCh chan struct{}, ch chan error, defer singleBatchCancel() } beforeSleep := singleBatchBackoffer.totalSleep - ch <- batchExe.procFn(singleBatchBackoffer, batch) + ch <- batchExe.action.handleSingleBatch(batchExe.committer, singleBatchBackoffer, batch) commitDetail := batchExe.committer.getDetail() if commitDetail != nil { // lock operations of pessimistic-txn will let commitDetail be nil if delta := singleBatchBackoffer.totalSleep - beforeSleep; delta > 0 { @@ -1217,7 +1207,7 @@ func (batchExe *batchExecutor) process(batches []batchKeys) error { // For prewrite, stop sending other requests after receiving first error. backoffer := batchExe.backoffer var cancel context.CancelFunc - if batchExe.action == actionPrewrite { + if _, ok := batchExe.action.(actionPrewrite); ok { backoffer, cancel = batchExe.backoffer.Fork() defer cancel() }