Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store/tikv: avoid holding write lock for long time #6880

Merged
merged 4 commits into from
Jun 25, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions store/mockstore/mocktikv/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,14 +230,14 @@ func (c *Cluster) GetRegionByID(regionID uint64) (*metapb.Region, *metapb.Peer)

// Bootstrap creates the first Region. The Stores should be in the Cluster before
// bootstrap.
func (c *Cluster) Bootstrap(regionID uint64, storeIDs, peerIDs []uint64, leaderStoreID uint64) {
func (c *Cluster) Bootstrap(regionID uint64, storeIDs, peerIDs []uint64, leaderPeerID uint64) {
c.Lock()
defer c.Unlock()

if len(storeIDs) != len(peerIDs) {
panic("len(storeIDs) != len(peerIDs)")
}
c.regions[regionID] = newRegion(regionID, storeIDs, peerIDs, leaderStoreID)
c.regions[regionID] = newRegion(regionID, storeIDs, peerIDs, leaderPeerID)
}

// AddPeer adds a new Peer for the Region on the Store.
Expand All @@ -259,11 +259,11 @@ func (c *Cluster) RemovePeer(regionID, storeID uint64) {

// ChangeLeader sets the Region's leader Peer. Caller should guarantee the Peer
// exists.
func (c *Cluster) ChangeLeader(regionID, leaderStoreID uint64) {
func (c *Cluster) ChangeLeader(regionID, leaderPeerID uint64) {
c.Lock()
defer c.Unlock()

c.regions[regionID].changeLeader(leaderStoreID)
c.regions[regionID].changeLeader(leaderPeerID)
}

// GiveUpLeader sets the Region's leader to 0. The Region will have no leader
Expand Down Expand Up @@ -464,8 +464,8 @@ func (r *Region) removePeer(peerID uint64) {
r.incConfVer()
}

func (r *Region) changeLeader(leaderStoreID uint64) {
r.leader = leaderStoreID
func (r *Region) changeLeader(leaderID uint64) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the ID of a peer or store?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

peer, it was mistakenly name leaderStoreID.

r.leader = leaderID
}

func (r *Region) leaderPeer() *metapb.Peer {
Expand Down
70 changes: 27 additions & 43 deletions store/tikv/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,33 +449,38 @@ func (c *RegionCache) loadStoreAddr(bo *Backoffer, id uint64) (string, error) {
}
}

// OnRequestFail is used for clearing cache when a tikv server does not respond.
func (c *RegionCache) OnRequestFail(ctx *RPCContext, err error) {
// Switch region's leader peer to next one.
regionID := ctx.Region
// DropStoreOnSendRequestFail is used for clearing cache when a tikv server does not respond.
func (c *RegionCache) DropStoreOnSendRequestFail(ctx *RPCContext, err error) {
// We need to drop the store only when the request is the first one failed on this store.
// Because too many concurrently requests trying to drop the store will be blocked on the lock.
failedRegionID := ctx.Region
failedStoreID := ctx.Peer.StoreId
c.mu.Lock()
if cachedregion, ok := c.mu.regions[regionID]; ok {
region := cachedregion.region
if !region.OnRequestFail(ctx.Peer.GetStoreId()) {
c.dropRegionFromCache(regionID)
}
_, ok := c.mu.regions[failedRegionID]
if !ok {
// The failed region is dropped already by another request, we don't need to iterate the regions
// and find regions on the failed store to drop.
c.mu.Unlock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use defer to unlock?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we can do something out of lock at the end of this function.

return
}
c.mu.Unlock()
// Store's meta may be out of date.
storeID := ctx.Peer.GetStoreId()
c.storeMu.Lock()
delete(c.storeMu.stores, storeID)
c.storeMu.Unlock()

log.Infof("drop regions of store %d from cache due to request fail, err: %v", storeID, err)

c.mu.Lock()
for id, r := range c.mu.regions {
if r.region.peer.GetStoreId() == storeID {
if r.region.peer.GetStoreId() == failedStoreID {
c.dropRegionFromCache(id)
}
}
c.mu.Unlock()

// Store's meta may be out of date.
var failedStoreAddr string
c.storeMu.Lock()
store, ok := c.storeMu.stores[failedStoreID]
if ok {
failedStoreAddr = store.Addr
delete(c.storeMu.stores, failedStoreID)
}
c.storeMu.Unlock()
log.Infof("drop regions that on the store %d(%s) due to send request fail, err: %v",
failedStoreID, failedStoreAddr, err)
}

// OnRegionStale removes the old region and inserts new regions into the cache.
Expand Down Expand Up @@ -531,9 +536,8 @@ func (item *btreeItem) Less(other btree.Item) bool {

// Region stores region's meta and its leader peer.
type Region struct {
meta *metapb.Region
peer *metapb.Peer
unreachableStores []uint64
meta *metapb.Region
peer *metapb.Peer
}

// GetID returns id.
Expand Down Expand Up @@ -581,26 +585,6 @@ func (r *Region) GetContext() *kvrpcpb.Context {
}
}

// OnRequestFail records unreachable peer and tries to select another valid peer.
// It returns false if all peers are unreachable.
func (r *Region) OnRequestFail(storeID uint64) bool {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why remove this? Is this logic useless or moved to another place?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's useless.

Copy link
Contributor

@disksing disksing Jun 25, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are some considerations for using unreachable store list.

Consider a store is down, another peer of the region becomes the leader, but somehow the new leader is not able to send heartbeat to PD in time.

With the unreachable store list, tidb can try the other peers automatically. Otherwise, it will continue to reconnect the down tikv until timeout.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@disksing
I know, but we drop all other regions in the store due to send request failure, keep an unreachable list for only one region doesn't make any difference.

if r.peer.GetStoreId() != storeID {
return true
}
r.unreachableStores = append(r.unreachableStores, storeID)
L:
for _, p := range r.meta.Peers {
for _, id := range r.unreachableStores {
if p.GetStoreId() == id {
continue L
}
}
r.peer = p
return true
}
return false
}

// SwitchPeer switches current peer to the one on specific store. It returns
// false if no peer matches the storeID.
func (r *Region) SwitchPeer(storeID uint64) bool {
Expand Down
101 changes: 88 additions & 13 deletions store/tikv/region_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package tikv
import (
"errors"
"fmt"
"testing"
"time"

. "github.com/pingcap/check"
Expand Down Expand Up @@ -124,7 +125,6 @@ func (s *testRegionCacheSuite) TestUpdateLeader(c *C) {
r := s.getRegion(c, []byte("a"))
c.Assert(r, NotNil)
c.Assert(r.GetID(), Equals, s.region1)
c.Assert(r.unreachableStores, HasLen, 0)
c.Assert(s.getAddr(c, []byte("a")), Equals, s.storeAddr(s.store2))
}

Expand All @@ -143,7 +143,6 @@ func (s *testRegionCacheSuite) TestUpdateLeader2(c *C) {
r := s.getRegion(c, []byte("a"))
c.Assert(r, NotNil)
c.Assert(r.GetID(), Equals, s.region1)
c.Assert(r.unreachableStores, HasLen, 0)
c.Assert(s.getAddr(c, []byte("a")), Equals, s.storeAddr(s.store1))

// tikv-server notifies new leader to pd-server.
Expand All @@ -153,7 +152,6 @@ func (s *testRegionCacheSuite) TestUpdateLeader2(c *C) {
r = s.getRegion(c, []byte("a"))
c.Assert(r, NotNil)
c.Assert(r.GetID(), Equals, s.region1)
c.Assert(r.unreachableStores, HasLen, 0)
c.Assert(s.getAddr(c, []byte("a")), Equals, s.storeAddr(store3))
}

Expand Down Expand Up @@ -182,7 +180,6 @@ func (s *testRegionCacheSuite) TestUpdateLeader3(c *C) {
c.Assert(addr, Equals, "")
r = s.getRegion(c, []byte("a"))
// pd-server should return the new leader.
c.Assert(r.unreachableStores, HasLen, 0)
c.Assert(s.getAddr(c, []byte("a")), Equals, s.storeAddr(store3))
}

Expand Down Expand Up @@ -245,18 +242,17 @@ func (s *testRegionCacheSuite) TestReconnect(c *C) {

func (s *testRegionCacheSuite) TestRequestFail(c *C) {
region := s.getRegion(c, []byte("a"))
c.Assert(region.unreachableStores, HasLen, 0)

ctx, _ := s.cache.GetRPCContext(s.bo, region.VerID())
s.cache.OnRequestFail(ctx, errors.New("test error"))
s.cache.DropStoreOnSendRequestFail(ctx, errors.New("test error"))
c.Assert(s.cache.mu.regions, HasLen, 0)
region = s.getRegion(c, []byte("a"))
c.Assert(region.unreachableStores, DeepEquals, []uint64{s.store1})

c.Assert(s.cache.mu.regions, HasLen, 1)
ctx, _ = s.cache.GetRPCContext(s.bo, region.VerID())
s.cache.OnRequestFail(ctx, errors.New("test error"))
s.cache.DropStoreOnSendRequestFail(ctx, errors.New("test error"))
c.Assert(len(s.cache.mu.regions), Equals, 0)
region = s.getRegion(c, []byte("a"))
// Out of range of Peers, so get Region again and pick Stores[0] as leader.
c.Assert(region.unreachableStores, HasLen, 0)
c.Assert(s.cache.mu.regions, HasLen, 1)
}

func (s *testRegionCacheSuite) TestRequestFail2(c *C) {
Expand All @@ -277,11 +273,58 @@ func (s *testRegionCacheSuite) TestRequestFail2(c *C) {
ctx, _ := s.cache.GetRPCContext(s.bo, loc1.Region)
c.Assert(s.cache.storeMu.stores, HasLen, 1)
s.checkCache(c, 2)
s.cache.OnRequestFail(ctx, errors.New("test error"))
s.cache.DropStoreOnSendRequestFail(ctx, errors.New("test error"))
// Both region2 and store should be dropped from cache.
c.Assert(s.cache.storeMu.stores, HasLen, 0)
c.Assert(s.cache.searchCachedRegion([]byte("x")), IsNil)
s.checkCache(c, 1)
s.checkCache(c, 0)
}

func (s *testRegionCacheSuite) TestDropStoreOnSendRequestFail(c *C) {
regionCnt := 999
cluster := createClusterWithStoresAndRegions(regionCnt)

cache := NewRegionCache(mocktikv.NewPDClient(cluster))
loadRegionsToCache(cache, regionCnt)
c.Assert(len(cache.mu.regions), Equals, regionCnt)

bo := NewBackoffer(context.Background(), 1)
loc, err := cache.LocateKey(bo, []byte{})
c.Assert(err, IsNil)

// Drop the regions on one store, should drop only 1/3 of the regions.
rpcCtx, err := cache.GetRPCContext(bo, loc.Region)
c.Assert(err, IsNil)
cache.DropStoreOnSendRequestFail(rpcCtx, errors.New("test error"))
c.Assert(len(cache.mu.regions), Equals, regionCnt*2/3)

loadRegionsToCache(cache, regionCnt)
c.Assert(len(cache.mu.regions), Equals, regionCnt)
}

const regionSplitKeyFormat = "t%08d"

func createClusterWithStoresAndRegions(regionCnt int) *mocktikv.Cluster {
cluster := mocktikv.NewCluster()
_, _, regionID, _ := mocktikv.BootstrapWithMultiStores(cluster, 3)
for i := 0; i < regionCnt; i++ {
rawKey := []byte(fmt.Sprintf(regionSplitKeyFormat, i))
ids := cluster.AllocIDs(4)
// Make leaders equally distributed on the 3 stores.
storeID := ids[0]
peerIDs := ids[1:]
leaderPeerID := peerIDs[i%3]
cluster.SplitRaw(regionID, storeID, rawKey, peerIDs, leaderPeerID)
regionID = ids[0]
}
return cluster
}

func loadRegionsToCache(cache *RegionCache, regionCnt int) {
for i := 0; i < regionCnt; i++ {
rawKey := []byte(fmt.Sprintf(regionSplitKeyFormat, i))
cache.LocateKey(NewBackoffer(context.Background(), 1), rawKey)
}
}

func (s *testRegionCacheSuite) TestUpdateStoreAddr(c *C) {
Expand Down Expand Up @@ -325,3 +368,35 @@ func (s *testRegionCacheSuite) TestListRegionIDsInCache(c *C) {
c.Assert(err, IsNil)
c.Assert(regionIDs, DeepEquals, []uint64{s.region1, region2})
}

func BenchmarkOnRequestFail(b *testing.B) {
/*
This benchmark simulate many concurrent requests call DropStoreOnSendRequestFail method
after failed on a store, validate that on this scene, requests don't get blocked on the
RegionCache lock.
*/
regionCnt := 999
cluster := createClusterWithStoresAndRegions(regionCnt)
cache := NewRegionCache(mocktikv.NewPDClient(cluster))
loadRegionsToCache(cache, regionCnt)
bo := NewBackoffer(context.Background(), 1)
loc, err := cache.LocateKey(bo, []byte{})
if err != nil {
b.Fatal(err)
}
region := cache.getRegionByIDFromCache(loc.Region.id)
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
rpcCtx := &RPCContext{
Region: loc.Region,
Meta: region.meta,
Peer: region.peer,
}
cache.DropStoreOnSendRequestFail(rpcCtx, nil)
}
})
if len(cache.mu.regions) != regionCnt*2/3 {
b.Fatal(len(cache.mu.regions))
}
}
4 changes: 2 additions & 2 deletions store/tikv/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,9 @@ func (s *RegionRequestSender) onSendFail(bo *Backoffer, ctx *RPCContext, err err
}
}

s.regionCache.OnRequestFail(ctx, err)
s.regionCache.DropStoreOnSendRequestFail(ctx, err)

// Retry on request failure when it's not canceled.
// Retry on send request failure when it's not canceled.
// When a store is not available, the leader of related region should be elected quickly.
// TODO: the number of retry time should be limited:since region may be unavailable
// when some unrecoverable disaster happened.
Expand Down