From af93a3d42486faf355d7b9f72fd231865be44e27 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Tue, 29 Oct 2019 13:14:57 +0800 Subject: [PATCH] store/tikv: tiny refactoring, change `twoPhaseCommitAction` to interface (#12845) --- go.sum | 1 + store/tikv/2pc.go | 126 +++++++++++++++++++++++++--------------------- 2 files changed, 71 insertions(+), 56 deletions(-) diff --git a/go.sum b/go.sum index b7d5184fcc063..9b40b36d696e7 100644 --- a/go.sum +++ b/go.sum @@ -189,6 +189,7 @@ github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d h1:GoAlyOgbOEIFd github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/remyoudompheng/bigfft v0.0.0-20190512091148-babf20351dd7 h1:FUL3b97ZY2EPqg2NbXKuMHs5pXJB9hjj1fDHnF2vl28= github.com/remyoudompheng/bigfft v0.0.0-20190512091148-babf20351dd7/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= +github.com/sergi/go-diff v1.0.1-0.20180205163309-da645544ed44 h1:tB9NOR21++IjLyVx3/PCPhWMwqGNCMQEH96A6dMZ/gc= github.com/sergi/go-diff v1.0.1-0.20180205163309-da645544ed44/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= github.com/shirou/gopsutil v2.18.10+incompatible h1:cy84jW6EVRPa5g9HAHrlbxMSIjBhDSX0OFYyMYminYs= github.com/shirou/gopsutil v2.18.10+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index 88cd7a8722272..21ae183f14055 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -23,6 +23,7 @@ import ( "time" "unsafe" + "github.com/opentracing/opentracing-go" "github.com/pingcap/errors" "github.com/pingcap/failpoint" pb "github.com/pingcap/kvproto/pkg/kvrpcpb" @@ -40,14 +41,23 @@ import ( "go.uber.org/zap" ) -type twoPhaseCommitAction int +type twoPhaseCommitAction interface { + handleSingleBatch(*twoPhaseCommitter, *Backoffer, batchKeys) error + String() string +} -const ( - actionPrewrite twoPhaseCommitAction = 1 + iota - actionCommit - actionCleanup - actionPessimisticLock - actionPessimisticRollback +type actionPrewrite struct{} +type actionCommit struct{} +type actionCleanup struct{} +type actionPessimisticLock struct{} +type actionPessimisticRollback struct{} + +var ( + _ twoPhaseCommitAction = actionPrewrite{} + _ twoPhaseCommitAction = actionCommit{} + _ twoPhaseCommitAction = actionCleanup{} + _ twoPhaseCommitAction = actionPessimisticLock{} + _ twoPhaseCommitAction = actionPessimisticRollback{} ) var ( @@ -62,24 +72,28 @@ var ( PessimisticLockTTL uint64 = 15000 // 15s ~ 40s ) -func (ca twoPhaseCommitAction) String() string { - switch ca { - case actionPrewrite: - return "prewrite" - case actionCommit: - return "commit" - case actionCleanup: - return "cleanup" - case actionPessimisticLock: - return "pessimistic_lock" - case actionPessimisticRollback: - return "pessimistic_rollback" - } - return "unknown" +func (actionPrewrite) String() string { + return "prewrite" +} + +func (actionCommit) String() string { + return "commit" +} + +func (actionCleanup) String() string { + return "cleanup" +} + +func (actionPessimisticLock) String() string { + return "pessimistic_lock" +} + +func (actionPessimisticRollback) String() string { + return "pessimistic_rollback" } -// MetricsTag returns detail tag for metrics. -func (ca twoPhaseCommitAction) MetricsTag() string { +// metricsTag returns detail tag for metrics. +func metricsTag(ca twoPhaseCommitAction) string { return "2pc_" + ca.String() } @@ -371,11 +385,11 @@ func (c *twoPhaseCommitter) doActionOnKeys(bo *Backoffer, action twoPhaseCommitA return errors.Trace(err) } - metrics.TiKVTxnRegionsNumHistogram.WithLabelValues(action.MetricsTag()).Observe(float64(len(groups))) + metrics.TiKVTxnRegionsNumHistogram.WithLabelValues(metricsTag(action)).Observe(float64(len(groups))) var batches []batchKeys var sizeFunc = c.keySize - if action == actionPrewrite { + if _, ok := action.(actionPrewrite); ok { // Do not update regionTxnSize on retries. They are not used when building a PrewriteRequest. if len(bo.errors) == 0 { for region, keys := range groups { @@ -393,7 +407,9 @@ func (c *twoPhaseCommitter) doActionOnKeys(bo *Backoffer, action twoPhaseCommitA } firstIsPrimary := bytes.Equal(keys[0], c.primary()) - if firstIsPrimary && (action == actionCommit || action == actionCleanup) { + _, actionIsCommit := action.(actionCommit) + _, actionIsCleanup := action.(actionCleanup) + if firstIsPrimary && (actionIsCommit || actionIsCleanup) { // primary should be committed/cleanup first err = c.doActionOnBatches(bo, action, batches[:1]) if err != nil { @@ -401,7 +417,7 @@ func (c *twoPhaseCommitter) doActionOnKeys(bo *Backoffer, action twoPhaseCommitA } batches = batches[1:] } - if action == actionCommit { + if actionIsCommit { // Commit secondary batches in background goroutine to reduce latency. // The backoffer instance is created outside of the goroutine to avoid // potencial data race in unit test since `CommitMaxBackoff` will be updated @@ -428,23 +444,10 @@ func (c *twoPhaseCommitter) doActionOnBatches(bo *Backoffer, action twoPhaseComm if len(batches) == 0 { return nil } - var singleBatchActionFunc func(bo *Backoffer, batch batchKeys) error - switch action { - case actionPrewrite: - singleBatchActionFunc = c.prewriteSingleBatch - case actionCommit: - singleBatchActionFunc = c.commitSingleBatch - case actionCleanup: - singleBatchActionFunc = c.cleanupSingleBatch - case actionPessimisticLock: - singleBatchActionFunc = c.pessimisticLockSingleBatch - case actionPessimisticRollback: - singleBatchActionFunc = c.pessimisticRollbackSingleBatch - } if len(batches) == 1 { - e := singleBatchActionFunc(bo, batches[0]) + e := action.handleSingleBatch(c, bo, batches[0]) if e != nil { - logutil.Logger(context.Background()).Debug("2PC doActionOnBatches failed", + logutil.Logger(bo.ctx).Debug("2PC doActionOnBatches failed", zap.Uint64("conn", c.connID), zap.Stringer("action type", action), zap.Error(e), @@ -456,7 +459,7 @@ func (c *twoPhaseCommitter) doActionOnBatches(bo *Backoffer, action twoPhaseComm // For prewrite, stop sending other requests after receiving first error. backoffer := bo var cancel context.CancelFunc - if action == actionPrewrite { + if _, ok := action.(actionPrewrite); ok { backoffer, cancel = bo.Fork() defer cancel() } @@ -468,7 +471,7 @@ func (c *twoPhaseCommitter) doActionOnBatches(bo *Backoffer, action twoPhaseComm batch := batch1 go func() { var singleBatchBackoffer *Backoffer - if action == actionCommit { + if _, ok := action.(actionCommit); ok { // Because the secondary batches of the commit actions are implemented to be // committed asynchronously in background goroutines, we should not // fork a child context and call cancel() while the foreground goroutine exits. @@ -483,7 +486,7 @@ func (c *twoPhaseCommitter) doActionOnBatches(bo *Backoffer, action twoPhaseComm defer singleBatchCancel() } beforeSleep := singleBatchBackoffer.totalSleep - ch <- singleBatchActionFunc(singleBatchBackoffer, batch) + ch <- action.handleSingleBatch(c, singleBatchBackoffer, batch) commitDetail := c.getDetail() if commitDetail != nil { // lock operations of pessimistic-txn will let commitDetail be nil if delta := singleBatchBackoffer.totalSleep - beforeSleep; delta > 0 { @@ -562,7 +565,7 @@ func (c *twoPhaseCommitter) buildPrewriteRequest(batch batchKeys, txnSize uint64 } } -func (c *twoPhaseCommitter) prewriteSingleBatch(bo *Backoffer, batch batchKeys) error { +func (actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch batchKeys) error { txnSize := uint64(c.regionTxnSize[batch.region.id]) // When we retry because of a region miss, we don't know the transaction size. We set the transaction size here // to MaxUint64 to avoid unexpected "resolve lock lite". @@ -710,7 +713,7 @@ func (tm *ttlManager) keepAlive(c *twoPhaseCommitter) { } } -func (c *twoPhaseCommitter) pessimisticLockSingleBatch(bo *Backoffer, batch batchKeys) error { +func (actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch batchKeys) error { mutations := make([]*pb.Mutation, len(batch.keys)) for i, k := range batch.keys { mut := &pb.Mutation{ @@ -794,7 +797,7 @@ func (c *twoPhaseCommitter) pessimisticLockSingleBatch(bo *Backoffer, batch batc } } -func (c *twoPhaseCommitter) pessimisticRollbackSingleBatch(bo *Backoffer, batch batchKeys) error { +func (actionPessimisticRollback) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch batchKeys) error { req := &tikvrpc.Request{ Type: tikvrpc.CmdPessimisticRollback, PessimisticRollback: &pb.PessimisticRollbackRequest{ @@ -868,7 +871,7 @@ func (c *twoPhaseCommitter) getUndeterminedErr() error { return c.mu.undeterminedErr } -func (c *twoPhaseCommitter) commitSingleBatch(bo *Backoffer, batch batchKeys) error { +func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch batchKeys) error { req := &tikvrpc.Request{ Type: tikvrpc.CmdCommit, Commit: &pb.CommitRequest{ @@ -881,7 +884,6 @@ func (c *twoPhaseCommitter) commitSingleBatch(bo *Backoffer, batch batchKeys) er SyncLog: c.syncLog, }, } - req.Context.Priority = c.priority sender := NewRegionRequestSender(c.store.regionCache, c.store.client) resp, err := sender.SendReq(bo, req, batch.region, readTimeoutShort) @@ -948,7 +950,7 @@ func (c *twoPhaseCommitter) commitSingleBatch(bo *Backoffer, batch batchKeys) er return nil } -func (c *twoPhaseCommitter) cleanupSingleBatch(bo *Backoffer, batch batchKeys) error { +func (actionCleanup) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch batchKeys) error { req := &tikvrpc.Request{ Type: tikvrpc.CmdBatchRollback, BatchRollback: &pb.BatchRollbackRequest{ @@ -987,23 +989,35 @@ func (c *twoPhaseCommitter) cleanupSingleBatch(bo *Backoffer, batch batchKeys) e } func (c *twoPhaseCommitter) prewriteKeys(bo *Backoffer, keys [][]byte) error { - return c.doActionOnKeys(bo, actionPrewrite, keys) + if span := opentracing.SpanFromContext(bo.ctx); span != nil && span.Tracer() != nil { + span1 := span.Tracer().StartSpan("twoPhaseCommitter.prewriteKeys", opentracing.ChildOf(span.Context())) + defer span1.Finish() + bo.ctx = opentracing.ContextWithSpan(bo.ctx, span1) + } + + return c.doActionOnKeys(bo, actionPrewrite{}, keys) } func (c *twoPhaseCommitter) commitKeys(bo *Backoffer, keys [][]byte) error { - return c.doActionOnKeys(bo, actionCommit, keys) + if span := opentracing.SpanFromContext(bo.ctx); span != nil && span.Tracer() != nil { + span1 := span.Tracer().StartSpan("twoPhaseCommitter.commitKeys", opentracing.ChildOf(span.Context())) + defer span1.Finish() + bo.ctx = opentracing.ContextWithSpan(bo.ctx, span1) + } + + return c.doActionOnKeys(bo, actionCommit{}, keys) } func (c *twoPhaseCommitter) cleanupKeys(bo *Backoffer, keys [][]byte) error { - return c.doActionOnKeys(bo, actionCleanup, keys) + return c.doActionOnKeys(bo, actionCleanup{}, keys) } func (c *twoPhaseCommitter) pessimisticLockKeys(bo *Backoffer, keys [][]byte) error { - return c.doActionOnKeys(bo, actionPessimisticLock, keys) + return c.doActionOnKeys(bo, actionPessimisticLock{}, keys) } func (c *twoPhaseCommitter) pessimisticRollbackKeys(bo *Backoffer, keys [][]byte) error { - return c.doActionOnKeys(bo, actionPessimisticRollback, keys) + return c.doActionOnKeys(bo, actionPessimisticRollback{}, keys) } func (c *twoPhaseCommitter) executeAndWriteFinishBinlog(ctx context.Context) error {