Skip to content

Commit

Permalink
update kvproto
Browse files Browse the repository at this point in the history
Signed-off-by: ekexium <eke@fastmail.com>
  • Loading branch information
ekexium committed Aug 23, 2022
1 parent 136f817 commit 03942a5
Show file tree
Hide file tree
Showing 10 changed files with 72 additions and 69 deletions.
4 changes: 1 addition & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
github.com/opentracing/opentracing-go v1.2.0
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989
github.com/pingcap/kvproto v0.0.0-20220705053936-aa9c2d20cd2a
github.com/pingcap/kvproto v0.0.0-20220818023518-a0f02b6efcee
github.com/pingcap/log v0.0.0-20211215031037-e024ba4eb0ee
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.11.0
Expand Down Expand Up @@ -58,5 +58,3 @@ require (
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
stathat.com/c/consistent v1.0.0 // indirect
)

replace github.com/pingcap/kvproto => github.com/ekexium/kvproto v0.0.0-20220725090049-ba7549963255
5 changes: 3 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/ekexium/kvproto v0.0.0-20220725090049-ba7549963255 h1:C1H3SBEtQEa3ObZPqe6C0njrvgtQGG+hoYjBDvUFk2A=
github.com/ekexium/kvproto v0.0.0-20220725090049-ba7549963255/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
Expand Down Expand Up @@ -156,6 +154,9 @@ github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZ
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew=
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E=
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20220510035547-0e2f26c0a46a/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/kvproto v0.0.0-20220818023518-a0f02b6efcee h1:s1al2ci3MEj5VnNuUCGAfeqpbCxcMeZibOyxw8ClHLE=
github.com/pingcap/kvproto v0.0.0-20220818023518-a0f02b6efcee/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/log v0.0.0-20211215031037-e024ba4eb0ee h1:VO2t6IBpfvW34TdtD/G10VvnGqjLic1jzOuHjUb5VqM=
github.com/pingcap/log v0.0.0-20211215031037-e024ba4eb0ee/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
4 changes: 4 additions & 0 deletions internal/locate/region_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,10 @@ func (s *mockTikvGrpcServer) Compact(ctx context.Context, request *kvrpcpb.Compa
return nil, errors.New("unreachable")
}

func (s *mockTikvGrpcServer) GetLockWaitHistory(ctx context.Context, request *kvrpcpb.GetLockWaitHistoryRequest) (*kvrpcpb.GetLockWaitHistoryResponse, error) {
return nil, errors.New("unreachable")
}

func (s *testRegionRequestToSingleStoreSuite) TestNoReloadRegionForGrpcWhenCtxCanceled() {
// prepare a mock tikv grpc server
addr := "localhost:56341"
Expand Down
14 changes: 7 additions & 7 deletions internal/mockstore/mocktikv/mvcc_leveldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -740,11 +740,11 @@ func (mvcc *MVCCLevelDB) Prewrite(req *kvrpcpb.PrewriteRequest) []error {
continue
}

isPessimisticLock := kvrpcpb.PessimisticLockType_NonPessimisticLocked
if len(req.IsPessimisticLock) > 0 {
isPessimisticLock = req.IsPessimisticLock[i]
pessimisticAction := kvrpcpb.PrewriteRequest_SKIP_PESSIMISTIC_CHECK
if len(req.PessimisticActions) > 0 {
pessimisticAction = req.PessimisticActions[i]
}
err = prewriteMutation(mvcc.getDB(""), batch, m, startTS, primary, ttl, txnSize, isPessimisticLock, minCommitTS, req.AssertionLevel)
err = prewriteMutation(mvcc.getDB(""), batch, m, startTS, primary, ttl, txnSize, pessimisticAction, minCommitTS, req.AssertionLevel)
errs = append(errs, err)
if err != nil {
anyError = true
Expand Down Expand Up @@ -875,7 +875,7 @@ func checkConflictValue(iter *Iterator, m *kvrpcpb.Mutation, forUpdateTS uint64,
func prewriteMutation(db *leveldb.DB, batch *leveldb.Batch,
mutation *kvrpcpb.Mutation, startTS uint64,
primary []byte, ttl uint64, txnSize uint64,
isPessimisticLock kvrpcpb.PessimisticLockType, minCommitTS uint64,
pessimisticAction kvrpcpb.PrewriteRequest_PessimisticAction, minCommitTS uint64,
assertionLevel kvrpcpb.AssertionLevel) error {
startKey := mvccEncode(mutation.Key, lockVer)
iter := newIterator(db, &util.Range{
Expand All @@ -892,7 +892,7 @@ func prewriteMutation(db *leveldb.DB, batch *leveldb.Batch,
}
if ok {
if dec.lock.startTS != startTS {
if isPessimisticLock == kvrpcpb.PessimisticLockType_PessimisticLocked {
if pessimisticAction == kvrpcpb.PrewriteRequest_DO_PESSIMISTIC_CHECK {
// NOTE: A special handling.
// When pessimistic txn prewrite meets lock, set the TTL = 0 means
// telling TiDB to rollback the transaction **unconditionly**.
Expand All @@ -917,7 +917,7 @@ func prewriteMutation(db *leveldb.DB, batch *leveldb.Batch,
return err
}
} else {
if isPessimisticLock == kvrpcpb.PessimisticLockType_PessimisticLocked {
if pessimisticAction == kvrpcpb.PrewriteRequest_DO_PESSIMISTIC_CHECK {
return ErrAbort("pessimistic lock not found")
}
_, err = checkConflictValue(iter, mutation, startTS, startTS, false, assertionLevel)
Expand Down
4 changes: 2 additions & 2 deletions internal/unionstore/memdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,10 +338,10 @@ func (db *MemDB) set(key []byte, value []byte, ops ...kv.FlagsOp) error {

if len(ops) != 0 {
originalFlags := x.getKeyFlags()
// the NeedConflictCheckInPrewrite flag is temporary,
// the NeedConstraintCheckInPrewrite flag is temporary,
// every access to the node removes it unless it's explicitly set.
// This set must be in the latest stage so no special processing is needed.
flags := kv.ApplyFlagsOps(originalFlags, kv.DelNeedConflictCheckInPrewrite)
flags := kv.ApplyFlagsOps(originalFlags, kv.DelNeedConstraintCheckInPrewrite)
flags = kv.ApplyFlagsOps(flags, ops...)
if flags.AndPersistent() != 0 {
db.dirty = true
Expand Down
10 changes: 5 additions & 5 deletions internal/unionstore/memdb_arena.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,18 +351,18 @@ func (l *memdbVlog) getValue(addr memdbArenaAddr) (value []byte) {
valueOff := hdrOffset - hdr.valueLen
value = block[valueOff:hdrOffset:hdrOffset]

l.processFlagNeedConflictCheckInPrewrite(addr, hdr.nodeAddr, value)
l.processFlagNeedConstraintCheckInPrewrite(addr, hdr.nodeAddr, value)

return value
}

// remove the temporary flag NeedConflictCheckInPrewrite
func (l *memdbVlog) processFlagNeedConflictCheckInPrewrite(valueAddr memdbArenaAddr, nodeAddr memdbArenaAddr, value []byte) {
// remove the temporary flag NeedConstraintCheckInPrewrite
func (l *memdbVlog) processFlagNeedConstraintCheckInPrewrite(valueAddr memdbArenaAddr, nodeAddr memdbArenaAddr, value []byte) {
node := l.memdb.getNode(nodeAddr)
flags := node.getKeyFlags()
// only process if current access is on the latest version, so that we do it at most once.
if flags.HasNeedConflictCheckInPrewrite() && node.vptr == valueAddr {
flags = kv.ApplyFlagsOps(flags, kv.DelNeedConflictCheckInPrewrite)
if flags.HasNeedConstraintCheckInPrewrite() && node.vptr == valueAddr {
flags = kv.ApplyFlagsOps(flags, kv.DelNeedConstraintCheckInPrewrite)
node.updateKeyFlags()

// if this is not in the latest stage, we need to copy the vlog entry to the latest stage, marking it as
Expand Down
2 changes: 1 addition & 1 deletion internal/unionstore/memdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -835,7 +835,7 @@ func TestUnsetTemporaryFlag(t *testing.T) {
db.Staging()
key := []byte{1}
value := []byte{2}
db.SetWithFlags(key, value, kv.SetNeedConflictCheckInPrewrite)
db.SetWithFlags(key, value, kv.SetNeedConstraintCheckInPrewrite)
h2 := db.Staging()

values := make([][]byte, 0, 0)
Expand Down
30 changes: 15 additions & 15 deletions kv/keyflags.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ const (
// When the key gets locked (and the existence is checked), the flag should be removed.
// It should only be applied to keys with PresumeNotExist, so that an in-place constraint check becomes
// (a conflict check + a constraint check) in prewrite.
flagNeedConflictCheckInPrewrite
flagNeedConstraintCheckInPrewrite

persistentFlags = flagKeyLocked | flagKeyLockedValExist | flagNeedConflictCheckInPrewrite
persistentFlags = flagKeyLocked | flagKeyLockedValExist | flagNeedConstraintCheckInPrewrite
)

// HasAssertExist returns whether the key need ensure exists in 2pc.
Expand Down Expand Up @@ -131,9 +131,9 @@ func (f KeyFlags) HasReadable() bool {
return f&flagReadable != 0
}

// HasNeedConflictCheckInPrewrite returns whether the key needs to check conflict in prewrite.
func (f KeyFlags) HasNeedConflictCheckInPrewrite() bool {
return f&flagNeedConflictCheckInPrewrite != 0
// HasNeedConstraintCheckInPrewrite returns whether the key needs to check conflict in prewrite.
func (f KeyFlags) HasNeedConstraintCheckInPrewrite() bool {
return f&flagNeedConstraintCheckInPrewrite != 0
}

// AndPersistent returns the value of current flags&persistentFlags
Expand Down Expand Up @@ -164,12 +164,12 @@ func ApplyFlagsOps(origin KeyFlags, ops ...FlagsOp) KeyFlags {
origin &= ^flagNeedLocked
case SetKeyLockedValueExists:
origin |= flagKeyLockedValExist
origin &= ^flagNeedConflictCheckInPrewrite
origin &= ^flagNeedConstraintCheckInPrewrite
case DelNeedCheckExists:
origin &= ^flagNeedCheckExists
case SetKeyLockedValueNotExists:
origin &= ^flagKeyLockedValExist
origin &= ^flagNeedConflictCheckInPrewrite
origin &= ^flagNeedConstraintCheckInPrewrite
case SetPrewriteOnly:
origin |= flagPrewriteOnly
case SetIgnoredIn2PC:
Expand All @@ -190,10 +190,10 @@ func ApplyFlagsOps(origin KeyFlags, ops ...FlagsOp) KeyFlags {
case SetAssertNone:
origin &= ^flagAssertExist
origin &= ^flagAssertNotExist
case SetNeedConflictCheckInPrewrite:
origin |= flagNeedConflictCheckInPrewrite
case DelNeedConflictCheckInPrewrite:
origin &= ^flagNeedConflictCheckInPrewrite
case SetNeedConstraintCheckInPrewrite:
origin |= flagNeedConstraintCheckInPrewrite
case DelNeedConstraintCheckInPrewrite:
origin &= ^flagNeedConstraintCheckInPrewrite
}
}
return origin
Expand Down Expand Up @@ -238,9 +238,9 @@ const (
SetAssertUnknown
// SetAssertNone cleans up the key's assert.
SetAssertNone
// SetNeedConflictCheckInPrewrite marks the key needs to check conflict in prewrite.
SetNeedConflictCheckInPrewrite
// DelNeedConflictCheckInPrewrite reverts SetNeedConflictCheckInPrewrite. This is required when we decide to
// SetNeedConstraintCheckInPrewrite marks the key needs to check conflict in prewrite.
SetNeedConstraintCheckInPrewrite
// DelNeedConstraintCheckInPrewrite reverts SetNeedConstraintCheckInPrewrite. This is required when we decide to
// make up the pessimistic lock.
DelNeedConflictCheckInPrewrite
DelNeedConstraintCheckInPrewrite
)
38 changes: 19 additions & 19 deletions txnkv/transaction/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ type memBufferMutations struct {

// The format to put to the UserData of the handles:
// MSB LSB
// [12 bits: Op][1 bit: needConflictCheckInPrewrite][1 bit: assertNotExist][1 bit: assertExist][1 bit: isPessimisticLock]
// [12 bits: Op][1 bit: NeedConstraintCheckInPrewrite][1 bit: assertNotExist][1 bit: assertExist][1 bit: isPessimisticLock]
handles []unionstore.MemKeyHandle
}

Expand Down Expand Up @@ -231,7 +231,7 @@ func (m *memBufferMutations) IsAssertNotExist(i int) bool {
return m.handles[i].UserData&(1<<2) != 0
}

func (m *memBufferMutations) NeedConflictCheckInPrewrite(i int) bool {
func (m *memBufferMutations) NeedConstraintCheckInPrewrite(i int) bool {
return m.handles[i].UserData&(1<<3) != 0
}

Expand All @@ -242,7 +242,7 @@ func (m *memBufferMutations) Slice(from, to int) CommitterMutations {
}
}

func (m *memBufferMutations) Push(op kvrpcpb.Op, isPessimisticLock, assertExist, assertNotExist, needConflictCheckInPrewrite bool,
func (m *memBufferMutations) Push(op kvrpcpb.Op, isPessimisticLock, assertExist, assertNotExist, NeedConstraintCheckInPrewrite bool,
handle unionstore.MemKeyHandle) {
// See comments of `m.handles` field about the format of the user data `aux`.
aux := uint16(op) << 4
Expand All @@ -255,7 +255,7 @@ func (m *memBufferMutations) Push(op kvrpcpb.Op, isPessimisticLock, assertExist,
if assertNotExist {
aux |= 1 << 2
}
if needConflictCheckInPrewrite {
if NeedConstraintCheckInPrewrite {
aux |= 1 << 3
}
handle.UserData = aux
Expand All @@ -275,11 +275,11 @@ const (
// MutationFlagIsAssertNotExists is the flag that marks a mutation needs to be asserted to be not-existed when prewriting.
MutationFlagIsAssertNotExists

// MutationFlagNeedConflictCheckInPrewrite is the flag that marks a mutation needs to be checked for conflicts in prewrite.
MutationFlagNeedConflictCheckInPrewrite
// MutationFlagNeedConstraintCheckInPrewrite is the flag that marks a mutation needs to be checked for conflicts in prewrite.
MutationFlagNeedConstraintCheckInPrewrite
)

func makeMutationFlags(isPessimisticLock, assertExist, assertNotExist, needConflictCheckInPrewrite bool) CommitterMutationFlags {
func makeMutationFlags(isPessimisticLock, assertExist, assertNotExist, NeedConstraintCheckInPrewrite bool) CommitterMutationFlags {
var flags CommitterMutationFlags = 0
if isPessimisticLock {
flags |= MutationFlagIsPessimisticLock
Expand All @@ -290,8 +290,8 @@ func makeMutationFlags(isPessimisticLock, assertExist, assertNotExist, needConfl
if assertNotExist {
flags |= MutationFlagIsAssertNotExists
}
if needConflictCheckInPrewrite {
flags |= MutationFlagNeedConflictCheckInPrewrite
if NeedConstraintCheckInPrewrite {
flags |= MutationFlagNeedConstraintCheckInPrewrite
}
return flags
}
Expand All @@ -307,7 +307,7 @@ type CommitterMutations interface {
Slice(from, to int) CommitterMutations
IsAssertExists(i int) bool
IsAssertNotExist(i int) bool
NeedConflictCheckInPrewrite(i int) bool
NeedConstraintCheckInPrewrite(i int) bool
}

// PlainMutations contains transaction operations.
Expand Down Expand Up @@ -346,11 +346,11 @@ func (c *PlainMutations) Slice(from, to int) CommitterMutations {

// Push another mutation into mutations.
func (c *PlainMutations) Push(op kvrpcpb.Op, key []byte, value []byte, isPessimisticLock, assertExist,
assertNotExist, needConflictCheckInPrewrite bool) {
assertNotExist, NeedConstraintCheckInPrewrite bool) {
c.ops = append(c.ops, op)
c.keys = append(c.keys, key)
c.values = append(c.values, value)
c.flags = append(c.flags, makeMutationFlags(isPessimisticLock, assertExist, assertNotExist, needConflictCheckInPrewrite))
c.flags = append(c.flags, makeMutationFlags(isPessimisticLock, assertExist, assertNotExist, NeedConstraintCheckInPrewrite))
}

// Len returns the count of mutations.
Expand Down Expand Up @@ -393,9 +393,9 @@ func (c *PlainMutations) IsAssertNotExist(i int) bool {
return c.flags[i]&MutationFlagIsAssertNotExists != 0
}

// NeedConflictCheckInPrewrite returns the key needConflictCheckInPrewrite flag at index.
func (c *PlainMutations) NeedConflictCheckInPrewrite(i int) bool {
return c.flags[i]&MutationFlagNeedConflictCheckInPrewrite != 0
// NeedConstraintCheckInPrewrite returns the key NeedConstraintCheckInPrewrite flag at index.
func (c *PlainMutations) NeedConstraintCheckInPrewrite(i int) bool {
return c.flags[i]&MutationFlagNeedConstraintCheckInPrewrite != 0
}

// GetOp returns the key op at index.
Expand Down Expand Up @@ -602,7 +602,7 @@ func (c *twoPhaseCommitter) initKeysAndMutations(ctx context.Context) error {
if c.txn.schemaAmender != nil || c.txn.assertionLevel == kvrpcpb.AssertionLevel_Off {
mustExist, mustNotExist, hasAssertUnknown = false, false, false
}
c.mutations.Push(op, isPessimistic, mustExist, mustNotExist, flags.HasNeedConflictCheckInPrewrite(), it.Handle())
c.mutations.Push(op, isPessimistic, mustExist, mustNotExist, flags.HasNeedConstraintCheckInPrewrite(), it.Handle())
size += len(key) + len(value)

if c.txn.assertionLevel != kvrpcpb.AssertionLevel_Off {
Expand Down Expand Up @@ -1701,7 +1701,7 @@ func (c *twoPhaseCommitter) amendPessimisticLock(ctx context.Context, addMutatio
for i := 0; i < addMutations.Len(); i++ {
if addMutations.IsPessimisticLock(i) {
keysNeedToLock.Push(addMutations.GetOp(i), addMutations.GetKey(i), addMutations.GetValue(i), addMutations.IsPessimisticLock(i),
addMutations.IsAssertExists(i), addMutations.IsAssertNotExist(i), addMutations.NeedConflictCheckInPrewrite(i))
addMutations.IsAssertExists(i), addMutations.IsAssertNotExist(i), addMutations.NeedConstraintCheckInPrewrite(i))
}
}
// For unique index amend, we need to pessimistic lock the generated new index keys first.
Expand Down Expand Up @@ -1786,7 +1786,7 @@ func (c *twoPhaseCommitter) tryAmendTxn(ctx context.Context, startInfoSchema Sch
}
handle := c.txn.GetMemBuffer().IterWithFlags(key, nil).Handle()
c.mutations.Push(op, addMutations.IsPessimisticLock(i), addMutations.IsAssertExists(i),
addMutations.IsAssertNotExist(i), addMutations.NeedConflictCheckInPrewrite(i), handle)
addMutations.IsAssertNotExist(i), addMutations.NeedConstraintCheckInPrewrite(i), handle)
}
}
return false, nil
Expand Down Expand Up @@ -2140,7 +2140,7 @@ func (c *twoPhaseCommitter) mutationsOfKeys(keys [][]byte) CommitterMutations {
for _, key := range keys {
if bytes.Equal(c.mutations.GetKey(i), key) {
res.Push(c.mutations.GetOp(i), c.mutations.GetKey(i), c.mutations.GetValue(i), c.mutations.IsPessimisticLock(i),
c.mutations.IsAssertExists(i), c.mutations.IsAssertNotExist(i), c.mutations.NeedConflictCheckInPrewrite(i))
c.mutations.IsAssertExists(i), c.mutations.IsAssertNotExist(i), c.mutations.NeedConstraintCheckInPrewrite(i))
break
}
}
Expand Down
30 changes: 15 additions & 15 deletions txnkv/transaction/prewrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (actionPrewrite) tiKVTxnRegionsNumHistogram() prometheus.Observer {
func (c *twoPhaseCommitter) buildPrewriteRequest(batch batchMutations, txnSize uint64) *tikvrpc.Request {
m := batch.mutations
mutations := make([]*kvrpcpb.Mutation, m.Len())
isPessimisticLock := make([]kvrpcpb.PessimisticLockType, m.Len())
pessimisticActions := make([]kvrpcpb.PrewriteRequest_PessimisticAction, m.Len())
for i := 0; i < m.Len(); i++ {
assertion := kvrpcpb.Assertion_None
if m.IsAssertExists(i) {
Expand All @@ -90,11 +90,11 @@ func (c *twoPhaseCommitter) buildPrewriteRequest(batch batchMutations, txnSize u
Assertion: assertion,
}
if m.IsPessimisticLock(i) {
isPessimisticLock[i] = kvrpcpb.PessimisticLockType_PessimisticLocked
} else if m.NeedConflictCheckInPrewrite(i) {
isPessimisticLock[i] = kvrpcpb.PessimisticLockType_NeedConflictCheck
pessimisticActions[i] = kvrpcpb.PrewriteRequest_DO_PESSIMISTIC_CHECK
} else if m.NeedConstraintCheckInPrewrite(i) {
pessimisticActions[i] = kvrpcpb.PrewriteRequest_DO_CONSTRAINT_CHECK
} else {
isPessimisticLock[i] = kvrpcpb.PessimisticLockType_NonPessimisticLocked
pessimisticActions[i] = kvrpcpb.PrewriteRequest_SKIP_PESSIMISTIC_CHECK
}
}
c.mu.Lock()
Expand Down Expand Up @@ -147,16 +147,16 @@ func (c *twoPhaseCommitter) buildPrewriteRequest(batch batchMutations, txnSize u
}

req := &kvrpcpb.PrewriteRequest{
Mutations: mutations,
PrimaryLock: c.primary(),
StartVersion: c.startTS,
LockTtl: ttl,
IsPessimisticLock: isPessimisticLock,
ForUpdateTs: c.forUpdateTS,
TxnSize: txnSize,
MinCommitTs: minCommitTS,
MaxCommitTs: c.maxCommitTS,
AssertionLevel: assertionLevel,
Mutations: mutations,
PrimaryLock: c.primary(),
StartVersion: c.startTS,
LockTtl: ttl,
PessimisticActions: pessimisticActions,
ForUpdateTs: c.forUpdateTS,
TxnSize: txnSize,
MinCommitTs: minCommitTS,
MaxCommitTs: c.maxCommitTS,
AssertionLevel: assertionLevel,
}

if _, err := util.EvalFailpoint("invalidMaxCommitTS"); err == nil {
Expand Down

0 comments on commit 03942a5

Please sign in to comment.