Skip to content

Commit

Permalink
region_cache: filter peers on tombstone or dropped stores (#24726)
Browse files Browse the repository at this point in the history
  • Loading branch information
youjiali1995 committed May 21, 2021
1 parent bc7f182 commit 55d26c5
Show file tree
Hide file tree
Showing 3 changed files with 278 additions and 101 deletions.
8 changes: 8 additions & 0 deletions store/tikv/mockstore/mocktikv/pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package mocktikv

import (
"context"
"fmt"
"math"
"sync"
"time"
Expand Down Expand Up @@ -126,6 +127,13 @@ func (c *pdClient) GetStore(ctx context.Context, storeID uint64) (*metapb.Store,
default:
}
store := c.cluster.GetStore(storeID)
// It's same as PD's implementation.
if store == nil {
return nil, fmt.Errorf("invalid store ID %d, not found", storeID)
}
if store.GetState() == metapb.StoreState_Tombstone {
return nil, nil
}
return store, nil
}

Expand Down
183 changes: 104 additions & 79 deletions store/tikv/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func (r *RegionStore) filterStoreCandidate(aidx AccessIndex, op *storeSelectorOp
}

// init initializes region after constructed.
func (r *Region) init(c *RegionCache) error {
func (r *Region) init(bo *Backoffer, c *RegionCache) error {
// region store pull used store from global store map
// to avoid acquire storeMu in later access.
rs := &RegionStore{
Expand All @@ -197,17 +197,23 @@ func (r *Region) init(c *RegionCache) error {
stores: make([]*Store, 0, len(r.meta.Peers)),
storeEpochs: make([]uint32, 0, len(r.meta.Peers)),
}
availablePeers := r.meta.GetPeers()[:0]
for _, p := range r.meta.Peers {
c.storeMu.RLock()
store, exists := c.storeMu.stores[p.StoreId]
c.storeMu.RUnlock()
if !exists {
store = c.getStoreByStoreID(p.StoreId)
}
_, err := store.initResolve(retry.NewNoopBackoff(context.Background()), c)
addr, err := store.initResolve(bo, c)
if err != nil {
return err
}
// Filter the peer on a tombstone store.
if addr == "" {
continue
}
availablePeers = append(availablePeers, p)
switch store.storeType {
case tikvrpc.TiKV:
rs.accessIndex[TiKVOnly] = append(rs.accessIndex[TiKVOnly], len(rs.stores))
Expand All @@ -217,6 +223,13 @@ func (r *Region) init(c *RegionCache) error {
rs.stores = append(rs.stores, store)
rs.storeEpochs = append(rs.storeEpochs, atomic.LoadUint32(&store.epoch))
}
// TODO(youjiali1995): It's possible the region info in PD is stale for now but it can recover.
// Maybe we need backoff here.
if len(availablePeers) == 0 {
return errors.Errorf("no available peers, region: {%v}", r.meta)
}
r.meta.Peers = availablePeers

atomic.StorePointer(&r.store, unsafe.Pointer(rs))

// mark region has been init accessed.
Expand Down Expand Up @@ -321,6 +334,18 @@ func NewRegionCache(pdClient pd.Client) *RegionCache {
return c
}

// clear clears all cached data in the RegionCache. It's only used in tests.
func (c *RegionCache) clear() {
c.mu.Lock()
c.mu.regions = make(map[RegionVerID]*Region)
c.mu.latestVersions = make(map[uint64]RegionVerID)
c.mu.sorted = btree.New(btreeDegree)
c.mu.Unlock()
c.storeMu.Lock()
c.storeMu.stores = make(map[uint64]*Store)
c.storeMu.Unlock()
}

// Close releases region cache's resource.
func (c *RegionCache) Close() {
close(c.closeCh)
Expand All @@ -332,32 +357,29 @@ func (c *RegionCache) asyncCheckAndResolveLoop(interval time.Duration) {
defer ticker.Stop()
var needCheckStores []*Store
for {
needCheckStores = needCheckStores[:0]
select {
case <-c.closeCh:
return
case <-c.notifyCheckCh:
needCheckStores = needCheckStores[:0]
c.checkAndResolve(needCheckStores)
c.checkAndResolve(needCheckStores, func(s *Store) bool {
return s.getResolveState() == needCheck
})
case <-ticker.C:
// refresh store once a minute to update labels
var stores []*Store
c.storeMu.RLock()
stores = make([]*Store, 0, len(c.storeMu.stores))
for _, s := range c.storeMu.stores {
stores = append(stores, s)
}
c.storeMu.RUnlock()
for _, store := range stores {
_, err := store.reResolve(c)
terror.Log(err)
}
// refresh store to update labels.
c.checkAndResolve(needCheckStores, func(s *Store) bool {
state := s.getResolveState()
// Only valid stores should be reResolved. In fact, it's impossible
// there's a deleted store in the stores map which guaranteed by reReslve().
return state != unresolved && state != tombstone && state != deleted
})
}
}
}

// checkAndResolve checks and resolve addr of failed stores.
// this method isn't thread-safe and only be used by one goroutine.
func (c *RegionCache) checkAndResolve(needCheckStores []*Store) {
func (c *RegionCache) checkAndResolve(needCheckStores []*Store, needCheck func(*Store) bool) {
defer func() {
r := recover()
if r != nil {
Expand All @@ -369,8 +391,7 @@ func (c *RegionCache) checkAndResolve(needCheckStores []*Store) {

c.storeMu.RLock()
for _, store := range c.storeMu.stores {
state := store.getResolveState()
if state == needCheck {
if needCheck(store) {
needCheckStores = append(needCheckStores, store)
}
}
Expand Down Expand Up @@ -1217,9 +1238,6 @@ func filterUnavailablePeers(region *pd.Region) {
new = append(new, p)
}
}
for i := len(new); i < len(region.Meta.Peers); i++ {
region.Meta.Peers[i] = nil
}
region.Meta.Peers = new
}

Expand Down Expand Up @@ -1272,7 +1290,7 @@ func (c *RegionCache) loadRegion(bo *Backoffer, key []byte, isEndKey bool) (*Reg
continue
}
region := &Region{meta: reg.Meta}
err = region.init(c)
err = region.init(bo, c)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1317,7 +1335,7 @@ func (c *RegionCache) loadRegionByID(bo *Backoffer, regionID uint64) (*Region, e
return nil, errors.New("receive Region with no available peer")
}
region := &Region{meta: reg.Meta}
err = region.init(c)
err = region.init(bo, c)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1368,7 +1386,7 @@ func (c *RegionCache) scanRegions(bo *Backoffer, startKey, endKey []byte, limit
regions := make([]*Region, 0, len(regionsInfo))
for _, r := range regionsInfo {
region := &Region{meta: r.Meta}
err := region.init(c)
err := region.init(bo, c)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1409,6 +1427,8 @@ func (c *RegionCache) getStoreAddr(bo *Backoffer, region *Region, store *Store,
case deleted:
addr = c.changeToActiveStore(region, store, storeIdx)
return
case tombstone:
return "", nil
default:
panic("unsupported resolve state")
}
Expand Down Expand Up @@ -1456,6 +1476,8 @@ func (c *RegionCache) getProxyStore(region *Region, store *Store, rs *RegionStor
return nil, 0, 0
}

// changeToActiveStore replace the deleted store in the region by an up-to-date store in the stores map.
// The order is guaranteed by reResolve() which adds the new store before marking old store deleted.
func (c *RegionCache) changeToActiveStore(region *Region, store *Store, storeIdx int) (addr string) {
c.storeMu.RLock()
store = c.storeMu.stores[store.storeID]
Expand Down Expand Up @@ -1530,7 +1552,7 @@ func (c *RegionCache) OnRegionEpochNotMatch(bo *Backoffer, ctx *RPCContext, curr
}
}
region := &Region{meta: meta}
err := region.init(c)
err := region.init(bo, c)
if err != nil {
return err
}
Expand Down Expand Up @@ -1860,19 +1882,31 @@ type Store struct {
type resolveState uint64

const (
// The store is just created and normally is being resolved.
// Store in this state will only be resolved by initResolve().
unresolved resolveState = iota
// The store is resolved and its address is valid.
resolved
// Request failed on this store and it will be re-resolved by asyncCheckAndResolveLoop().
needCheck
// The store's address or label is changed and marked deleted.
// There is a new store struct replaced it in the RegionCache and should
// call changeToActiveStore() to get the new struct.
deleted
// The store is a tombstone. Should invalidate the region if tries to access it.
tombstone
)

// initResolve resolves addr for store that never resolved.
// initResolve resolves the address of the store that never resolved and returns an
// empty string if it's a tombstone.
func (s *Store) initResolve(bo *Backoffer, c *RegionCache) (addr string, err error) {
s.resolveMutex.Lock()
state := s.getResolveState()
defer s.resolveMutex.Unlock()
if state != unresolved {
addr = s.addr
if state != tombstone {
addr = s.addr
}
return
}
var store *metapb.Store
Expand All @@ -1883,35 +1917,33 @@ func (s *Store) initResolve(bo *Backoffer, c *RegionCache) (addr string, err err
} else {
metrics.RegionCacheCounterWithGetStoreOK.Inc()
}
if err != nil {
if bo.GetCtx().Err() != nil && errors.Cause(bo.GetCtx().Err()) == context.Canceled {
return
}
if err != nil && !isStoreNotFoundError(err) {
// TODO: more refine PD error status handle.
if errors.Cause(err) == context.Canceled {
return
}
err = errors.Errorf("loadStore from PD failed, id: %d, err: %v", s.storeID, err)
if err = bo.Backoff(retry.BoPDRPC, err); err != nil {
return
}
continue
}
// The store is a tombstone.
if store == nil {
return
s.setResolveState(tombstone)
return "", nil
}
addr = store.GetAddress()
if addr == "" {
return "", errors.Errorf("empty store(%d) address", s.storeID)
}
s.addr = addr
s.saddr = store.GetStatusAddress()
s.storeType = GetStoreTypeByMeta(store)
s.labels = store.GetLabels()
retry:
state = s.getResolveState()
if state != unresolved {
addr = s.addr
return
}
if !s.compareAndSwapState(state, resolved) {
goto retry
}
return
// Shouldn't have other one changing its state concurrently, but we still use changeResolveStateTo for safety.
s.changeResolveStateTo(unresolved, resolved)
return s.addr, nil
}
}

Expand Down Expand Up @@ -1944,41 +1976,22 @@ func (s *Store) reResolve(c *RegionCache) (bool, error) {
logutil.BgLogger().Info("invalidate regions in removed store",
zap.Uint64("store", s.storeID), zap.String("add", s.addr))
atomic.AddUint32(&s.epoch, 1)
atomic.StoreUint64(&s.state, uint64(deleted))
s.setResolveState(tombstone)
metrics.RegionCacheCounterWithInvalidateStoreRegionsOK.Inc()
return false, nil
}

storeType := GetStoreTypeByMeta(store)
addr = store.GetAddress()
if s.addr != addr || !s.IsSameLabels(store.GetLabels()) {
state := resolved
newStore := &Store{storeID: s.storeID, addr: addr, saddr: store.GetStatusAddress(), storeType: storeType, labels: store.GetLabels()}
newStore.state = *(*uint64)(&state)
newStore := &Store{storeID: s.storeID, addr: addr, saddr: store.GetStatusAddress(), storeType: storeType, labels: store.GetLabels(), state: uint64(resolved)}
c.storeMu.Lock()
c.storeMu.stores[newStore.storeID] = newStore
c.storeMu.Unlock()
retryMarkDel:
// all region used those
oldState := s.getResolveState()
if oldState == deleted {
return false, nil
}
newState := deleted
if !s.compareAndSwapState(oldState, newState) {
goto retryMarkDel
}
s.setResolveState(deleted)
return false, nil
}
retryMarkResolved:
oldState := s.getResolveState()
if oldState != needCheck {
return true, nil
}
newState := resolved
if !s.compareAndSwapState(oldState, newState) {
goto retryMarkResolved
}
s.changeResolveStateTo(needCheck, resolved)
return true, nil
}

Expand All @@ -1990,23 +2003,35 @@ func (s *Store) getResolveState() resolveState {
return resolveState(atomic.LoadUint64(&s.state))
}

func (s *Store) compareAndSwapState(oldState, newState resolveState) bool {
return atomic.CompareAndSwapUint64(&s.state, uint64(oldState), uint64(newState))
func (s *Store) setResolveState(state resolveState) {
atomic.StoreUint64(&s.state, uint64(state))
}

// changeResolveStateTo changes the store resolveState from the old state to the new state.
// Returns true if it changes the state successfully, and false if the store's state
// is changed by another one.
func (s *Store) changeResolveStateTo(from, to resolveState) bool {
for {
state := s.getResolveState()
if state == to {
return true
}
if state != from {
return false
}
if atomic.CompareAndSwapUint64(&s.state, uint64(from), uint64(to)) {
return true
}
}
}

// markNeedCheck marks resolved store to be async resolve to check store addr change.
func (s *Store) markNeedCheck(notifyCheckCh chan struct{}) {
retry:
oldState := s.getResolveState()
if oldState != resolved {
return
}
if !s.compareAndSwapState(oldState, needCheck) {
goto retry
}
select {
case notifyCheckCh <- struct{}{}:
default:
if s.changeResolveStateTo(resolved, needCheck) {
select {
case notifyCheckCh <- struct{}{}:
default:
}
}
}

Expand Down
Loading

0 comments on commit 55d26c5

Please sign in to comment.