Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support postponed conflict check #556

Merged
merged 18 commits into from
Aug 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
github.com/opentracing/opentracing-go v1.2.0
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989
github.com/pingcap/kvproto v0.0.0-20220705053936-aa9c2d20cd2a
github.com/pingcap/kvproto v0.0.0-20220818023518-a0f02b6efcee
github.com/pingcap/log v0.0.0-20211215031037-e024ba4eb0ee
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.11.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,8 @@ github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E=
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20220510035547-0e2f26c0a46a/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/kvproto v0.0.0-20220705053936-aa9c2d20cd2a h1:nP2wmyw9JTRsk5rm+tZtfAso6c/1FvuaFNbXTaYz3FE=
github.com/pingcap/kvproto v0.0.0-20220705053936-aa9c2d20cd2a/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/kvproto v0.0.0-20220818023518-a0f02b6efcee h1:s1al2ci3MEj5VnNuUCGAfeqpbCxcMeZibOyxw8ClHLE=
github.com/pingcap/kvproto v0.0.0-20220818023518-a0f02b6efcee/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/log v0.0.0-20211215031037-e024ba4eb0ee h1:VO2t6IBpfvW34TdtD/G10VvnGqjLic1jzOuHjUb5VqM=
github.com/pingcap/log v0.0.0-20211215031037-e024ba4eb0ee/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
6 changes: 3 additions & 3 deletions integration_tests/2pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,7 @@ func (s *testCommitterSuite) TestPessimisticPrewriteRequest() {
s.Nil(err)
committer.SetForUpdateTS(100)
req := committer.BuildPrewriteRequest(1, 1, 1, committer.GetMutations().Slice(0, 1), 1)
s.Greater(len(req.Prewrite().IsPessimisticLock), 0)
s.Greater(len(req.Prewrite().PessimisticActions), 0)
s.Equal(req.Prewrite().ForUpdateTs, uint64(100))
}

Expand Down Expand Up @@ -1261,7 +1261,7 @@ func (s *testCommitterSuite) TestResolveMixed() {
// stop txn ttl manager and remove primary key, make the other keys left behind
committer.CloseTTLManager()
muts := transaction.NewPlainMutations(1)
muts.Push(kvrpcpb.Op_Lock, pk, nil, true, false, false)
muts.Push(kvrpcpb.Op_Lock, pk, nil, true, false, false, false)
err = committer.PessimisticRollbackMutations(context.Background(), &muts)
s.Nil(err)

Expand Down Expand Up @@ -1731,7 +1731,7 @@ func (s *testCommitterSuite) TestFlagsInMemBufferMutations() {

forEachCase(func(op kvrpcpb.Op, key []byte, value []byte, i int, isPessimisticLock, assertExist, assertNotExist bool) {
handle := db.IterWithFlags(key, nil).Handle()
mutations.Push(op, isPessimisticLock, assertExist, assertNotExist, handle)
mutations.Push(op, isPessimisticLock, assertExist, assertNotExist, false, handle)
})

forEachCase(func(op kvrpcpb.Op, key []byte, value []byte, i int, isPessimisticLock, assertExist, assertNotExist bool) {
Expand Down
6 changes: 4 additions & 2 deletions integration_tests/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/ninedraft/israce v0.0.3
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20220423142525-ae43b7f4e5c3
github.com/pingcap/kvproto v0.0.0-20220804022843-f006036b1277
github.com/pingcap/kvproto v0.0.0-20220818023518-a0f02b6efcee
github.com/pingcap/tidb v1.1.0-beta.0.20220824151221-29b57e356929
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.8.0
Expand Down Expand Up @@ -49,7 +49,7 @@ require (
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 // indirect
github.com/pingcap/log v1.1.0 // indirect
github.com/pingcap/tidb/parser v0.0.0-20220724090709-5484002f1963 // indirect
github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e // indirect
github.com/pingcap/tipb v0.0.0-20220824081009-0714a57aff1d // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
github.com/prometheus/client_golang v1.12.2 // indirect
Expand Down Expand Up @@ -87,3 +87,5 @@ require (
)

replace github.com/tikv/client-go/v2 => ../

replace github.com/pingcap/tidb => github.com/ekexium/tidb v1.1.0-beta.0.20220829095709-87c0a7d42fbd
46 changes: 11 additions & 35 deletions integration_tests/go.sum

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions internal/locate/region_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,10 @@ func (s *mockTikvGrpcServer) Compact(ctx context.Context, request *kvrpcpb.Compa
return nil, errors.New("unreachable")
}

func (s *mockTikvGrpcServer) GetLockWaitHistory(ctx context.Context, request *kvrpcpb.GetLockWaitHistoryRequest) (*kvrpcpb.GetLockWaitHistoryResponse, error) {
return nil, errors.New("unreachable")
}

func (s *testRegionRequestToSingleStoreSuite) TestNoReloadRegionForGrpcWhenCtxCanceled() {
// prepare a mock tikv grpc server
addr := "localhost:56341"
Expand Down
14 changes: 9 additions & 5 deletions internal/mockstore/mocktikv/mvcc_leveldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -739,8 +739,12 @@ func (mvcc *MVCCLevelDB) Prewrite(req *kvrpcpb.PrewriteRequest) []error {
if op == kvrpcpb.Op_CheckNotExists {
continue
}
isPessimisticLock := len(req.IsPessimisticLock) > 0 && req.IsPessimisticLock[i]
err = prewriteMutation(mvcc.getDB(""), batch, m, startTS, primary, ttl, txnSize, isPessimisticLock, minCommitTS, req.AssertionLevel)

pessimisticAction := kvrpcpb.PrewriteRequest_SKIP_PESSIMISTIC_CHECK
if len(req.PessimisticActions) > 0 {
pessimisticAction = req.PessimisticActions[i]
}
err = prewriteMutation(mvcc.getDB(""), batch, m, startTS, primary, ttl, txnSize, pessimisticAction, minCommitTS, req.AssertionLevel)
errs = append(errs, err)
if err != nil {
anyError = true
Expand Down Expand Up @@ -871,7 +875,7 @@ func checkConflictValue(iter *Iterator, m *kvrpcpb.Mutation, forUpdateTS uint64,
func prewriteMutation(db *leveldb.DB, batch *leveldb.Batch,
mutation *kvrpcpb.Mutation, startTS uint64,
primary []byte, ttl uint64, txnSize uint64,
isPessimisticLock bool, minCommitTS uint64,
pessimisticAction kvrpcpb.PrewriteRequest_PessimisticAction, minCommitTS uint64,
assertionLevel kvrpcpb.AssertionLevel) error {
startKey := mvccEncode(mutation.Key, lockVer)
iter := newIterator(db, &util.Range{
Expand All @@ -888,7 +892,7 @@ func prewriteMutation(db *leveldb.DB, batch *leveldb.Batch,
}
if ok {
if dec.lock.startTS != startTS {
if isPessimisticLock {
if pessimisticAction == kvrpcpb.PrewriteRequest_DO_PESSIMISTIC_CHECK {
// NOTE: A special handling.
// When pessimistic txn prewrite meets lock, set the TTL = 0 means
// telling TiDB to rollback the transaction **unconditionly**.
Expand All @@ -913,7 +917,7 @@ func prewriteMutation(db *leveldb.DB, batch *leveldb.Batch,
return err
}
} else {
if isPessimisticLock {
if pessimisticAction == kvrpcpb.PrewriteRequest_DO_PESSIMISTIC_CHECK {
return ErrAbort("pessimistic lock not found")
}
_, err = checkConflictValue(iter, mutation, startTS, startTS, false, assertionLevel)
Expand Down
20 changes: 14 additions & 6 deletions internal/unionstore/memdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ func newMemDB() *MemDB {
db.stages = make([]MemDBCheckpoint, 0, 2)
db.entrySizeLimit = math.MaxUint64
db.bufferSizeLimit = math.MaxUint64
db.vlog.memdb = db
return db
}

Expand Down Expand Up @@ -330,13 +331,20 @@ func (db *MemDB) set(key []byte, value []byte, ops ...kv.FlagsOp) error {
}
x := db.traverse(key, true)

if len(ops) != 0 {
flags := kv.ApplyFlagsOps(x.getKeyFlags(), ops...)
if flags.AndPersistent() != 0 {
db.dirty = true
}
x.setKeyFlags(flags)
// the NeedConstraintCheckInPrewrite flag is temporary,
// every write to the node removes the flag unless it's explicitly set.
// This set must be in the latest stage so no special processing is needed.
var flags kv.KeyFlags
if value != nil {
flags = kv.ApplyFlagsOps(x.getKeyFlags(), append([]kv.FlagsOp{kv.DelNeedConstraintCheckInPrewrite}, ops...)...)
} else {
// an UpdateFlag operation, do not delete the NeedConstraintCheckInPrewrite flag.
flags = kv.ApplyFlagsOps(x.getKeyFlags(), ops...)
}
if flags.AndPersistent() != 0 {
db.dirty = true
}
x.setKeyFlags(flags)

if value == nil {
return nil
Expand Down
2 changes: 2 additions & 0 deletions internal/unionstore/memdb_arena.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ func (a *nodeAllocator) reset() {

type memdbVlog struct {
memdbArena
memdb *MemDB
}

const memdbVlogHdrSize = 8 + 8 + 4
Expand Down Expand Up @@ -320,6 +321,7 @@ func (l *memdbVlog) appendValue(nodeAddr memdbArenaAddr, oldValue memdbArenaAddr
return addr
}

// A pure function that gets a value.
func (l *memdbVlog) getValue(addr memdbArenaAddr) []byte {
lenOff := addr.off - memdbVlogHdrSize
block := l.blocks[addr.idx].buf
Expand Down
12 changes: 12 additions & 0 deletions internal/unionstore/memdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -828,3 +828,15 @@ func TestBufferLimit(t *testing.T) {
err = buffer.Delete(make([]byte, 500))
assert.NotNil(err)
}

func TestUnsetTemporaryFlag(t *testing.T) {
require := require.New(t)
db := newMemDB()
key := []byte{1}
value := []byte{2}
db.SetWithFlags(key, value, kv.SetNeedConstraintCheckInPrewrite)
db.Set(key, value)
flags, err := db.GetFlags(key)
require.Nil(err)
require.False(flags.HasNeedConstraintCheckInPrewrite())
}
24 changes: 23 additions & 1 deletion kv/keyflags.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,13 @@ const (
flagAssertExist
flagAssertNotExist

persistentFlags = flagKeyLocked | flagKeyLockedValExist
// It marks the conflict check of the key is postponed to prewrite. This is only used in pessimistic transactions.
// When the key gets locked (and the existence is checked), the flag should be removed.
// It should only be applied to keys with PresumeNotExist, so that an in-place constraint check becomes
// (a conflict check + a constraint check) in prewrite.
flagNeedConstraintCheckInPrewrite
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe flagConstraintCheckInPrewrite is more consistency with flagKeyLocked and flagNeedConstraintCheckInPrewrite

Copy link
Contributor Author

@ekexium ekexium Aug 30, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally I prefer need constraint check in prewrite. It clearly describes our intention.


persistentFlags = flagKeyLocked | flagKeyLockedValExist | flagNeedConstraintCheckInPrewrite
)

// HasAssertExist returns whether the key need ensure exists in 2pc.
Expand Down Expand Up @@ -125,6 +131,11 @@ func (f KeyFlags) HasReadable() bool {
return f&flagReadable != 0
}

// HasNeedConstraintCheckInPrewrite returns whether the key needs to check conflict in prewrite.
func (f KeyFlags) HasNeedConstraintCheckInPrewrite() bool {
return f&flagNeedConstraintCheckInPrewrite != 0
}

// AndPersistent returns the value of current flags&persistentFlags
func (f KeyFlags) AndPersistent() KeyFlags {
return f & persistentFlags
Expand Down Expand Up @@ -153,10 +164,12 @@ func ApplyFlagsOps(origin KeyFlags, ops ...FlagsOp) KeyFlags {
origin &= ^flagNeedLocked
case SetKeyLockedValueExists:
origin |= flagKeyLockedValExist
origin &= ^flagNeedConstraintCheckInPrewrite
case DelNeedCheckExists:
origin &= ^flagNeedCheckExists
case SetKeyLockedValueNotExists:
origin &= ^flagKeyLockedValExist
origin &= ^flagNeedConstraintCheckInPrewrite
case SetPrewriteOnly:
origin |= flagPrewriteOnly
case SetIgnoredIn2PC:
Expand All @@ -177,6 +190,10 @@ func ApplyFlagsOps(origin KeyFlags, ops ...FlagsOp) KeyFlags {
case SetAssertNone:
origin &= ^flagAssertExist
origin &= ^flagAssertNotExist
case SetNeedConstraintCheckInPrewrite:
origin |= flagNeedConstraintCheckInPrewrite
case DelNeedConstraintCheckInPrewrite:
origin &= ^flagNeedConstraintCheckInPrewrite
}
}
return origin
Expand Down Expand Up @@ -221,4 +238,9 @@ const (
SetAssertUnknown
// SetAssertNone cleans up the key's assert.
SetAssertNone
// SetNeedConstraintCheckInPrewrite marks the key needs to check conflict in prewrite.
SetNeedConstraintCheckInPrewrite
// DelNeedConstraintCheckInPrewrite reverts SetNeedConstraintCheckInPrewrite. This is required when we decide to
// make up the pessimistic lock.
DelNeedConstraintCheckInPrewrite
)
Loading