diff --git a/store/mockstore/mocktikv/cluster.go b/store/mockstore/mocktikv/cluster.go index e1e42108ad170..664fc7dc8bef1 100644 --- a/store/mockstore/mocktikv/cluster.go +++ b/store/mockstore/mocktikv/cluster.go @@ -230,14 +230,14 @@ func (c *Cluster) GetRegionByID(regionID uint64) (*metapb.Region, *metapb.Peer) // Bootstrap creates the first Region. The Stores should be in the Cluster before // bootstrap. -func (c *Cluster) Bootstrap(regionID uint64, storeIDs, peerIDs []uint64, leaderStoreID uint64) { +func (c *Cluster) Bootstrap(regionID uint64, storeIDs, peerIDs []uint64, leaderPeerID uint64) { c.Lock() defer c.Unlock() if len(storeIDs) != len(peerIDs) { panic("len(storeIDs) != len(peerIDs)") } - c.regions[regionID] = newRegion(regionID, storeIDs, peerIDs, leaderStoreID) + c.regions[regionID] = newRegion(regionID, storeIDs, peerIDs, leaderPeerID) } // AddPeer adds a new Peer for the Region on the Store. @@ -259,11 +259,11 @@ func (c *Cluster) RemovePeer(regionID, storeID uint64) { // ChangeLeader sets the Region's leader Peer. Caller should guarantee the Peer // exists. -func (c *Cluster) ChangeLeader(regionID, leaderStoreID uint64) { +func (c *Cluster) ChangeLeader(regionID, leaderPeerID uint64) { c.Lock() defer c.Unlock() - c.regions[regionID].changeLeader(leaderStoreID) + c.regions[regionID].changeLeader(leaderPeerID) } // GiveUpLeader sets the Region's leader to 0. The Region will have no leader @@ -464,8 +464,8 @@ func (r *Region) removePeer(peerID uint64) { r.incConfVer() } -func (r *Region) changeLeader(leaderStoreID uint64) { - r.leader = leaderStoreID +func (r *Region) changeLeader(leaderID uint64) { + r.leader = leaderID } func (r *Region) leaderPeer() *metapb.Peer { diff --git a/store/tikv/region_cache.go b/store/tikv/region_cache.go index b440b92dafd72..259a9aa3725fe 100644 --- a/store/tikv/region_cache.go +++ b/store/tikv/region_cache.go @@ -449,33 +449,38 @@ func (c *RegionCache) loadStoreAddr(bo *Backoffer, id uint64) (string, error) { } } -// OnRequestFail is used for clearing cache when a tikv server does not respond. -func (c *RegionCache) OnRequestFail(ctx *RPCContext, err error) { - // Switch region's leader peer to next one. - regionID := ctx.Region +// DropStoreOnSendRequestFail is used for clearing cache when a tikv server does not respond. +func (c *RegionCache) DropStoreOnSendRequestFail(ctx *RPCContext, err error) { + // We need to drop the store only when the request is the first one failed on this store. + // Because too many concurrently requests trying to drop the store will be blocked on the lock. + failedRegionID := ctx.Region + failedStoreID := ctx.Peer.StoreId c.mu.Lock() - if cachedregion, ok := c.mu.regions[regionID]; ok { - region := cachedregion.region - if !region.OnRequestFail(ctx.Peer.GetStoreId()) { - c.dropRegionFromCache(regionID) - } + _, ok := c.mu.regions[failedRegionID] + if !ok { + // The failed region is dropped already by another request, we don't need to iterate the regions + // and find regions on the failed store to drop. + c.mu.Unlock() + return } - c.mu.Unlock() - // Store's meta may be out of date. - storeID := ctx.Peer.GetStoreId() - c.storeMu.Lock() - delete(c.storeMu.stores, storeID) - c.storeMu.Unlock() - - log.Infof("drop regions of store %d from cache due to request fail, err: %v", storeID, err) - - c.mu.Lock() for id, r := range c.mu.regions { - if r.region.peer.GetStoreId() == storeID { + if r.region.peer.GetStoreId() == failedStoreID { c.dropRegionFromCache(id) } } c.mu.Unlock() + + // Store's meta may be out of date. + var failedStoreAddr string + c.storeMu.Lock() + store, ok := c.storeMu.stores[failedStoreID] + if ok { + failedStoreAddr = store.Addr + delete(c.storeMu.stores, failedStoreID) + } + c.storeMu.Unlock() + log.Infof("drop regions that on the store %d(%s) due to send request fail, err: %v", + failedStoreID, failedStoreAddr, err) } // OnRegionStale removes the old region and inserts new regions into the cache. @@ -531,9 +536,8 @@ func (item *btreeItem) Less(other btree.Item) bool { // Region stores region's meta and its leader peer. type Region struct { - meta *metapb.Region - peer *metapb.Peer - unreachableStores []uint64 + meta *metapb.Region + peer *metapb.Peer } // GetID returns id. @@ -581,26 +585,6 @@ func (r *Region) GetContext() *kvrpcpb.Context { } } -// OnRequestFail records unreachable peer and tries to select another valid peer. -// It returns false if all peers are unreachable. -func (r *Region) OnRequestFail(storeID uint64) bool { - if r.peer.GetStoreId() != storeID { - return true - } - r.unreachableStores = append(r.unreachableStores, storeID) -L: - for _, p := range r.meta.Peers { - for _, id := range r.unreachableStores { - if p.GetStoreId() == id { - continue L - } - } - r.peer = p - return true - } - return false -} - // SwitchPeer switches current peer to the one on specific store. It returns // false if no peer matches the storeID. func (r *Region) SwitchPeer(storeID uint64) bool { diff --git a/store/tikv/region_cache_test.go b/store/tikv/region_cache_test.go index 761c14239c74e..0af9f97282a21 100644 --- a/store/tikv/region_cache_test.go +++ b/store/tikv/region_cache_test.go @@ -16,6 +16,7 @@ package tikv import ( "errors" "fmt" + "testing" "time" . "github.com/pingcap/check" @@ -124,7 +125,6 @@ func (s *testRegionCacheSuite) TestUpdateLeader(c *C) { r := s.getRegion(c, []byte("a")) c.Assert(r, NotNil) c.Assert(r.GetID(), Equals, s.region1) - c.Assert(r.unreachableStores, HasLen, 0) c.Assert(s.getAddr(c, []byte("a")), Equals, s.storeAddr(s.store2)) } @@ -143,7 +143,6 @@ func (s *testRegionCacheSuite) TestUpdateLeader2(c *C) { r := s.getRegion(c, []byte("a")) c.Assert(r, NotNil) c.Assert(r.GetID(), Equals, s.region1) - c.Assert(r.unreachableStores, HasLen, 0) c.Assert(s.getAddr(c, []byte("a")), Equals, s.storeAddr(s.store1)) // tikv-server notifies new leader to pd-server. @@ -153,7 +152,6 @@ func (s *testRegionCacheSuite) TestUpdateLeader2(c *C) { r = s.getRegion(c, []byte("a")) c.Assert(r, NotNil) c.Assert(r.GetID(), Equals, s.region1) - c.Assert(r.unreachableStores, HasLen, 0) c.Assert(s.getAddr(c, []byte("a")), Equals, s.storeAddr(store3)) } @@ -182,7 +180,6 @@ func (s *testRegionCacheSuite) TestUpdateLeader3(c *C) { c.Assert(addr, Equals, "") r = s.getRegion(c, []byte("a")) // pd-server should return the new leader. - c.Assert(r.unreachableStores, HasLen, 0) c.Assert(s.getAddr(c, []byte("a")), Equals, s.storeAddr(store3)) } @@ -245,18 +242,17 @@ func (s *testRegionCacheSuite) TestReconnect(c *C) { func (s *testRegionCacheSuite) TestRequestFail(c *C) { region := s.getRegion(c, []byte("a")) - c.Assert(region.unreachableStores, HasLen, 0) ctx, _ := s.cache.GetRPCContext(s.bo, region.VerID()) - s.cache.OnRequestFail(ctx, errors.New("test error")) + s.cache.DropStoreOnSendRequestFail(ctx, errors.New("test error")) + c.Assert(s.cache.mu.regions, HasLen, 0) region = s.getRegion(c, []byte("a")) - c.Assert(region.unreachableStores, DeepEquals, []uint64{s.store1}) - + c.Assert(s.cache.mu.regions, HasLen, 1) ctx, _ = s.cache.GetRPCContext(s.bo, region.VerID()) - s.cache.OnRequestFail(ctx, errors.New("test error")) + s.cache.DropStoreOnSendRequestFail(ctx, errors.New("test error")) + c.Assert(len(s.cache.mu.regions), Equals, 0) region = s.getRegion(c, []byte("a")) - // Out of range of Peers, so get Region again and pick Stores[0] as leader. - c.Assert(region.unreachableStores, HasLen, 0) + c.Assert(s.cache.mu.regions, HasLen, 1) } func (s *testRegionCacheSuite) TestRequestFail2(c *C) { @@ -277,11 +273,58 @@ func (s *testRegionCacheSuite) TestRequestFail2(c *C) { ctx, _ := s.cache.GetRPCContext(s.bo, loc1.Region) c.Assert(s.cache.storeMu.stores, HasLen, 1) s.checkCache(c, 2) - s.cache.OnRequestFail(ctx, errors.New("test error")) + s.cache.DropStoreOnSendRequestFail(ctx, errors.New("test error")) // Both region2 and store should be dropped from cache. c.Assert(s.cache.storeMu.stores, HasLen, 0) c.Assert(s.cache.searchCachedRegion([]byte("x")), IsNil) - s.checkCache(c, 1) + s.checkCache(c, 0) +} + +func (s *testRegionCacheSuite) TestDropStoreOnSendRequestFail(c *C) { + regionCnt := 999 + cluster := createClusterWithStoresAndRegions(regionCnt) + + cache := NewRegionCache(mocktikv.NewPDClient(cluster)) + loadRegionsToCache(cache, regionCnt) + c.Assert(len(cache.mu.regions), Equals, regionCnt) + + bo := NewBackoffer(context.Background(), 1) + loc, err := cache.LocateKey(bo, []byte{}) + c.Assert(err, IsNil) + + // Drop the regions on one store, should drop only 1/3 of the regions. + rpcCtx, err := cache.GetRPCContext(bo, loc.Region) + c.Assert(err, IsNil) + cache.DropStoreOnSendRequestFail(rpcCtx, errors.New("test error")) + c.Assert(len(cache.mu.regions), Equals, regionCnt*2/3) + + loadRegionsToCache(cache, regionCnt) + c.Assert(len(cache.mu.regions), Equals, regionCnt) +} + +const regionSplitKeyFormat = "t%08d" + +func createClusterWithStoresAndRegions(regionCnt int) *mocktikv.Cluster { + cluster := mocktikv.NewCluster() + _, _, regionID, _ := mocktikv.BootstrapWithMultiStores(cluster, 3) + for i := 0; i < regionCnt; i++ { + rawKey := []byte(fmt.Sprintf(regionSplitKeyFormat, i)) + ids := cluster.AllocIDs(4) + // Make leaders equally distributed on the 3 stores. + storeID := ids[0] + peerIDs := ids[1:] + leaderPeerID := peerIDs[i%3] + cluster.SplitRaw(regionID, storeID, rawKey, peerIDs, leaderPeerID) + regionID = ids[0] + } + return cluster +} + +func loadRegionsToCache(cache *RegionCache, regionCnt int) { + for i := 0; i < regionCnt; i++ { + rawKey := []byte(fmt.Sprintf(regionSplitKeyFormat, i)) + cache.LocateKey(NewBackoffer(context.Background(), 1), rawKey) + } } func (s *testRegionCacheSuite) TestUpdateStoreAddr(c *C) { @@ -325,3 +368,35 @@ func (s *testRegionCacheSuite) TestListRegionIDsInCache(c *C) { c.Assert(err, IsNil) c.Assert(regionIDs, DeepEquals, []uint64{s.region1, region2}) } + +func BenchmarkOnRequestFail(b *testing.B) { + /* + This benchmark simulate many concurrent requests call DropStoreOnSendRequestFail method + after failed on a store, validate that on this scene, requests don't get blocked on the + RegionCache lock. + */ + regionCnt := 999 + cluster := createClusterWithStoresAndRegions(regionCnt) + cache := NewRegionCache(mocktikv.NewPDClient(cluster)) + loadRegionsToCache(cache, regionCnt) + bo := NewBackoffer(context.Background(), 1) + loc, err := cache.LocateKey(bo, []byte{}) + if err != nil { + b.Fatal(err) + } + region := cache.getRegionByIDFromCache(loc.Region.id) + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + rpcCtx := &RPCContext{ + Region: loc.Region, + Meta: region.meta, + Peer: region.peer, + } + cache.DropStoreOnSendRequestFail(rpcCtx, nil) + } + }) + if len(cache.mu.regions) != regionCnt*2/3 { + b.Fatal(len(cache.mu.regions)) + } +} diff --git a/store/tikv/region_request.go b/store/tikv/region_request.go index b20c7856abbb6..e9747bd5b3a9b 100644 --- a/store/tikv/region_request.go +++ b/store/tikv/region_request.go @@ -154,9 +154,9 @@ func (s *RegionRequestSender) onSendFail(bo *Backoffer, ctx *RPCContext, err err } } - s.regionCache.OnRequestFail(ctx, err) + s.regionCache.DropStoreOnSendRequestFail(ctx, err) - // Retry on request failure when it's not canceled. + // Retry on send request failure when it's not canceled. // When a store is not available, the leader of related region should be elected quickly. // TODO: the number of retry time should be limited:since region may be unavailable // when some unrecoverable disaster happened.