Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support postponed conflict check #556

Merged
merged 18 commits into from
Aug 30, 2022
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
github.com/opentracing/opentracing-go v1.2.0
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989
github.com/pingcap/kvproto v0.0.0-20220705053936-aa9c2d20cd2a
github.com/pingcap/kvproto v0.0.0-20220818023518-a0f02b6efcee
github.com/pingcap/log v0.0.0-20211215031037-e024ba4eb0ee
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.11.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,8 @@ github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E=
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20220510035547-0e2f26c0a46a/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/kvproto v0.0.0-20220705053936-aa9c2d20cd2a h1:nP2wmyw9JTRsk5rm+tZtfAso6c/1FvuaFNbXTaYz3FE=
github.com/pingcap/kvproto v0.0.0-20220705053936-aa9c2d20cd2a/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/kvproto v0.0.0-20220818023518-a0f02b6efcee h1:s1al2ci3MEj5VnNuUCGAfeqpbCxcMeZibOyxw8ClHLE=
github.com/pingcap/kvproto v0.0.0-20220818023518-a0f02b6efcee/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/log v0.0.0-20211215031037-e024ba4eb0ee h1:VO2t6IBpfvW34TdtD/G10VvnGqjLic1jzOuHjUb5VqM=
github.com/pingcap/log v0.0.0-20211215031037-e024ba4eb0ee/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
4 changes: 4 additions & 0 deletions internal/locate/region_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,10 @@ func (s *mockTikvGrpcServer) Compact(ctx context.Context, request *kvrpcpb.Compa
return nil, errors.New("unreachable")
}

func (s *mockTikvGrpcServer) GetLockWaitHistory(ctx context.Context, request *kvrpcpb.GetLockWaitHistoryRequest) (*kvrpcpb.GetLockWaitHistoryResponse, error) {
return nil, errors.New("unreachable")
}

func (s *testRegionRequestToSingleStoreSuite) TestNoReloadRegionForGrpcWhenCtxCanceled() {
// prepare a mock tikv grpc server
addr := "localhost:56341"
Expand Down
14 changes: 9 additions & 5 deletions internal/mockstore/mocktikv/mvcc_leveldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -739,8 +739,12 @@ func (mvcc *MVCCLevelDB) Prewrite(req *kvrpcpb.PrewriteRequest) []error {
if op == kvrpcpb.Op_CheckNotExists {
continue
}
isPessimisticLock := len(req.IsPessimisticLock) > 0 && req.IsPessimisticLock[i]
err = prewriteMutation(mvcc.getDB(""), batch, m, startTS, primary, ttl, txnSize, isPessimisticLock, minCommitTS, req.AssertionLevel)

pessimisticAction := kvrpcpb.PrewriteRequest_SKIP_PESSIMISTIC_CHECK
if len(req.PessimisticActions) > 0 {
pessimisticAction = req.PessimisticActions[i]
}
err = prewriteMutation(mvcc.getDB(""), batch, m, startTS, primary, ttl, txnSize, pessimisticAction, minCommitTS, req.AssertionLevel)
errs = append(errs, err)
if err != nil {
anyError = true
Expand Down Expand Up @@ -871,7 +875,7 @@ func checkConflictValue(iter *Iterator, m *kvrpcpb.Mutation, forUpdateTS uint64,
func prewriteMutation(db *leveldb.DB, batch *leveldb.Batch,
mutation *kvrpcpb.Mutation, startTS uint64,
primary []byte, ttl uint64, txnSize uint64,
isPessimisticLock bool, minCommitTS uint64,
pessimisticAction kvrpcpb.PrewriteRequest_PessimisticAction, minCommitTS uint64,
assertionLevel kvrpcpb.AssertionLevel) error {
startKey := mvccEncode(mutation.Key, lockVer)
iter := newIterator(db, &util.Range{
Expand All @@ -888,7 +892,7 @@ func prewriteMutation(db *leveldb.DB, batch *leveldb.Batch,
}
if ok {
if dec.lock.startTS != startTS {
if isPessimisticLock {
if pessimisticAction == kvrpcpb.PrewriteRequest_DO_PESSIMISTIC_CHECK {
// NOTE: A special handling.
// When pessimistic txn prewrite meets lock, set the TTL = 0 means
// telling TiDB to rollback the transaction **unconditionly**.
Expand All @@ -913,7 +917,7 @@ func prewriteMutation(db *leveldb.DB, batch *leveldb.Batch,
return err
}
} else {
if isPessimisticLock {
if pessimisticAction == kvrpcpb.PrewriteRequest_DO_PESSIMISTIC_CHECK {
return ErrAbort("pessimistic lock not found")
}
_, err = checkConflictValue(iter, mutation, startTS, startTS, false, assertionLevel)
Expand Down
12 changes: 9 additions & 3 deletions internal/unionstore/memdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ func newMemDB() *MemDB {
db.stages = make([]MemDBCheckpoint, 0, 2)
db.entrySizeLimit = math.MaxUint64
db.bufferSizeLimit = math.MaxUint64
db.vlog.memdb = db
return db
}

Expand Down Expand Up @@ -224,7 +225,7 @@ func (db *MemDB) SelectValueHistory(key []byte, predicate func(value []byte) boo
return nil, tikverr.ErrNotExist
}
result := db.vlog.selectValueHistory(x.vptr, func(addr memdbArenaAddr) bool {
return predicate(db.vlog.getValue(addr))
return predicate(db.vlog.pureGetValue(addr))
})
if result.isNull() {
return nil, nil
Expand Down Expand Up @@ -331,7 +332,12 @@ func (db *MemDB) set(key []byte, value []byte, ops ...kv.FlagsOp) error {
x := db.traverse(key, true)

if len(ops) != 0 {
flags := kv.ApplyFlagsOps(x.getKeyFlags(), ops...)
originalFlags := x.getKeyFlags()
// the NeedConstraintCheckInPrewrite flag is temporary,
// every access to the node removes it unless it's explicitly set.
// This set must be in the latest stage so no special processing is needed.
flags := kv.ApplyFlagsOps(originalFlags, kv.DelNeedConstraintCheckInPrewrite)
flags = kv.ApplyFlagsOps(flags, ops...)
if flags.AndPersistent() != 0 {
db.dirty = true
}
Expand All @@ -357,7 +363,7 @@ func (db *MemDB) setValue(x memdbNodeAddr, value []byte) {

var oldVal []byte
if !x.vptr.isNull() {
oldVal = db.vlog.getValue(x.vptr)
oldVal = db.vlog.pureGetValue(x.vptr)
}

if len(oldVal) > 0 && db.vlog.canModify(activeCp, x.vptr) {
Expand Down
48 changes: 46 additions & 2 deletions internal/unionstore/memdb_arena.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ func (a *nodeAllocator) reset() {

type memdbVlog struct {
memdbArena
memdb *MemDB
}

const memdbVlogHdrSize = 8 + 8 + 4
Expand Down Expand Up @@ -320,7 +321,12 @@ func (l *memdbVlog) appendValue(nodeAddr memdbArenaAddr, oldValue memdbArenaAddr
return addr
}

func (l *memdbVlog) getValue(addr memdbArenaAddr) []byte {
func (addr memdbArenaAddr) AfterCheckPoint(cp MemDBCheckpoint) bool {
return addr.idx > uint32(cp.blocks)-1 || (addr.idx == uint32(cp.blocks)-1 && addr.off > uint32(cp.offsetInBlock))
ekexium marked this conversation as resolved.
Show resolved Hide resolved
}

// A pure function that gets a value. It should be used where the read doesn't affect user's(upper level module's) view
func (l *memdbVlog) pureGetValue(addr memdbArenaAddr) []byte {
lenOff := addr.off - memdbVlogHdrSize
block := l.blocks[addr.idx].buf
valueLen := endian.Uint32(block[lenOff:])
Expand All @@ -331,6 +337,44 @@ func (l *memdbVlog) getValue(addr memdbArenaAddr) []byte {
return block[valueOff:lenOff:lenOff]
}

// Get value, and possibly write a new vlog entry in the latest stage to let the locking phase be aware of it via InspectStage.
// It should be used where the read result affect users or upper level modules. Temporary flags need to be unset in such cases.
func (l *memdbVlog) getValue(addr memdbArenaAddr) (value []byte) {
hdrOffset := addr.off - memdbVlogHdrSize
cfzjywxk marked this conversation as resolved.
Show resolved Hide resolved
block := l.blocks[addr.idx].buf
var hdr memdbVlogHdr
hdr.load(block[hdrOffset:])

if hdr.valueLen == 0 {
value = tombstone
} else {
valueOff := hdrOffset - hdr.valueLen
value = block[valueOff:hdrOffset:hdrOffset]
}

l.processFlagNeedConstraintCheckInPrewrite(addr, hdr.nodeAddr, value)

return value
}

// remove the temporary flag NeedConstraintCheckInPrewrite
func (l *memdbVlog) processFlagNeedConstraintCheckInPrewrite(valueAddr memdbArenaAddr, nodeAddr memdbArenaAddr, value []byte) {
node := l.memdb.getNode(nodeAddr)
flags := node.getKeyFlags()
// only process if current access is on the latest version, so that we do it at most once.
if flags.HasNeedConstraintCheckInPrewrite() && node.vptr == valueAddr {
flags = kv.ApplyFlagsOps(flags, kv.DelNeedConstraintCheckInPrewrite)
node.setKeyFlags(flags)

// if this is not in the latest stage, we need to copy the vlog entry to the latest stage, marking it as
// a modification (of flag) so that the locking phase could find it via `InspectStage` and acquire the lock.
latestStagingHandle := len(l.memdb.stages)
if latestStagingHandle > 0 && !valueAddr.AfterCheckPoint(l.memdb.stages[latestStagingHandle-1]) {
l.memdb.setValue(node, value)
cfzjywxk marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

func (l *memdbVlog) getSnapshotValue(addr memdbArenaAddr, snap *MemDBCheckpoint) ([]byte, bool) {
result := l.selectValueHistory(addr, func(addr memdbArenaAddr) bool {
return !l.canModify(snap, addr)
Expand Down Expand Up @@ -375,7 +419,7 @@ func (l *memdbVlog) revertToCheckpoint(db *MemDB, cp *MemDBCheckpoint) {
db.dirty = true
}
} else {
db.size += len(l.getValue(hdr.oldValue))
db.size += len(l.pureGetValue(hdr.oldValue))
}

l.moveBackCursor(&cursor, &hdr)
Expand Down
23 changes: 23 additions & 0 deletions internal/unionstore/memdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -828,3 +828,26 @@ func TestBufferLimit(t *testing.T) {
err = buffer.Delete(make([]byte, 500))
assert.NotNil(err)
}

func TestUnsetTemporaryFlag(t *testing.T) {
require := require.New(t)
db := newMemDB()
db.Staging()
key := []byte{1}
value := []byte{2}
db.SetWithFlags(key, value, kv.SetNeedConstraintCheckInPrewrite)
h2 := db.Staging()

values := make([][]byte, 0)
db.InspectStage(h2, func(k []byte, flag kv.KeyFlags, v []byte) {
values = append(values, v)
})
require.Equal(len(values), 0)

values = values[:0]
db.Get(key)
db.InspectStage(h2, func(k []byte, flag kv.KeyFlags, v []byte) {
values = append(values, v)
})
require.Equal(len(values), 1)
}
24 changes: 23 additions & 1 deletion kv/keyflags.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,13 @@ const (
flagAssertExist
flagAssertNotExist

persistentFlags = flagKeyLocked | flagKeyLockedValExist
// It marks the conflict check of the key is postponed to prewrite. This is only used in pessimistic transactions.
// When the key gets locked (and the existence is checked), the flag should be removed.
// It should only be applied to keys with PresumeNotExist, so that an in-place constraint check becomes
// (a conflict check + a constraint check) in prewrite.
flagNeedConstraintCheckInPrewrite
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe flagConstraintCheckInPrewrite is more consistency with flagKeyLocked and flagNeedConstraintCheckInPrewrite

Copy link
Contributor Author

@ekexium ekexium Aug 30, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally I prefer need constraint check in prewrite. It clearly describes our intention.


persistentFlags = flagKeyLocked | flagKeyLockedValExist | flagNeedConstraintCheckInPrewrite
)

// HasAssertExist returns whether the key need ensure exists in 2pc.
Expand Down Expand Up @@ -125,6 +131,11 @@ func (f KeyFlags) HasReadable() bool {
return f&flagReadable != 0
}

// HasNeedConstraintCheckInPrewrite returns whether the key needs to check conflict in prewrite.
func (f KeyFlags) HasNeedConstraintCheckInPrewrite() bool {
return f&flagNeedConstraintCheckInPrewrite != 0
}

// AndPersistent returns the value of current flags&persistentFlags
func (f KeyFlags) AndPersistent() KeyFlags {
return f & persistentFlags
Expand Down Expand Up @@ -153,10 +164,12 @@ func ApplyFlagsOps(origin KeyFlags, ops ...FlagsOp) KeyFlags {
origin &= ^flagNeedLocked
case SetKeyLockedValueExists:
origin |= flagKeyLockedValExist
origin &= ^flagNeedConstraintCheckInPrewrite
case DelNeedCheckExists:
origin &= ^flagNeedCheckExists
case SetKeyLockedValueNotExists:
origin &= ^flagKeyLockedValExist
origin &= ^flagNeedConstraintCheckInPrewrite
case SetPrewriteOnly:
origin |= flagPrewriteOnly
case SetIgnoredIn2PC:
Expand All @@ -177,6 +190,10 @@ func ApplyFlagsOps(origin KeyFlags, ops ...FlagsOp) KeyFlags {
case SetAssertNone:
origin &= ^flagAssertExist
origin &= ^flagAssertNotExist
case SetNeedConstraintCheckInPrewrite:
origin |= flagNeedConstraintCheckInPrewrite
case DelNeedConstraintCheckInPrewrite:
origin &= ^flagNeedConstraintCheckInPrewrite
}
}
return origin
Expand Down Expand Up @@ -221,4 +238,9 @@ const (
SetAssertUnknown
// SetAssertNone cleans up the key's assert.
SetAssertNone
// SetNeedConstraintCheckInPrewrite marks the key needs to check conflict in prewrite.
SetNeedConstraintCheckInPrewrite
// DelNeedConstraintCheckInPrewrite reverts SetNeedConstraintCheckInPrewrite. This is required when we decide to
// make up the pessimistic lock.
DelNeedConstraintCheckInPrewrite
)
Loading