Skip to content

Commit

Permalink
Support load-based replica read (#675)
Browse files Browse the repository at this point in the history
Signed-off-by: Yilin Chen <sticnarf@gmail.com>
  • Loading branch information
sticnarf authored Feb 22, 2023
1 parent e0e6019 commit a27994e
Show file tree
Hide file tree
Showing 6 changed files with 243 additions and 15 deletions.
44 changes: 44 additions & 0 deletions internal/locate/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"context"
"encoding/hex"
"fmt"
"math"
"math/rand"
"sort"
"strconv"
Expand All @@ -51,6 +52,7 @@ import (
"github.com/gogo/protobuf/proto"
"github.com/google/btree"
"github.com/opentracing/opentracing-go"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pkg/errors"
"github.com/stathat/consistent"
Expand Down Expand Up @@ -535,6 +537,8 @@ type RPCContext struct {
ProxyStore *Store // nil means proxy is not used
ProxyAddr string // valid when ProxyStore is not nil
TiKVNum int // Number of TiKV nodes among the region's peers. Assuming non-TiKV peers are all TiFlash peers.

contextPatcher contextPatcher // kvrpcpb.Context fields that need to be overridden
}

func (c *RPCContext) String() string {
Expand All @@ -550,6 +554,25 @@ func (c *RPCContext) String() string {
return res
}

type contextPatcher struct {
replicaRead *bool
busyThreshold *time.Duration
}

func (patcher *contextPatcher) applyTo(pbCtx *kvrpcpb.Context) {
if patcher.replicaRead != nil {
pbCtx.ReplicaRead = *patcher.replicaRead
}
if patcher.busyThreshold != nil {
millis := patcher.busyThreshold.Milliseconds()
if millis > 0 && millis <= math.MaxUint32 {
pbCtx.BusyThresholdMs = uint32(millis)
} else {
pbCtx.BusyThresholdMs = 0
}
}
}

type storeSelectorOp struct {
leaderOnly bool
preferLeader bool
Expand Down Expand Up @@ -2238,6 +2261,8 @@ type Store struct {
storeType tikvrpc.EndpointType // type of the store
tokenCount atomic2.Int64 // used store token count

loadStats atomic2.Pointer[storeLoadStats]

// whether the store is unreachable due to some reason, therefore requests to the store needs to be
// forwarded by other stores. this is also the flag that a checkUntilHealth goroutine is running for this store.
// this mechanism is currently only applicable for TiKV stores.
Expand All @@ -2250,6 +2275,11 @@ type Store struct {

type resolveState uint64

type storeLoadStats struct {
estimatedWait time.Duration
waitTimeUpdatedAt time.Time
}

const (
// The store is just created and normally is being resolved.
// Store in this state will only be resolved by initResolve().
Expand Down Expand Up @@ -2597,6 +2627,20 @@ func (s *Store) GetPeerAddr() string {
return s.peerAddr
}

// estimatedWaitTime returns an optimistic estimation of how long a request will wait in the store.
// It's calculated by subtracting the time since the last update from the wait time returned from TiKV.
func (s *Store) estimatedWaitTime() time.Duration {
loadStats := s.loadStats.Load()
if loadStats == nil {
return 0
}
timeSinceUpdated := time.Since(loadStats.waitTimeUpdatedAt)
if loadStats.estimatedWait < timeSinceUpdated {
return 0
}
return loadStats.estimatedWait - timeSinceUpdated
}

func invokeKVStatusAPI(addr string, timeout time.Duration) (l livenessState) {
start := time.Now()
defer func() {
Expand Down
115 changes: 109 additions & 6 deletions internal/locate/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ package locate
import (
"context"
"fmt"
"math"
"math/rand"
"strconv"
"strings"
Expand Down Expand Up @@ -259,6 +260,9 @@ type replicaSelector struct {
targetIdx AccessIndex
// replicas[proxyIdx] is the store used to redirect requests this time
proxyIdx AccessIndex
// TiKV can reject the request when its estimated wait duration exceeds busyThreshold.
// Then, the client will receive a ServerIsBusy error and choose another replica to retry.
busyThreshold time.Duration
}

// selectorState is the interface of states of the replicaSelector.
Expand Down Expand Up @@ -338,6 +342,15 @@ func (state *accessKnownLeader) next(bo *retry.Backoffer, selector *replicaSelec
selector.state = &tryFollower{leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx}
return nil, stateChanged{}
}
if selector.busyThreshold > 0 {
// If the leader is busy in our estimation, change to tryIdleReplica state to try other replicas.
// If other replicas are all busy, tryIdleReplica will try the leader again without busy threshold.
leaderEstimated := selector.replicas[state.leaderIdx].store.estimatedWaitTime()
if leaderEstimated > selector.busyThreshold {
selector.state = &tryIdleReplica{leaderIdx: state.leaderIdx}
return nil, stateChanged{}
}
}
selector.targetIdx = state.leaderIdx
return selector.buildRPCContext(bo)
}
Expand Down Expand Up @@ -601,6 +614,61 @@ func (state *accessFollower) isCandidate(idx AccessIndex, replica *replica) bool
(!state.option.preferLeader || !replica.store.isSlow()))
}

// tryIdleReplica is the state where we find the leader is busy and retry the request using replica read.
type tryIdleReplica struct {
stateBase
leaderIdx AccessIndex
}

func (state *tryIdleReplica) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) {
// Select a follower replica that has the lowest estimated wait duration
minWait := time.Duration(math.MaxInt64)
targetIdx := state.leaderIdx
startIdx := rand.Intn(len(selector.replicas))
for i := 0; i < len(selector.replicas); i++ {
idx := (i + startIdx) % len(selector.replicas)
r := selector.replicas[idx]
// Don't choose leader again by default.
if idx == int(state.leaderIdx) {
continue
}
// Skip replicas that have been tried.
if r.isExhausted(1) {
continue
}
estimated := r.store.estimatedWaitTime()
if estimated > selector.busyThreshold {
continue
}
if estimated < minWait {
minWait = estimated
targetIdx = AccessIndex(idx)
}
if minWait == 0 {
break
}
}
selector.targetIdx = targetIdx
rpcCtx, err := selector.buildRPCContext(bo)
if err != nil || rpcCtx == nil {
return nil, err
}
replicaRead := targetIdx != state.leaderIdx
rpcCtx.contextPatcher.replicaRead = &replicaRead
if targetIdx == state.leaderIdx {
// No threshold if all peers are too busy.
selector.busyThreshold = 0
rpcCtx.contextPatcher.busyThreshold = &selector.busyThreshold
}
return rpcCtx, nil
}

func (state *tryIdleReplica) onSendFailure(bo *retry.Backoffer, selector *replicaSelector, cause error) {
if selector.checkLiveness(bo, selector.targetReplica()) != reachable {
selector.invalidateReplicaStore(selector.targetReplica(), cause)
}
}

type invalidStore struct {
stateBase
}
Expand Down Expand Up @@ -673,6 +741,7 @@ func newReplicaSelector(regionCache *RegionCache, regionID RegionVerID, req *tik
state,
-1,
-1,
time.Duration(req.BusyThresholdMs) * time.Millisecond,
}, nil
}

Expand Down Expand Up @@ -873,6 +942,42 @@ func (s *replicaSelector) updateLeader(leader *metapb.Peer) {
s.region.invalidate(StoreNotFound)
}

func (s *replicaSelector) onServerIsBusy(bo *retry.Backoffer, ctx *RPCContext, serverIsBusy *errorpb.ServerIsBusy) (shouldRetry bool, err error) {
if serverIsBusy.EstimatedWaitMs != 0 && ctx != nil && ctx.Store != nil {
estimatedWait := time.Duration(serverIsBusy.EstimatedWaitMs) * time.Millisecond
// Update the estimated wait time of the store.
loadStats := &storeLoadStats{
estimatedWait: estimatedWait,
waitTimeUpdatedAt: time.Now(),
}
ctx.Store.loadStats.Store(loadStats)

if s.busyThreshold != 0 {
switch state := s.state.(type) {
case *accessKnownLeader:
// Clear attempt history of the leader, so the leader can be accessed again.
s.replicas[state.leaderIdx].attempts = 0
s.state = &tryIdleReplica{leaderIdx: state.leaderIdx}
return true, nil
case *tryIdleReplica:
if s.targetIdx != state.leaderIdx {
return true, nil
}
// backoff if still receiving ServerIsBusy after accessing leader again
}
}
} else if ctx != nil && ctx.Store != nil {
// Mark the server is busy (the next incoming READs could be redirect
// to expected followers. )
ctx.Store.markAlreadySlow()
}
err = bo.Backoff(retry.BoTiKVServerBusy, errors.Errorf("server is busy, ctx: %v", ctx))
if err != nil {
return false, err
}
return true, nil
}

func (s *replicaSelector) invalidateRegion() {
if s.region != nil {
s.region.invalidate(Other)
Expand Down Expand Up @@ -1157,6 +1262,7 @@ func (s *RegionRequestSender) sendReqToRegion(bo *retry.Backoffer, rpcCtx *RPCCo
if e := tikvrpc.SetContext(req, rpcCtx.Meta, rpcCtx.Peer); e != nil {
return nil, false, err
}
rpcCtx.contextPatcher.applyTo(&req.Context)
// judge the store limit switch.
if limit := kv.StoreLimit.Load(); limit > 0 {
if err := s.getStoreToken(rpcCtx.Store, limit); err != nil {
Expand Down Expand Up @@ -1547,13 +1653,10 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext
return retry, err
}

if regionErr.GetServerIsBusy() != nil {
// Mark the server is busy (the next incoming READs could be redirect
// to expected followers. )
if ctx != nil && ctx.Store != nil {
ctx.Store.markAlreadySlow()
if serverIsBusy := regionErr.GetServerIsBusy(); serverIsBusy != nil {
if s.replicaSelector != nil {
return s.replicaSelector.onServerIsBusy(bo, ctx, serverIsBusy)
}

logutil.BgLogger().Warn("tikv reports `ServerIsBusy` retry later",
zap.String("reason", regionErr.GetServerIsBusy().GetReason()),
zap.Stringer("ctx", ctx))
Expand Down
73 changes: 73 additions & 0 deletions internal/locate/region_request3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -928,3 +928,76 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
s.True(totalAttempts <= 2)
}
}

func (s *testRegionRequestToThreeStoresSuite) TestLoadBasedReplicaRead() {
regionLoc, err := s.cache.LocateRegionByID(s.bo, s.regionID)
s.Nil(err)
s.NotNil(regionLoc)
region := s.cache.GetCachedRegionWithRLock(regionLoc.Region)
req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{}, kvrpcpb.Context{
BusyThresholdMs: 50,
})

replicaSelector, err := newReplicaSelector(s.cache, regionLoc.Region, req)
s.NotNil(replicaSelector)
s.Nil(err)
s.Equal(replicaSelector.region, region)
s.IsType(&accessKnownLeader{}, replicaSelector.state)
// The busyThreshold in replicaSelector should be initialized with the request context.
s.Equal(replicaSelector.busyThreshold, 50*time.Millisecond)

bo := retry.NewBackoffer(context.Background(), -1)
rpcCtx, err := replicaSelector.next(bo)
s.Nil(err)
s.Equal(rpcCtx.Peer.Id, s.leaderPeer)

// Receive a ServerIsBusy error
replicaSelector.onServerIsBusy(bo, rpcCtx, &errorpb.ServerIsBusy{
EstimatedWaitMs: 500,
})

rpcCtx, err = replicaSelector.next(bo)
s.Nil(err)
s.NotEqual(rpcCtx.Peer.Id, s.leaderPeer)
s.IsType(&tryIdleReplica{}, replicaSelector.state)
s.True(*rpcCtx.contextPatcher.replicaRead)
lastPeerID := rpcCtx.Peer.Id

replicaSelector.onServerIsBusy(bo, rpcCtx, &errorpb.ServerIsBusy{
EstimatedWaitMs: 800,
})

rpcCtx, err = replicaSelector.next(bo)
s.Nil(err)
// Should choose a peer different from before
s.NotEqual(rpcCtx.Peer.Id, s.leaderPeer)
s.NotEqual(rpcCtx.Peer.Id, lastPeerID)
s.IsType(&tryIdleReplica{}, replicaSelector.state)
s.True(*rpcCtx.contextPatcher.replicaRead)

// All peers are too busy
replicaSelector.onServerIsBusy(bo, rpcCtx, &errorpb.ServerIsBusy{
EstimatedWaitMs: 150,
})
lessBusyPeer := rpcCtx.Peer.Id

// Then, send to the leader again with no threshold.
rpcCtx, err = replicaSelector.next(bo)
s.Nil(err)
s.Equal(rpcCtx.Peer.Id, s.leaderPeer)
s.IsType(&tryIdleReplica{}, replicaSelector.state)
s.False(*rpcCtx.contextPatcher.replicaRead)
s.Equal(*rpcCtx.contextPatcher.busyThreshold, time.Duration(0))

time.Sleep(120 * time.Millisecond)

// When there comes a new request, it should skip busy leader and choose a less busy store
replicaSelector, err = newReplicaSelector(s.cache, regionLoc.Region, req)
s.NotNil(replicaSelector)
s.Nil(err)
rpcCtx, err = replicaSelector.next(bo)
s.Nil(err)
s.Equal(rpcCtx.Peer.Id, lessBusyPeer)
s.IsType(&tryIdleReplica{}, replicaSelector.state)
s.True(*rpcCtx.contextPatcher.replicaRead)
}
2 changes: 1 addition & 1 deletion internal/logutil/hex.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import (
"reflect"
"strings"

"github.com/golang/protobuf/proto" //nolint
"github.com/golang/protobuf/proto" //nolint:staticcheck
)

// Hex defines a fmt.Stringer for proto.Message.
Expand Down
9 changes: 1 addition & 8 deletions txnkv/txnsnapshot/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,14 +226,6 @@ func (s *Scanner) getData(bo *retry.Backoffer) error {
}
}
sreq := &kvrpcpb.ScanRequest{
Context: &kvrpcpb.Context{
Priority: s.snapshot.priority.ToPB(),
NotFillCache: s.snapshot.notFillCache,
IsolationLevel: s.snapshot.isolationLevel.ToPB(),
ResourceGroupTag: s.snapshot.mu.resourceGroupTag,
RequestSource: s.snapshot.GetRequestSource(),
ResourceGroupName: s.snapshot.mu.resourceGroupName,
},
StartKey: s.nextStartKey,
EndKey: reqEndKey,
Limit: uint32(s.batchSize),
Expand All @@ -255,6 +247,7 @@ func (s *Scanner) getData(bo *retry.Backoffer) error {
IsolationLevel: s.snapshot.isolationLevel.ToPB(),
RequestSource: s.snapshot.GetRequestSource(),
ResourceGroupName: s.snapshot.mu.resourceGroupName,
BusyThresholdMs: uint32(s.snapshot.mu.busyThreshold.Milliseconds()),
})
if s.snapshot.mu.resourceGroupTag == nil && s.snapshot.mu.resourceGroupTagger != nil {
s.snapshot.mu.resourceGroupTagger(req)
Expand Down
Loading

0 comments on commit a27994e

Please sign in to comment.