Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store/copr: batch replica read #42237

Merged
merged 3 commits into from
Mar 19, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -4101,8 +4101,8 @@ def go_deps():
name = "com_github_tikv_client_go_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sum = "h1:KFKjXBwDCfmPyNjMgNE2YAS+ZtwryVSYSlCSNhzpbig=",
version = "v2.0.7-0.20230316080603-d19741b3ed77",
sum = "h1:m5Y7tBW5Rq8L1ANxibitBa/DInDy3hA2Qvk1Ys9u1NU=",
version = "v2.0.7-0.20230317032622-884a634378d4",
)
go_repository(
name = "com_github_tikv_pd",
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ require (
github.com/stretchr/testify v1.8.2
github.com/tdakkota/asciicheck v0.1.1
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2
github.com/tikv/client-go/v2 v2.0.7-0.20230316080603-d19741b3ed77
github.com/tikv/client-go/v2 v2.0.7-0.20230317032622-884a634378d4
github.com/tikv/pd/client v0.0.0-20230316082839-7a0ce101c243
github.com/timakin/bodyclose v0.0.0-20221125081123-e39cf3fc478e
github.com/twmb/murmur3 v1.1.6
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -941,8 +941,8 @@ github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 h1:mbAskLJ0oJf
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfKggNGDuadAa0LElHrByyrz4JPZ9fFx6Gs7nx7ZZU=
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a h1:J/YdBZ46WKpXsxsW93SG+q0F8KI+yFrcIDT4c/RNoc4=
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM=
github.com/tikv/client-go/v2 v2.0.7-0.20230316080603-d19741b3ed77 h1:KFKjXBwDCfmPyNjMgNE2YAS+ZtwryVSYSlCSNhzpbig=
github.com/tikv/client-go/v2 v2.0.7-0.20230316080603-d19741b3ed77/go.mod h1:DPL03G+QwLmypNjDIl+B02UltorBMx3WzSh4yJbp+cw=
github.com/tikv/client-go/v2 v2.0.7-0.20230317032622-884a634378d4 h1:m5Y7tBW5Rq8L1ANxibitBa/DInDy3hA2Qvk1Ys9u1NU=
github.com/tikv/client-go/v2 v2.0.7-0.20230317032622-884a634378d4/go.mod h1:DPL03G+QwLmypNjDIl+B02UltorBMx3WzSh4yJbp+cw=
github.com/tikv/pd/client v0.0.0-20230316082839-7a0ce101c243 h1:CYU+awkq5ykKyWV2e2Z+qtRveWMttV4N3r0lyk/z4/M=
github.com/tikv/pd/client v0.0.0-20230316082839-7a0ce101c243/go.mod h1:N2QHc05Vll8CofXQor47lpW5d22WDosFC8WPVx9BsbU=
github.com/timakin/bodyclose v0.0.0-20221125081123-e39cf3fc478e h1:MV6KaVu/hzByHP0UvJ4HcMGE/8a6A4Rggc/0wx2AvJo=
Expand Down
129 changes: 87 additions & 42 deletions store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,13 +259,19 @@ type copTask struct {
requestSource util.RequestSource
RowCountHint int // used for extra concurrency of small tasks, -1 for unknown row count
batchTaskList map[uint64]*batchedCopTask

// when this task is batched and the leader's wait duration exceeds the load-based threshold,
// we set this field to the target replica store ID and redirect the request to the replica.
redirect2Replica *uint64
busyThreshold time.Duration
}

type batchedCopTask struct {
task *copTask
region coprocessor.RegionInfo
storeID uint64
peer *metapb.Peer
task *copTask
region coprocessor.RegionInfo
storeID uint64
peer *metapb.Peer
loadBasedReplicaRetry bool
}

func (r *copTask) String() string {
Expand Down Expand Up @@ -339,7 +345,7 @@ func buildCopTasks(bo *Backoffer, ranges *KeyRanges, opt *buildCopTaskOpt) ([]*c

var builder taskBuilder
if req.StoreBatchSize > 0 && hints != nil {
builder = newBatchTaskBuilder(bo, req, cache)
builder = newBatchTaskBuilder(bo, req, cache, req.ReplicaRead)
cfzjywxk marked this conversation as resolved.
Show resolved Hide resolved
} else {
builder = newLegacyTaskBuilder(len(locs))
}
Expand Down Expand Up @@ -389,6 +395,7 @@ func buildCopTasks(bo *Backoffer, ranges *KeyRanges, opt *buildCopTaskOpt) ([]*c
pagingSize: pagingSize,
requestSource: req.RequestSource,
RowCountHint: hint,
busyThreshold: req.StoreBusyThreshold,
}
// only keep-order need chan inside task.
// tasks by region error will reuse the channel of parent task.
Expand Down Expand Up @@ -466,25 +473,32 @@ func (b *legacyTaskBuilder) build() []*copTask {
return b.tasks
}

type storeReplicaKey struct {
storeID uint64
replicaRead bool
}

type batchStoreTaskBuilder struct {
bo *Backoffer
req *kv.Request
cache *RegionCache
taskID uint64
limit int
store2Idx map[uint64]int
tasks []*copTask
bo *Backoffer
req *kv.Request
cache *RegionCache
taskID uint64
limit int
store2Idx map[storeReplicaKey]int
tasks []*copTask
replicaRead kv.ReplicaReadType
}

func newBatchTaskBuilder(bo *Backoffer, req *kv.Request, cache *RegionCache) *batchStoreTaskBuilder {
func newBatchTaskBuilder(bo *Backoffer, req *kv.Request, cache *RegionCache, replicaRead kv.ReplicaReadType) *batchStoreTaskBuilder {
return &batchStoreTaskBuilder{
bo: bo,
req: req,
cache: cache,
taskID: 0,
limit: req.StoreBatchSize,
store2Idx: make(map[uint64]int, 16),
tasks: make([]*copTask, 0, 16),
bo: bo,
req: req,
cache: cache,
taskID: 0,
limit: req.StoreBatchSize,
store2Idx: make(map[storeReplicaKey]int, 16),
tasks: make([]*copTask, 0, 16),
replicaRead: replicaRead,
}
}

Expand All @@ -502,16 +516,25 @@ func (b *batchStoreTaskBuilder) handle(task *copTask) (err error) {
if b.limit <= 0 || !isSmallTask(task) {
return nil
}
batchedTask, err := b.cache.BuildBatchTask(b.bo, task, b.req.ReplicaRead)
batchedTask, err := b.cache.BuildBatchTask(b.bo, b.req, task, b.replicaRead)
if err != nil {
return err
}
if batchedTask == nil {
return nil
}
if idx, ok := b.store2Idx[batchedTask.storeID]; !ok || len(b.tasks[idx].batchTaskList) >= b.limit {
key := storeReplicaKey{
storeID: batchedTask.storeID,
replicaRead: batchedTask.loadBasedReplicaRetry,
}
if idx, ok := b.store2Idx[key]; !ok || len(b.tasks[idx].batchTaskList) >= b.limit {
if batchedTask.loadBasedReplicaRetry {
// If the task is dispatched to leader because all followers are busy,
// task.redirect2Replica != nil means the busy threshold shouldn't take effect again.
batchedTask.task.redirect2Replica = &batchedTask.storeID
}
b.tasks = append(b.tasks, batchedTask.task)
b.store2Idx[batchedTask.storeID] = len(b.tasks) - 1
b.store2Idx[key] = len(b.tasks) - 1
} else {
if b.tasks[idx].batchTaskList == nil {
b.tasks[idx].batchTaskList = make(map[uint64]*batchedCopTask, b.limit)
Expand Down Expand Up @@ -1123,14 +1146,8 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch

cacheKey, cacheValue := worker.buildCacheKey(task, &copReq)

// TODO: Load-based replica read is currently not compatible with store batched tasks now.
// The batched tasks should be dispatched to their own followers, but it's not implemented yet.
// So, only enable load-based replica read when there is no batched tasks.
var busyThresholdMs uint32
if len(copReq.Tasks) == 0 {
busyThresholdMs = uint32(worker.req.StoreBusyThreshold.Milliseconds())
}
req := tikvrpc.NewReplicaReadRequest(task.cmdType, &copReq, options.GetTiKVReplicaReadType(worker.req.ReplicaRead), &worker.replicaReadSeed, kvrpcpb.Context{
replicaRead := worker.req.ReplicaRead
req := tikvrpc.NewReplicaReadRequest(task.cmdType, &copReq, options.GetTiKVReplicaReadType(replicaRead), &worker.replicaReadSeed, kvrpcpb.Context{
IsolationLevel: isolationLevelToPB(worker.req.IsolationLevel),
Priority: priorityToPB(worker.req.Priority),
NotFillCache: worker.req.NotFillCache,
Expand All @@ -1139,7 +1156,7 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch
TaskId: worker.req.TaskID,
RequestSource: task.requestSource.GetRequestSource(),
ResourceGroupName: worker.req.ResourceGroupName,
BusyThresholdMs: busyThresholdMs,
BusyThresholdMs: uint32(task.busyThreshold.Milliseconds()),
})
if worker.req.ResourceGroupTagger != nil {
worker.req.ResourceGroupTagger(req)
Expand All @@ -1158,6 +1175,16 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch
if len(worker.req.MatchStoreLabels) > 0 {
ops = append(ops, tikv.WithMatchLabels(worker.req.MatchStoreLabels))
}
if task.redirect2Replica != nil {
req.ReplicaRead = true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we move the setting busyThreshold=0 here so none of the changes would be missing?

req.ReplicaReadType = options.GetTiKVReplicaReadType(kv.ReplicaReadFollower)
ops = append(ops, tikv.WithMatchStores([]uint64{*task.redirect2Replica}))
if replicaRead == kv.ReplicaReadLeader {
replicaRead = kv.ReplicaReadFollower
req.ReplicaRead = true
req.ReplicaReadType = options.GetTiKVReplicaReadType(replicaRead)
ekexium marked this conversation as resolved.
Show resolved Hide resolved
}
}
resp, rpcCtx, storeAddr, err := worker.kvclient.SendReqCtx(bo.TiKVBackoffer(), req, task.region, tikv.ReadTimeoutMedium, getEndPointType(task.storeType), task.storeAddr, ops...)
err = derr.ToTiDBErr(err)
if err != nil {
Expand Down Expand Up @@ -1304,13 +1331,13 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *tikv.R
if err != nil {
return remains, err
}
return worker.handleBatchRemainsOnErr(bo, rpcCtx, remains, resp.pbResp.GetBatchResponses(), task, ch)
return worker.handleBatchRemainsOnErr(bo, rpcCtx, remains, resp.pbResp, task, ch)
}
if lockErr := resp.pbResp.GetLocked(); lockErr != nil {
if err := worker.handleLockErr(bo, lockErr, task); err != nil {
return nil, err
}
return worker.handleBatchRemainsOnErr(bo, rpcCtx, []*copTask{task}, resp.pbResp.GetBatchResponses(), task, ch)
return worker.handleBatchRemainsOnErr(bo, rpcCtx, []*copTask{task}, resp.pbResp, task, ch)
}
if otherErr := resp.pbResp.GetOtherError(); otherErr != "" {
err := errors.Errorf("other error: %s", otherErr)
Expand Down Expand Up @@ -1346,18 +1373,18 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *tikv.R
return nil, err
}

batchResps := resp.pbResp.BatchResponses
pbResp := resp.pbResp
worker.sendToRespCh(resp, ch, true)
return worker.handleBatchCopResponse(bo, rpcCtx, batchResps, task.batchTaskList, ch)
return worker.handleBatchCopResponse(bo, rpcCtx, pbResp, task.batchTaskList, ch)
}

func (worker *copIteratorWorker) handleBatchRemainsOnErr(bo *Backoffer, rpcCtx *tikv.RPCContext, remains []*copTask, batchResp []*coprocessor.StoreBatchTaskResponse, task *copTask, ch chan<- *copResponse) ([]*copTask, error) {
func (worker *copIteratorWorker) handleBatchRemainsOnErr(bo *Backoffer, rpcCtx *tikv.RPCContext, remains []*copTask, resp *coprocessor.Response, task *copTask, ch chan<- *copResponse) ([]*copTask, error) {
if len(task.batchTaskList) == 0 {
return remains, nil
}
batchedTasks := task.batchTaskList
task.batchTaskList = nil
batchedRemains, err := worker.handleBatchCopResponse(bo, rpcCtx, batchResp, batchedTasks, ch)
batchedRemains, err := worker.handleBatchCopResponse(bo, rpcCtx, resp, batchedTasks, ch)
if err != nil {
return nil, err
}
Expand All @@ -1366,18 +1393,21 @@ func (worker *copIteratorWorker) handleBatchRemainsOnErr(bo *Backoffer, rpcCtx *

// handle the batched cop response.
// tasks will be changed, so the input tasks should not be used after calling this function.
func (worker *copIteratorWorker) handleBatchCopResponse(bo *Backoffer, rpcCtx *tikv.RPCContext, batchResps []*coprocessor.StoreBatchTaskResponse,
func (worker *copIteratorWorker) handleBatchCopResponse(bo *Backoffer, rpcCtx *tikv.RPCContext, resp *coprocessor.Response,
tasks map[uint64]*batchedCopTask, ch chan<- *copResponse) (remainTasks []*copTask, err error) {
if len(tasks) == 0 {
return nil, nil
}
batchedNum := len(tasks)
busyThresholdFallback := false
defer func() {
if err != nil {
return
}
worker.storeBatchedNum.Add(uint64(batchedNum - len(remainTasks)))
worker.storeBatchedFallbackNum.Add(uint64(len(remainTasks)))
if !busyThresholdFallback {
worker.storeBatchedNum.Add(uint64(batchedNum - len(remainTasks)))
worker.storeBatchedFallbackNum.Add(uint64(len(remainTasks)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it supposed to be put in an else clause?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The counter will be increased when handling the responses of the follower tasks.

}
}()
appendRemainTasks := func(tasks ...*copTask) {
if remainTasks == nil {
Expand All @@ -1393,6 +1423,7 @@ func (worker *copIteratorWorker) handleBatchCopResponse(bo *Backoffer, rpcCtx *t
Addr: rpcCtx.Addr,
}
}
batchResps := resp.GetBatchResponses()
for _, batchResp := range batchResps {
taskID := batchResp.GetTaskId()
batchedTask, ok := tasks[taskID]
Expand Down Expand Up @@ -1463,7 +1494,8 @@ func (worker *copIteratorWorker) handleBatchCopResponse(bo *Backoffer, rpcCtx *t
}
for _, t := range tasks {
task := t.task
// when the error is generated by client, response is empty, skip warning for this case.
// when the error is generated by client or a load-based server busy,
// response is empty by design, skip warning for this case.
if len(batchResps) != 0 {
firstRangeStartKey := task.ranges.At(0).StartKey
lastRangeEndKey := task.ranges.At(task.ranges.Len() - 1).EndKey
Expand All @@ -1479,6 +1511,19 @@ func (worker *copIteratorWorker) handleBatchCopResponse(bo *Backoffer, rpcCtx *t
}
appendRemainTasks(t.task)
}
if regionErr := resp.GetRegionError(); regionErr != nil && regionErr.ServerIsBusy != nil &&
ekexium marked this conversation as resolved.
Show resolved Hide resolved
regionErr.ServerIsBusy.EstimatedWaitMs > 0 && len(batchResps) == 0 && len(remainTasks) != 0 {
busyThresholdFallback = true
handler := newBatchTaskBuilder(bo, worker.req, worker.store.GetRegionCache(), kv.ReplicaReadFollower)
for _, task := range remainTasks {
// do not set busy threshold again.
task.busyThreshold = 0
if err = handler.handle(task); err != nil {
return nil, err
}
}
remainTasks = handler.build()
}
return remainTasks, nil
}

Expand Down
58 changes: 52 additions & 6 deletions store/copr/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ package copr

import (
"bytes"
"math"
"strconv"
"time"

"github.com/pingcap/kvproto/pkg/coprocessor"
"github.com/pingcap/kvproto/pkg/metapb"
Expand Down Expand Up @@ -210,11 +212,54 @@ func (c *RegionCache) OnSendFailForBatchRegions(bo *Backoffer, store *tikv.Store
}

// BuildBatchTask fetches store and peer info for cop task, wrap it as `batchedCopTask`.
func (c *RegionCache) BuildBatchTask(bo *Backoffer, task *copTask, replicaRead kv.ReplicaReadType) (*batchedCopTask, error) {
rpcContext, err := c.GetTiKVRPCContext(bo.TiKVBackoffer(), task.region, options.GetTiKVReplicaReadType(replicaRead), 0)
if err != nil {
return nil, err
func (c *RegionCache) BuildBatchTask(bo *Backoffer, req *kv.Request, task *copTask, replicaRead kv.ReplicaReadType) (*batchedCopTask, error) {
var (
rpcContext *tikv.RPCContext
err error
)
if replicaRead == kv.ReplicaReadFollower {
followerStoreSeed := uint32(0)
leastEstWaitTime := time.Duration(math.MaxInt64)
var (
firstFollowerPeer *uint64
followerContext *tikv.RPCContext
)
for {
followerContext, err = c.GetTiKVRPCContext(bo.TiKVBackoffer(), task.region, options.GetTiKVReplicaReadType(replicaRead), followerStoreSeed)
if err != nil {
return nil, err
}
if firstFollowerPeer == nil {
firstFollowerPeer = &rpcContext.Peer.Id
} else if *firstFollowerPeer == rpcContext.Peer.Id {
break
}
estWaitTime := followerContext.Store.EstimatedWaitTime()
// the wait time of this follower is under given threshold, choose it.
if estWaitTime > req.StoreBusyThreshold {
continue
}
if rpcContext == nil {
rpcContext = followerContext
} else if estWaitTime < leastEstWaitTime {
leastEstWaitTime = estWaitTime
rpcContext = followerContext
}
followerStoreSeed++
}
// all replicas are busy, fallback to leader.
if rpcContext == nil {
replicaRead = kv.ReplicaReadLeader
}
}

if replicaRead == kv.ReplicaReadLeader {
rpcContext, err = c.GetTiKVRPCContext(bo.TiKVBackoffer(), task.region, options.GetTiKVReplicaReadType(replicaRead), 0)
if err != nil {
return nil, err
}
}

// fallback to non-batch path
if rpcContext == nil {
return nil, nil
Expand All @@ -229,7 +274,8 @@ func (c *RegionCache) BuildBatchTask(bo *Backoffer, task *copTask, replicaRead k
},
Ranges: task.ranges.ToPBRanges(),
},
storeID: rpcContext.Store.StoreID(),
peer: rpcContext.Peer,
storeID: rpcContext.Store.StoreID(),
peer: rpcContext.Peer,
loadBasedReplicaRetry: replicaRead != kv.ReplicaReadLeader,
}, nil
}