Skip to content

Commit

Permalink
store/tikv: revert optimization for optimistic conflicts pessimistic (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
coocood authored and jackysp committed Jul 3, 2019
1 parent 3acb645 commit aafe172
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 31 deletions.
33 changes: 32 additions & 1 deletion session/pessimistic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,15 @@ type testPessimisticSuite struct {
func (s *testPessimisticSuite) SetUpSuite(c *C) {
testleak.BeforeTest()
config.GetGlobalConfig().PessimisticTxn.Enable = true
// Set it to 300ms for testing lock resolve.
tikv.PessimisticLockTTL = 300
s.cluster = mocktikv.NewCluster()
mocktikv.BootstrapWithSingleStore(s.cluster)
s.mvccStore = mocktikv.MustNewMVCCStore()
store, err := mockstore.NewMockTikvStore(
mockstore.WithCluster(s.cluster),
mockstore.WithMVCCStore(s.mvccStore),
)
tikv.PessimisticLockTTL = uint64(config.MinPessimisticTTL / time.Millisecond)
c.Assert(err, IsNil)
s.store = store
session.SetSchemaLease(0)
Expand Down Expand Up @@ -351,3 +352,33 @@ func (s *testPessimisticSuite) TestBankTransfer(c *C) {
syncCh <- struct{}{}
tk.MustQuery("select sum(c) from accounts").Check(testkit.Rows("300"))
}

func (s *testPessimisticSuite) TestOptimisticConflicts(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk2 := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("drop table if exists conflict")
tk.MustExec("create table conflict (id int primary key, c int)")
tk.MustExec("insert conflict values (1, 1)")
tk.MustExec("begin pessimistic")
tk.MustQuery("select * from conflict where id = 1 for update")
syncCh := make(chan struct{})
go func() {
tk2.MustExec("update conflict set c = 3 where id = 1")
<-syncCh
}()
time.Sleep(time.Millisecond * 10)
tk.MustExec("update conflict set c = 2 where id = 1")
tk.MustExec("commit")
syncCh <- struct{}{}
tk.MustQuery("select c from conflict where id = 1").Check(testkit.Rows("3"))

// Check outdated pessimistic lock is resolved.
tk.MustExec("begin pessimistic")
tk.MustExec("update conflict set c = 4 where id = 1")
time.Sleep(300 * time.Millisecond)
tk2.MustExec("begin optimistic")
tk2.MustExec("update conflict set c = 5 where id = 1")
tk2.MustExec("commit")
_, err := tk.Exec("commit")
c.Check(err, NotNil)
}
13 changes: 0 additions & 13 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,19 +538,6 @@ func (c *twoPhaseCommitter) prewriteSingleBatch(bo *Backoffer, batch batchKeys)
if err1 != nil {
return errors.Trace(err1)
}
if !c.isPessimistic && c.lockTTL < lock.TTL && lock.TTL >= uint64(config.MinPessimisticTTL/time.Millisecond) {
// An optimistic prewrite meets a pessimistic or large transaction lock.
// If we wait for the lock, other written optimistic locks would block reads for long time.
// And it is very unlikely this transaction would succeed after wait for the long TTL lock.
// Return write conflict error to cleanup locks.
return newWriteConflictError(&pb.WriteConflict{
StartTs: c.startTS,
ConflictTs: lock.TxnID,
ConflictCommitTs: 0,
Key: lock.Key,
Primary: lock.Primary,
})
}
logutil.Logger(context.Background()).Debug("prewrite encounters lock",
zap.Uint64("conn", c.connID),
zap.Stringer("lock", lock))
Expand Down
17 changes: 0 additions & 17 deletions store/tikv/2pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
. "github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/mockstore/mocktikv"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
Expand All @@ -39,7 +38,6 @@ var _ = Suite(&testCommitterSuite{})

func (s *testCommitterSuite) SetUpTest(c *C) {
s.cluster = mocktikv.NewCluster()
PessimisticLockTTL = uint64(config.MinPessimisticTTL / time.Millisecond)
mocktikv.BootstrapWithMultiRegions(s.cluster, []byte("a"), []byte("b"), []byte("c"))
mvccStore, err := mocktikv.NewMVCCLevelDB("")
c.Assert(err, IsNil)
Expand Down Expand Up @@ -492,18 +490,3 @@ func (s *testCommitterSuite) TestPessimisticLockedKeysDedup(c *C) {
c.Assert(err, IsNil)
c.Assert(txn.lockKeys, HasLen, 2)
}

func (s *testCommitterSuite) TestPessimistOptimisticConflict(c *C) {
txnPes := s.begin(c)
txnPes.SetOption(kv.Pessimistic, true)
err := txnPes.LockKeys(context.Background(), txnPes.startTS, kv.Key("pes"))
c.Assert(err, IsNil)
c.Assert(txnPes.IsPessimistic(), IsTrue)
c.Assert(txnPes.lockKeys, HasLen, 1)
txnOpt := s.begin(c)
err = txnOpt.Set(kv.Key("pes"), []byte("v"))
c.Assert(err, IsNil)
err = txnOpt.Commit(context.Background())
c.Assert(kv.ErrWriteConflict.Equal(err), IsTrue)
c.Assert(txnPes.Commit(context.Background()), IsNil)
}

0 comments on commit aafe172

Please sign in to comment.