From af93a3d42486faf355d7b9f72fd231865be44e27 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Tue, 29 Oct 2019 13:14:57 +0800 Subject: [PATCH 1/2] store/tikv: tiny refactoring, change `twoPhaseCommitAction` to interface (#12845) --- go.sum | 1 + store/tikv/2pc.go | 126 +++++++++++++++++++++++++--------------------- 2 files changed, 71 insertions(+), 56 deletions(-) diff --git a/go.sum b/go.sum index b7d5184fcc063..9b40b36d696e7 100644 --- a/go.sum +++ b/go.sum @@ -189,6 +189,7 @@ github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d h1:GoAlyOgbOEIFd github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/remyoudompheng/bigfft v0.0.0-20190512091148-babf20351dd7 h1:FUL3b97ZY2EPqg2NbXKuMHs5pXJB9hjj1fDHnF2vl28= github.com/remyoudompheng/bigfft v0.0.0-20190512091148-babf20351dd7/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= +github.com/sergi/go-diff v1.0.1-0.20180205163309-da645544ed44 h1:tB9NOR21++IjLyVx3/PCPhWMwqGNCMQEH96A6dMZ/gc= github.com/sergi/go-diff v1.0.1-0.20180205163309-da645544ed44/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= github.com/shirou/gopsutil v2.18.10+incompatible h1:cy84jW6EVRPa5g9HAHrlbxMSIjBhDSX0OFYyMYminYs= github.com/shirou/gopsutil v2.18.10+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index 88cd7a8722272..21ae183f14055 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -23,6 +23,7 @@ import ( "time" "unsafe" + "github.com/opentracing/opentracing-go" "github.com/pingcap/errors" "github.com/pingcap/failpoint" pb "github.com/pingcap/kvproto/pkg/kvrpcpb" @@ -40,14 +41,23 @@ import ( "go.uber.org/zap" ) -type twoPhaseCommitAction int +type twoPhaseCommitAction interface { + handleSingleBatch(*twoPhaseCommitter, *Backoffer, batchKeys) error + String() string +} -const ( - actionPrewrite twoPhaseCommitAction = 1 + iota - actionCommit - actionCleanup - actionPessimisticLock - actionPessimisticRollback +type actionPrewrite struct{} +type actionCommit struct{} +type actionCleanup struct{} +type actionPessimisticLock struct{} +type actionPessimisticRollback struct{} + +var ( + _ twoPhaseCommitAction = actionPrewrite{} + _ twoPhaseCommitAction = actionCommit{} + _ twoPhaseCommitAction = actionCleanup{} + _ twoPhaseCommitAction = actionPessimisticLock{} + _ twoPhaseCommitAction = actionPessimisticRollback{} ) var ( @@ -62,24 +72,28 @@ var ( PessimisticLockTTL uint64 = 15000 // 15s ~ 40s ) -func (ca twoPhaseCommitAction) String() string { - switch ca { - case actionPrewrite: - return "prewrite" - case actionCommit: - return "commit" - case actionCleanup: - return "cleanup" - case actionPessimisticLock: - return "pessimistic_lock" - case actionPessimisticRollback: - return "pessimistic_rollback" - } - return "unknown" +func (actionPrewrite) String() string { + return "prewrite" +} + +func (actionCommit) String() string { + return "commit" +} + +func (actionCleanup) String() string { + return "cleanup" +} + +func (actionPessimisticLock) String() string { + return "pessimistic_lock" +} + +func (actionPessimisticRollback) String() string { + return "pessimistic_rollback" } -// MetricsTag returns detail tag for metrics. -func (ca twoPhaseCommitAction) MetricsTag() string { +// metricsTag returns detail tag for metrics. +func metricsTag(ca twoPhaseCommitAction) string { return "2pc_" + ca.String() } @@ -371,11 +385,11 @@ func (c *twoPhaseCommitter) doActionOnKeys(bo *Backoffer, action twoPhaseCommitA return errors.Trace(err) } - metrics.TiKVTxnRegionsNumHistogram.WithLabelValues(action.MetricsTag()).Observe(float64(len(groups))) + metrics.TiKVTxnRegionsNumHistogram.WithLabelValues(metricsTag(action)).Observe(float64(len(groups))) var batches []batchKeys var sizeFunc = c.keySize - if action == actionPrewrite { + if _, ok := action.(actionPrewrite); ok { // Do not update regionTxnSize on retries. They are not used when building a PrewriteRequest. if len(bo.errors) == 0 { for region, keys := range groups { @@ -393,7 +407,9 @@ func (c *twoPhaseCommitter) doActionOnKeys(bo *Backoffer, action twoPhaseCommitA } firstIsPrimary := bytes.Equal(keys[0], c.primary()) - if firstIsPrimary && (action == actionCommit || action == actionCleanup) { + _, actionIsCommit := action.(actionCommit) + _, actionIsCleanup := action.(actionCleanup) + if firstIsPrimary && (actionIsCommit || actionIsCleanup) { // primary should be committed/cleanup first err = c.doActionOnBatches(bo, action, batches[:1]) if err != nil { @@ -401,7 +417,7 @@ func (c *twoPhaseCommitter) doActionOnKeys(bo *Backoffer, action twoPhaseCommitA } batches = batches[1:] } - if action == actionCommit { + if actionIsCommit { // Commit secondary batches in background goroutine to reduce latency. // The backoffer instance is created outside of the goroutine to avoid // potencial data race in unit test since `CommitMaxBackoff` will be updated @@ -428,23 +444,10 @@ func (c *twoPhaseCommitter) doActionOnBatches(bo *Backoffer, action twoPhaseComm if len(batches) == 0 { return nil } - var singleBatchActionFunc func(bo *Backoffer, batch batchKeys) error - switch action { - case actionPrewrite: - singleBatchActionFunc = c.prewriteSingleBatch - case actionCommit: - singleBatchActionFunc = c.commitSingleBatch - case actionCleanup: - singleBatchActionFunc = c.cleanupSingleBatch - case actionPessimisticLock: - singleBatchActionFunc = c.pessimisticLockSingleBatch - case actionPessimisticRollback: - singleBatchActionFunc = c.pessimisticRollbackSingleBatch - } if len(batches) == 1 { - e := singleBatchActionFunc(bo, batches[0]) + e := action.handleSingleBatch(c, bo, batches[0]) if e != nil { - logutil.Logger(context.Background()).Debug("2PC doActionOnBatches failed", + logutil.Logger(bo.ctx).Debug("2PC doActionOnBatches failed", zap.Uint64("conn", c.connID), zap.Stringer("action type", action), zap.Error(e), @@ -456,7 +459,7 @@ func (c *twoPhaseCommitter) doActionOnBatches(bo *Backoffer, action twoPhaseComm // For prewrite, stop sending other requests after receiving first error. backoffer := bo var cancel context.CancelFunc - if action == actionPrewrite { + if _, ok := action.(actionPrewrite); ok { backoffer, cancel = bo.Fork() defer cancel() } @@ -468,7 +471,7 @@ func (c *twoPhaseCommitter) doActionOnBatches(bo *Backoffer, action twoPhaseComm batch := batch1 go func() { var singleBatchBackoffer *Backoffer - if action == actionCommit { + if _, ok := action.(actionCommit); ok { // Because the secondary batches of the commit actions are implemented to be // committed asynchronously in background goroutines, we should not // fork a child context and call cancel() while the foreground goroutine exits. @@ -483,7 +486,7 @@ func (c *twoPhaseCommitter) doActionOnBatches(bo *Backoffer, action twoPhaseComm defer singleBatchCancel() } beforeSleep := singleBatchBackoffer.totalSleep - ch <- singleBatchActionFunc(singleBatchBackoffer, batch) + ch <- action.handleSingleBatch(c, singleBatchBackoffer, batch) commitDetail := c.getDetail() if commitDetail != nil { // lock operations of pessimistic-txn will let commitDetail be nil if delta := singleBatchBackoffer.totalSleep - beforeSleep; delta > 0 { @@ -562,7 +565,7 @@ func (c *twoPhaseCommitter) buildPrewriteRequest(batch batchKeys, txnSize uint64 } } -func (c *twoPhaseCommitter) prewriteSingleBatch(bo *Backoffer, batch batchKeys) error { +func (actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch batchKeys) error { txnSize := uint64(c.regionTxnSize[batch.region.id]) // When we retry because of a region miss, we don't know the transaction size. We set the transaction size here // to MaxUint64 to avoid unexpected "resolve lock lite". @@ -710,7 +713,7 @@ func (tm *ttlManager) keepAlive(c *twoPhaseCommitter) { } } -func (c *twoPhaseCommitter) pessimisticLockSingleBatch(bo *Backoffer, batch batchKeys) error { +func (actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch batchKeys) error { mutations := make([]*pb.Mutation, len(batch.keys)) for i, k := range batch.keys { mut := &pb.Mutation{ @@ -794,7 +797,7 @@ func (c *twoPhaseCommitter) pessimisticLockSingleBatch(bo *Backoffer, batch batc } } -func (c *twoPhaseCommitter) pessimisticRollbackSingleBatch(bo *Backoffer, batch batchKeys) error { +func (actionPessimisticRollback) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch batchKeys) error { req := &tikvrpc.Request{ Type: tikvrpc.CmdPessimisticRollback, PessimisticRollback: &pb.PessimisticRollbackRequest{ @@ -868,7 +871,7 @@ func (c *twoPhaseCommitter) getUndeterminedErr() error { return c.mu.undeterminedErr } -func (c *twoPhaseCommitter) commitSingleBatch(bo *Backoffer, batch batchKeys) error { +func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch batchKeys) error { req := &tikvrpc.Request{ Type: tikvrpc.CmdCommit, Commit: &pb.CommitRequest{ @@ -881,7 +884,6 @@ func (c *twoPhaseCommitter) commitSingleBatch(bo *Backoffer, batch batchKeys) er SyncLog: c.syncLog, }, } - req.Context.Priority = c.priority sender := NewRegionRequestSender(c.store.regionCache, c.store.client) resp, err := sender.SendReq(bo, req, batch.region, readTimeoutShort) @@ -948,7 +950,7 @@ func (c *twoPhaseCommitter) commitSingleBatch(bo *Backoffer, batch batchKeys) er return nil } -func (c *twoPhaseCommitter) cleanupSingleBatch(bo *Backoffer, batch batchKeys) error { +func (actionCleanup) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch batchKeys) error { req := &tikvrpc.Request{ Type: tikvrpc.CmdBatchRollback, BatchRollback: &pb.BatchRollbackRequest{ @@ -987,23 +989,35 @@ func (c *twoPhaseCommitter) cleanupSingleBatch(bo *Backoffer, batch batchKeys) e } func (c *twoPhaseCommitter) prewriteKeys(bo *Backoffer, keys [][]byte) error { - return c.doActionOnKeys(bo, actionPrewrite, keys) + if span := opentracing.SpanFromContext(bo.ctx); span != nil && span.Tracer() != nil { + span1 := span.Tracer().StartSpan("twoPhaseCommitter.prewriteKeys", opentracing.ChildOf(span.Context())) + defer span1.Finish() + bo.ctx = opentracing.ContextWithSpan(bo.ctx, span1) + } + + return c.doActionOnKeys(bo, actionPrewrite{}, keys) } func (c *twoPhaseCommitter) commitKeys(bo *Backoffer, keys [][]byte) error { - return c.doActionOnKeys(bo, actionCommit, keys) + if span := opentracing.SpanFromContext(bo.ctx); span != nil && span.Tracer() != nil { + span1 := span.Tracer().StartSpan("twoPhaseCommitter.commitKeys", opentracing.ChildOf(span.Context())) + defer span1.Finish() + bo.ctx = opentracing.ContextWithSpan(bo.ctx, span1) + } + + return c.doActionOnKeys(bo, actionCommit{}, keys) } func (c *twoPhaseCommitter) cleanupKeys(bo *Backoffer, keys [][]byte) error { - return c.doActionOnKeys(bo, actionCleanup, keys) + return c.doActionOnKeys(bo, actionCleanup{}, keys) } func (c *twoPhaseCommitter) pessimisticLockKeys(bo *Backoffer, keys [][]byte) error { - return c.doActionOnKeys(bo, actionPessimisticLock, keys) + return c.doActionOnKeys(bo, actionPessimisticLock{}, keys) } func (c *twoPhaseCommitter) pessimisticRollbackKeys(bo *Backoffer, keys [][]byte) error { - return c.doActionOnKeys(bo, actionPessimisticRollback, keys) + return c.doActionOnKeys(bo, actionPessimisticRollback{}, keys) } func (c *twoPhaseCommitter) executeAndWriteFinishBinlog(ctx context.Context) error { From 718253a9d5b84e94813d604cb7da84c9e875c168 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Tue, 29 Oct 2019 13:42:35 +0800 Subject: [PATCH 2/2] cherry pick --- ddl/index.go | 2 +- executor/adapter.go | 2 +- executor/admin.go | 2 +- executor/executor.go | 2 +- kv/kv.go | 2 +- kv/mock.go | 2 +- kv/mock_test.go | 2 +- session/pessimistic_test.go | 29 +++++++++++++++++++++++++++++ store/tikv/2pc.go | 28 +++++++++++++++++++++------- store/tikv/2pc_test.go | 10 +++++----- store/tikv/error.go | 2 ++ store/tikv/ticlient_test.go | 2 +- store/tikv/txn.go | 11 +++++++++-- 13 files changed, 74 insertions(+), 22 deletions(-) diff --git a/ddl/index.go b/ddl/index.go index e3fc94fba1d18..3921499ade8ec 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -869,7 +869,7 @@ func (w *addIndexWorker) backfillIndexInTxn(handleRange reorgIndexTask) (taskCtx // Lock the row key to notify us that someone delete or update the row, // then we should not backfill the index of it, otherwise the adding index is redundant. - err := txn.LockKeys(context.Background(), 0, idxRecord.key) + err := txn.LockKeys(context.Background(), nil, 0, idxRecord.key) if err != nil { return errors.Trace(err) } diff --git a/executor/adapter.go b/executor/adapter.go index a2eb59239570f..deb91247c48b2 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -460,7 +460,7 @@ func (a *ExecStmt) handlePessimisticDML(ctx context.Context, e Executor) error { return nil } forUpdateTS := txnCtx.GetForUpdateTS() - err = txn.LockKeys(ctx, forUpdateTS, keys...) + err = txn.LockKeys(ctx, &sctx.GetSessionVars().Killed, forUpdateTS, keys...) if err == nil { return nil } diff --git a/executor/admin.go b/executor/admin.go index fbbcc73b28f71..a177c9fff1e5f 100644 --- a/executor/admin.go +++ b/executor/admin.go @@ -431,7 +431,7 @@ func (e *RecoverIndexExec) backfillIndexInTxn(ctx context.Context, txn kv.Transa } recordKey := e.table.RecordKey(row.handle) - err := txn.LockKeys(ctx, 0, recordKey) + err := txn.LockKeys(ctx, nil, 0, recordKey) if err != nil { return result, err } diff --git a/executor/executor.go b/executor/executor.go index 1ba5ed502917d..92c903963931b 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -780,7 +780,7 @@ func doLockKeys(ctx context.Context, se sessionctx.Context, keys ...kv.Key) erro return err } forUpdateTS := se.GetSessionVars().TxnCtx.GetForUpdateTS() - return txn.LockKeys(ctx, forUpdateTS, keys...) + return txn.LockKeys(ctx, &se.GetSessionVars().Killed, forUpdateTS, keys...) } // LimitExec represents limit executor diff --git a/kv/kv.go b/kv/kv.go index 09d903581edf2..7ca90087377e2 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -136,7 +136,7 @@ type Transaction interface { // String implements fmt.Stringer interface. String() string // LockKeys tries to lock the entries with the keys in KV store. - LockKeys(ctx context.Context, forUpdateTS uint64, keys ...Key) error + LockKeys(ctx context.Context, killed *uint32, forUpdateTS uint64, keys ...Key) error // SetOption sets an option with a value, when val is nil, uses the default // value of this option. SetOption(opt Option, val interface{}) diff --git a/kv/mock.go b/kv/mock.go index 8d007e64e6893..877ab09ff9344 100644 --- a/kv/mock.go +++ b/kv/mock.go @@ -39,7 +39,7 @@ func (t *mockTxn) String() string { return "" } -func (t *mockTxn) LockKeys(_ context.Context, _ uint64, _ ...Key) error { +func (t *mockTxn) LockKeys(_ context.Context, _ *uint32, _ uint64, _ ...Key) error { return nil } diff --git a/kv/mock_test.go b/kv/mock_test.go index 67b4193f7ea9f..4cbe5631e0610 100644 --- a/kv/mock_test.go +++ b/kv/mock_test.go @@ -37,7 +37,7 @@ func (s testMockSuite) TestInterface(c *C) { transaction, err := storage.Begin() c.Check(err, IsNil) - err = transaction.LockKeys(context.Background(), 0, Key("lock")) + err = transaction.LockKeys(context.Background(), nil, 0, Key("lock")) c.Check(err, IsNil) transaction.SetOption(Option(23), struct{}{}) if mock, ok := transaction.(*mockTxn); ok { diff --git a/session/pessimistic_test.go b/session/pessimistic_test.go index 98abee8433e2d..7ec471c148f11 100644 --- a/session/pessimistic_test.go +++ b/session/pessimistic_test.go @@ -15,6 +15,8 @@ package session_test import ( "fmt" + "sync" + "sync/atomic" "time" . "github.com/pingcap/check" @@ -391,3 +393,30 @@ func (s *testPessimisticSuite) TestOptimisticConflicts(c *C) { tk.MustExec("commit") tk.MustQuery("select * from conflict").Check(testkit.Rows("1 3")) } + +func (s *testPessimisticSuite) TestWaitLockKill(c *C) { + // Test kill command works on waiting pessimistic lock. + tk := testkit.NewTestKitWithInit(c, s.store) + tk2 := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("drop table if exists test_kill") + tk.MustExec("create table test_kill (id int primary key, c int)") + tk.MustExec("insert test_kill values (1, 1)") + tk.MustExec("begin pessimistic") + tk2.MustExec("begin pessimistic") + tk.MustQuery("select * from test_kill where id = 1 for update") + + var wg sync.WaitGroup + wg.Add(1) + go func() { + time.Sleep(500 * time.Millisecond) + sessVars := tk2.Se.GetSessionVars() + succ := atomic.CompareAndSwapUint32(&sessVars.Killed, 0, 1) + c.Assert(succ, IsTrue) + wg.Wait() + }() + _, err := tk2.Exec("update test_kill set c = c + 1 where id = 1") + wg.Done() + c.Assert(err, NotNil) + c.Assert(terror.ErrorEqual(err, tikv.ErrQueryInterrupted), IsTrue) + tk.MustExec("rollback") +} diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index 21ae183f14055..764daa5f2b770 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -49,7 +49,7 @@ type twoPhaseCommitAction interface { type actionPrewrite struct{} type actionCommit struct{} type actionCleanup struct{} -type actionPessimisticLock struct{} +type actionPessimisticLock struct{ killed *uint32 } type actionPessimisticRollback struct{} var ( @@ -713,7 +713,7 @@ func (tm *ttlManager) keepAlive(c *twoPhaseCommitter) { } } -func (actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch batchKeys) error { +func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch batchKeys) error { mutations := make([]*pb.Mutation, len(batch.keys)) for i, k := range batch.keys { mut := &pb.Mutation{ @@ -756,7 +756,7 @@ func (actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *Backoff if err != nil { return errors.Trace(err) } - err = c.pessimisticLockKeys(bo, batch.keys) + err = c.pessimisticLockKeys(bo, action.killed, batch.keys) return errors.Trace(err) } lockResp := resp.PessimisticLock @@ -789,11 +789,25 @@ func (actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *Backoff } locks = append(locks, lock) } - _, err = c.store.lockResolver.ResolveLocks(bo, locks) + expire, err := c.store.lockResolver.ResolveLocks(bo, locks) if err != nil { return errors.Trace(err) } - // Because we already waited on tikv, no need to Backoff here. + + if err1 := bo.BackoffWithMaxSleep(BoTxnLock, int(expire), errors.New(locks[0].String())); err1 != nil { + return err1 + } + // Handle the killed flag when waiting for the pessimistic lock. + // When a txn runs into LockKeys() and backoff here, it has no chance to call + // executor.Next() and check the killed flag. + if action.killed != nil { + // Do not reset the killed flag here! + // actionPessimisticLock runs on each region parallelly, we have to consider that + // the error may be dropped. + if atomic.LoadUint32(action.killed) == 1 { + return ErrQueryInterrupted + } + } } } @@ -1012,8 +1026,8 @@ func (c *twoPhaseCommitter) cleanupKeys(bo *Backoffer, keys [][]byte) error { return c.doActionOnKeys(bo, actionCleanup{}, keys) } -func (c *twoPhaseCommitter) pessimisticLockKeys(bo *Backoffer, keys [][]byte) error { - return c.doActionOnKeys(bo, actionPessimisticLock{}, keys) +func (c *twoPhaseCommitter) pessimisticLockKeys(bo *Backoffer, killed *uint32, keys [][]byte) error { + return c.doActionOnKeys(bo, actionPessimisticLock{killed}, keys) } func (c *twoPhaseCommitter) pessimisticRollbackKeys(bo *Backoffer, keys [][]byte) error { diff --git a/store/tikv/2pc_test.go b/store/tikv/2pc_test.go index 05b932ff4562c..7ecb6bb1994ba 100644 --- a/store/tikv/2pc_test.go +++ b/store/tikv/2pc_test.go @@ -507,7 +507,7 @@ func (s *testCommitterSuite) TestUnsetPrimaryKey(c *C) { _, _ = txn.us.Get(key) c.Assert(txn.Set(key, key), IsNil) txn.DelOption(kv.PresumeKeyNotExists) - err := txn.LockKeys(context.Background(), txn.startTS, key) + err := txn.LockKeys(context.Background(), nil, txn.startTS, key) c.Assert(err, NotNil) c.Assert(txn.Delete(key), IsNil) key2 := kv.Key("key2") @@ -519,9 +519,9 @@ func (s *testCommitterSuite) TestUnsetPrimaryKey(c *C) { func (s *testCommitterSuite) TestPessimisticLockedKeysDedup(c *C) { txn := s.begin(c) txn.SetOption(kv.Pessimistic, true) - err := txn.LockKeys(context.Background(), 100, kv.Key("abc"), kv.Key("def")) + err := txn.LockKeys(context.Background(), nil, 100, kv.Key("abc"), kv.Key("def")) c.Assert(err, IsNil) - err = txn.LockKeys(context.Background(), 100, kv.Key("abc"), kv.Key("def")) + err = txn.LockKeys(context.Background(), nil, 100, kv.Key("abc"), kv.Key("def")) c.Assert(err, IsNil) c.Assert(txn.lockKeys, HasLen, 2) } @@ -531,11 +531,11 @@ func (s *testCommitterSuite) TestPessimisticTTL(c *C) { txn := s.begin(c) txn.SetOption(kv.Pessimistic, true) time.Sleep(time.Millisecond * 100) - err := txn.LockKeys(context.Background(), txn.startTS, key) + err := txn.LockKeys(context.Background(), nil, txn.startTS, key) c.Assert(err, IsNil) time.Sleep(time.Millisecond * 100) key2 := kv.Key("key2") - err = txn.LockKeys(context.Background(), txn.startTS, key2) + err = txn.LockKeys(context.Background(), nil, txn.startTS, key2) c.Assert(err, IsNil) lockInfo := s.getLockInfo(c, key) elapsedTTL := lockInfo.LockTtl - PessimisticLockTTL diff --git a/store/tikv/error.go b/store/tikv/error.go index 5e775218a9f5e..574e460454912 100644 --- a/store/tikv/error.go +++ b/store/tikv/error.go @@ -38,6 +38,7 @@ var ( ErrRegionUnavailable = terror.ClassTiKV.New(mysql.ErrRegionUnavailable, mysql.MySQLErrName[mysql.ErrRegionUnavailable]) ErrTiKVServerBusy = terror.ClassTiKV.New(mysql.ErrTiKVServerBusy, mysql.MySQLErrName[mysql.ErrTiKVServerBusy]) ErrGCTooEarly = terror.ClassTiKV.New(mysql.ErrGCTooEarly, mysql.MySQLErrName[mysql.ErrGCTooEarly]) + ErrQueryInterrupted = terror.ClassTiKV.New(mysql.ErrQueryInterrupted, mysql.MySQLErrName[mysql.ErrQueryInterrupted]) ) // ErrDeadlock wraps *kvrpcpb.Deadlock to implement the error interface. @@ -60,6 +61,7 @@ func init() { mysql.ErrTiKVServerBusy: mysql.ErrTiKVServerBusy, mysql.ErrGCTooEarly: mysql.ErrGCTooEarly, mysql.ErrTruncatedWrongValue: mysql.ErrTruncatedWrongValue, + mysql.ErrQueryInterrupted: mysql.ErrQueryInterrupted, } terror.ErrClassToMySQLCodes[terror.ClassTiKV] = tikvMySQLErrCodes } diff --git a/store/tikv/ticlient_test.go b/store/tikv/ticlient_test.go index 44ca2d40f7f41..ddbbe3e610f0b 100644 --- a/store/tikv/ticlient_test.go +++ b/store/tikv/ticlient_test.go @@ -119,7 +119,7 @@ func (s *testTiclientSuite) TestSingleKey(c *C) { txn := s.beginTxn(c) err := txn.Set(encodeKey(s.prefix, "key"), []byte("value")) c.Assert(err, IsNil) - err = txn.LockKeys(context.Background(), 0, encodeKey(s.prefix, "key")) + err = txn.LockKeys(context.Background(), nil, 0, encodeKey(s.prefix, "key")) c.Assert(err, IsNil) err = txn.Commit(context.Background()) c.Assert(err, IsNil) diff --git a/store/tikv/txn.go b/store/tikv/txn.go index 01a32795437d0..7e3b6a86d142c 100644 --- a/store/tikv/txn.go +++ b/store/tikv/txn.go @@ -17,6 +17,7 @@ import ( "context" "fmt" "sync" + "sync/atomic" "time" "github.com/dgryski/go-farm" @@ -351,7 +352,7 @@ func (txn *tikvTxn) rollbackPessimisticLocks() error { return txn.committer.pessimisticRollbackKeys(NewBackoffer(context.Background(), cleanupMaxBackoff), txn.lockKeys) } -func (txn *tikvTxn) LockKeys(ctx context.Context, forUpdateTS uint64, keysInput ...kv.Key) error { +func (txn *tikvTxn) LockKeys(ctx context.Context, killed *uint32, forUpdateTS uint64, keysInput ...kv.Key) error { // Exclude keys that are already locked. keys := make([][]byte, 0, len(keysInput)) txn.mu.Lock() @@ -395,7 +396,13 @@ func (txn *tikvTxn) LockKeys(ctx context.Context, forUpdateTS uint64, keysInput // If the number of keys greater than 1, it can be on different region, // concurrently execute on multiple regions may lead to deadlock. txn.committer.isFirstLock = len(txn.lockKeys) == 0 && len(keys) == 1 - err := txn.committer.pessimisticLockKeys(bo, keys) + err := txn.committer.pessimisticLockKeys(bo, killed, keys) + if killed != nil { + // If the kill signal is received during waiting for pessimisticLock, + // pessimisticLockKeys would handle the error but it doesn't reset the flag. + // We need to reset the killed flag here. + atomic.CompareAndSwapUint32(killed, 1, 0) + } if err != nil { for _, key := range keys { txn.us.DeleteConditionPair(key)