-
Notifications
You must be signed in to change notification settings - Fork 5.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
store/tikv: implement local latch for transaction #6268
Changes from 15 commits
958eed5
13fd547
a3c8d2c
9c0f9d2
2ae2922
b87033b
fa10a54
af981cc
ed700f4
5488231
e71ffe2
c84c8de
58cabb8
d0b2200
65bfe3f
d00fa95
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,199 @@ | ||
// Copyright 2018 PingCAP, Inc. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package latch | ||
|
||
import ( | ||
"fmt" | ||
"math/bits" | ||
"sort" | ||
"sync" | ||
|
||
"github.com/cznic/mathutil" | ||
"github.com/spaolacci/murmur3" | ||
) | ||
|
||
// Latch stores a key's waiting transactions information. | ||
type Latch struct { | ||
// Whether there is any transaction in waitingQueue except head. | ||
hasMoreWaiting bool | ||
// The startTS of the transaction which is the head of waiting transactions. | ||
waitingQueueHead uint64 | ||
maxCommitTS uint64 | ||
sync.Mutex | ||
} | ||
|
||
func (l *Latch) occupied() bool { | ||
return l.waitingQueueHead != 0 | ||
} | ||
|
||
func (l *Latch) free() { | ||
l.waitingQueueHead = 0 | ||
} | ||
|
||
// Lock is the locks' information required for a transaction. | ||
type Lock struct { | ||
// The slot IDs of the latches(keys) that a startTS must acquire before being able to processed. | ||
requiredSlots []int | ||
// The number of latches that the transaction has acquired. | ||
acquiredCount int | ||
// Whether current transaction is waiting | ||
isWaiting bool | ||
// Current transaction's startTS. | ||
startTS uint64 | ||
} | ||
|
||
// NewLock creates a new lock. | ||
func NewLock(startTS uint64, requiredSlots []int) Lock { | ||
return Lock{ | ||
requiredSlots: requiredSlots, | ||
acquiredCount: 0, | ||
isWaiting: false, | ||
startTS: startTS, | ||
} | ||
} | ||
|
||
// Latches which are used for concurrency control. | ||
// Each latch is indexed by a slot's ID, hence the term latch and slot are used in interchangeable, | ||
// but conceptually a latch is a queue, and a slot is an index to the queue | ||
type Latches struct { | ||
slots []Latch | ||
// The waiting queue for each slot(slotID => slice of startTS). | ||
waitingQueues map[int][]uint64 | ||
sync.RWMutex | ||
} | ||
|
||
// NewLatches create a Latches with fixed length, | ||
// the size will be rounded up to the power of 2. | ||
func NewLatches(size int) *Latches { | ||
powerOfTwoSize := 1 << uint32(bits.Len32(uint32(size-1))) | ||
slots := make([]Latch, powerOfTwoSize) | ||
return &Latches{ | ||
slots: slots, | ||
waitingQueues: make(map[int][]uint64), | ||
} | ||
} | ||
|
||
// GenLock generates Lock for the transaction with startTS and keys. | ||
func (latches *Latches) GenLock(startTS uint64, keys [][]byte) Lock { | ||
slots := make([]int, 0, len(keys)) | ||
for _, key := range keys { | ||
slots = append(slots, latches.slotID(key)) | ||
} | ||
sort.Ints(slots) | ||
if len(slots) <= 1 { | ||
return NewLock(startTS, slots) | ||
} | ||
dedup := slots[:1] | ||
for i := 1; i < len(slots); i++ { | ||
if slots[i] != slots[i-1] { | ||
dedup = append(dedup, slots[i]) | ||
} | ||
} | ||
return NewLock(startTS, dedup) | ||
} | ||
|
||
// slotID return slotID for current key. | ||
func (latches *Latches) slotID(key []byte) int { | ||
return int(murmur3.Sum32(key)) & (len(latches.slots) - 1) | ||
} | ||
|
||
// Acquire tries to acquire the lock for a transaction. | ||
// It returns with stale = true when the transaction is stale( | ||
// when the lock.startTS is smaller than any key's last commitTS). | ||
func (latches *Latches) Acquire(lock *Lock) (success, stale bool) { | ||
for lock.acquiredCount < len(lock.requiredSlots) { | ||
slotID := lock.requiredSlots[lock.acquiredCount] | ||
success, stale = latches.acquireSlot(slotID, lock.startTS) | ||
if success { | ||
lock.acquiredCount++ | ||
lock.isWaiting = false | ||
continue | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should not There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. According to the definition of |
||
} | ||
if !stale { | ||
lock.isWaiting = true | ||
} | ||
return | ||
} | ||
return | ||
} | ||
|
||
// Release releases all latches owned by the `lock` and returns the wakeup list. | ||
// Preconditions: the caller must ensure the transaction is at the front of the latches. | ||
func (latches *Latches) Release(lock *Lock, commitTS uint64) (wakeupList []uint64) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What the commitTs will be when release a uncommitted txn? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. will be zero. |
||
releaseCount := lock.acquiredCount | ||
if lock.isWaiting { | ||
releaseCount++ | ||
} | ||
wakeupList = make([]uint64, 0, releaseCount) | ||
for i := 0; i < releaseCount; i++ { | ||
slotID := lock.requiredSlots[i] | ||
|
||
if hasNext, nextStartTS := latches.releaseSlot(slotID, lock.startTS, commitTS); hasNext { | ||
wakeupList = append(wakeupList, nextStartTS) | ||
} | ||
} | ||
return | ||
} | ||
|
||
func (latches *Latches) releaseSlot(slotID int, startTS, commitTS uint64) (hasNext bool, nextStartTS uint64) { | ||
latch := &latches.slots[slotID] | ||
latch.Lock() | ||
defer latch.Unlock() | ||
if startTS != latch.waitingQueueHead { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When release a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We panic here since it should never happen. |
||
panic(fmt.Sprintf("invalid front ts %d, latch:%#v", startTS, latch)) | ||
} | ||
latch.maxCommitTS = mathutil.MaxUint64(latch.maxCommitTS, commitTS) | ||
if !latch.hasMoreWaiting { | ||
latch.free() | ||
return | ||
} | ||
latch.waitingQueueHead, latch.hasMoreWaiting = latches.popFromWaitingQueue(slotID) | ||
return true, latch.waitingQueueHead | ||
} | ||
|
||
func (latches *Latches) popFromWaitingQueue(slotID int) (front uint64, hasMoreWaiting bool) { | ||
latches.Lock() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can avoid lock here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. no, we do not know if the slot has a wating queue in |
||
defer latches.Unlock() | ||
waiting := latches.waitingQueues[slotID] | ||
front = waiting[0] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about if waiting queue is empty? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about waiting is empty? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It will return on L158 |
||
if len(waiting) == 1 { | ||
delete(latches.waitingQueues, slotID) | ||
} else { | ||
latches.waitingQueues[slotID] = waiting[1:] | ||
hasMoreWaiting = true | ||
} | ||
return | ||
} | ||
|
||
func (latches *Latches) acquireSlot(slotID int, startTS uint64) (success, stale bool) { | ||
latch := &latches.slots[slotID] | ||
latch.Lock() | ||
defer latch.Unlock() | ||
if stale = latch.maxCommitTS > startTS; stale { | ||
return | ||
} | ||
// Empty latch | ||
if !latch.occupied() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. acquired := latch.aquire(start_ts) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So we check stale outside latch while do acquire in latch? I do not think is a good style. |
||
latch.waitingQueueHead = startTS | ||
} | ||
if success = latch.waitingQueueHead == startTS; success { | ||
return | ||
} | ||
// push current transaction into waitingQueue | ||
latch.hasMoreWaiting = true | ||
latches.Lock() | ||
defer latches.Unlock() | ||
latches.waitingQueues[slotID] = append(latches.waitingQueues[slotID], startTS) | ||
return | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,173 @@ | ||
// Copyright 2018 PingCAP, Inc. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package latch | ||
|
||
import ( | ||
"sync" | ||
"sync/atomic" | ||
"testing" | ||
|
||
. "github.com/pingcap/check" | ||
) | ||
|
||
func TestT(t *testing.T) { | ||
TestingT(t) | ||
} | ||
|
||
var _ = Suite(&testLatchSuite{}) | ||
|
||
var baseTso uint64 | ||
|
||
type testLatchSuite struct { | ||
latches *Latches | ||
} | ||
|
||
func (s *testLatchSuite) SetUpTest(c *C) { | ||
s.latches = NewLatches(256) | ||
} | ||
|
||
func (s *testLatchSuite) newLock(keys [][]byte) (startTS uint64, lock Lock) { | ||
startTS = getTso() | ||
lock = s.latches.GenLock(startTS, keys) | ||
return | ||
} | ||
|
||
func getTso() uint64 { | ||
return atomic.AddUint64(&baseTso, uint64(1)) | ||
} | ||
|
||
func (s *testLatchSuite) TestWakeUp(c *C) { | ||
keysA := [][]byte{ | ||
[]byte("a"), []byte("b"), []byte("c"), []byte("c")} | ||
_, lockA := s.newLock(keysA) | ||
|
||
keysB := [][]byte{[]byte("d"), []byte("e"), []byte("a"), []byte("c")} | ||
startTSB, lockB := s.newLock(keysB) | ||
|
||
// A acquire lock success. | ||
acquired, stale := s.latches.Acquire(&lockA) | ||
c.Assert(stale, IsFalse) | ||
c.Assert(acquired, IsTrue) | ||
|
||
// B acquire lock failed. | ||
acquired, stale = s.latches.Acquire(&lockB) | ||
c.Assert(stale, IsFalse) | ||
c.Assert(acquired, IsFalse) | ||
|
||
// A release lock, and get wakeup list. | ||
commitTSA := getTso() | ||
wakeupList := s.latches.Release(&lockA, commitTSA) | ||
c.Assert(wakeupList[0], Equals, startTSB) | ||
|
||
// B acquire failed since startTSB has stale for some keys. | ||
acquired, stale = s.latches.Acquire(&lockB) | ||
c.Assert(stale, IsTrue) | ||
c.Assert(acquired, IsFalse) | ||
|
||
// B release lock since it received a stale. | ||
wakeupList = s.latches.Release(&lockB, 0) | ||
c.Assert(len(wakeupList), Equals, 0) | ||
|
||
// B restart:get a new startTS. | ||
startTSB = getTso() | ||
lockB = s.latches.GenLock(startTSB, keysB) | ||
acquired, stale = s.latches.Acquire(&lockB) | ||
c.Assert(acquired, IsTrue) | ||
c.Assert(stale, IsFalse) | ||
} | ||
|
||
type txn struct { | ||
keys [][]byte | ||
startTS uint64 | ||
lock Lock | ||
} | ||
|
||
func newTxn(keys [][]byte, startTS uint64, lock Lock) txn { | ||
return txn{ | ||
keys: keys, | ||
startTS: startTS, | ||
lock: lock, | ||
} | ||
} | ||
|
||
type txnScheduler struct { | ||
txns map[uint64]*txn | ||
latches *Latches | ||
lock sync.Mutex | ||
wait *sync.WaitGroup | ||
} | ||
|
||
func newTxnScheduler(wait *sync.WaitGroup, latches *Latches) *txnScheduler { | ||
return &txnScheduler{ | ||
txns: make(map[uint64]*txn), | ||
latches: latches, | ||
wait: wait, | ||
} | ||
} | ||
|
||
func (store *txnScheduler) runTxn(startTS uint64) { | ||
store.lock.Lock() | ||
txn, ok := store.txns[startTS] | ||
store.lock.Unlock() | ||
if !ok { | ||
panic(startTS) | ||
} | ||
acquired, stale := store.latches.Acquire(&txn.lock) | ||
|
||
if !stale && !acquired { | ||
return | ||
} | ||
commitTs := uint64(0) | ||
if stale { | ||
// restart Txn | ||
go store.newTxn(txn.keys) | ||
} else { | ||
// DO commit | ||
commitTs = getTso() | ||
store.wait.Done() | ||
} | ||
wakeupList := store.latches.Release(&txn.lock, commitTs) | ||
for _, s := range wakeupList { | ||
go store.runTxn(s) | ||
} | ||
store.lock.Lock() | ||
delete(store.txns, startTS) | ||
store.lock.Unlock() | ||
} | ||
|
||
func (store *txnScheduler) newTxn(keys [][]byte) { | ||
startTS := getTso() | ||
lock := store.latches.GenLock(startTS, keys) | ||
t := newTxn(keys, startTS, lock) | ||
store.lock.Lock() | ||
store.txns[t.startTS] = &t | ||
store.lock.Unlock() | ||
go store.runTxn(t.startTS) | ||
} | ||
|
||
func (s *testLatchSuite) TestWithConcurrency(c *C) { | ||
waitGroup := sync.WaitGroup{} | ||
txns := [][][]byte{ | ||
{[]byte("a"), []byte("a"), []byte("b"), []byte("c")}, | ||
{[]byte("a"), []byte("d"), []byte("e"), []byte("f")}, | ||
{[]byte("e"), []byte("f"), []byte("g"), []byte("h")}, | ||
} | ||
|
||
store := newTxnScheduler(&waitGroup, s.latches) | ||
waitGroup.Add(len(txns)) | ||
for _, txn := range txns { | ||
go store.newTxn(txn) | ||
} | ||
waitGroup.Wait() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This maybe easier to read.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tiancaiamao What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, @coocood is right! it's easier to read, you can change this way.
As for me, I'd always like the C-style, for example:
i := 0; i < len(); i++
style instead ofrange
It seems clumsy but less likely to went wrong.