Skip to content

Commit

Permalink
executor: pessimistic txn update forUpdateTS for concurrent insert (p…
Browse files Browse the repository at this point in the history
  • Loading branch information
coocood authored and jackysp committed Nov 19, 2019
1 parent e9165b9 commit 83630db
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 9 deletions.
41 changes: 32 additions & 9 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,24 @@ func (a *ExecStmt) handlePessimisticDML(ctx context.Context, e Executor) error {
}
}

// UpdateForUpdateTS updates the ForUpdateTS, if newForUpdateTS is 0, it obtain a new TS from PD.
func UpdateForUpdateTS(seCtx sessionctx.Context, newForUpdateTS uint64) error {
if newForUpdateTS == 0 {
version, err := seCtx.GetStore().CurrentVersion()
if err != nil {
return err
}
newForUpdateTS = version.Ver
}
txn, err := seCtx.Txn(true)
if err != nil {
return err
}
seCtx.GetSessionVars().TxnCtx.SetForUpdateTS(newForUpdateTS)
txn.SetOption(kv.SnapshotTS, newForUpdateTS)
return nil
}

// handlePessimisticLockError updates TS and rebuild executor if the err is write conflict.
func (a *ExecStmt) handlePessimisticLockError(ctx context.Context, err error) (Executor, error) {
txnCtx := a.Ctx.GetSessionVars().TxnCtx
Expand All @@ -495,24 +513,29 @@ func (a *ExecStmt) handlePessimisticLockError(ctx context.Context, err error) (E
newForUpdateTS = conflictCommitTS
}
} else {
// this branch if err not nil, always update forUpdateTS to avoid problem described below
// for nowait, when ErrLock happened, ErrLockAcquireFailAndNoWaitSet will be returned, and in the same txn
// the select for updateTs must be updated, otherwise there maybe rollback problem.
// begin; select for update key1(here ErrLocked or other errors(or max_execution_time like util),
// key1 lock not get and async rollback key1 is raised)
// select for update key1 again(this time lock succ(maybe lock released by others))
// the async rollback operation rollbacked the lock just acquired
if err != nil {
tsErr := UpdateForUpdateTS(a.Ctx, 0)
if tsErr != nil {
return nil, tsErr
}
}
return nil, err
}
if a.retryCount >= config.GetGlobalConfig().PessimisticTxn.MaxRetryCount {
return nil, errors.New("pessimistic lock retry limit reached")
}
a.retryCount++
if newForUpdateTS == 0 {
newForUpdateTS, err = a.Ctx.GetStore().GetOracle().GetTimestamp(ctx)
if err != nil {
return nil, err
}
}
txnCtx.SetForUpdateTS(newForUpdateTS)
txn, err := a.Ctx.Txn(true)
err = UpdateForUpdateTS(a.Ctx, newForUpdateTS)
if err != nil {
return nil, err
}
txn.SetOption(kv.SnapshotTS, newForUpdateTS)
e, err := a.buildExecutor()
if err != nil {
return nil, err
Expand Down
23 changes: 23 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,9 @@ func (b *executorBuilder) buildDeallocate(v *plannercore.Deallocate) Executor {

func (b *executorBuilder) buildSelectLock(v *plannercore.PhysicalLock) Executor {
b.isSelectForUpdate = true
if b.err = b.updateForUpdateTSIfNeeded(v.Children()[0]); b.err != nil {
return nil
}
// Build 'select for update' using the 'for update' ts.
b.startTS = b.ctx.GetSessionVars().TxnCtx.GetForUpdateTS()

Expand Down Expand Up @@ -1355,6 +1358,9 @@ func (b *executorBuilder) buildUpdate(v *plannercore.Update) Executor {
for id := range v.SelectPlan.Schema().TblID2Handle {
tblID2table[id], _ = b.is.TableByID(id)
}
if b.err = b.updateForUpdateTSIfNeeded(v.SelectPlan); b.err != nil {
return nil
}
b.startTS = b.ctx.GetSessionVars().TxnCtx.GetForUpdateTS()
selExec := b.build(v.SelectPlan)
if b.err != nil {
Expand Down Expand Up @@ -1437,6 +1443,9 @@ func (b *executorBuilder) buildDelete(v *plannercore.Delete) Executor {
for id := range v.SelectPlan.Schema().TblID2Handle {
tblID2table[id], _ = b.is.TableByID(id)
}
if b.err = b.updateForUpdateTSIfNeeded(v.SelectPlan); b.err != nil {
return nil
}
b.startTS = b.ctx.GetSessionVars().TxnCtx.GetForUpdateTS()
selExec := b.build(v.SelectPlan)
if b.err != nil {
Expand All @@ -1454,6 +1463,20 @@ func (b *executorBuilder) buildDelete(v *plannercore.Delete) Executor {
return deleteExec
}

// updateForUpdateTSIfNeeded updates the ForUpdateTS for a pessimistic transaction if needed.
// PointGet executor will get conflict error if the ForUpdateTS is older than the latest commitTS,
// so we don't need to update now for better latency.
func (b *executorBuilder) updateForUpdateTSIfNeeded(selectPlan plannercore.PhysicalPlan) error {
txnCtx := b.ctx.GetSessionVars().TxnCtx
if !txnCtx.IsPessimistic {
return nil
}
if _, ok := selectPlan.(*plannercore.PointGetPlan); ok {
return nil
}
return UpdateForUpdateTS(b.ctx, 0)
}

func (b *executorBuilder) buildAnalyzeIndexPushdown(task plannercore.AnalyzeIndexTask, maxNumBuckets uint64, autoAnalyze string) *analyzeTask {
_, offset := timeutil.Zone(b.ctx.GetSessionVars().Location())
e := &AnalyzeIndexExec{
Expand Down
28 changes: 28 additions & 0 deletions session/pessimistic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,34 @@ func (s *testPessimisticSuite) TestKillStopTTLManager(c *C) {
tk2.MustExec("update test_kill set c = c + 1 where id = 1")
}

func (s *testPessimisticSuite) TestConcurrentInsert(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("drop table if exists tk")
tk.MustExec("create table tk (c1 int primary key, c2 int)")
tk.MustExec("insert tk values (1, 1)")

tk1 := testkit.NewTestKitWithInit(c, s.store)
tk1.MustExec("begin pessimistic")
tk2 := testkit.NewTestKitWithInit(c, s.store)
forUpdateTsA := tk1.Se.GetSessionVars().TxnCtx.GetForUpdateTS()
tk1.MustQuery("select * from tk where c1 = 1 for update")
forUpdateTsB := tk1.Se.GetSessionVars().TxnCtx.GetForUpdateTS()
c.Assert(forUpdateTsA, Equals, forUpdateTsB)
tk1.MustQuery("select * from tk where c1 > 0 for update")
forUpdateTsC := tk1.Se.GetSessionVars().TxnCtx.GetForUpdateTS()
c.Assert(forUpdateTsC, Greater, forUpdateTsB)

tk2.MustExec("insert tk values (2, 2)")
tk1.MustQuery("select * from tk for update").Check(testkit.Rows("1 1", "2 2"))
tk2.MustExec("insert tk values (3, 3)")
tk1.MustExec("update tk set c2 = c2 + 1")
c.Assert(tk1.Se.AffectedRows(), Equals, uint64(3))
tk2.MustExec("insert tk values (4, 4)")
tk1.MustExec("delete from tk")
c.Assert(tk1.Se.AffectedRows(), Equals, uint64(4))
tk1.MustExec("commit")
}

func (s *testPessimisticSuite) TestInnodbLockWaitTimeout(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("drop table if exists tk")
Expand Down

0 comments on commit 83630db

Please sign in to comment.