From 958eed54b93f6f676d03f7289bff45be175bc7f0 Mon Sep 17 00:00:00 2001 From: wuxuelian Date: Wed, 11 Apr 2018 17:26:34 +0800 Subject: [PATCH 01/11] store/tikv: implement local latch for transaction --- store/tikv/latch/latch.go | 162 ++++++++++++++++++++++++++++++ store/tikv/latch/latch_test.go | 173 +++++++++++++++++++++++++++++++++ 2 files changed, 335 insertions(+) create mode 100644 store/tikv/latch/latch.go create mode 100644 store/tikv/latch/latch_test.go diff --git a/store/tikv/latch/latch.go b/store/tikv/latch/latch.go new file mode 100644 index 0000000000000..9400af72be137 --- /dev/null +++ b/store/tikv/latch/latch.go @@ -0,0 +1,162 @@ +// Copyright 2018 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package latch + +import ( + "fmt" + "math" + "sort" + "sync" + + "github.com/spaolacci/murmur3" +) + +// Latch stores a key's waiting transactions. +type Latch struct { + // A queue by startTs of those waiting transactions. + waiting []uint64 + lastCommitTs uint64 + sync.Mutex +} + +// acquire tries to get current key's lock for the transaction with startTS. +// acquire is true when success +// timeout is true when the startTS is already timeout +// newWait is true when current transaction is new for the current latch +func (l *Latch) acquire(startTS uint64) (acquire, timeout, newWait bool) { + l.Lock() + defer l.Unlock() + timeout = startTS <= l.lastCommitTs + if timeout { + return + } + if len(l.waiting) == 0 || l.waiting[0] != startTS { + l.waiting = append(l.waiting, startTS) + newWait = true + } + + acquire = l.waiting[0] == startTS + return +} + +// release releases the transaction with startTS and commitTS from current latch. +// isEmpty is true when the waiting queue is empty after release current transaction, +// otherwise return the front transaction in queue. +func (l *Latch) release(startTS uint64, commitTS uint64) (isEmpty bool, front uint64) { + l.Lock() + defer l.Unlock() + if startTS != l.waiting[0] { + panic(fmt.Sprintf("invalid front ts %d, latch:%+v", startTS, l)) + } + if commitTS > l.lastCommitTs { + l.lastCommitTs = commitTS + } + l.waiting = l.waiting[1:] + if len(l.waiting) == 0 { + isEmpty = true + } else { + front = l.waiting[0] + isEmpty = false + } + return +} + +// Lock is the locks' information required for a transaction. +type Lock struct { + // The slot IDs of the latches(keys) that a startTS must acquire before being able to processed. + requiredSlots []int + // The number of latches that the transaction has acquired. + acquiredCount int + // The number of latches whose waiting queue contains current transaction. + waitedCount int + // Current transaction's startTS. + startTS uint64 +} + +// NewLock creates a new lock. +func NewLock(startTS uint64, requiredSlots []int) Lock { + return Lock{ + requiredSlots: requiredSlots, + acquiredCount: 0, + waitedCount: 0, + startTS: startTS, + } +} + +// Latches which are used for concurrency control. +// Each latch is indexed by a slot's ID, hence the term latch and slot are used in interchangeable, +// but conceptually a latch is a queue, and a slot is an index to the queue +type Latches []Latch + +// NewLatches create a Latches with fixed length, +// the size will be rounded up to the power of 2. +func NewLatches(size int) Latches { + powerOfTwoSize := 1 << uint(math.Ceil(math.Log2(float64(size)))) + latches := make([]Latch, powerOfTwoSize, powerOfTwoSize) + return latches +} + +// GenLock generates Lock for the transaction with startTS and keys. +func (latches Latches) GenLock(startTS uint64, keys [][]byte) Lock { + hashes := make(map[int]bool) + for _, key := range keys { + hashes[latches.hash(key)] = true + } + slots := make([]int, 0, len(hashes)) + for key := range hashes { + slots = append(slots, key) + } + sort.Ints(slots) + return NewLock(startTS, slots) +} + +// hash return hash int for current key. +func (latches Latches) hash(key []byte) int { + return int(murmur3.Sum32(key)) & (len(latches) - 1) +} + +// Acquire tries to acquire the lock for a transaction. +// It returns with timeout = true when the transaction is timeout( +// when the lock.startTS is smaller than any key's last commitTS). +// It returns with acquired = true when acquire success and the transaction +// is ready to commit. +func (latches Latches) Acquire(lock *Lock) (acquired, timeout bool) { + var newWait bool + for lock.acquiredCount < len(lock.requiredSlots) { + slotID := lock.requiredSlots[lock.acquiredCount] + acquired, timeout, newWait = latches[slotID].acquire(lock.startTS) + if newWait { + lock.waitedCount++ + } + if timeout || !acquired { + return + } + lock.acquiredCount++ + } + return +} + +// Release releases all latches owned by the `lock` and returns the wakeup list. +// Preconditions: the caller must ensure the transaction is at the front of the latches. +func (latches Latches) Release(lock *Lock, commitTS uint64) (wakeupList []uint64) { + wakeupList = make([]uint64, 0, lock.waitedCount) + for id := 0; id < lock.waitedCount; id++ { + slotID := lock.requiredSlots[id] + isEmpty, front := latches[slotID].release(lock.startTS, commitTS) + if !isEmpty { + wakeupList = append(wakeupList, front) + } + } + return +} diff --git a/store/tikv/latch/latch_test.go b/store/tikv/latch/latch_test.go new file mode 100644 index 0000000000000..0005e18c1daa6 --- /dev/null +++ b/store/tikv/latch/latch_test.go @@ -0,0 +1,173 @@ +// Copyright 2018 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package latch + +import ( + "sync" + "sync/atomic" + "testing" + + . "github.com/pingcap/check" +) + +func TestT(t *testing.T) { + TestingT(t) +} + +var _ = Suite(&testLatchSuite{}) + +var baseTso uint64 + +type testLatchSuite struct { + latches Latches +} + +func (s *testLatchSuite) SetUpTest(c *C) { + s.latches = NewLatches(256) +} + +func (s *testLatchSuite) newLock(keys [][]byte) (startTS uint64, lock Lock) { + startTS = getTso() + lock = s.latches.GenLock(startTS, keys) + return +} + +func getTso() uint64 { + return atomic.AddUint64(&baseTso, uint64(1)) +} + +func (s *testLatchSuite) TestWakeUp(c *C) { + keysA := [][]byte{ + []byte("a"), []byte("a"), []byte("b"), []byte("c")} + _, lockA := s.newLock(keysA) + + keysB := [][]byte{[]byte("d"), []byte("e"), []byte("a"), []byte("c")} + startTSB, lockB := s.newLock(keysB) + + //A acquire lock success + acquired, timeout := s.latches.Acquire(&lockA) + c.Assert(timeout, IsFalse) + c.Assert(acquired, IsTrue) + + // B acquire lock failed + acquired, timeout = s.latches.Acquire(&lockB) + c.Assert(timeout, IsFalse) + c.Assert(acquired, IsFalse) + + // A release lock, and get wakeup list + commitTSA := getTso() + wakeupList := s.latches.Release(&lockA, commitTSA) + c.Assert(wakeupList[0], Equals, startTSB) + + // B acquire failed since startTSB has timeout for some keys + acquired, timeout = s.latches.Acquire(&lockB) + c.Assert(timeout, IsTrue) + c.Assert(acquired, IsFalse) + + // B release lock since it received a timeout + wakeupList = s.latches.Release(&lockB, 0) + c.Assert(len(wakeupList), Equals, 0) + + // B restart:get a new startTSo + startTSB = getTso() + lockB = s.latches.GenLock(startTSB, keysB) + acquired, timeout = s.latches.Acquire(&lockB) + c.Assert(acquired, IsTrue) + c.Assert(timeout, IsFalse) +} + +type txn struct { + keys [][]byte + startTS uint64 + lock Lock +} + +func newTxn(keys [][]byte, startTS uint64, lock Lock) txn { + return txn{ + keys: keys, + startTS: startTS, + lock: lock, + } +} + +type txnScheduler struct { + txns map[uint64]*txn + latches Latches + lock sync.Mutex + wait *sync.WaitGroup +} + +func newTxnScheduler(wait *sync.WaitGroup, latches Latches) *txnScheduler { + return &txnScheduler{ + txns: make(map[uint64]*txn), + latches: latches, + wait: wait, + } +} + +func (store *txnScheduler) runTxn(startTS uint64) { + store.lock.Lock() + txn, ok := store.txns[startTS] + store.lock.Unlock() + if !ok { + panic(startTS) + } + acquired, timeout := store.latches.Acquire(&txn.lock) + + if !timeout && !acquired { + return + } + commitTs := uint64(0) + if timeout { + // restart Txn + go store.newTxn(txn.keys) + } else { + // DO commit + commitTs = getTso() + store.wait.Done() + } + wakeupList := store.latches.Release(&txn.lock, commitTs) + for _, s := range wakeupList { + go store.runTxn(s) + } + store.lock.Lock() + delete(store.txns, startTS) + store.lock.Unlock() +} + +func (store *txnScheduler) newTxn(keys [][]byte) { + startTS := getTso() + lock := store.latches.GenLock(startTS, keys) + t := newTxn(keys, startTS, lock) + store.lock.Lock() + defer store.lock.Unlock() + store.txns[t.startTS] = &t + go store.runTxn(t.startTS) +} + +func (s *testLatchSuite) TestWithConcurrency(c *C) { + waitGroup := sync.WaitGroup{} + txns := [][][]byte{ + {[]byte("a"), []byte("a"), []byte("b"), []byte("c")}, + {[]byte("a"), []byte("d"), []byte("e"), []byte("f")}, + {[]byte("e"), []byte("f"), []byte("g"), []byte("h")}, + } + + store := newTxnScheduler(&waitGroup, s.latches) + waitGroup.Add(len(txns)) + for _, txn := range txns { + go store.newTxn(txn) + } + waitGroup.Wait() +} From 13fd547ce44c4d945581d041940fe480e6556504 Mon Sep 17 00:00:00 2001 From: wuxuelian Date: Wed, 11 Apr 2018 17:29:21 +0800 Subject: [PATCH 02/11] tikv/latch_test: update comments --- store/tikv/latch/latch_test.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/store/tikv/latch/latch_test.go b/store/tikv/latch/latch_test.go index 0005e18c1daa6..19c03e886d98f 100644 --- a/store/tikv/latch/latch_test.go +++ b/store/tikv/latch/latch_test.go @@ -55,31 +55,31 @@ func (s *testLatchSuite) TestWakeUp(c *C) { keysB := [][]byte{[]byte("d"), []byte("e"), []byte("a"), []byte("c")} startTSB, lockB := s.newLock(keysB) - //A acquire lock success + // A acquire lock success. acquired, timeout := s.latches.Acquire(&lockA) c.Assert(timeout, IsFalse) c.Assert(acquired, IsTrue) - // B acquire lock failed + // B acquire lock failed. acquired, timeout = s.latches.Acquire(&lockB) c.Assert(timeout, IsFalse) c.Assert(acquired, IsFalse) - // A release lock, and get wakeup list + // A release lock, and get wakeup list. commitTSA := getTso() wakeupList := s.latches.Release(&lockA, commitTSA) c.Assert(wakeupList[0], Equals, startTSB) - // B acquire failed since startTSB has timeout for some keys + // B acquire failed since startTSB has timeout for some keys. acquired, timeout = s.latches.Acquire(&lockB) c.Assert(timeout, IsTrue) c.Assert(acquired, IsFalse) - // B release lock since it received a timeout + // B release lock since it received a timeout. wakeupList = s.latches.Release(&lockB, 0) c.Assert(len(wakeupList), Equals, 0) - // B restart:get a new startTSo + // B restart:get a new startTS. startTSB = getTso() lockB = s.latches.GenLock(startTSB, keysB) acquired, timeout = s.latches.Acquire(&lockB) From 9c0f9d285e600ecf07b5e74fb984c3b7770bfd1b Mon Sep 17 00:00:00 2001 From: wuxuelian Date: Thu, 12 Apr 2018 17:28:06 +0800 Subject: [PATCH 03/11] address comments --- store/tikv/latch/latch.go | 146 ++++++++++++++++++++------------- store/tikv/latch/latch_test.go | 8 +- 2 files changed, 95 insertions(+), 59 deletions(-) diff --git a/store/tikv/latch/latch.go b/store/tikv/latch/latch.go index 9400af72be137..d92776616a424 100644 --- a/store/tikv/latch/latch.go +++ b/store/tikv/latch/latch.go @@ -15,61 +15,56 @@ package latch import ( "fmt" - "math" + "math/bits" "sort" "sync" "github.com/spaolacci/murmur3" ) -// Latch stores a key's waiting transactions. +// Latch stores a key's waiting transactions information. type Latch struct { - // A queue by startTs of those waiting transactions. - waiting []uint64 - lastCommitTs uint64 + hasWaiting bool + // The startTS of the transaction which is the head of waiting transactions. + head uint64 + maxCommitTS uint64 sync.Mutex } // acquire tries to get current key's lock for the transaction with startTS. -// acquire is true when success +// success is true when success // timeout is true when the startTS is already timeout // newWait is true when current transaction is new for the current latch -func (l *Latch) acquire(startTS uint64) (acquire, timeout, newWait bool) { +func (l *Latch) acquire(startTS uint64) (success, timeout, newWait bool) { l.Lock() defer l.Unlock() - timeout = startTS <= l.lastCommitTs - if timeout { + + if timeout = startTS <= l.maxCommitTS; timeout { return } - if len(l.waiting) == 0 || l.waiting[0] != startTS { - l.waiting = append(l.waiting, startTS) + + if l.hasWaiting == false { + l.head = startTS + l.hasWaiting = true newWait = true } - - acquire = l.waiting[0] == startTS + success = l.head == startTS return } -// release releases the transaction with startTS and commitTS from current latch. -// isEmpty is true when the waiting queue is empty after release current transaction, -// otherwise return the front transaction in queue. -func (l *Latch) release(startTS uint64, commitTS uint64) (isEmpty bool, front uint64) { +// release releases the transaction with startTS and commitTS from current latch, +// and set the next transaction to head if hasNext is true. +func (l *Latch) release(startTS, commitTS uint64, hasNext bool, nextStartTS uint64) { l.Lock() defer l.Unlock() - if startTS != l.waiting[0] { - panic(fmt.Sprintf("invalid front ts %d, latch:%+v", startTS, l)) + if startTS != l.head { + panic(fmt.Sprintf("invalid front ts %d, latch:%#v", startTS, l)) } - if commitTS > l.lastCommitTs { - l.lastCommitTs = commitTS + if commitTS > l.maxCommitTS { + l.maxCommitTS = commitTS } - l.waiting = l.waiting[1:] - if len(l.waiting) == 0 { - isEmpty = true - } else { - front = l.waiting[0] - isEmpty = false - } - return + l.hasWaiting = hasNext + l.head = nextStartTS } // Lock is the locks' information required for a transaction. @@ -97,49 +92,58 @@ func NewLock(startTS uint64, requiredSlots []int) Lock { // Latches which are used for concurrency control. // Each latch is indexed by a slot's ID, hence the term latch and slot are used in interchangeable, // but conceptually a latch is a queue, and a slot is an index to the queue -type Latches []Latch +type Latches struct { + slots []Latch + // The waiting queue for each slot(slotID => slice of startTS). + waitingQueue map[int][]uint64 + sync.RWMutex +} // NewLatches create a Latches with fixed length, // the size will be rounded up to the power of 2. -func NewLatches(size int) Latches { - powerOfTwoSize := 1 << uint(math.Ceil(math.Log2(float64(size)))) - latches := make([]Latch, powerOfTwoSize, powerOfTwoSize) - return latches +func NewLatches(size int) *Latches { + powerOfTwoSize := 1 << uint32(bits.Len32(uint32(size-1))) + slots := make([]Latch, powerOfTwoSize) + return &Latches{ + slots: slots, + waitingQueue: make(map[int][]uint64), + } } // GenLock generates Lock for the transaction with startTS and keys. -func (latches Latches) GenLock(startTS uint64, keys [][]byte) Lock { - hashes := make(map[int]bool) +func (latches *Latches) GenLock(startTS uint64, keys [][]byte) Lock { + slots := make([]int, 0, len(keys)) for _, key := range keys { - hashes[latches.hash(key)] = true - } - slots := make([]int, 0, len(hashes)) - for key := range hashes { - slots = append(slots, key) + slots = append(slots, latches.hash(key)) } sort.Ints(slots) - return NewLock(startTS, slots) + size := 0 + for _, v := range slots { + if size == 0 || slots[size-1] != v { + slots[size] = v + size++ + } + } + return NewLock(startTS, slots[0:size]) } // hash return hash int for current key. -func (latches Latches) hash(key []byte) int { - return int(murmur3.Sum32(key)) & (len(latches) - 1) +func (latches *Latches) hash(key []byte) int { + return int(murmur3.Sum32(key)) & (len(latches.slots) - 1) } // Acquire tries to acquire the lock for a transaction. // It returns with timeout = true when the transaction is timeout( // when the lock.startTS is smaller than any key's last commitTS). -// It returns with acquired = true when acquire success and the transaction -// is ready to commit. -func (latches Latches) Acquire(lock *Lock) (acquired, timeout bool) { - var newWait bool +func (latches *Latches) Acquire(lock *Lock) (success, timeout bool) { + var new bool for lock.acquiredCount < len(lock.requiredSlots) { slotID := lock.requiredSlots[lock.acquiredCount] - acquired, timeout, newWait = latches[slotID].acquire(lock.startTS) - if newWait { + success, timeout, new = latches.acquireSlot(slotID, lock.startTS) + if new { lock.waitedCount++ } - if timeout || !acquired { + if timeout || !success { return } lock.acquiredCount++ @@ -149,14 +153,46 @@ func (latches Latches) Acquire(lock *Lock) (acquired, timeout bool) { // Release releases all latches owned by the `lock` and returns the wakeup list. // Preconditions: the caller must ensure the transaction is at the front of the latches. -func (latches Latches) Release(lock *Lock, commitTS uint64) (wakeupList []uint64) { +func (latches *Latches) Release(lock *Lock, commitTS uint64) (wakeupList []uint64) { wakeupList = make([]uint64, 0, lock.waitedCount) for id := 0; id < lock.waitedCount; id++ { slotID := lock.requiredSlots[id] - isEmpty, front := latches[slotID].release(lock.startTS, commitTS) - if !isEmpty { - wakeupList = append(wakeupList, front) + + if hasNext, nextStartTS := latches.releaseSlot(slotID, lock.startTS, commitTS); hasNext { + wakeupList = append(wakeupList, nextStartTS) } } return } + +func (latches *Latches) releaseSlot(slotID int, startTS, commitTS uint64) (hasNext bool, nextStartTS uint64) { + latches.Lock() + if waiting, ok := latches.waitingQueue[slotID]; ok { + hasNext = true + nextStartTS = waiting[0] + if len(waiting) == 1 { + delete(latches.waitingQueue, slotID) + } else { + latches.waitingQueue[slotID] = waiting[1:] + } + } + latches.Unlock() + latches.slots[slotID].release(startTS, commitTS, hasNext, nextStartTS) + return +} + +func (latches *Latches) acquireSlot(slotID int, startTS uint64) (success, timeout, new bool) { + success, timeout, new = latches.slots[slotID].acquire(startTS) + if success || timeout { + return + } + new = true + latches.Lock() + defer latches.Unlock() + if waiting, ok := latches.waitingQueue[slotID]; ok { + latches.waitingQueue[slotID] = append(waiting, startTS) + } else { + latches.waitingQueue[slotID] = []uint64{startTS} + } + return +} diff --git a/store/tikv/latch/latch_test.go b/store/tikv/latch/latch_test.go index 19c03e886d98f..e7b8c442a8bbb 100644 --- a/store/tikv/latch/latch_test.go +++ b/store/tikv/latch/latch_test.go @@ -30,7 +30,7 @@ var _ = Suite(&testLatchSuite{}) var baseTso uint64 type testLatchSuite struct { - latches Latches + latches *Latches } func (s *testLatchSuite) SetUpTest(c *C) { @@ -49,7 +49,7 @@ func getTso() uint64 { func (s *testLatchSuite) TestWakeUp(c *C) { keysA := [][]byte{ - []byte("a"), []byte("a"), []byte("b"), []byte("c")} + []byte("a"), []byte("b"), []byte("c"), []byte("c")} _, lockA := s.newLock(keysA) keysB := [][]byte{[]byte("d"), []byte("e"), []byte("a"), []byte("c")} @@ -103,12 +103,12 @@ func newTxn(keys [][]byte, startTS uint64, lock Lock) txn { type txnScheduler struct { txns map[uint64]*txn - latches Latches + latches *Latches lock sync.Mutex wait *sync.WaitGroup } -func newTxnScheduler(wait *sync.WaitGroup, latches Latches) *txnScheduler { +func newTxnScheduler(wait *sync.WaitGroup, latches *Latches) *txnScheduler { return &txnScheduler{ txns: make(map[uint64]*txn), latches: latches, From b87033b689e0cf286f27d917bc52e7e7ff609679 Mon Sep 17 00:00:00 2001 From: wuxuelian Date: Thu, 12 Apr 2018 21:21:40 +0800 Subject: [PATCH 04/11] address comments --- store/tikv/latch/latch.go | 53 ++++++++++++++++++---------------- store/tikv/latch/latch_test.go | 28 +++++++++--------- 2 files changed, 42 insertions(+), 39 deletions(-) diff --git a/store/tikv/latch/latch.go b/store/tikv/latch/latch.go index d92776616a424..3b590b524c80a 100644 --- a/store/tikv/latch/latch.go +++ b/store/tikv/latch/latch.go @@ -33,20 +33,19 @@ type Latch struct { // acquire tries to get current key's lock for the transaction with startTS. // success is true when success -// timeout is true when the startTS is already timeout +// stale is true when the startTS is already stale // newWait is true when current transaction is new for the current latch -func (l *Latch) acquire(startTS uint64) (success, timeout, newWait bool) { +func (l *Latch) acquire(startTS uint64) (success, stale bool) { l.Lock() defer l.Unlock() - if timeout = startTS <= l.maxCommitTS; timeout { + if stale = startTS <= l.maxCommitTS; stale { return } - if l.hasWaiting == false { + if !l.hasWaiting { l.head = startTS l.hasWaiting = true - newWait = true } success = l.head == startTS return @@ -73,8 +72,8 @@ type Lock struct { requiredSlots []int // The number of latches that the transaction has acquired. acquiredCount int - // The number of latches whose waiting queue contains current transaction. - waitedCount int + // Whether current transaction is waiting + waiting bool // Current transaction's startTS. startTS uint64 } @@ -84,7 +83,7 @@ func NewLock(startTS uint64, requiredSlots []int) Lock { return Lock{ requiredSlots: requiredSlots, acquiredCount: 0, - waitedCount: 0, + waiting: false, startTS: startTS, } } @@ -133,20 +132,21 @@ func (latches *Latches) hash(key []byte) int { } // Acquire tries to acquire the lock for a transaction. -// It returns with timeout = true when the transaction is timeout( +// It returns with stale = true when the transaction is stale( // when the lock.startTS is smaller than any key's last commitTS). -func (latches *Latches) Acquire(lock *Lock) (success, timeout bool) { - var new bool +func (latches *Latches) Acquire(lock *Lock) (success, stale bool) { for lock.acquiredCount < len(lock.requiredSlots) { slotID := lock.requiredSlots[lock.acquiredCount] - success, timeout, new = latches.acquireSlot(slotID, lock.startTS) - if new { - lock.waitedCount++ + success, stale = latches.acquireSlot(slotID, lock.startTS) + if success { + lock.acquiredCount++ + lock.waiting = false + continue } - if timeout || !success { - return + if !stale { + lock.waiting = true } - lock.acquiredCount++ + return } return } @@ -154,8 +154,12 @@ func (latches *Latches) Acquire(lock *Lock) (success, timeout bool) { // Release releases all latches owned by the `lock` and returns the wakeup list. // Preconditions: the caller must ensure the transaction is at the front of the latches. func (latches *Latches) Release(lock *Lock, commitTS uint64) (wakeupList []uint64) { - wakeupList = make([]uint64, 0, lock.waitedCount) - for id := 0; id < lock.waitedCount; id++ { + wakeupCount := lock.acquiredCount + if lock.waiting { + wakeupCount++ + } + wakeupList = make([]uint64, 0, wakeupCount) + for id := 0; id < wakeupCount; id++ { slotID := lock.requiredSlots[id] if hasNext, nextStartTS := latches.releaseSlot(slotID, lock.startTS, commitTS); hasNext { @@ -181,16 +185,15 @@ func (latches *Latches) releaseSlot(slotID int, startTS, commitTS uint64) (hasNe return } -func (latches *Latches) acquireSlot(slotID int, startTS uint64) (success, timeout, new bool) { - success, timeout, new = latches.slots[slotID].acquire(startTS) - if success || timeout { +func (latches *Latches) acquireSlot(slotID int, startTS uint64) (success, stale bool) { + success, stale = latches.slots[slotID].acquire(startTS) + if success || stale { return } - new = true latches.Lock() defer latches.Unlock() - if waiting, ok := latches.waitingQueue[slotID]; ok { - latches.waitingQueue[slotID] = append(waiting, startTS) + if waitingQueue, ok := latches.waitingQueue[slotID]; ok { + latches.waitingQueue[slotID] = append(waitingQueue, startTS) } else { latches.waitingQueue[slotID] = []uint64{startTS} } diff --git a/store/tikv/latch/latch_test.go b/store/tikv/latch/latch_test.go index e7b8c442a8bbb..ed2066a8a04fc 100644 --- a/store/tikv/latch/latch_test.go +++ b/store/tikv/latch/latch_test.go @@ -56,13 +56,13 @@ func (s *testLatchSuite) TestWakeUp(c *C) { startTSB, lockB := s.newLock(keysB) // A acquire lock success. - acquired, timeout := s.latches.Acquire(&lockA) - c.Assert(timeout, IsFalse) + acquired, stale := s.latches.Acquire(&lockA) + c.Assert(stale, IsFalse) c.Assert(acquired, IsTrue) // B acquire lock failed. - acquired, timeout = s.latches.Acquire(&lockB) - c.Assert(timeout, IsFalse) + acquired, stale = s.latches.Acquire(&lockB) + c.Assert(stale, IsFalse) c.Assert(acquired, IsFalse) // A release lock, and get wakeup list. @@ -70,21 +70,21 @@ func (s *testLatchSuite) TestWakeUp(c *C) { wakeupList := s.latches.Release(&lockA, commitTSA) c.Assert(wakeupList[0], Equals, startTSB) - // B acquire failed since startTSB has timeout for some keys. - acquired, timeout = s.latches.Acquire(&lockB) - c.Assert(timeout, IsTrue) + // B acquire failed since startTSB has stale for some keys. + acquired, stale = s.latches.Acquire(&lockB) + c.Assert(stale, IsTrue) c.Assert(acquired, IsFalse) - // B release lock since it received a timeout. + // B release lock since it received a stale. wakeupList = s.latches.Release(&lockB, 0) c.Assert(len(wakeupList), Equals, 0) // B restart:get a new startTS. startTSB = getTso() lockB = s.latches.GenLock(startTSB, keysB) - acquired, timeout = s.latches.Acquire(&lockB) + acquired, stale = s.latches.Acquire(&lockB) c.Assert(acquired, IsTrue) - c.Assert(timeout, IsFalse) + c.Assert(stale, IsFalse) } type txn struct { @@ -123,13 +123,13 @@ func (store *txnScheduler) runTxn(startTS uint64) { if !ok { panic(startTS) } - acquired, timeout := store.latches.Acquire(&txn.lock) + acquired, stale := store.latches.Acquire(&txn.lock) - if !timeout && !acquired { + if !stale && !acquired { return } commitTs := uint64(0) - if timeout { + if stale { // restart Txn go store.newTxn(txn.keys) } else { @@ -151,8 +151,8 @@ func (store *txnScheduler) newTxn(keys [][]byte) { lock := store.latches.GenLock(startTS, keys) t := newTxn(keys, startTS, lock) store.lock.Lock() - defer store.lock.Unlock() store.txns[t.startTS] = &t + store.lock.Unlock() go store.runTxn(t.startTS) } From fa10a54163e86a60f7209579b2f548fd4363198f Mon Sep 17 00:00:00 2001 From: wuxuelian Date: Thu, 12 Apr 2018 22:12:31 +0800 Subject: [PATCH 05/11] address comments --- store/tikv/latch/latch.go | 40 +++++++++++++++++++++++++++------------ 1 file changed, 28 insertions(+), 12 deletions(-) diff --git a/store/tikv/latch/latch.go b/store/tikv/latch/latch.go index 3b590b524c80a..10649c23e22b2 100644 --- a/store/tikv/latch/latch.go +++ b/store/tikv/latch/latch.go @@ -24,6 +24,7 @@ import ( // Latch stores a key's waiting transactions information. type Latch struct { + // Whether there is any transaction in waitingQueue except head. hasWaiting bool // The startTS of the transaction which is the head of waiting transactions. head uint64 @@ -32,9 +33,8 @@ type Latch struct { } // acquire tries to get current key's lock for the transaction with startTS. -// success is true when success -// stale is true when the startTS is already stale -// newWait is true when current transaction is new for the current latch +// success is true when success. +// stale is true when the startTS is already stale. func (l *Latch) acquire(startTS uint64) (success, stale bool) { l.Lock() defer l.Unlock() @@ -43,11 +43,13 @@ func (l *Latch) acquire(startTS uint64) (success, stale bool) { return } - if !l.hasWaiting { + if l.head == 0 { l.head = startTS - l.hasWaiting = true } success = l.head == startTS + if !success { + l.hasWaiting = true + } return } @@ -116,14 +118,16 @@ func (latches *Latches) GenLock(startTS uint64, keys [][]byte) Lock { slots = append(slots, latches.hash(key)) } sort.Ints(slots) - size := 0 - for _, v := range slots { - if size == 0 || slots[size-1] != v { - slots[size] = v - size++ + if len(slots) == 0 { + return NewLock(startTS, slots) + } + dedup := slots[:1] + for i := 1; i < len(slots); i++ { + if slots[i] != slots[i-1] { + dedup = append(dedup, slots[i]) } } - return NewLock(startTS, slots[0:size]) + return NewLock(startTS, dedup) } // hash return hash int for current key. @@ -170,18 +174,30 @@ func (latches *Latches) Release(lock *Lock, commitTS uint64) (wakeupList []uint6 } func (latches *Latches) releaseSlot(slotID int, startTS, commitTS uint64) (hasNext bool, nextStartTS uint64) { + latch := &latches.slots[slotID] + latch.Lock() + defer latch.Unlock() + if latch.maxCommitTS < commitTS { + latch.maxCommitTS = commitTS + } + if !latch.hasWaiting { + latch.head = 0 + return + } + latches.Lock() if waiting, ok := latches.waitingQueue[slotID]; ok { hasNext = true nextStartTS = waiting[0] if len(waiting) == 1 { delete(latches.waitingQueue, slotID) + latch.hasWaiting = false } else { latches.waitingQueue[slotID] = waiting[1:] } } latches.Unlock() - latches.slots[slotID].release(startTS, commitTS, hasNext, nextStartTS) + latch.head = nextStartTS return } From ed700f49808f0809878e7a3c5332d0705f75b4e0 Mon Sep 17 00:00:00 2001 From: wuxuelian Date: Fri, 13 Apr 2018 10:27:00 +0800 Subject: [PATCH 06/11] address comments --- store/tikv/latch/latch.go | 18 +++--------------- 1 file changed, 3 insertions(+), 15 deletions(-) diff --git a/store/tikv/latch/latch.go b/store/tikv/latch/latch.go index 10649c23e22b2..f5f65d8511e0f 100644 --- a/store/tikv/latch/latch.go +++ b/store/tikv/latch/latch.go @@ -53,21 +53,6 @@ func (l *Latch) acquire(startTS uint64) (success, stale bool) { return } -// release releases the transaction with startTS and commitTS from current latch, -// and set the next transaction to head if hasNext is true. -func (l *Latch) release(startTS, commitTS uint64, hasNext bool, nextStartTS uint64) { - l.Lock() - defer l.Unlock() - if startTS != l.head { - panic(fmt.Sprintf("invalid front ts %d, latch:%#v", startTS, l)) - } - if commitTS > l.maxCommitTS { - l.maxCommitTS = commitTS - } - l.hasWaiting = hasNext - l.head = nextStartTS -} - // Lock is the locks' information required for a transaction. type Lock struct { // The slot IDs of the latches(keys) that a startTS must acquire before being able to processed. @@ -177,6 +162,9 @@ func (latches *Latches) releaseSlot(slotID int, startTS, commitTS uint64) (hasNe latch := &latches.slots[slotID] latch.Lock() defer latch.Unlock() + if startTS != latch.head { + panic(fmt.Sprintf("invalid front ts %d, latch:%#v", startTS, latch)) + } if latch.maxCommitTS < commitTS { latch.maxCommitTS = commitTS } From 5488231ca45a7f549df339fe5ccbef140608c3f3 Mon Sep 17 00:00:00 2001 From: wuxuelian Date: Fri, 13 Apr 2018 10:40:05 +0800 Subject: [PATCH 07/11] address comments --- store/tikv/latch/latch.go | 36 +++++++++++++----------------------- 1 file changed, 13 insertions(+), 23 deletions(-) diff --git a/store/tikv/latch/latch.go b/store/tikv/latch/latch.go index f5f65d8511e0f..d70685ff812d3 100644 --- a/store/tikv/latch/latch.go +++ b/store/tikv/latch/latch.go @@ -32,27 +32,6 @@ type Latch struct { sync.Mutex } -// acquire tries to get current key's lock for the transaction with startTS. -// success is true when success. -// stale is true when the startTS is already stale. -func (l *Latch) acquire(startTS uint64) (success, stale bool) { - l.Lock() - defer l.Unlock() - - if stale = startTS <= l.maxCommitTS; stale { - return - } - - if l.head == 0 { - l.head = startTS - } - success = l.head == startTS - if !success { - l.hasWaiting = true - } - return -} - // Lock is the locks' information required for a transaction. type Lock struct { // The slot IDs of the latches(keys) that a startTS must acquire before being able to processed. @@ -190,10 +169,21 @@ func (latches *Latches) releaseSlot(slotID int, startTS, commitTS uint64) (hasNe } func (latches *Latches) acquireSlot(slotID int, startTS uint64) (success, stale bool) { - success, stale = latches.slots[slotID].acquire(startTS) - if success || stale { + latch := &latches.slots[slotID] + latch.Lock() + defer latch.Unlock() + if stale = latch.maxCommitTS > startTS; stale { + return + } + // Empty latch + if latch.head == 0 { + latch.head = startTS + } + if success = latch.head == startTS; success { return } + // push current transaction into waitingQueue + latch.hasWaiting = true latches.Lock() defer latches.Unlock() if waitingQueue, ok := latches.waitingQueue[slotID]; ok { From e71ffe2a8df39a10a7f752ce017b3cacccb4a5d2 Mon Sep 17 00:00:00 2001 From: wuxuelian Date: Fri, 13 Apr 2018 13:59:13 +0800 Subject: [PATCH 08/11] address comments --- store/tikv/latch/latch.go | 35 +++++++++++++++++------------------ 1 file changed, 17 insertions(+), 18 deletions(-) diff --git a/store/tikv/latch/latch.go b/store/tikv/latch/latch.go index d70685ff812d3..16ee487728a1f 100644 --- a/store/tikv/latch/latch.go +++ b/store/tikv/latch/latch.go @@ -27,8 +27,8 @@ type Latch struct { // Whether there is any transaction in waitingQueue except head. hasWaiting bool // The startTS of the transaction which is the head of waiting transactions. - head uint64 - maxCommitTS uint64 + waitingQueueHead uint64 + maxCommitTS uint64 sync.Mutex } @@ -82,7 +82,7 @@ func (latches *Latches) GenLock(startTS uint64, keys [][]byte) Lock { slots = append(slots, latches.hash(key)) } sort.Ints(slots) - if len(slots) == 0 { + if len(slots) <= 1 { return NewLock(startTS, slots) } dedup := slots[:1] @@ -141,30 +141,29 @@ func (latches *Latches) releaseSlot(slotID int, startTS, commitTS uint64) (hasNe latch := &latches.slots[slotID] latch.Lock() defer latch.Unlock() - if startTS != latch.head { + if startTS != latch.waitingQueueHead { panic(fmt.Sprintf("invalid front ts %d, latch:%#v", startTS, latch)) } if latch.maxCommitTS < commitTS { latch.maxCommitTS = commitTS } if !latch.hasWaiting { - latch.head = 0 + latch.waitingQueueHead = 0 return } latches.Lock() - if waiting, ok := latches.waitingQueue[slotID]; ok { - hasNext = true - nextStartTS = waiting[0] - if len(waiting) == 1 { - delete(latches.waitingQueue, slotID) - latch.hasWaiting = false - } else { - latches.waitingQueue[slotID] = waiting[1:] - } + waiting := latches.waitingQueue[slotID] + hasNext = true + nextStartTS = waiting[0] + if len(waiting) == 1 { + delete(latches.waitingQueue, slotID) + latch.hasWaiting = false + } else { + latches.waitingQueue[slotID] = waiting[1:] } latches.Unlock() - latch.head = nextStartTS + latch.waitingQueueHead = nextStartTS return } @@ -176,10 +175,10 @@ func (latches *Latches) acquireSlot(slotID int, startTS uint64) (success, stale return } // Empty latch - if latch.head == 0 { - latch.head = startTS + if latch.waitingQueueHead == 0 { + latch.waitingQueueHead = startTS } - if success = latch.head == startTS; success { + if success = latch.waitingQueueHead == startTS; success { return } // push current transaction into waitingQueue From c84c8de24455469988682fab29cc6c143c527207 Mon Sep 17 00:00:00 2001 From: wuxuelian Date: Fri, 13 Apr 2018 14:12:03 +0800 Subject: [PATCH 09/11] address comments --- store/tikv/latch/latch.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/store/tikv/latch/latch.go b/store/tikv/latch/latch.go index 16ee487728a1f..5f9180deeb494 100644 --- a/store/tikv/latch/latch.go +++ b/store/tikv/latch/latch.go @@ -32,6 +32,14 @@ type Latch struct { sync.Mutex } +func (l *Latch) occupied() bool { + return l.waitingQueueHead != 0 +} + +func (l *Latch) free() { + l.waitingQueueHead = 0 +} + // Lock is the locks' information required for a transaction. type Lock struct { // The slot IDs of the latches(keys) that a startTS must acquire before being able to processed. @@ -148,7 +156,7 @@ func (latches *Latches) releaseSlot(slotID int, startTS, commitTS uint64) (hasNe latch.maxCommitTS = commitTS } if !latch.hasWaiting { - latch.waitingQueueHead = 0 + latch.free() return } @@ -175,7 +183,7 @@ func (latches *Latches) acquireSlot(slotID int, startTS uint64) (success, stale return } // Empty latch - if latch.waitingQueueHead == 0 { + if !latch.occupied() { latch.waitingQueueHead = startTS } if success = latch.waitingQueueHead == startTS; success { From 58cabb8db24ab8eadde86a6a885a0da92ffb074e Mon Sep 17 00:00:00 2001 From: wuxuelian Date: Fri, 13 Apr 2018 19:39:31 +0800 Subject: [PATCH 10/11] address comments --- store/tikv/latch/latch.go | 56 +++++++++++++++++++-------------------- 1 file changed, 28 insertions(+), 28 deletions(-) diff --git a/store/tikv/latch/latch.go b/store/tikv/latch/latch.go index 5f9180deeb494..f45c4c7f26b51 100644 --- a/store/tikv/latch/latch.go +++ b/store/tikv/latch/latch.go @@ -19,13 +19,14 @@ import ( "sort" "sync" + "github.com/cznic/mathutil" "github.com/spaolacci/murmur3" ) // Latch stores a key's waiting transactions information. type Latch struct { // Whether there is any transaction in waitingQueue except head. - hasWaiting bool + hasMoreWaiting bool // The startTS of the transaction which is the head of waiting transactions. waitingQueueHead uint64 maxCommitTS uint64 @@ -47,7 +48,7 @@ type Lock struct { // The number of latches that the transaction has acquired. acquiredCount int // Whether current transaction is waiting - waiting bool + isWaiting bool // Current transaction's startTS. startTS uint64 } @@ -57,7 +58,7 @@ func NewLock(startTS uint64, requiredSlots []int) Lock { return Lock{ requiredSlots: requiredSlots, acquiredCount: 0, - waiting: false, + isWaiting: false, startTS: startTS, } } @@ -68,7 +69,7 @@ func NewLock(startTS uint64, requiredSlots []int) Lock { type Latches struct { slots []Latch // The waiting queue for each slot(slotID => slice of startTS). - waitingQueue map[int][]uint64 + waitingQueues map[int][]uint64 sync.RWMutex } @@ -78,8 +79,8 @@ func NewLatches(size int) *Latches { powerOfTwoSize := 1 << uint32(bits.Len32(uint32(size-1))) slots := make([]Latch, powerOfTwoSize) return &Latches{ - slots: slots, - waitingQueue: make(map[int][]uint64), + slots: slots, + waitingQueues: make(map[int][]uint64), } } @@ -87,7 +88,7 @@ func NewLatches(size int) *Latches { func (latches *Latches) GenLock(startTS uint64, keys [][]byte) Lock { slots := make([]int, 0, len(keys)) for _, key := range keys { - slots = append(slots, latches.hash(key)) + slots = append(slots, latches.slotID(key)) } sort.Ints(slots) if len(slots) <= 1 { @@ -102,8 +103,8 @@ func (latches *Latches) GenLock(startTS uint64, keys [][]byte) Lock { return NewLock(startTS, dedup) } -// hash return hash int for current key. -func (latches *Latches) hash(key []byte) int { +// hash return slotID for current key. +func (latches *Latches) slotID(key []byte) int { return int(murmur3.Sum32(key)) & (len(latches.slots) - 1) } @@ -116,11 +117,11 @@ func (latches *Latches) Acquire(lock *Lock) (success, stale bool) { success, stale = latches.acquireSlot(slotID, lock.startTS) if success { lock.acquiredCount++ - lock.waiting = false + lock.isWaiting = false continue } if !stale { - lock.waiting = true + lock.isWaiting = true } return } @@ -130,12 +131,12 @@ func (latches *Latches) Acquire(lock *Lock) (success, stale bool) { // Release releases all latches owned by the `lock` and returns the wakeup list. // Preconditions: the caller must ensure the transaction is at the front of the latches. func (latches *Latches) Release(lock *Lock, commitTS uint64) (wakeupList []uint64) { - wakeupCount := lock.acquiredCount - if lock.waiting { - wakeupCount++ + releaseCount := lock.acquiredCount + if lock.isWaiting { + releaseCount++ } - wakeupList = make([]uint64, 0, wakeupCount) - for id := 0; id < wakeupCount; id++ { + wakeupList = make([]uint64, 0, releaseCount) + for id := 0; id < releaseCount; id++ { slotID := lock.requiredSlots[id] if hasNext, nextStartTS := latches.releaseSlot(slotID, lock.startTS, commitTS); hasNext { @@ -152,23 +153,22 @@ func (latches *Latches) releaseSlot(slotID int, startTS, commitTS uint64) (hasNe if startTS != latch.waitingQueueHead { panic(fmt.Sprintf("invalid front ts %d, latch:%#v", startTS, latch)) } - if latch.maxCommitTS < commitTS { - latch.maxCommitTS = commitTS - } - if !latch.hasWaiting { + latch.maxCommitTS = mathutil.MaxUint64(latch.maxCommitTS, commitTS) + if !latch.hasMoreWaiting { latch.free() return } + // pop next transaction from the waitingQueue. latches.Lock() - waiting := latches.waitingQueue[slotID] + waiting := latches.waitingQueues[slotID] hasNext = true nextStartTS = waiting[0] if len(waiting) == 1 { - delete(latches.waitingQueue, slotID) - latch.hasWaiting = false + delete(latches.waitingQueues, slotID) + latch.hasMoreWaiting = false } else { - latches.waitingQueue[slotID] = waiting[1:] + latches.waitingQueues[slotID] = waiting[1:] } latches.Unlock() latch.waitingQueueHead = nextStartTS @@ -190,13 +190,13 @@ func (latches *Latches) acquireSlot(slotID int, startTS uint64) (success, stale return } // push current transaction into waitingQueue - latch.hasWaiting = true + latch.hasMoreWaiting = true latches.Lock() defer latches.Unlock() - if waitingQueue, ok := latches.waitingQueue[slotID]; ok { - latches.waitingQueue[slotID] = append(waitingQueue, startTS) + if waitingQueue, ok := latches.waitingQueues[slotID]; ok { + latches.waitingQueues[slotID] = append(waitingQueue, startTS) } else { - latches.waitingQueue[slotID] = []uint64{startTS} + latches.waitingQueues[slotID] = []uint64{startTS} } return } From 65bfe3fa74a6017febced28cd31c5582711cc699 Mon Sep 17 00:00:00 2001 From: wuxuelian Date: Mon, 16 Apr 2018 14:40:16 +0800 Subject: [PATCH 11/11] address comments --- store/tikv/latch/latch.go | 25 +++++++++++-------------- 1 file changed, 11 insertions(+), 14 deletions(-) diff --git a/store/tikv/latch/latch.go b/store/tikv/latch/latch.go index f45c4c7f26b51..5bcac1817c1c3 100644 --- a/store/tikv/latch/latch.go +++ b/store/tikv/latch/latch.go @@ -103,7 +103,7 @@ func (latches *Latches) GenLock(startTS uint64, keys [][]byte) Lock { return NewLock(startTS, dedup) } -// hash return slotID for current key. +// slotID return slotID for current key. func (latches *Latches) slotID(key []byte) int { return int(murmur3.Sum32(key)) & (len(latches.slots) - 1) } @@ -136,8 +136,8 @@ func (latches *Latches) Release(lock *Lock, commitTS uint64) (wakeupList []uint6 releaseCount++ } wakeupList = make([]uint64, 0, releaseCount) - for id := 0; id < releaseCount; id++ { - slotID := lock.requiredSlots[id] + for i := 0; i < releaseCount; i++ { + slotID := lock.requiredSlots[i] if hasNext, nextStartTS := latches.releaseSlot(slotID, lock.startTS, commitTS); hasNext { wakeupList = append(wakeupList, nextStartTS) @@ -158,20 +158,21 @@ func (latches *Latches) releaseSlot(slotID int, startTS, commitTS uint64) (hasNe latch.free() return } + latch.waitingQueueHead, latch.hasMoreWaiting = latches.popFromWaitingQueue(slotID) + return true, latch.waitingQueueHead +} - // pop next transaction from the waitingQueue. +func (latches *Latches) popFromWaitingQueue(slotID int) (front uint64, hasMoreWaiting bool) { latches.Lock() + defer latches.Unlock() waiting := latches.waitingQueues[slotID] - hasNext = true - nextStartTS = waiting[0] + front = waiting[0] if len(waiting) == 1 { delete(latches.waitingQueues, slotID) - latch.hasMoreWaiting = false } else { latches.waitingQueues[slotID] = waiting[1:] + hasMoreWaiting = true } - latches.Unlock() - latch.waitingQueueHead = nextStartTS return } @@ -193,10 +194,6 @@ func (latches *Latches) acquireSlot(slotID int, startTS uint64) (success, stale latch.hasMoreWaiting = true latches.Lock() defer latches.Unlock() - if waitingQueue, ok := latches.waitingQueues[slotID]; ok { - latches.waitingQueues[slotID] = append(waitingQueue, startTS) - } else { - latches.waitingQueues[slotID] = []uint64{startTS} - } + latches.waitingQueues[slotID] = append(latches.waitingQueues[slotID], startTS) return }