Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store/tikv: implement local latch for transaction #6268

Merged
merged 16 commits into from
Apr 16, 2018
202 changes: 202 additions & 0 deletions store/tikv/latch/latch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package latch

import (
"fmt"
"math/bits"
"sort"
"sync"

"github.com/cznic/mathutil"
"github.com/spaolacci/murmur3"
)

// Latch stores a key's waiting transactions information.
type Latch struct {
// Whether there is any transaction in waitingQueue except head.
hasMoreWaiting bool
// The startTS of the transaction which is the head of waiting transactions.
waitingQueueHead uint64
maxCommitTS uint64
sync.Mutex
}

func (l *Latch) occupied() bool {
return l.waitingQueueHead != 0
}

func (l *Latch) free() {
l.waitingQueueHead = 0
}

// Lock is the locks' information required for a transaction.
type Lock struct {
// The slot IDs of the latches(keys) that a startTS must acquire before being able to processed.
requiredSlots []int
// The number of latches that the transaction has acquired.
acquiredCount int
// Whether current transaction is waiting
isWaiting bool
// Current transaction's startTS.
startTS uint64
}

// NewLock creates a new lock.
func NewLock(startTS uint64, requiredSlots []int) Lock {
return Lock{
requiredSlots: requiredSlots,
acquiredCount: 0,
isWaiting: false,
startTS: startTS,
}
}

// Latches which are used for concurrency control.
// Each latch is indexed by a slot's ID, hence the term latch and slot are used in interchangeable,
// but conceptually a latch is a queue, and a slot is an index to the queue
type Latches struct {
slots []Latch
// The waiting queue for each slot(slotID => slice of startTS).
waitingQueues map[int][]uint64
sync.RWMutex
}

// NewLatches create a Latches with fixed length,
// the size will be rounded up to the power of 2.
func NewLatches(size int) *Latches {
powerOfTwoSize := 1 << uint32(bits.Len32(uint32(size-1)))
slots := make([]Latch, powerOfTwoSize)
return &Latches{
slots: slots,
waitingQueues: make(map[int][]uint64),
}
}

// GenLock generates Lock for the transaction with startTS and keys.
func (latches *Latches) GenLock(startTS uint64, keys [][]byte) Lock {
slots := make([]int, 0, len(keys))
for _, key := range keys {
slots = append(slots, latches.slotID(key))
}
sort.Ints(slots)
if len(slots) <= 1 {
return NewLock(startTS, slots)
}
dedup := slots[:1]
for i := 1; i < len(slots); i++ {
if slots[i] != slots[i-1] {
dedup = append(dedup, slots[i])
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This maybe easier to read.

	dedup := slots[:1]
	for i := 1; i < len(slots); i++ {
		if slots[i] != slots[i-1] {
			dedup = append(dedup, slots[i])
		}
	}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tiancaiamao What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, @coocood is right! it's easier to read, you can change this way.

if len(slots) == 0 {
   return NewLock(startTS, nil)
}

As for me, I'd always like the C-style, for example:

  • don't let append do the implicit work
  • manual malloc when resize a slice
  • prefer using i := 0; i < len(); i++ style instead of range

It seems clumsy but less likely to went wrong.

return NewLock(startTS, dedup)
}

// hash return slotID for current key.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

slotID returns ...

func (latches *Latches) slotID(key []byte) int {
return int(murmur3.Sum32(key)) & (len(latches.slots) - 1)
}

// Acquire tries to acquire the lock for a transaction.
// It returns with stale = true when the transaction is stale(
// when the lock.startTS is smaller than any key's last commitTS).
func (latches *Latches) Acquire(lock *Lock) (success, stale bool) {
for lock.acquiredCount < len(lock.requiredSlots) {
slotID := lock.requiredSlots[lock.acquiredCount]
success, stale = latches.acquireSlot(slotID, lock.startTS)
if success {
lock.acquiredCount++
lock.isWaiting = false
continue
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should not continue here.
success and stale may both be true.

Copy link
Member

@coocood coocood Apr 13, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if !stale {
    continue
} else {
    return
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the definition of acquiredCount and success, it's ok to continue here.

}
if !stale {
lock.isWaiting = true
}
return
}
return
}

// Release releases all latches owned by the `lock` and returns the wakeup list.
// Preconditions: the caller must ensure the transaction is at the front of the latches.
func (latches *Latches) Release(lock *Lock, commitTS uint64) (wakeupList []uint64) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What the commitTs will be when release a uncommitted txn?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will be zero.

releaseCount := lock.acquiredCount
if lock.isWaiting {
releaseCount++
}
wakeupList = make([]uint64, 0, releaseCount)
for id := 0; id < releaseCount; id++ {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

id -> i

slotID := lock.requiredSlots[id]

if hasNext, nextStartTS := latches.releaseSlot(slotID, lock.startTS, commitTS); hasNext {
wakeupList = append(wakeupList, nextStartTS)
}
}
return
}

func (latches *Latches) releaseSlot(slotID int, startTS, commitTS uint64) (hasNext bool, nextStartTS uint64) {
latch := &latches.slots[slotID]
latch.Lock()
defer latch.Unlock()
if startTS != latch.waitingQueueHead {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When release a Lock which is in waiting status will cause panic here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We panic here since it should never happen.

panic(fmt.Sprintf("invalid front ts %d, latch:%#v", startTS, latch))
}
latch.maxCommitTS = mathutil.MaxUint64(latch.maxCommitTS, commitTS)
if !latch.hasMoreWaiting {
latch.free()
return
}

// pop next transaction from the waitingQueue.
latches.Lock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can avoid lock here.
Check the slot first, if the slot doesn't have waiting queue, we don't need to lock.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, we do not know if the slot has a wating queue in Latch

waiting := latches.waitingQueues[slotID]
hasNext = true
nextStartTS = waiting[0]
if len(waiting) == 1 {
delete(latches.waitingQueues, slotID)
latch.hasMoreWaiting = false
} else {
latches.waitingQueues[slotID] = waiting[1:]
}
latches.Unlock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about extract L162-173 to a method?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about extract L162-173 as a method?

latch.waitingQueueHead = nextStartTS
return
}

func (latches *Latches) acquireSlot(slotID int, startTS uint64) (success, stale bool) {
latch := &latches.slots[slotID]
latch.Lock()
defer latch.Unlock()
if stale = latch.maxCommitTS > startTS; stale {
return
}
// Empty latch
if !latch.occupied() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

acquired := latch.aquire(start_ts)
if acquired {
return
}
...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we check stale outside latch while do acquire in latch? I do not think is a good style.
if we do this, I think we should also implement latch.isStale(startTS), latch.setHasWaiting(), and so on.

latch.waitingQueueHead = startTS
}
if success = latch.waitingQueueHead == startTS; success {
return
}
// push current transaction into waitingQueue
latch.hasMoreWaiting = true
latches.Lock()
defer latches.Unlock()
if waitingQueue, ok := latches.waitingQueues[slotID]; ok {
latches.waitingQueues[slotID] = append(waitingQueue, startTS)
} else {
latches.waitingQueues[slotID] = []uint64{startTS}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can simplify it to latches.waitingQueues[slotID] = append(latches.waitingQueues[slotID], startTS)

return
}
173 changes: 173 additions & 0 deletions store/tikv/latch/latch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package latch

import (
"sync"
"sync/atomic"
"testing"

. "github.com/pingcap/check"
)

func TestT(t *testing.T) {
TestingT(t)
}

var _ = Suite(&testLatchSuite{})

var baseTso uint64

type testLatchSuite struct {
latches *Latches
}

func (s *testLatchSuite) SetUpTest(c *C) {
s.latches = NewLatches(256)
}

func (s *testLatchSuite) newLock(keys [][]byte) (startTS uint64, lock Lock) {
startTS = getTso()
lock = s.latches.GenLock(startTS, keys)
return
}

func getTso() uint64 {
return atomic.AddUint64(&baseTso, uint64(1))
}

func (s *testLatchSuite) TestWakeUp(c *C) {
keysA := [][]byte{
[]byte("a"), []byte("b"), []byte("c"), []byte("c")}
_, lockA := s.newLock(keysA)

keysB := [][]byte{[]byte("d"), []byte("e"), []byte("a"), []byte("c")}
startTSB, lockB := s.newLock(keysB)

// A acquire lock success.
acquired, stale := s.latches.Acquire(&lockA)
c.Assert(stale, IsFalse)
c.Assert(acquired, IsTrue)

// B acquire lock failed.
acquired, stale = s.latches.Acquire(&lockB)
c.Assert(stale, IsFalse)
c.Assert(acquired, IsFalse)

// A release lock, and get wakeup list.
commitTSA := getTso()
wakeupList := s.latches.Release(&lockA, commitTSA)
c.Assert(wakeupList[0], Equals, startTSB)

// B acquire failed since startTSB has stale for some keys.
acquired, stale = s.latches.Acquire(&lockB)
c.Assert(stale, IsTrue)
c.Assert(acquired, IsFalse)

// B release lock since it received a stale.
wakeupList = s.latches.Release(&lockB, 0)
c.Assert(len(wakeupList), Equals, 0)

// B restart:get a new startTS.
startTSB = getTso()
lockB = s.latches.GenLock(startTSB, keysB)
acquired, stale = s.latches.Acquire(&lockB)
c.Assert(acquired, IsTrue)
c.Assert(stale, IsFalse)
}

type txn struct {
keys [][]byte
startTS uint64
lock Lock
}

func newTxn(keys [][]byte, startTS uint64, lock Lock) txn {
return txn{
keys: keys,
startTS: startTS,
lock: lock,
}
}

type txnScheduler struct {
txns map[uint64]*txn
latches *Latches
lock sync.Mutex
wait *sync.WaitGroup
}

func newTxnScheduler(wait *sync.WaitGroup, latches *Latches) *txnScheduler {
return &txnScheduler{
txns: make(map[uint64]*txn),
latches: latches,
wait: wait,
}
}

func (store *txnScheduler) runTxn(startTS uint64) {
store.lock.Lock()
txn, ok := store.txns[startTS]
store.lock.Unlock()
if !ok {
panic(startTS)
}
acquired, stale := store.latches.Acquire(&txn.lock)

if !stale && !acquired {
return
}
commitTs := uint64(0)
if stale {
// restart Txn
go store.newTxn(txn.keys)
} else {
// DO commit
commitTs = getTso()
store.wait.Done()
}
wakeupList := store.latches.Release(&txn.lock, commitTs)
for _, s := range wakeupList {
go store.runTxn(s)
}
store.lock.Lock()
delete(store.txns, startTS)
store.lock.Unlock()
}

func (store *txnScheduler) newTxn(keys [][]byte) {
startTS := getTso()
lock := store.latches.GenLock(startTS, keys)
t := newTxn(keys, startTS, lock)
store.lock.Lock()
store.txns[t.startTS] = &t
store.lock.Unlock()
go store.runTxn(t.startTS)
}

func (s *testLatchSuite) TestWithConcurrency(c *C) {
waitGroup := sync.WaitGroup{}
txns := [][][]byte{
{[]byte("a"), []byte("a"), []byte("b"), []byte("c")},
{[]byte("a"), []byte("d"), []byte("e"), []byte("f")},
{[]byte("e"), []byte("f"), []byte("g"), []byte("h")},
}

store := newTxnScheduler(&waitGroup, s.latches)
waitGroup.Add(len(txns))
for _, txn := range txns {
go store.newTxn(txn)
}
waitGroup.Wait()
}