Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store/copr: balance region for batch cop task (#24521) #24724

Merged
merged 5 commits into from
Jun 17, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
201 changes: 183 additions & 18 deletions store/copr/batch_coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package copr
import (
"context"
"io"
"math"
"strconv"
"sync"
"sync/atomic"
"time"
Expand All @@ -24,6 +26,7 @@ import (
"github.com/pingcap/kvproto/pkg/coprocessor"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/logutil"
Expand All @@ -38,8 +41,9 @@ import (
type batchCopTask struct {
storeAddr string
cmdType tikvrpc.CmdType
ctx *tikv.RPCContext

copTasks []copTaskAndRPCContext
regionInfos []tikv.RegionInfo
}

type batchCopResponse struct {
Expand Down Expand Up @@ -91,9 +95,152 @@ func (rs *batchCopResponse) RespTime() time.Duration {
return rs.respTime
}

type copTaskAndRPCContext struct {
task *copTask
ctx *tikv.RPCContext
// balanceBatchCopTask balance the regions between available stores, the basic rule is
// 1. the first region of each original batch cop task belongs to its original store because some
// meta data(like the rpc context) in batchCopTask is related to it
// 2. for the remaining regions:
// if there is only 1 available store, then put the region to the related store
// otherwise, use a greedy algorithm to put it into the store with highest weight
func balanceBatchCopTask(originalTasks []*batchCopTask) []*batchCopTask {
if len(originalTasks) <= 1 {
return originalTasks
}
storeTaskMap := make(map[uint64]*batchCopTask)
storeCandidateRegionMap := make(map[uint64]map[string]tikv.RegionInfo)
totalRegionCandidateNum := 0
totalRemainingRegionNum := 0

for _, task := range originalTasks {
taskStoreID := task.regionInfos[0].AllStores[0]
batchTask := &batchCopTask{
storeAddr: task.storeAddr,
cmdType: task.cmdType,
ctx: task.ctx,
regionInfos: []tikv.RegionInfo{task.regionInfos[0]},
}
storeTaskMap[taskStoreID] = batchTask
}

for _, task := range originalTasks {
taskStoreID := task.regionInfos[0].AllStores[0]
for index, ri := range task.regionInfos {
// for each region, figure out the valid store num
validStoreNum := 0
if index == 0 {
continue
}
if len(ri.AllStores) <= 1 {
validStoreNum = 1
} else {
for _, storeID := range ri.AllStores {
if _, ok := storeTaskMap[storeID]; ok {
validStoreNum++
}
}
}
if validStoreNum == 1 {
// if only one store is valid, just put it to storeTaskMap
storeTaskMap[taskStoreID].regionInfos = append(storeTaskMap[taskStoreID].regionInfos, ri)
} else {
// if more than one store is valid, put the region
// to store candidate map
totalRegionCandidateNum += validStoreNum
totalRemainingRegionNum += 1
taskKey := ri.Region.String()
for _, storeID := range ri.AllStores {
if _, validStore := storeTaskMap[storeID]; !validStore {
continue
}
if _, ok := storeCandidateRegionMap[storeID]; !ok {
candidateMap := make(map[string]tikv.RegionInfo)
storeCandidateRegionMap[storeID] = candidateMap
}
if _, duplicateRegion := storeCandidateRegionMap[storeID][taskKey]; duplicateRegion {
// duplicated region, should not happen, just give up balance
logutil.BgLogger().Warn("Meet duplicated region info during when trying to balance batch cop task, give up balancing")
return originalTasks
}
storeCandidateRegionMap[storeID][taskKey] = ri
}
}
}
}
if totalRemainingRegionNum == 0 {
return originalTasks
}

avgStorePerRegion := float64(totalRegionCandidateNum) / float64(totalRemainingRegionNum)
findNextStore := func(candidateStores []uint64) uint64 {
store := uint64(math.MaxUint64)
weightedRegionNum := math.MaxFloat64
if candidateStores != nil {
for _, storeID := range candidateStores {
if _, validStore := storeCandidateRegionMap[storeID]; !validStore {
continue
}
num := float64(len(storeCandidateRegionMap[storeID]))/avgStorePerRegion + float64(len(storeTaskMap[storeID].regionInfos))
if num < weightedRegionNum {
store = storeID
weightedRegionNum = num
}
}
if store != uint64(math.MaxUint64) {
return store
}
}
for storeID := range storeTaskMap {
if _, validStore := storeCandidateRegionMap[storeID]; !validStore {
continue
}
num := float64(len(storeCandidateRegionMap[storeID]))/avgStorePerRegion + float64(len(storeTaskMap[storeID].regionInfos))
if num < weightedRegionNum {
store = storeID
weightedRegionNum = num
}
}
return store
}

store := findNextStore(nil)
for totalRemainingRegionNum > 0 {
if store == uint64(math.MaxUint64) {
break
}
var key string
var ri tikv.RegionInfo
for key, ri = range storeCandidateRegionMap[store] {
// get the first region
break
}
storeTaskMap[store].regionInfos = append(storeTaskMap[store].regionInfos, ri)
totalRemainingRegionNum--
for _, id := range ri.AllStores {
if _, ok := storeCandidateRegionMap[id]; ok {
delete(storeCandidateRegionMap[id], key)
totalRegionCandidateNum--
if len(storeCandidateRegionMap[id]) == 0 {
delete(storeCandidateRegionMap, id)
}
}
}
if totalRemainingRegionNum > 0 {
avgStorePerRegion = float64(totalRegionCandidateNum) / float64(totalRemainingRegionNum)
// it is not optimal because we only check the stores that affected by this region, in fact in order
// to find out the store with the lowest weightedRegionNum, all stores should be checked, but I think
// check only the affected stores is more simple and will get a good enough result
store = findNextStore(ri.AllStores)
}
}
if totalRemainingRegionNum > 0 {
logutil.BgLogger().Warn("Some regions are not used when trying to balance batch cop task, give up balancing")
return originalTasks
}

var ret []*batchCopTask
for _, task := range storeTaskMap {
ret = append(ret, task)
}
return ret
}

func buildBatchCopTasks(bo *tikv.Backoffer, cache *tikv.RegionCache, ranges *tikv.KeyRanges, storeType kv.StoreType) ([]*batchCopTask, error) {
Expand Down Expand Up @@ -134,13 +281,15 @@ func buildBatchCopTasks(bo *tikv.Backoffer, cache *tikv.RegionCache, ranges *tik
// Then `splitRegion` will reloads these regions.
continue
}
allStores := cache.GetAllValidTiFlashStores(task.region, rpcCtx.Store)
if batchCop, ok := storeTaskMap[rpcCtx.Addr]; ok {
batchCop.copTasks = append(batchCop.copTasks, copTaskAndRPCContext{task: task, ctx: rpcCtx})
batchCop.regionInfos = append(batchCop.regionInfos, tikv.RegionInfo{Region: task.region, Meta: rpcCtx.Meta, Ranges: task.ranges, AllStores: allStores})
} else {
batchTask := &batchCopTask{
storeAddr: rpcCtx.Addr,
cmdType: cmdType,
copTasks: []copTaskAndRPCContext{{task, rpcCtx}},
storeAddr: rpcCtx.Addr,
cmdType: cmdType,
ctx: rpcCtx,
regionInfos: []tikv.RegionInfo{{Region: task.region, Meta: rpcCtx.Meta, Ranges: task.ranges, AllStores: allStores}},
}
storeTaskMap[rpcCtx.Addr] = batchTask
}
Expand All @@ -153,9 +302,25 @@ func buildBatchCopTasks(bo *tikv.Backoffer, cache *tikv.RegionCache, ranges *tik
}
continue
}

for _, task := range storeTaskMap {
batchTasks = append(batchTasks, task)
}
if log.GetLevel() <= zap.DebugLevel {
msg := "Before region balance:"
for _, task := range batchTasks {
msg += " store " + task.storeAddr + ": " + strconv.Itoa(len(task.regionInfos)) + " regions,"
}
logutil.BgLogger().Debug(msg)
}
batchTasks = balanceBatchCopTask(batchTasks)
if log.GetLevel() <= zap.DebugLevel {
msg := "After region balance:"
for _, task := range batchTasks {
msg += " store " + task.storeAddr + ": " + strconv.Itoa(len(task.regionInfos)) + " regions,"
}
logutil.BgLogger().Debug(msg)
}

if elapsed := time.Since(start); elapsed > time.Millisecond*500 {
logutil.BgLogger().Warn("buildBatchCopTasks takes too much time",
Expand Down Expand Up @@ -310,25 +475,25 @@ func (b *batchCopIterator) handleTask(ctx context.Context, bo *tikv.Backoffer, t
// Merge all ranges and request again.
func (b *batchCopIterator) retryBatchCopTask(ctx context.Context, bo *tikv.Backoffer, batchTask *batchCopTask) ([]*batchCopTask, error) {
var ranges []kv.KeyRange
for _, taskCtx := range batchTask.copTasks {
taskCtx.task.ranges.Do(func(ran *kv.KeyRange) {
for _, ri := range batchTask.regionInfos {
ri.Ranges.Do(func(ran *kv.KeyRange) {
ranges = append(ranges, *ran)
})
}
return buildBatchCopTasks(bo, b.store.GetRegionCache(), tikv.NewKeyRanges(ranges), b.req.StoreType)
}

func (b *batchCopIterator) handleTaskOnce(ctx context.Context, bo *tikv.Backoffer, task *batchCopTask) ([]*batchCopTask, error) {
sender := NewRegionBatchRequestSender(b.store.GetRegionCache(), b.store.GetTiKVClient())
sender := tikv.NewRegionBatchRequestSender(b.store.GetRegionCache(), b.store.GetTiKVClient())
var regionInfos []*coprocessor.RegionInfo
for _, task := range task.copTasks {
for _, ri := range task.regionInfos {
regionInfos = append(regionInfos, &coprocessor.RegionInfo{
RegionId: task.task.region.GetID(),
RegionId: ri.Region.GetID(),
RegionEpoch: &metapb.RegionEpoch{
ConfVer: task.task.region.GetConfVer(),
Version: task.task.region.GetVer(),
ConfVer: ri.Region.GetConfVer(),
Version: ri.Region.GetVer(),
},
Ranges: task.task.ranges.ToPBRanges(),
Ranges: ri.Ranges.ToPBRanges(),
})
}

Expand All @@ -350,8 +515,8 @@ func (b *batchCopIterator) handleTaskOnce(ctx context.Context, bo *tikv.Backoffe
})
req.StoreTp = kv.TiFlash

logutil.BgLogger().Debug("send batch request to ", zap.String("req info", req.String()), zap.Int("cop task len", len(task.copTasks)))
resp, retry, cancel, err := sender.sendStreamReqToAddr(bo, task.copTasks, req, tikv.ReadTimeoutUltraLong)
logutil.BgLogger().Debug("send batch request to ", zap.String("req info", req.String()), zap.Int("cop task len", len(task.regionInfos)))
resp, retry, cancel, err := sender.SendReqToAddr(bo, task.ctx, task.regionInfos, req, tikv.ReadTimeoutUltraLong)
// If there are store errors, we should retry for all regions.
if retry {
return b.retryBatchCopTask(ctx, bo, task)
Expand Down
14 changes: 7 additions & 7 deletions store/copr/mpp.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,14 +181,14 @@ func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *tikv.Backoffer,
var regionInfos []*coprocessor.RegionInfo
originalTask, ok := req.Meta.(*batchCopTask)
if ok {
for _, task := range originalTask.copTasks {
for _, ri := range originalTask.regionInfos {
regionInfos = append(regionInfos, &coprocessor.RegionInfo{
RegionId: task.task.region.GetID(),
RegionId: ri.Region.GetID(),
RegionEpoch: &metapb.RegionEpoch{
ConfVer: task.task.region.GetConfVer(),
Version: task.task.region.GetVer(),
ConfVer: ri.Region.GetConfVer(),
Version: ri.Region.GetVer(),
},
Ranges: task.task.ranges.ToPBRanges(),
Ranges: ri.Ranges.ToPBRanges(),
})
}
}
Expand All @@ -215,8 +215,8 @@ func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *tikv.Backoffer,
// Or else it's the task without region, which always happens in high layer task without table.
// In that case
if originalTask != nil {
sender := NewRegionBatchRequestSender(m.store.GetRegionCache(), m.store.GetTiKVClient())
rpcResp, _, _, err = sender.sendStreamReqToAddr(bo, originalTask.copTasks, wrappedReq, tikv.ReadTimeoutMedium)
sender := tikv.NewRegionBatchRequestSender(m.store.GetRegionCache(), m.store.GetTiKVClient())
rpcResp, _, _, err = sender.SendReqToAddr(bo, originalTask.ctx, originalTask.regionInfos, wrappedReq, tikv.ReadTimeoutMedium)
// No matter what the rpc error is, we won't retry the mpp dispatch tasks.
// TODO: If we want to retry, we must redo the plan fragment cutting and task scheduling.
// That's a hard job but we can try it in the future.
Expand Down
Loading