Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store/tikv: implement local latch for transaction #6268

Merged
merged 16 commits into from
Apr 16, 2018
199 changes: 199 additions & 0 deletions store/tikv/latch/latch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package latch

import (
"fmt"
"math/bits"
"sort"
"sync"

"github.com/cznic/mathutil"
"github.com/spaolacci/murmur3"
)

// Latch stores a key's waiting transactions information.
type Latch struct {
// Whether there is any transaction in waitingQueue except head.
hasMoreWaiting bool
// The startTS of the transaction which is the head of waiting transactions.
waitingQueueHead uint64
maxCommitTS uint64
sync.Mutex
}

func (l *Latch) occupied() bool {
return l.waitingQueueHead != 0
}

func (l *Latch) free() {
l.waitingQueueHead = 0
}

// Lock is the locks' information required for a transaction.
type Lock struct {
// The slot IDs of the latches(keys) that a startTS must acquire before being able to processed.
requiredSlots []int
// The number of latches that the transaction has acquired.
acquiredCount int
// Whether current transaction is waiting
isWaiting bool
// Current transaction's startTS.
startTS uint64
}

// NewLock creates a new lock.
func NewLock(startTS uint64, requiredSlots []int) Lock {
return Lock{
requiredSlots: requiredSlots,
acquiredCount: 0,
isWaiting: false,
startTS: startTS,
}
}

// Latches which are used for concurrency control.
// Each latch is indexed by a slot's ID, hence the term latch and slot are used in interchangeable,
// but conceptually a latch is a queue, and a slot is an index to the queue
type Latches struct {
slots []Latch
// The waiting queue for each slot(slotID => slice of startTS).
waitingQueues map[int][]uint64
sync.RWMutex
}

// NewLatches create a Latches with fixed length,
// the size will be rounded up to the power of 2.
func NewLatches(size int) *Latches {
powerOfTwoSize := 1 << uint32(bits.Len32(uint32(size-1)))
slots := make([]Latch, powerOfTwoSize)
return &Latches{
slots: slots,
waitingQueues: make(map[int][]uint64),
}
}

// GenLock generates Lock for the transaction with startTS and keys.
func (latches *Latches) GenLock(startTS uint64, keys [][]byte) Lock {
slots := make([]int, 0, len(keys))
for _, key := range keys {
slots = append(slots, latches.slotID(key))
}
sort.Ints(slots)
if len(slots) <= 1 {
return NewLock(startTS, slots)
}
dedup := slots[:1]
for i := 1; i < len(slots); i++ {
if slots[i] != slots[i-1] {
dedup = append(dedup, slots[i])
}
}
return NewLock(startTS, dedup)
}

// slotID return slotID for current key.
func (latches *Latches) slotID(key []byte) int {
return int(murmur3.Sum32(key)) & (len(latches.slots) - 1)
}

// Acquire tries to acquire the lock for a transaction.
// It returns with stale = true when the transaction is stale(
// when the lock.startTS is smaller than any key's last commitTS).
func (latches *Latches) Acquire(lock *Lock) (success, stale bool) {
for lock.acquiredCount < len(lock.requiredSlots) {
slotID := lock.requiredSlots[lock.acquiredCount]
success, stale = latches.acquireSlot(slotID, lock.startTS)
if success {
lock.acquiredCount++
lock.isWaiting = false
continue
}
if !stale {
lock.isWaiting = true
}
return
}
return
}

// Release releases all latches owned by the `lock` and returns the wakeup list.
// Preconditions: the caller must ensure the transaction is at the front of the latches.
func (latches *Latches) Release(lock *Lock, commitTS uint64) (wakeupList []uint64) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What the commitTs will be when release a uncommitted txn?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will be zero.

releaseCount := lock.acquiredCount
if lock.isWaiting {
releaseCount++
}
wakeupList = make([]uint64, 0, releaseCount)
for i := 0; i < releaseCount; i++ {
slotID := lock.requiredSlots[i]

if hasNext, nextStartTS := latches.releaseSlot(slotID, lock.startTS, commitTS); hasNext {
wakeupList = append(wakeupList, nextStartTS)
}
}
return
}

func (latches *Latches) releaseSlot(slotID int, startTS, commitTS uint64) (hasNext bool, nextStartTS uint64) {
latch := &latches.slots[slotID]
latch.Lock()
defer latch.Unlock()
if startTS != latch.waitingQueueHead {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When release a Lock which is in waiting status will cause panic here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We panic here since it should never happen.

panic(fmt.Sprintf("invalid front ts %d, latch:%#v", startTS, latch))
}
latch.maxCommitTS = mathutil.MaxUint64(latch.maxCommitTS, commitTS)
if !latch.hasMoreWaiting {
latch.free()
return
}
latch.waitingQueueHead, latch.hasMoreWaiting = latches.popFromWaitingQueue(slotID)
return true, latch.waitingQueueHead
}

func (latches *Latches) popFromWaitingQueue(slotID int) (front uint64, hasMoreWaiting bool) {
latches.Lock()
defer latches.Unlock()
waiting := latches.waitingQueues[slotID]
front = waiting[0]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about waiting is empty?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will return on L158

if len(waiting) == 1 {
delete(latches.waitingQueues, slotID)
} else {
latches.waitingQueues[slotID] = waiting[1:]
hasMoreWaiting = true
}
return
}

func (latches *Latches) acquireSlot(slotID int, startTS uint64) (success, stale bool) {
latch := &latches.slots[slotID]
latch.Lock()
defer latch.Unlock()
if stale = latch.maxCommitTS > startTS; stale {
return
}
// Empty latch
if !latch.occupied() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

acquired := latch.aquire(start_ts)
if acquired {
return
}
...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we check stale outside latch while do acquire in latch? I do not think is a good style.
if we do this, I think we should also implement latch.isStale(startTS), latch.setHasWaiting(), and so on.

latch.waitingQueueHead = startTS
}
if success = latch.waitingQueueHead == startTS; success {
return
}
// push current transaction into waitingQueue
latch.hasMoreWaiting = true
latches.Lock()
defer latches.Unlock()
latches.waitingQueues[slotID] = append(latches.waitingQueues[slotID], startTS)
return
}
173 changes: 173 additions & 0 deletions store/tikv/latch/latch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package latch

import (
"sync"
"sync/atomic"
"testing"

. "github.com/pingcap/check"
)

func TestT(t *testing.T) {
TestingT(t)
}

var _ = Suite(&testLatchSuite{})

var baseTso uint64

type testLatchSuite struct {
latches *Latches
}

func (s *testLatchSuite) SetUpTest(c *C) {
s.latches = NewLatches(256)
}

func (s *testLatchSuite) newLock(keys [][]byte) (startTS uint64, lock Lock) {
startTS = getTso()
lock = s.latches.GenLock(startTS, keys)
return
}

func getTso() uint64 {
return atomic.AddUint64(&baseTso, uint64(1))
}

func (s *testLatchSuite) TestWakeUp(c *C) {
keysA := [][]byte{
[]byte("a"), []byte("b"), []byte("c"), []byte("c")}
_, lockA := s.newLock(keysA)

keysB := [][]byte{[]byte("d"), []byte("e"), []byte("a"), []byte("c")}
startTSB, lockB := s.newLock(keysB)

// A acquire lock success.
acquired, stale := s.latches.Acquire(&lockA)
c.Assert(stale, IsFalse)
c.Assert(acquired, IsTrue)

// B acquire lock failed.
acquired, stale = s.latches.Acquire(&lockB)
c.Assert(stale, IsFalse)
c.Assert(acquired, IsFalse)

// A release lock, and get wakeup list.
commitTSA := getTso()
wakeupList := s.latches.Release(&lockA, commitTSA)
c.Assert(wakeupList[0], Equals, startTSB)

// B acquire failed since startTSB has stale for some keys.
acquired, stale = s.latches.Acquire(&lockB)
c.Assert(stale, IsTrue)
c.Assert(acquired, IsFalse)

// B release lock since it received a stale.
wakeupList = s.latches.Release(&lockB, 0)
c.Assert(len(wakeupList), Equals, 0)

// B restart:get a new startTS.
startTSB = getTso()
lockB = s.latches.GenLock(startTSB, keysB)
acquired, stale = s.latches.Acquire(&lockB)
c.Assert(acquired, IsTrue)
c.Assert(stale, IsFalse)
}

type txn struct {
keys [][]byte
startTS uint64
lock Lock
}

func newTxn(keys [][]byte, startTS uint64, lock Lock) txn {
return txn{
keys: keys,
startTS: startTS,
lock: lock,
}
}

type txnScheduler struct {
txns map[uint64]*txn
latches *Latches
lock sync.Mutex
wait *sync.WaitGroup
}

func newTxnScheduler(wait *sync.WaitGroup, latches *Latches) *txnScheduler {
return &txnScheduler{
txns: make(map[uint64]*txn),
latches: latches,
wait: wait,
}
}

func (store *txnScheduler) runTxn(startTS uint64) {
store.lock.Lock()
txn, ok := store.txns[startTS]
store.lock.Unlock()
if !ok {
panic(startTS)
}
acquired, stale := store.latches.Acquire(&txn.lock)

if !stale && !acquired {
return
}
commitTs := uint64(0)
if stale {
// restart Txn
go store.newTxn(txn.keys)
} else {
// DO commit
commitTs = getTso()
store.wait.Done()
}
wakeupList := store.latches.Release(&txn.lock, commitTs)
for _, s := range wakeupList {
go store.runTxn(s)
}
store.lock.Lock()
delete(store.txns, startTS)
store.lock.Unlock()
}

func (store *txnScheduler) newTxn(keys [][]byte) {
startTS := getTso()
lock := store.latches.GenLock(startTS, keys)
t := newTxn(keys, startTS, lock)
store.lock.Lock()
store.txns[t.startTS] = &t
store.lock.Unlock()
go store.runTxn(t.startTS)
}

func (s *testLatchSuite) TestWithConcurrency(c *C) {
waitGroup := sync.WaitGroup{}
txns := [][][]byte{
{[]byte("a"), []byte("a"), []byte("b"), []byte("c")},
{[]byte("a"), []byte("d"), []byte("e"), []byte("f")},
{[]byte("e"), []byte("f"), []byte("g"), []byte("h")},
}

store := newTxnScheduler(&waitGroup, s.latches)
waitGroup.Add(len(txns))
for _, txn := range txns {
go store.newTxn(txn)
}
waitGroup.Wait()
}