Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: support innodb_lock_wait_timeout for pessimistic transaction #13103

Merged
merged 4 commits into from
Nov 5, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,7 @@ func (a *ExecStmt) handlePessimisticDML(ctx context.Context, e Executor) error {
return nil
}
forUpdateTS := txnCtx.GetForUpdateTS()
err = txn.LockKeys(ctx, &sctx.GetSessionVars().Killed, forUpdateTS, kv.LockAlwaysWait, keys...)
err = txn.LockKeys(ctx, &sctx.GetSessionVars().Killed, forUpdateTS, sctx.GetSessionVars().LockWaitTimeout, keys...)
if err == nil {
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -815,7 +815,7 @@ func (e *SelectLockExec) Next(ctx context.Context, req *chunk.Chunk) error {
}
return nil
}
var lockWaitTime = kv.LockAlwaysWait
var lockWaitTime = e.ctx.GetSessionVars().LockWaitTimeout
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
var lockWaitTime = e.ctx.GetSessionVars().LockWaitTimeout
lockWaitTime := e.ctx.GetSessionVars().LockWaitTimeout

if e.Lock == ast.SelectLockForUpdateNoWait {
lockWaitTime = kv.LockNoWait
}
Expand Down
4 changes: 2 additions & 2 deletions planner/core/point_get_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ func TryFastPlan(ctx sessionctx.Context, node ast.Node) Plan {
if !sessVars.IsAutocommit() || sessVars.InTxn() {
fp.Lock = true
fp.IsForUpdate = true
fp.LockWaitTime = kv.LockAlwaysWait
fp.LockWaitTime = sessVars.LockWaitTimeout
if x.LockTp == ast.SelectLockForUpdateNoWait {
fp.LockWaitTime = kv.LockNoWait
}
Expand Down Expand Up @@ -621,7 +621,7 @@ func newPointGetPlan(ctx sessionctx.Context, dbName string, schema *expression.S
schema: schema,
TblInfo: tbl,
outputNames: names,
LockWaitTime: kv.LockAlwaysWait,
LockWaitTime: ctx.GetSessionVars().LockWaitTimeout,
}
ctx.GetSessionVars().StmtCtx.Tables = []stmtctx.TableEntry{{DB: ctx.GetSessionVars().CurrentDB, Table: tbl.Name.L}}
return p
Expand Down
88 changes: 88 additions & 0 deletions session/pessimistic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,3 +567,91 @@ func (s *testPessimisticSuite) TestKillStopTTLManager(c *C) {
// This query should success rather than returning a ResolveLock error.
tk2.MustExec("update test_kill set c = c + 1 where id = 1")
}

func (s *testPessimisticSuite) TestInnodbLockWaitTimeout(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("drop table if exists tk")
tk.MustExec("create table tk (c1 int primary key, c2 int)")
tk.MustExec("insert into tk values(1,1),(2,2),(3,3),(4,4),(5,5)")
// tk set global
tk.MustExec("set global innodb_lock_wait_timeout = 3")
tk.MustQuery(`show variables like "innodb_lock_wait_timeout"`).Check(testkit.Rows("innodb_lock_wait_timeout 50"))

tk2 := testkit.NewTestKitWithInit(c, s.store)
tk2.MustQuery(`show variables like "innodb_lock_wait_timeout"`).Check(testkit.Rows("innodb_lock_wait_timeout 3"))
tk2.MustExec("set innodb_lock_wait_timeout = 2")
tk2.MustQuery(`show variables like "innodb_lock_wait_timeout"`).Check(testkit.Rows("innodb_lock_wait_timeout 2"))

tk3 := testkit.NewTestKitWithInit(c, s.store)
tk3.MustQuery(`show variables like "innodb_lock_wait_timeout"`).Check(testkit.Rows("innodb_lock_wait_timeout 3"))
tk3.MustExec("set innodb_lock_wait_timeout = 1")
tk3.MustQuery(`show variables like "innodb_lock_wait_timeout"`).Check(testkit.Rows("innodb_lock_wait_timeout 1"))

tk2.MustExec("set @@autocommit = 0")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why set autocommit here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

set this before next begin statement below

tk3.MustExec("set @@autocommit = 0")

tk4 := testkit.NewTestKitWithInit(c, s.store)
tk4.MustQuery(`show variables like "innodb_lock_wait_timeout"`).Check(testkit.Rows("innodb_lock_wait_timeout 3"))
tk4.MustExec("set @@autocommit = 0")

// tk2 lock c1 = 1
tk2.MustExec("begin pessimistic")
tk2.MustExec("select * from tk where c1 = 1 for update") // lock succ c1 = 1

// tk3 try lock c1 = 1 timeout 1sec
tk3.MustExec("begin pessimistic")
start := time.Now()
_, err := tk3.Exec("select * from tk where c1 = 1 for update")
c.Check(time.Since(start), GreaterEqual, time.Duration(1000*time.Millisecond))
c.Check(time.Since(start), LessEqual, time.Duration(1100*time.Millisecond)) // unit test diff should not be too big
c.Check(err.Error(), Equals, tikv.ErrLockWaitTimeout.Error())

tk4.MustExec("begin pessimistic")
tk4.MustExec("update tk set c2 = c2 + 1 where c1 = 2") // lock succ c1 = 2 by update
start = time.Now()
_, err = tk2.Exec("update tk set c2 = c2 - 1 where c1 = 2")
c.Check(time.Since(start), GreaterEqual, time.Duration(2000*time.Millisecond))
c.Check(time.Since(start), LessEqual, time.Duration(2100*time.Millisecond)) // unit test diff should not be too big
c.Check(err.Error(), Equals, tikv.ErrLockWaitTimeout.Error())

tk2.MustExec("set innodb_lock_wait_timeout = 1")
tk2.MustQuery(`show variables like "innodb_lock_wait_timeout"`).Check(testkit.Rows("innodb_lock_wait_timeout 1"))
start = time.Now()
_, err = tk2.Exec("delete from tk where c1 = 2")
c.Check(time.Since(start), GreaterEqual, time.Duration(1000*time.Millisecond))
c.Check(time.Since(start), LessEqual, time.Duration(1100*time.Millisecond)) // unit test diff should not be too big
c.Check(err.Error(), Equals, tikv.ErrLockWaitTimeout.Error())

tk2.MustExec("commit")
tk3.MustExec("commit")
tk4.MustExec("commit")

tk.MustQuery(`show variables like "innodb_lock_wait_timeout"`).Check(testkit.Rows("innodb_lock_wait_timeout 50"))
tk.MustQuery(`select * from tk where c1 = 2`).Check(testkit.Rows("2 3")) // tk4 update commit work, tk2 delete should be rollbacked

// test stmtRollBack caused by timeout but not the whole transaction
tk2.MustExec("begin pessimistic")
tk2.MustExec("update tk set c2 = c2 + 2 where c1 = 2") // tk2 lock succ c1 = 2 by update
tk2.MustQuery(`select * from tk where c1 = 2`).Check(testkit.Rows("2 5")) // tk2 update c2 succ

tk3.MustExec("begin pessimistic")
tk3.MustExec("select * from tk where c1 = 3 for update") // tk3 lock c1 = 3 succ

start = time.Now()
_, err = tk2.Exec("delete from tk where c1 = 3") // tk2 tries to lock c1 = 3 fail, this delete should be rollback, but previous update should be keeped
c.Check(time.Since(start), GreaterEqual, time.Duration(1000*time.Millisecond))
c.Check(time.Since(start), LessEqual, time.Duration(1100*time.Millisecond)) // unit test diff should not be too big
c.Check(err.Error(), Equals, tikv.ErrLockWaitTimeout.Error())

tk2.MustExec("commit")
tk3.MustExec("commit")

tk.MustQuery(`select * from tk where c1 = 1`).Check(testkit.Rows("1 1"))
tk.MustQuery(`select * from tk where c1 = 2`).Check(testkit.Rows("2 5")) // tk2 update succ
tk.MustQuery(`select * from tk where c1 = 3`).Check(testkit.Rows("3 3")) // tk2 delete should fail
tk.MustQuery(`select * from tk where c1 = 4`).Check(testkit.Rows("4 4"))
tk.MustQuery(`select * from tk where c1 = 5`).Check(testkit.Rows("5 5"))

// clean
tk.MustExec("drop table if exists tk")
}
1 change: 1 addition & 0 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1793,6 +1793,7 @@ var builtinGlobalVariable = []string{
variable.CollationServer,
variable.NetWriteTimeout,
variable.MaxExecutionTime,
variable.InnodbLockWaitTimeout,

/* TiDB specific global variables: */
variable.TiDBSkipUTF8Check,
Expand Down
7 changes: 7 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,9 @@ type SessionVars struct {
isolationReadEngines map[kv.StoreType]struct{}

PlannerSelectBlockAsName []ast.HintTable

// Lock wait timeout for pessimistic transaction in milliseconds, `innodb_lock_wait_timeout` is in seconds
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

put is in seconds at the last is too easy to be misunderstood as LockWaitTimeout is in seconds.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Lock wait timeout for pessimistic transaction in milliseconds, `innodb_lock_wait_timeout` is in seconds
// LockWaitTimeout is used for pessimistic transaction in milliseconds, `innodb_lock_wait_timeout` is in seconds.

LockWaitTimeout int64
}

// PreparedParams contains the parameters of the current prepared statement when executing it.
Expand Down Expand Up @@ -524,6 +527,7 @@ func NewSessionVars() *SessionVars {
AllowRemoveAutoInc: DefTiDBAllowRemoveAutoInc,
UsePlanBaselines: DefTiDBUsePlanBaselines,
isolationReadEngines: map[kv.StoreType]struct{}{kv.TiKV: {}, kv.TiFlash: {}},
LockWaitTimeout: DefInnodbLockWaitTimeout * 1000,
}
vars.Concurrency = Concurrency{
IndexLookupConcurrency: DefIndexLookupConcurrency,
Expand Down Expand Up @@ -812,6 +816,9 @@ func (s *SessionVars) SetSystemVar(name string, val string) error {
case MaxExecutionTime:
timeoutMS := tidbOptPositiveInt32(val, 0)
s.MaxExecutionTime = uint64(timeoutMS)
case InnodbLockWaitTimeout:
lockWaitSec := tidbOptInt64(val, DefInnodbLockWaitTimeout)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to validate the range.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is done in function ValidateSetSystemVar

case InnodbLockWaitTimeout:
	return checkUInt64SystemVar(name, value, 1, 1073741824, vars)
mysql> set global innodb_lock_wait_timeout = 0;
Query OK, 0 rows affected, 1 warning (0.00 sec)

mysql> show warnings;
+---------+------+---------------------------------------------------------+
| Level   | Code | Message                                                 |
+---------+------+---------------------------------------------------------+
| Warning | 1292 | Truncated incorrect innodb_lock_wait_timeout value: '0' |
+---------+------+---------------------------------------------------------+
1 row in set (0.00 sec)

s.LockWaitTimeout = int64(lockWaitSec * 1000)
case TiDBSkipUTF8Check:
s.SkipUTF8Check = TiDBOptOn(val)
case TiDBOptAggPushDown:
Expand Down
2 changes: 1 addition & 1 deletion sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ var defaultSysVars = []*SysVar{
{ScopeNone, "basedir", "/usr/local/mysql"},
{ScopeGlobal, "innodb_old_blocks_time", "1000"},
{ScopeGlobal, "innodb_stats_method", "nulls_equal"},
{ScopeGlobal | ScopeSession, InnodbLockWaitTimeout, "50"},
{ScopeGlobal | ScopeSession, InnodbLockWaitTimeout, strconv.FormatInt(DefInnodbLockWaitTimeout, 10)},
{ScopeGlobal, LocalInFile, "1"},
{ScopeGlobal | ScopeSession, "myisam_stats_method", "nulls_unequal"},
{ScopeNone, "version_compile_os", "osx10.8"},
Expand Down
1 change: 1 addition & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,7 @@ const (
DefTiDBEnableNoopFuncs = false
DefTiDBAllowRemoveAutoInc = false
DefTiDBUsePlanBaselines = true
DefInnodbLockWaitTimeout = 50 // 50s
)

// Process global variables.
Expand Down
22 changes: 18 additions & 4 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -676,6 +676,7 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *
IsFirstLock: c.isFirstLock,
WaitTimeout: action.lockWaitTime,
}, pb.Context{Priority: c.priority, SyncLog: c.syncLog})
lockWaitStartTime := time.Now()
for {
Copy link
Member

@coocood coocood Nov 4, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to update the req.WaitTimeout . so if the wait timeout is less than 3s,we can return sooner.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

roger

resp, err := c.store.SendReq(bo, req, batch.region, readTimeoutShort)
if err != nil {
Expand Down Expand Up @@ -726,15 +727,28 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *
// if the lock left behind whose related txn is already committed or rollbacked,
// (eg secondary locks not committed or rollbacked yet)
// we cant return "nowait conflict" directly
if action.lockWaitTime == kv.LockNoWait && lock.LockType == pb.Op_PessimisticLock {
// the pessimistic lock found could be lock left behind(timeout but not recycled yet)
if !c.store.oracle.IsExpired(lock.TxnID, lock.TTL) {
return ErrLockAcquireFailAndNoWaitSet
if lock.LockType == pb.Op_PessimisticLock {
if action.lockWaitTime == kv.LockNoWait {
// the pessimistic lock found could be lock left behind(timeout but not recycled yet)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is better to improve the English of this comment.

if !c.store.oracle.IsExpired(lock.TxnID, lock.TTL) {
return ErrLockAcquireFailAndNoWaitSet
}
} else if action.lockWaitTime == kv.LockAlwaysWait {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to handle LockAlwaysWait, innodb_wait_timeout is always positive.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@coocood some inner usages which calls doLockKeys (like RecoverIndexExec, addIndexWorker) will set this param LockAlwaysWait

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about

Suggested change
} else if action.lockWaitTime == kv.LockAlwaysWait {
} else if action.lockWaitTime != kv.LockAlwaysWait {

then do something.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

current we have 3 paths(negative, 0, positive value), I think it's more clear to make them like noodle style, and we just need to add/remove one more else if path if we want to modify the protocol

// do nothing but keep wait
} else {
// user has set the `InnodbLockWaitTimeout`, check timeout
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is better to improve the English of this comment.

// the pessimistic lock found could be lock left behind(timeout but not recycled yet)
if !c.store.oracle.IsExpired(lock.TxnID, lock.TTL) {
if time.Since(lockWaitStartTime).Milliseconds() >= action.lockWaitTime {
return ErrLockWaitTimeout
}
}
}
}
locks = append(locks, lock)
}
// Because we already waited on tikv, no need to Backoff here.
// tikv default will wait 3s(also the maximum wait value) when lock error occurs
_, err = c.store.lockResolver.ResolveLocks(bo, c.startTS, locks)
if err != nil {
return errors.Trace(err)
Expand Down
2 changes: 2 additions & 0 deletions store/tikv/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ var (
ErrGCTooEarly = terror.ClassTiKV.New(mysql.ErrGCTooEarly, mysql.MySQLErrName[mysql.ErrGCTooEarly])
ErrQueryInterrupted = terror.ClassTiKV.New(mysql.ErrQueryInterrupted, mysql.MySQLErrName[mysql.ErrQueryInterrupted])
ErrLockAcquireFailAndNoWaitSet = terror.ClassTiKV.New(mysql.ErrLockAcquireFailAndNoWaitSet, mysql.MySQLErrName[mysql.ErrLockAcquireFailAndNoWaitSet])
ErrLockWaitTimeout = terror.ClassTiKV.New(mysql.ErrLockWaitTimeout, mysql.MySQLErrName[mysql.ErrLockWaitTimeout])
)

// ErrDeadlock wraps *kvrpcpb.Deadlock to implement the error interface.
Expand All @@ -64,6 +65,7 @@ func init() {
mysql.ErrTruncatedWrongValue: mysql.ErrTruncatedWrongValue,
mysql.ErrQueryInterrupted: mysql.ErrQueryInterrupted,
mysql.ErrLockAcquireFailAndNoWaitSet: mysql.ErrLockAcquireFailAndNoWaitSet,
mysql.ErrLockWaitTimeout: mysql.ErrLockWaitTimeout,
}
terror.ErrClassToMySQLCodes[terror.ClassTiKV] = tikvMySQLErrCodes
}