Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: support select for update no wait #12775

Merged
merged 29 commits into from
Nov 4, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
f242af2
support select for update no wait
cfzjywxk Oct 16, 2019
9e8d6cf
refactor, handle select for update error will update "updateTS"
cfzjywxk Oct 17, 2019
70dc1eb
move const1
cfzjywxk Oct 17, 2019
d2bc394
add LockType in LockInfo
cfzjywxk Oct 17, 2019
c103a77
lockType use kvrpc.Op
cfzjywxk Oct 18, 2019
b69efa6
gomod, refactor some logic
cfzjywxk Oct 18, 2019
3f4205b
typo
cfzjywxk Oct 18, 2019
d5ef7bf
Merge branch 'master' into nowait
cfzjywxk Oct 18, 2019
38c5caf
format
cfzjywxk Oct 18, 2019
16616e0
Merge branch 'master' into nowait
cfzjywxk Oct 24, 2019
99935c3
go sum
cfzjywxk Oct 24, 2019
34dccb9
go sum
cfzjywxk Oct 24, 2019
a70b11f
Merge branch 'master' into nowait
cfzjywxk Oct 24, 2019
5c52dbd
format
cfzjywxk Oct 24, 2019
1a4d0b3
tidy
cfzjywxk Oct 24, 2019
4afc621
tidy sum
cfzjywxk Oct 24, 2019
de469ca
tidy
cfzjywxk Oct 24, 2019
31f2c67
Merge branch 'master' into nowait
cfzjywxk Oct 25, 2019
fddc2e8
refactoration, add case
cfzjywxk Oct 25, 2019
119a307
add missing
cfzjywxk Oct 25, 2019
09bdcb2
import
cfzjywxk Oct 25, 2019
4565b05
Merge branch 'master' into nowait
cfzjywxk Oct 25, 2019
7f73591
Merge branch 'master' into nowait
crazycs520 Oct 25, 2019
99bada6
Merge branch 'master' into nowait
cfzjywxk Nov 1, 2019
afa7bea
format
cfzjywxk Nov 1, 2019
4b3dfce
change nowait signal to int64
cfzjywxk Nov 1, 2019
edcaf66
Revert "change nowait signal to int64"
cfzjywxk Nov 4, 2019
16b26c4
change nowait signal to int64
cfzjywxk Nov 1, 2019
cd7151a
Merge branch 'master' into nowait
cfzjywxk Nov 4, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -869,7 +869,7 @@ func (w *addIndexWorker) backfillIndexInTxn(handleRange reorgIndexTask) (taskCtx

// Lock the row key to notify us that someone delete or update the row,
// then we should not backfill the index of it, otherwise the adding index is redundant.
err := txn.LockKeys(context.Background(), 0, idxRecord.key)
err := txn.LockKeys(context.Background(), 0, false, idxRecord.key)
if err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,7 @@ func (a *ExecStmt) handlePessimisticDML(ctx context.Context, e Executor) error {
return nil
}
forUpdateTS := txnCtx.GetForUpdateTS()
err = txn.LockKeys(ctx, forUpdateTS, keys...)
err = txn.LockKeys(ctx, forUpdateTS, false, keys...)
if err == nil {
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion executor/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ func (e *RecoverIndexExec) backfillIndexInTxn(ctx context.Context, txn kv.Transa
}

recordKey := e.table.RecordKey(row.handle)
err := txn.LockKeys(ctx, 0, recordKey)
err := txn.LockKeys(ctx, 0, false, recordKey)
if err != nil {
return result, err
}
Expand Down
17 changes: 13 additions & 4 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -779,7 +779,7 @@ func (e *SelectLockExec) Next(ctx context.Context, req *chunk.Chunk) error {
return err
}
// If there's no handle or it's not a `SELECT FOR UPDATE` statement.
if len(e.tblID2Handle) == 0 || e.Lock != ast.SelectLockForUpdate {
if len(e.tblID2Handle) == 0 || (e.Lock != ast.SelectLockForUpdate && e.Lock != ast.SelectLockForUpdateNoWait) {
return nil
}
if req.NumRows() != 0 {
Expand All @@ -793,18 +793,27 @@ func (e *SelectLockExec) Next(ctx context.Context, req *chunk.Chunk) error {
}
return nil
}
return doLockKeys(ctx, e.ctx, e.keys...)
return doLockKeys(ctx, e.ctx, e.Lock == ast.SelectLockForUpdateNoWait, e.keys...)
}

func doLockKeys(ctx context.Context, se sessionctx.Context, keys ...kv.Key) error {
// doLockKeys is the main entry for pessimistic lock keys
// lockNoWait means the lock operation will report error immediately if target key is already
// locked by others. used for (select for update nowait) situation
func doLockKeys(ctx context.Context, se sessionctx.Context, lockNoWait bool, keys ...kv.Key) error {
se.GetSessionVars().TxnCtx.ForUpdate = true
// Lock keys only once when finished fetching all results.
txn, err := se.Txn(true)
if err != nil {
return err
}
// TODO this is a problem, maybe we need to update forUpdateTS every time to avoid such case
// begin; select for update key1(here ErrLocked or other errors(or max_execution_time like util),
// key1 lock not get and async rollback key1 is raised)
// select for update key1 again(this time lock succ(maybe lock released by others))
// the async rollback operation rollbacked the lock just acquired from
se.GetSessionVars().TxnCtx.SetForUpdateTS(se.GetSessionVars().TxnCtx.GetForUpdateTS() + 1)
cfzjywxk marked this conversation as resolved.
Show resolved Hide resolved
forUpdateTS := se.GetSessionVars().TxnCtx.GetForUpdateTS()
return txn.LockKeys(ctx, forUpdateTS, keys...)
return txn.LockKeys(ctx, forUpdateTS, lockNoWait, keys...)
}

// LimitExec represents limit executor
Expand Down
4 changes: 3 additions & 1 deletion executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func (b *executorBuilder) buildPointGet(p *plannercore.PointGetPlan) Executor {
handle: p.Handle,
startTS: startTS,
lock: p.Lock,
nowait: p.ForUpdateNoWait,
}
b.isSelectForUpdate = p.IsForUpdate
e.base().initCap = 1
Expand All @@ -63,6 +64,7 @@ type PointGetExecutor struct {
snapshot kv.Snapshot
done bool
lock bool
nowait bool
}

// Open implements the Executor interface.
Expand Down Expand Up @@ -148,7 +150,7 @@ func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error {

func (e *PointGetExecutor) lockKeyIfNeeded(ctx context.Context, key []byte) error {
if e.lock {
return doLockKeys(ctx, e.ctx, key)
return doLockKeys(ctx, e.ctx, e.nowait, key)
}
return nil
}
Expand Down
4 changes: 4 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,7 @@ require (
)

go 1.13

replace github.com/pingcap/parser => github.com/cfzjywxk/parser v0.0.0-20191016072124-6c2853d8b1ae
cfzjywxk marked this conversation as resolved.
Show resolved Hide resolved

replace github.com/pingcap/kvproto => github.com/youjiali1995/kvproto v0.0.0-20191016061501-6883cfe78a90
9 changes: 4 additions & 5 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
github.com/blacktear23/go-proxyprotocol v0.0.0-20180807104634-af7a81e8dd0d h1:rQlvB2AYWme2bIB18r/SipGiMEVJYE9U0z+MGoU/LtQ=
github.com/blacktear23/go-proxyprotocol v0.0.0-20180807104634-af7a81e8dd0d/go.mod h1:VKt7CNAQxpFpSDz3sXyj9hY/GbVsQCr0sB3w59nE7lU=
github.com/cfzjywxk/parser v0.0.0-20191016072124-6c2853d8b1ae h1:EaLRRp9CPpDZQ3PCe3PtLA1BqY5e9i48RHtBLgMW/cI=
github.com/cfzjywxk/parser v0.0.0-20191016072124-6c2853d8b1ae/go.mod h1:LoM8lSJ7POQOA8WIpI6Eti5BHzZtUJhu1csFCjGkQVs=
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
github.com/chzyer/readline v0.0.0-20171208011716-f6d7a1f6fbf3/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
Expand Down Expand Up @@ -159,15 +161,10 @@ github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c h1:hvQd3aOLKLF7x
github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c/go.mod h1:DNS3Qg7bEDhU6EXNHF+XSv/PGznQaMJ5FWvctpm6pQI=
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e h1:P73/4dPCL96rGrobssy1nVy2VaVpNCuLpCbr+FEaTA8=
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20190822090350-11ea838aedf7/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/kvproto v0.0.0-20190910074005-0e61b6f435c1 h1:DNvxkdcjA0TBIIIF+K2w9KMlTzMZzLZ5JVF26kTCPhg=
github.com/pingcap/kvproto v0.0.0-20190910074005-0e61b6f435c1/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd h1:hWDol43WY5PGhsh3+8794bFHY1bPrmu6bTalpssCrGg=
github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9 h1:AJD9pZYm72vMgPcQDww9rkZ1DnWfl0pXV3BOWlkYIjA=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/parser v0.0.0-20191012071233-32876040fefb h1:okeNsbftvzQ8I9DseKukhZURRYJUHOpRSHwlSZC0g0g=
github.com/pingcap/parser v0.0.0-20191012071233-32876040fefb/go.mod h1:1FNvfp9+J0wvc4kl8eGNh7Rqrxveg15jJoWo/a0uHwA=
github.com/pingcap/pd v1.1.0-beta.0.20190923032047-5c648dc365e0 h1:GIEq+wZfrl2bcJxpuSrEH4H7/nlf5YdmpS+dU9lNIt8=
github.com/pingcap/pd v1.1.0-beta.0.20190923032047-5c648dc365e0/go.mod h1:G/6rJpnYwM0LKMec2rI82/5Kg6GaZMvlfB+e6/tvYmI=
github.com/pingcap/tidb-tools v2.1.3-0.20190321065848-1e8b48f5c168+incompatible h1:MkWCxgZpJBgY2f4HtwWMMFzSBb3+JPzeJgF3VrXE/bU=
Expand Down Expand Up @@ -244,6 +241,8 @@ github.com/urfave/negroni v0.3.0/go.mod h1:Meg73S6kFm/4PpbYdq35yYWoCZ9mS/YSx+lKn
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5Qo6v2eYzo7kUS51QINcR5jNpbZS8=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
github.com/yookoala/realpath v1.0.0/go.mod h1:gJJMA9wuX7AcqLy1+ffPatSCySA1FQ2S8Ya9AIoYBpE=
github.com/youjiali1995/kvproto v0.0.0-20191016061501-6883cfe78a90 h1:rzQ+EdmLdge8CqTHGhmnEi1xSZ/ATaa7cPCGrOg7yWM=
github.com/youjiali1995/kvproto v0.0.0-20191016061501-6883cfe78a90/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.etcd.io/bbolt v1.3.3 h1:MUGmc65QhB3pIlaQ5bB4LwqSj6GIonVJXpZiaKNyaKk=
go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
Expand Down
2 changes: 1 addition & 1 deletion kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ type Transaction interface {
// String implements fmt.Stringer interface.
String() string
// LockKeys tries to lock the entries with the keys in KV store.
LockKeys(ctx context.Context, forUpdateTS uint64, keys ...Key) error
LockKeys(ctx context.Context, forUpdateTS uint64, lockNoWait bool, keys ...Key) error
// SetOption sets an option with a value, when val is nil, uses the default
// value of this option.
SetOption(opt Option, val interface{})
Expand Down
2 changes: 1 addition & 1 deletion kv/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (t *mockTxn) String() string {
return ""
}

func (t *mockTxn) LockKeys(_ context.Context, _ uint64, _ ...Key) error {
func (t *mockTxn) LockKeys(_ context.Context, _ uint64, _ bool, _ ...Key) error {
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion kv/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (s testMockSuite) TestInterface(c *C) {

transaction, err := storage.Begin()
c.Check(err, IsNil)
err = transaction.LockKeys(context.Background(), 0, Key("lock"))
err = transaction.LockKeys(context.Background(), 0, false, Key("lock"))
c.Check(err, IsNil)
transaction.SetOption(Option(23), struct{}{})
if mock, ok := transaction.(*mockTxn); ok {
Expand Down
4 changes: 3 additions & 1 deletion planner/core/point_get_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type PointGetPlan struct {
Lock bool
IsForUpdate bool
outputNames []*types.FieldName
ForUpdateNoWait bool
}

type nameValuePair struct {
Expand Down Expand Up @@ -240,7 +241,7 @@ func TryFastPlan(ctx sessionctx.Context, node ast.Node) Plan {
tableDual.SetSchema(fp.Schema())
return tableDual.Init(ctx, &property.StatsInfo{}, 0)
}
if x.LockTp == ast.SelectLockForUpdate {
if x.LockTp == ast.SelectLockForUpdate || x.LockTp == ast.SelectLockForUpdateNoWait {
// Locking of rows for update using SELECT FOR UPDATE only applies when autocommit
// is disabled (either by beginning transaction with START TRANSACTION or by setting
// autocommit to 0. If autocommit is enabled, the rows matching the specification are not locked.
Expand All @@ -249,6 +250,7 @@ func TryFastPlan(ctx sessionctx.Context, node ast.Node) Plan {
if !sessVars.IsAutocommit() || sessVars.InTxn() {
fp.Lock = true
fp.IsForUpdate = true
fp.ForUpdateNoWait = x.LockTp == ast.SelectLockForUpdateNoWait
}
}
return fp
Expand Down
2 changes: 1 addition & 1 deletion planner/core/rule_column_pruning.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ func (la *LogicalApply) PruneColumns(parentUsedCols []*expression.Column) error

// PruneColumns implements LogicalPlan interface.
func (p *LogicalLock) PruneColumns(parentUsedCols []*expression.Column) error {
if p.Lock != ast.SelectLockForUpdate {
if p.Lock != ast.SelectLockForUpdate && p.Lock != ast.SelectLockForUpdateNoWait {
return p.baseLogicalPlan.PruneColumns(parentUsedCols)
}

Expand Down
86 changes: 86 additions & 0 deletions session/pessimistic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,3 +391,89 @@ func (s *testPessimisticSuite) TestOptimisticConflicts(c *C) {
tk.MustExec("commit")
tk.MustQuery("select * from conflict").Check(testkit.Rows("1 3"))
}

func (s *testPessimisticSuite) TestSelectForUpdateNoWait(c *C) {
tiancaiamao marked this conversation as resolved.
Show resolved Hide resolved
tk := testkit.NewTestKitWithInit(c, s.store)
tk2 := testkit.NewTestKitWithInit(c, s.store)
tk3 := testkit.NewTestKitWithInit(c, s.store)

tk.MustExec("drop table if exists tk")
tk.MustExec("create table tk (c1 int primary key, c2 int)")
tk.MustExec("insert into tk values(1,1),(2,2),(3,3),(4,4),(5,5)")

tk.MustExec("set @@autocommit = 0")
tk2.MustExec("set @@autocommit = 0")
tk3.MustExec("set @@autocommit = 0")

// point get with no autocommit
tk.MustExec("begin pessimistic")
tk.MustExec("select * from tk where c1 = 2 for update") // lock succ

tk2.MustExec("begin pessimistic")
_, err := tk2.Exec("select * from tk where c1 = 2 for update nowait")
c.Check(err, NotNil)
tk.MustExec("commit")
tk2.MustExec("select * from tk where c1 = 2 for update nowait") // lock succ

tk3.MustExec("begin pessimistic")
_, err = tk3.Exec("select * from tk where c1 = 2 for update nowait")
c.Check(err, NotNil)

tk2.MustExec("commit")
tk3.MustExec("select * from tk where c1 = 2 for update")
tk3.MustExec("commit")
tk.MustExec("commit")

tk3.MustExec("begin pessimistic")
tk3.MustExec("update tk set c2 = c2 + 1 where c1 = 3")
tk2.MustExec("begin pessimistic")
_, err = tk2.Exec("select * from tk where c1 = 3 for update nowait")
c.Check(err, NotNil)
tk3.MustExec("commit")
tk2.MustExec("select * from tk where c1 = 3 for update nowait")
tk2.MustExec("commit")

tk.MustExec("commit")
tk2.MustExec("commit")
tk3.MustExec("commit")

// scan with no autocommit
tk.MustExec("begin pessimistic")
tk.MustExec("select * from tk where c1 >= 2 for update")
tk2.MustExec("begin pessimistic")
_, err = tk2.Exec("select * from tk where c1 = 2 for update nowait")
c.Check(err, NotNil)
_, err = tk2.Exec("select * from tk where c1 > 3 for update nowait")
c.Check(err, NotNil)
tk2.MustExec("select * from tk where c1 = 1 for update nowait")
tk2.MustExec("commit")
tk.MustQuery("select * from tk where c1 >= 2 for update").Check(testkit.Rows("2 2", "3 4", "4 4", "5 5"))
tk.MustExec("commit")
tk.MustExec("begin pessimistic")
tk.MustExec("update tk set c2 = c2 + 10 where c1 > 3")
tk3.MustExec("begin pessimistic")
_, err = tk3.Exec("select * from tk where c1 = 5 for update nowait")
c.Check(err, NotNil)
tk3.MustExec("select * from tk where c1 = 1 for update nowait")
tk.MustExec("commit")
tk3.MustQuery("select * from tk where c1 > 3 for update nowait").Check(testkit.Rows("4 14", "5 15"))
tk3.MustExec("commit")

//delete
tk3.MustExec("begin pessimistic")
tk3.MustExec("delete from tk where c1 <= 2")
tk.MustExec("begin pessimistic")
_, err = tk.Exec("select * from tk where c1 = 1 for update nowait")
c.Check(err, NotNil)
tk3.MustExec("commit")
tk.MustQuery("select * from tk where c1 > 1 for update nowait").Check(testkit.Rows("3 4", "4 14", "5 15"))
tk.MustExec("update tk set c2 = c2 + 1 where c1 = 5")
tk2.MustExec("begin pessimistic")
_, err = tk2.Exec("select * from tk where c1 = 5 for update nowait")
c.Check(err, NotNil)
tk.MustExec("commit")
tk2.MustQuery("select * from tk where c1 = 5 for update nowait").Check(testkit.Rows("5 16"))
tk2.MustExec("update tk set c2 = c2 + 1 where c1 = 5")
tk2.MustQuery("select * from tk where c1 = 5 for update nowait").Check(testkit.Rows("5 17"))
tk2.MustExec("commit")
}
3 changes: 2 additions & 1 deletion store/mockstore/mocktikv/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,8 @@ type MVCCStore interface {
Scan(startKey, endKey []byte, limit int, startTS uint64, isoLevel kvrpcpb.IsolationLevel) []Pair
ReverseScan(startKey, endKey []byte, limit int, startTS uint64, isoLevel kvrpcpb.IsolationLevel) []Pair
BatchGet(ks [][]byte, startTS uint64, isoLevel kvrpcpb.IsolationLevel) []Pair
PessimisticLock(mutations []*kvrpcpb.Mutation, primary []byte, startTS, forUpdateTS uint64, ttl uint64) []error
PessimisticLock(mutations []*kvrpcpb.Mutation, primary []byte, startTS,
forUpdateTS uint64, ttl uint64, lockNoWait bool) []error
PessimisticRollback(keys [][]byte, startTS, forUpdateTS uint64) []error
Prewrite(req *kvrpcpb.PrewriteRequest) []error
Commit(keys [][]byte, startTS, commitTS uint64) error
Expand Down
8 changes: 7 additions & 1 deletion store/mockstore/mocktikv/mvcc_leveldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,8 @@ func reverse(values []mvccValue) {
}

// PessimisticLock writes the pessimistic lock.
func (mvcc *MVCCLevelDB) PessimisticLock(mutations []*kvrpcpb.Mutation, primary []byte, startTS, forUpdateTS uint64, ttl uint64) []error {
func (mvcc *MVCCLevelDB) PessimisticLock(mutations []*kvrpcpb.Mutation, primary []byte, startTS,
forUpdateTS uint64, ttl uint64, lockNoWait bool) []error {
mvcc.mu.Lock()
defer mvcc.mu.Unlock()

Expand All @@ -477,6 +478,11 @@ func (mvcc *MVCCLevelDB) PessimisticLock(mutations []*kvrpcpb.Mutation, primary
if err != nil {
anyError = true
}
if lockNoWait {
if _, ok := err.(*ErrLocked); ok {
break
}
}
}
if anyError {
return errs
Expand Down
10 changes: 6 additions & 4 deletions store/mockstore/mocktikv/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,10 +308,12 @@ func (h *rpcHandler) handleKvPessimisticLock(req *kvrpcpb.PessimisticLockRequest
startTS := req.StartVersion
regionID := req.Context.RegionId
h.cluster.handleDelay(startTS, regionID)
errs := h.mvccStore.PessimisticLock(req.Mutations, req.PrimaryLock, req.GetStartVersion(), req.GetForUpdateTs(), req.GetLockTtl())

// TODO: remove this when implement sever side wait.
h.simulateServerSideWaitLock(errs)
errs := h.mvccStore.PessimisticLock(req.Mutations, req.PrimaryLock, req.GetStartVersion(), req.GetForUpdateTs(),
req.GetLockTtl(), req.Nowait)
if !req.Nowait {
// TODO: remove this when implement sever side wait.
h.simulateServerSideWaitLock(errs)
}
return &kvrpcpb.PessimisticLockResponse{
Errors: convertToKeyErrors(errs),
}
Expand Down
10 changes: 10 additions & 0 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ type twoPhaseCommitter struct {
regionTxnSize map[uint64]int
// Used by pessimistic transaction and large transaction.
ttlManager
// Used for select for update nowait
lockNoWait bool
}

// batchExecutor is txn controller providing rate control like utils
Expand Down Expand Up @@ -674,6 +676,7 @@ func (c *twoPhaseCommitter) pessimisticLockSingleBatch(bo *Backoffer, batch batc
ForUpdateTs: c.forUpdateTS,
LockTtl: c.pessimisticTTL,
IsFirstLock: c.isFirstLock,
Nowait: c.lockNoWait,
}, pb.Context{Priority: c.priority, SyncLog: c.syncLog})
for {
resp, err := c.store.SendReq(bo, req, batch.region, readTimeoutShort)
Expand Down Expand Up @@ -702,6 +705,13 @@ func (c *twoPhaseCommitter) pessimisticLockSingleBatch(bo *Backoffer, batch batc
}
var locks []*Lock
for _, keyErr := range keyErrs {
// Check lock conflict error for nowait, if nowait set and key locked by others
// report error immediately and do no resolve locks any more
if c.lockNoWait {
if keyErr.GetLocked() != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this lock be a committed or rollbacked lock in pessimistic txn and need check lock state? I'm not familiar with this :D

return ErrLockFailNoWait
}
}
// Check already exists error
if alreadyExist := keyErr.GetAlreadyExist(); alreadyExist != nil {
key := alreadyExist.GetKey()
Expand Down
10 changes: 5 additions & 5 deletions store/tikv/2pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,7 @@ func (s *testCommitterSuite) TestUnsetPrimaryKey(c *C) {
c.Assert(txn.Set(key, key), IsNil)
txn.DelOption(kv.PresumeKeyNotExistsError)
txn.DelOption(kv.PresumeKeyNotExists)
err := txn.LockKeys(context.Background(), txn.startTS, key)
err := txn.LockKeys(context.Background(), txn.startTS, false, key)
c.Assert(err, NotNil)
c.Assert(txn.Delete(key), IsNil)
key2 := kv.Key("key2")
Expand All @@ -518,9 +518,9 @@ func (s *testCommitterSuite) TestUnsetPrimaryKey(c *C) {
func (s *testCommitterSuite) TestPessimisticLockedKeysDedup(c *C) {
txn := s.begin(c)
txn.SetOption(kv.Pessimistic, true)
err := txn.LockKeys(context.Background(), 100, kv.Key("abc"), kv.Key("def"))
err := txn.LockKeys(context.Background(), 100, false, kv.Key("abc"), kv.Key("def"))
c.Assert(err, IsNil)
err = txn.LockKeys(context.Background(), 100, kv.Key("abc"), kv.Key("def"))
err = txn.LockKeys(context.Background(), 100, false, kv.Key("abc"), kv.Key("def"))
c.Assert(err, IsNil)
c.Assert(txn.lockKeys, HasLen, 2)
}
Expand All @@ -530,11 +530,11 @@ func (s *testCommitterSuite) TestPessimisticTTL(c *C) {
txn := s.begin(c)
txn.SetOption(kv.Pessimistic, true)
time.Sleep(time.Millisecond * 100)
err := txn.LockKeys(context.Background(), txn.startTS, key)
err := txn.LockKeys(context.Background(), txn.startTS, false, key)
c.Assert(err, IsNil)
time.Sleep(time.Millisecond * 100)
key2 := kv.Key("key2")
err = txn.LockKeys(context.Background(), txn.startTS, key2)
err = txn.LockKeys(context.Background(), txn.startTS, false, key2)
c.Assert(err, IsNil)
lockInfo := s.getLockInfo(c, key)
elapsedTTL := lockInfo.LockTtl - PessimisticLockTTL
Expand Down
Loading