-
Notifications
You must be signed in to change notification settings - Fork 5.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
store/tikv: implement local latch for transaction #6268
Conversation
store/tikv/latch/latch.go
Outdated
func (l *Latch) acquire(startTS uint64) (acquire, timeout, newWait bool) { | ||
l.Lock() | ||
defer l.Unlock() | ||
timeout = startTS <= l.lastCommitTs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A latch may relate to multiple keys, so the timeout judgement is wrong here?
store/tikv/latch/latch.go
Outdated
} | ||
|
||
// acquire tries to get current key's lock for the transaction with startTS. | ||
// acquire is true when success |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not just name it success
?
store/tikv/latch/latch.go
Outdated
if timeout { | ||
return | ||
} | ||
if len(l.waiting) == 0 || l.waiting[0] != startTS { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm curious about when will l.waiting[0] == startTS
happen?
store/tikv/latch/latch.go
Outdated
l.Lock() | ||
defer l.Unlock() | ||
if startTS != l.waiting[0] { | ||
panic(fmt.Sprintf("invalid front ts %d, latch:%+v", startTS, l)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
%#v
store/tikv/latch/latch.go
Outdated
func (l *Latch) release(startTS uint64, commitTS uint64) (isEmpty bool, front uint64) { | ||
l.Lock() | ||
defer l.Unlock() | ||
if startTS != l.waiting[0] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if len(waiting) == 0 || startTS != l.waiting[0]
store/tikv/latch/latch.go
Outdated
|
||
// Latch stores a key's waiting transactions. | ||
type Latch struct { | ||
// A queue by startTs of those waiting transactions. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Both acquired
and waiting
txn is put into waiting
queue
if waiting[0] == T
, we can't distinguish T acquired the latch or waiting for the latch, that's not good.
I suggest add a holder
field, so the case would be much clear.
store/tikv/latch/latch.go
Outdated
// The number of latches that the transaction has acquired. | ||
acquiredCount int | ||
// The number of latches whose waiting queue contains current transaction. | ||
waitedCount int |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why we need this field?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need the filed in Release
for those Acquired
failed cases
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It can be changed to waiting bool
.
store/tikv/latch/latch.go
Outdated
// NewLatches create a Latches with fixed length, | ||
// the size will be rounded up to the power of 2. | ||
func NewLatches(size int) Latches { | ||
powerOfTwoSize := 1 << uint(math.Ceil(math.Log2(float64(size)))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line is ineffective and ugly.
Tell her how to do it @lamxTyler
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sort.SearchInts([]int{1,2,4,8,16,32,64...}, size)
? 😄
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, I do not know any bits magic to find the highest bits. But, there is a function in math/bits
called Len32
which returns the bits length after remove leading zeros. So a way to do it is:
if (size & (size -1)) == 0 {
powerOfTwoSize = size
} else {
powerOfTwoSize = 1 << bits.Len32(size)
}
store/tikv/latch/latch.go
Outdated
// the size will be rounded up to the power of 2. | ||
func NewLatches(size int) Latches { | ||
powerOfTwoSize := 1 << uint(math.Ceil(math.Log2(float64(size)))) | ||
latches := make([]Latch, powerOfTwoSize, powerOfTwoSize) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make([]Latch, powerOfTwoSize)
store/tikv/latch/latch.go
Outdated
panic(fmt.Sprintf("invalid front ts %d, latch:%+v", startTS, l)) | ||
} | ||
if commitTS > l.lastCommitTs { | ||
l.lastCommitTs = commitTS |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maxCommitTS
seems more appropriate than lastCommitTs
.
store/tikv/latch/latch.go
Outdated
} | ||
l.waiting = l.waiting[1:] | ||
if len(l.waiting) == 0 { | ||
isEmpty = true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess better to reset the slice by l.waiting = l.waiting[:0]
? @tiancaiamao
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if l.waiting == nil
, l.waiting = l.waiting[:0]
would panic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if l.waiting == nil
, it will panic at L65.
store/tikv/latch/latch.go
Outdated
|
||
// GenLock generates Lock for the transaction with startTS and keys. | ||
func (latches Latches) GenLock(startTS uint64, keys [][]byte) Lock { | ||
hashes := make(map[int]bool) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hashes
is not necessary.
slots := make([]int, 0, len(keys))
for _, key := range keys {
slots = append(slots, latches.hash[key])
}
sort.Ints(slots)
store/tikv/latch/latch.go
Outdated
lock.waitedCount++ | ||
} | ||
if timeout || !acquired { | ||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
timeout doesn't clear the lock?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd like to call Release
outside this function.
store/tikv/latch/latch.go
Outdated
slotID := lock.requiredSlots[lock.acquiredCount] | ||
acquired, timeout, newWait = latches[slotID].acquire(lock.startTS) | ||
if newWait { | ||
lock.waitedCount++ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The waitedCount
always make me confused...
store/tikv/latch/latch.go
Outdated
// Latch stores a key's waiting transactions. | ||
type Latch struct { | ||
// A queue by startTs of those waiting transactions. | ||
waiting []uint64 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The number of latches with a waiting queue is much less than the number of latches.
The slice has a pointer which has a big impact on GC when the number of latches is large.
We can move the waiting queue out of latches to a map to reduce GC pressure.
like
type Latch struct {
Head uint64
HasWaiting bool
LastCommit uint64
sync.Mutex
}
type Latches struct {
Slots []Latch
WaitingQueue map[int][]int // key is slotID, value is slice of txn startTS
WaitingLock sync.Mutex
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no map used in current code @coocood
store/tikv/latch/latch.go
Outdated
l.Lock() | ||
defer l.Unlock() | ||
|
||
if timeout = startTS <= l.maxCommitTS; timeout { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/timeout/stale
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if startTS <= l.maxCommitTS {
timeout = true
return
}
store/tikv/latch/latch.go
Outdated
return | ||
} | ||
|
||
if l.hasWaiting == false { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if !l.hasWaiting {
store/tikv/latch/latch.go
Outdated
} | ||
|
||
func (latches *Latches) acquireSlot(slotID int, startTS uint64) (success, timeout, new bool) { | ||
success, timeout, new = latches.slots[slotID].acquire(startTS) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We never use the returned value new
.
store/tikv/latch/latch.go
Outdated
return | ||
} | ||
|
||
if l.hasWaiting == false { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
!l.hasWaiting
store/tikv/latch/latch.go
Outdated
l.hasWaiting = true | ||
newWait = true | ||
} | ||
success = l.head == startTS |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems newWait
and success
will always be the same, as long as we don't acquire
multiple times for a same ts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm also confused by newWait
store/tikv/latch/latch.go
Outdated
latches.Lock() | ||
defer latches.Unlock() | ||
if waiting, ok := latches.waitingQueue[slotID]; ok { | ||
latches.waitingQueue[slotID] = append(waiting, startTS) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible that we already in the waiting queue?
store/tikv/latch/latch.go
Outdated
} | ||
|
||
func (latches *Latches) releaseSlot(slotID int, startTS, commitTS uint64) (hasNext bool, nextStartTS uint64) { | ||
latches.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can avoid lock here.
Check the slot first, if the slot doesn't have waiting queue, we don't need to lock.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, we do not know if the slot has a wating queue in Latch
store/tikv/latch/latch.go
Outdated
slots[size] = v | ||
size++ | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This maybe easier to read.
dedup := slots[:1]
for i := 1; i < len(slots); i++ {
if slots[i] != slots[i-1] {
dedup = append(dedup, slots[i])
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tiancaiamao What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, @coocood is right! it's easier to read, you can change this way.
if len(slots) == 0 {
return NewLock(startTS, nil)
}
As for me, I'd always like the C-style, for example:
- don't let append do the implicit work
- manual malloc when resize a slice
- prefer using
i := 0; i < len(); i++
style instead ofrange
It seems clumsy but less likely to went wrong.
store/tikv/latch/latch.go
Outdated
slots = append(slots, latches.hash(key)) | ||
} | ||
sort.Ints(slots) | ||
if len(slots) == 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if len(slots) <= 1
?
|
||
// Release releases all latches owned by the `lock` and returns the wakeup list. | ||
// Preconditions: the caller must ensure the transaction is at the front of the latches. | ||
func (latches *Latches) Release(lock *Lock, commitTS uint64) (wakeupList []uint64) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What the commitTs will be when release a uncommitted txn?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will be zero.
store/tikv/latch/latch.go
Outdated
} | ||
|
||
latches.Lock() | ||
if waiting, ok := latches.waitingQueue[slotID]; ok { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If latch.hasWanting
is true, it means the ok must be true.
store/tikv/latch/latch.go
Outdated
// Whether there is any transaction in waitingQueue except head. | ||
hasWaiting bool | ||
// The startTS of the transaction which is the head of waiting transactions. | ||
head uint64 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/head/waitingQueueHead
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is better to add a function occupied
for Latch
to judge if the Latch
is available. Lock
should use fn occupied
to implement acquire
.
return | ||
} | ||
// Empty latch | ||
if !latch.occupied() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
acquired := latch.aquire(start_ts)
if acquired {
return
}
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we check stale outside latch while do acquire in latch? I do not think is a good style.
if we do this, I think we should also implement latch.isStale(startTS), latch.setHasWaiting(), and so on.
store/tikv/latch/latch.go
Outdated
// Preconditions: the caller must ensure the transaction is at the front of the latches. | ||
func (latches *Latches) Release(lock *Lock, commitTS uint64) (wakeupList []uint64) { | ||
wakeupCount := lock.acquiredCount | ||
if lock.waiting { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In Release
, waiting
is always false.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nop
store/tikv/latch/latch.go
Outdated
if success { | ||
lock.acquiredCount++ | ||
lock.waiting = false | ||
continue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should not continue
here.
success
and stale
may both be true.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if !stale {
continue
} else {
return
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
According to the definition of acquiredCount
and success
, it's ok to continue here.
store/tikv/latch/latch.go
Outdated
type Latches struct { | ||
slots []Latch | ||
// The waiting queue for each slot(slotID => slice of startTS). | ||
waitingQueue map[int][]uint64 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
waitingQueues
store/tikv/latch/latch.go
Outdated
// The number of latches that the transaction has acquired. | ||
acquiredCount int | ||
// Whether current transaction is waiting | ||
waiting bool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isWaiting
store/tikv/latch/latch.go
Outdated
// Latch stores a key's waiting transactions information. | ||
type Latch struct { | ||
// Whether there is any transaction in waitingQueue except head. | ||
hasWaiting bool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hasMoreWaiting
store/tikv/latch/latch.go
Outdated
} | ||
|
||
// hash return hash int for current key. | ||
func (latches *Latches) hash(key []byte) int { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
slotID
store/tikv/latch/latch.go
Outdated
// Release releases all latches owned by the `lock` and returns the wakeup list. | ||
// Preconditions: the caller must ensure the transaction is at the front of the latches. | ||
func (latches *Latches) Release(lock *Lock, commitTS uint64) (wakeupList []uint64) { | ||
wakeupCount := lock.acquiredCount |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think releaseCount
is better
store/tikv/latch/latch.go
Outdated
if startTS != latch.waitingQueueHead { | ||
panic(fmt.Sprintf("invalid front ts %d, latch:%#v", startTS, latch)) | ||
} | ||
if latch.maxCommitTS < commitTS { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
latch.maxCommitTS = mathutil.Max(latch.maxCommitTS, commitTS)
LGTM |
friendly ping @zhangjinpeng1987 @disksing |
latch := &latches.slots[slotID] | ||
latch.Lock() | ||
defer latch.Unlock() | ||
if startTS != latch.waitingQueueHead { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When release a Lock
which is in waiting status will cause panic here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We panic here since it should never happen.
store/tikv/latch/latch.go
Outdated
return NewLock(startTS, dedup) | ||
} | ||
|
||
// hash return slotID for current key. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
slotID returns ...
store/tikv/latch/latch.go
Outdated
latches.waitingQueues[slotID] = append(waitingQueue, startTS) | ||
} else { | ||
latches.waitingQueues[slotID] = []uint64{startTS} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can simplify it to latches.waitingQueues[slotID] = append(latches.waitingQueues[slotID], startTS)
store/tikv/latch/latch.go
Outdated
releaseCount++ | ||
} | ||
wakeupList = make([]uint64, 0, releaseCount) | ||
for id := 0; id < releaseCount; id++ { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
id -> i
store/tikv/latch/latch.go
Outdated
} else { | ||
latches.waitingQueues[slotID] = waiting[1:] | ||
} | ||
latches.Unlock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about extract L162-173 to a method?
store/tikv/latch/latch.go
Outdated
} else { | ||
latches.waitingQueues[slotID] = waiting[1:] | ||
} | ||
latches.Unlock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about extract L162-173 as a method?
store/tikv/latch/latch.go
Outdated
waiting := latches.waitingQueues[slotID] | ||
hasNext = true | ||
nextStartTS = waiting[0] | ||
front = waiting[0] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about if waiting queue is empty?
latches.Lock() | ||
defer latches.Unlock() | ||
waiting := latches.waitingQueues[slotID] | ||
front = waiting[0] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about waiting is empty?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will return on L158
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
/run-all-tests |
@coocood @tiancaiamao @disksing @zhangjinpeng1987 PTAL