Skip to content

Commit

Permalink
Support load-based replica read
Browse files Browse the repository at this point in the history
Signed-off-by: Yilin Chen <sticnarf@gmail.com>
  • Loading branch information
sticnarf committed Feb 8, 2023
1 parent 29dfcc2 commit 6e2bfd8
Show file tree
Hide file tree
Showing 4 changed files with 232 additions and 10 deletions.
44 changes: 44 additions & 0 deletions internal/locate/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"context"
"encoding/hex"
"fmt"
"math"
"math/rand"
"sort"
"strconv"
Expand All @@ -51,6 +52,7 @@ import (
"github.com/gogo/protobuf/proto"
"github.com/google/btree"
"github.com/opentracing/opentracing-go"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pkg/errors"
"github.com/stathat/consistent"
Expand Down Expand Up @@ -535,6 +537,8 @@ type RPCContext struct {
ProxyStore *Store // nil means proxy is not used
ProxyAddr string // valid when ProxyStore is not nil
TiKVNum int // Number of TiKV nodes among the region's peers. Assuming non-TiKV peers are all TiFlash peers.

overrides contextOverrides // kvrpcpb.Context fields that need to be overridden
}

func (c *RPCContext) String() string {
Expand All @@ -550,6 +554,25 @@ func (c *RPCContext) String() string {
return res
}

type contextOverrides struct {
replicaRead *bool
busyThreshold *time.Duration
}

func (overrides *contextOverrides) patchContext(pbCtx *kvrpcpb.Context) {
if overrides.replicaRead != nil {
pbCtx.ReplicaRead = *overrides.replicaRead
}
if overrides.busyThreshold != nil {
millis := overrides.busyThreshold.Milliseconds()
if millis > 0 && millis <= math.MaxUint32 {
pbCtx.BusyThresholdMs = uint32(millis)
} else {
pbCtx.BusyThresholdMs = 0
}
}
}

type storeSelectorOp struct {
leaderOnly bool
preferLeader bool
Expand Down Expand Up @@ -2238,6 +2261,8 @@ type Store struct {
storeType tikvrpc.EndpointType // type of the store
tokenCount atomic2.Int64 // used store token count

loadStats atomic2.Pointer[storeLoadStats]

// whether the store is unreachable due to some reason, therefore requests to the store needs to be
// forwarded by other stores. this is also the flag that a checkUntilHealth goroutine is running for this store.
// this mechanism is currently only applicable for TiKV stores.
Expand All @@ -2250,6 +2275,11 @@ type Store struct {

type resolveState uint64

type storeLoadStats struct {
estimatedWait time.Duration
waitTimeUpdatedAt time.Time
}

const (
// The store is just created and normally is being resolved.
// Store in this state will only be resolved by initResolve().
Expand Down Expand Up @@ -2597,6 +2627,20 @@ func (s *Store) GetPeerAddr() string {
return s.peerAddr
}

// estimatedWaitTime returns an optimistic estimation of how long a request will wait in the store.
// It's calculated by subtracting the time since the last update from the wait time returned from TiKV.
func (s *Store) estimatedWaitTime() time.Duration {
loadStats := s.loadStats.Load()
if loadStats == nil {
return 0
}
timeSinceUpdated := time.Since(loadStats.waitTimeUpdatedAt)
if loadStats.estimatedWait < timeSinceUpdated {
return 0
}
return loadStats.estimatedWait - timeSinceUpdated
}

func invokeKVStatusAPI(addr string, timeout time.Duration) (l livenessState) {
start := time.Now()
defer func() {
Expand Down
123 changes: 114 additions & 9 deletions internal/locate/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ package locate
import (
"context"
"fmt"
"math"
"math/rand"
"strconv"
"strings"
Expand Down Expand Up @@ -259,13 +260,17 @@ type replicaSelector struct {
targetIdx AccessIndex
// replicas[proxyIdx] is the store used to redirect requests this time
proxyIdx AccessIndex
// TiKV can reject the request when its estimated wait duration exceeds busyThreshold.
// Then, the client will receive a ServerIsBusy error and choose another replica to retry.
busyThreshold time.Duration
}

// selectorState is the interface of states of the replicaSelector.
// Here is the main state transition diagram:
//
// exceeding maxReplicaAttempt
// +-------------------+ || RPC failure && unreachable && no forwarding
// exceeding maxReplicaAttempt
// +-------------------+ || RPC failure && unreachable && no forwarding
//
// +-------->+ accessKnownLeader +----------------+
// | +------+------------+ |
// | | |
Expand All @@ -282,7 +287,8 @@ type replicaSelector struct {
// | leader becomes v +---+---+
// | reachable +-----+-----+ all proxies are tried ^
// +------------+tryNewProxy+-------------------------+
// +-----------+
//
// +-----------+
type selectorState interface {
next(*retry.Backoffer, *replicaSelector) (*RPCContext, error)
onSendSuccess(*replicaSelector)
Expand Down Expand Up @@ -337,6 +343,15 @@ func (state *accessKnownLeader) next(bo *retry.Backoffer, selector *replicaSelec
selector.state = &tryFollower{leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx}
return nil, stateChanged{}
}
if selector.busyThreshold > 0 {
// If the leader is busy in our estimation, change to tryIdleReplica state to try other replicas.
// If other replicas are all busy, tryIdleReplica will try the leader again without busy threshold.
leaderEstimated := selector.replicas[state.leaderIdx].store.estimatedWaitTime()
if leaderEstimated > selector.busyThreshold {
selector.state = &tryIdleReplica{leaderIdx: state.leaderIdx}
return nil, stateChanged{}
}
}
selector.targetIdx = state.leaderIdx
return selector.buildRPCContext(bo)
}
Expand Down Expand Up @@ -600,6 +615,61 @@ func (state *accessFollower) isCandidate(idx AccessIndex, replica *replica) bool
(!state.option.preferLeader || !replica.store.isSlow()))
}

// tryIdleReplica is the state where we find the leader is busy and retry the request using replica read.
type tryIdleReplica struct {
stateBase
leaderIdx AccessIndex
}

func (state *tryIdleReplica) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) {
// Select a follower replica that has the lowest estimated wait duration
minWait := time.Duration(math.MaxInt64)
targetIdx := state.leaderIdx
startIdx := rand.Intn(len(selector.replicas))
for i := 0; i < len(selector.replicas); i++ {
idx := (i + startIdx) % len(selector.replicas)
r := selector.replicas[idx]
// Don't choose leader again by default.
if idx == int(state.leaderIdx) {
continue
}
// Skip replicas that have been tried.
if r.isExhausted(1) {
continue
}
estimated := r.store.estimatedWaitTime()
if estimated > selector.busyThreshold {
continue
}
if estimated < minWait {
minWait = estimated
targetIdx = AccessIndex(idx)
}
if minWait == 0 {
break
}
}
selector.targetIdx = targetIdx
rpcCtx, err := selector.buildRPCContext(bo)
if err != nil || rpcCtx == nil {
return nil, err
}
replicaRead := targetIdx != state.leaderIdx
rpcCtx.overrides.replicaRead = &replicaRead
if targetIdx == state.leaderIdx {
// No threshold if all peers are too busy.
selector.busyThreshold = 0
rpcCtx.overrides.busyThreshold = &selector.busyThreshold
}
return rpcCtx, nil
}

func (state *tryIdleReplica) onSendFailure(bo *retry.Backoffer, selector *replicaSelector, cause error) {
if selector.checkLiveness(bo, selector.targetReplica()) != reachable {
selector.invalidateReplicaStore(selector.targetReplica(), cause)
}
}

type invalidStore struct {
stateBase
}
Expand Down Expand Up @@ -672,6 +742,7 @@ func newReplicaSelector(regionCache *RegionCache, regionID RegionVerID, req *tik
state,
-1,
-1,
time.Duration(req.BusyThresholdMs) * time.Millisecond,
}, nil
}

Expand Down Expand Up @@ -872,6 +943,42 @@ func (s *replicaSelector) updateLeader(leader *metapb.Peer) {
s.region.invalidate(StoreNotFound)
}

func (s *replicaSelector) onServerIsBusy(bo *retry.Backoffer, ctx *RPCContext, serverIsBusy *errorpb.ServerIsBusy) (shouldRetry bool, err error) {
if serverIsBusy.EstimatedWaitMs != 0 && ctx != nil && ctx.Store != nil {
estimatedWait := time.Duration(serverIsBusy.EstimatedWaitMs) * time.Millisecond
// Update the estimated wait time of the store.
loadStats := &storeLoadStats{
estimatedWait: estimatedWait,
waitTimeUpdatedAt: time.Now(),
}
ctx.Store.loadStats.Store(loadStats)

if s.busyThreshold != 0 {
switch state := s.state.(type) {
case *accessKnownLeader:
// Clear attempt history of the leader, so the leader can be accessed again.
s.replicas[state.leaderIdx].attempts = 0
s.state = &tryIdleReplica{leaderIdx: state.leaderIdx}
return true, nil
case *tryIdleReplica:
if s.targetIdx != state.leaderIdx {
return true, nil
}
// backoff if still receiving ServerIsBusy after accessing leader again
}
}
} else if ctx != nil && ctx.Store != nil {
// Mark the server is busy (the next incoming READs could be redirect
// to expected followers. )
ctx.Store.markAlreadySlow()
}
err = bo.Backoff(retry.BoTiKVServerBusy, errors.Errorf("server is busy, ctx: %v", ctx))
if err != nil {
return false, err
}
return true, nil
}

func (s *replicaSelector) invalidateRegion() {
if s.region != nil {
s.region.invalidate(Other)
Expand Down Expand Up @@ -1158,6 +1265,7 @@ func (s *RegionRequestSender) sendReqToRegion(bo *retry.Backoffer, rpcCtx *RPCCo
if e := tikvrpc.SetContext(req, rpcCtx.Meta, rpcCtx.Peer); e != nil {
return nil, false, err
}
rpcCtx.overrides.patchContext(&req.Context)
// judge the store limit switch.
if limit := kv.StoreLimit.Load(); limit > 0 {
if err := s.getStoreToken(rpcCtx.Store, limit); err != nil {
Expand Down Expand Up @@ -1536,13 +1644,10 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext
return retry, err
}

if regionErr.GetServerIsBusy() != nil {
// Mark the server is busy (the next incoming READs could be redirect
// to expected followers. )
if ctx != nil && ctx.Store != nil {
ctx.Store.markAlreadySlow()
if serverIsBusy := regionErr.GetServerIsBusy(); serverIsBusy != nil {
if s.replicaSelector != nil {
return s.replicaSelector.onServerIsBusy(bo, ctx, serverIsBusy)
}

logutil.BgLogger().Warn("tikv reports `ServerIsBusy` retry later",
zap.String("reason", regionErr.GetServerIsBusy().GetReason()),
zap.Stringer("ctx", ctx))
Expand Down
73 changes: 73 additions & 0 deletions internal/locate/region_request3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -928,3 +928,76 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
s.True(totalAttempts <= 2)
}
}

func (s *testRegionRequestToThreeStoresSuite) TestLoadBasedReplicaRead() {
regionLoc, err := s.cache.LocateRegionByID(s.bo, s.regionID)
s.Nil(err)
s.NotNil(regionLoc)
region := s.cache.GetCachedRegionWithRLock(regionLoc.Region)
req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{}, kvrpcpb.Context{
BusyThresholdMs: 50,
})

replicaSelector, err := newReplicaSelector(s.cache, regionLoc.Region, req)
s.NotNil(replicaSelector)
s.Nil(err)
s.Equal(replicaSelector.region, region)
s.IsType(&accessKnownLeader{}, replicaSelector.state)
// The busyThreshold in replicaSelector should be initialized with the request context.
s.Equal(replicaSelector.busyThreshold, 50*time.Millisecond)

bo := retry.NewBackoffer(context.Background(), -1)
rpcCtx, err := replicaSelector.next(bo)
s.Nil(err)
s.Equal(rpcCtx.Peer.Id, s.leaderPeer)

// Receive a ServerIsBusy error
replicaSelector.onServerIsBusy(bo, rpcCtx, &errorpb.ServerIsBusy{
EstimatedWaitMs: 500,
})

rpcCtx, err = replicaSelector.next(bo)
s.Nil(err)
s.NotEqual(rpcCtx.Peer.Id, s.leaderPeer)
s.IsType(&tryIdleReplica{}, replicaSelector.state)
s.True(*rpcCtx.overrides.replicaRead)
lastPeerID := rpcCtx.Peer.Id

replicaSelector.onServerIsBusy(bo, rpcCtx, &errorpb.ServerIsBusy{
EstimatedWaitMs: 800,
})

rpcCtx, err = replicaSelector.next(bo)
s.Nil(err)
// Should choose a peer different from before
s.NotEqual(rpcCtx.Peer.Id, s.leaderPeer)
s.NotEqual(rpcCtx.Peer.Id, lastPeerID)
s.IsType(&tryIdleReplica{}, replicaSelector.state)
s.True(*rpcCtx.overrides.replicaRead)

// All peers are too busy
replicaSelector.onServerIsBusy(bo, rpcCtx, &errorpb.ServerIsBusy{
EstimatedWaitMs: 150,
})
lessBusyPeer := rpcCtx.Peer.Id

// Then, send to the leader again with no threshold.
rpcCtx, err = replicaSelector.next(bo)
s.Nil(err)
s.Equal(rpcCtx.Peer.Id, s.leaderPeer)
s.IsType(&tryIdleReplica{}, replicaSelector.state)
s.False(*rpcCtx.overrides.replicaRead)
s.Equal(*rpcCtx.overrides.busyThreshold, time.Duration(0))

time.Sleep(120 * time.Millisecond)

// When there comes a new request, it should skip busy leader and choose a less busy store
replicaSelector, err = newReplicaSelector(s.cache, regionLoc.Region, req)
s.NotNil(replicaSelector)
s.Nil(err)
rpcCtx, err = replicaSelector.next(bo)
s.Nil(err)
s.Equal(rpcCtx.Peer.Id, lessBusyPeer)
s.IsType(&tryIdleReplica{}, replicaSelector.state)
s.True(*rpcCtx.overrides.replicaRead)
}
2 changes: 1 addition & 1 deletion internal/logutil/hex.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import (
"reflect"
"strings"

"github.com/golang/protobuf/proto" //nolint
"github.com/golang/protobuf/proto" //nolint:staticcheck
)

// Hex defines a fmt.Stringer for proto.Message.
Expand Down

0 comments on commit 6e2bfd8

Please sign in to comment.