Skip to content

Commit

Permalink
store/copr: support EngineRoleWrite for disaggregated tiflash (#41958)
Browse files Browse the repository at this point in the history
close #41978
  • Loading branch information
guo-shaoge authored Mar 8, 2023
1 parent 382b388 commit 5ff5b34
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 26 deletions.
4 changes: 2 additions & 2 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -4101,8 +4101,8 @@ def go_deps():
name = "com_github_tikv_client_go_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sum = "h1:3BQR4RmBxu391t3z/q9h7BjDTS3cuRn8dfgYgMWk57s=",
version = "v2.0.6-0.20230302054057-3f7860f10959",
sum = "h1:u3ZBOP7xD9c8FtaUyXXvcby3HZ+9LmgD+m8Sod6orP8=",
version = "v2.0.7-0.20230307075841-4037273b3ca7",
)
go_repository(
name = "com_github_tikv_pd",
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ require (
github.com/stretchr/testify v1.8.2
github.com/tdakkota/asciicheck v0.1.1
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2
github.com/tikv/client-go/v2 v2.0.6-0.20230302054057-3f7860f10959
github.com/tikv/client-go/v2 v2.0.7-0.20230307075841-4037273b3ca7
github.com/tikv/pd/client v0.0.0-20230301094509-c82b237672a0
github.com/timakin/bodyclose v0.0.0-20221125081123-e39cf3fc478e
github.com/twmb/murmur3 v1.1.6
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -938,8 +938,8 @@ github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 h1:mbAskLJ0oJf
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfKggNGDuadAa0LElHrByyrz4JPZ9fFx6Gs7nx7ZZU=
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a h1:J/YdBZ46WKpXsxsW93SG+q0F8KI+yFrcIDT4c/RNoc4=
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM=
github.com/tikv/client-go/v2 v2.0.6-0.20230302054057-3f7860f10959 h1:3BQR4RmBxu391t3z/q9h7BjDTS3cuRn8dfgYgMWk57s=
github.com/tikv/client-go/v2 v2.0.6-0.20230302054057-3f7860f10959/go.mod h1:HdCAbFaUCsjI4n5vlCJ0rGpMfIHoD1o6UlA1rwD9u1o=
github.com/tikv/client-go/v2 v2.0.7-0.20230307075841-4037273b3ca7 h1:u3ZBOP7xD9c8FtaUyXXvcby3HZ+9LmgD+m8Sod6orP8=
github.com/tikv/client-go/v2 v2.0.7-0.20230307075841-4037273b3ca7/go.mod h1:HdCAbFaUCsjI4n5vlCJ0rGpMfIHoD1o6UlA1rwD9u1o=
github.com/tikv/pd/client v0.0.0-20230301094509-c82b237672a0 h1:1fomIvN2iiKT5uZbe2E6uNHZnRzmS6O47D/PJ9BAuPw=
github.com/tikv/pd/client v0.0.0-20230301094509-c82b237672a0/go.mod h1:4wjAY2NoMn4wx5+hZrEhrSGBs3jvKb+lxfUt+thHFQ4=
github.com/timakin/bodyclose v0.0.0-20221125081123-e39cf3fc478e h1:MV6KaVu/hzByHP0UvJ4HcMGE/8a6A4Rggc/0wx2AvJo=
Expand Down
31 changes: 21 additions & 10 deletions store/copr/batch_coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ func balanceBatchCopTask(ctx context.Context, kvStore *kvStore, originalTasks []
storeTaskMap[taskStoreID] = batchTask
}
} else {
stores := cache.RegionCache.GetTiFlashStores()
stores := cache.RegionCache.GetTiFlashStores(tikv.LabelFilterNoTiFlashWriteNode)
aliveStores := filterAliveStores(ctx, stores, ttl, kvStore)
for _, s := range aliveStores {
storeTaskMap[s.StoreID()] = &batchCopTask{
Expand Down Expand Up @@ -531,6 +531,14 @@ func buildBatchCopTasksForPartitionedTable(
return batchTasks, nil
}

func filterAliveStoresStr(ctx context.Context, storesStr []string, ttl time.Duration, kvStore *kvStore) (aliveStores []string) {
aliveIdx := filterAliveStoresHelper(ctx, storesStr, ttl, kvStore)
for _, idx := range aliveIdx {
aliveStores = append(aliveStores, storesStr[idx])
}
return aliveStores
}

func filterAliveStores(ctx context.Context, stores []*tikv.Store, ttl time.Duration, kvStore *kvStore) (aliveStores []*tikv.Store) {
storesStr := make([]string, 0, len(stores))
for _, s := range stores {
Expand Down Expand Up @@ -577,6 +585,7 @@ func filterAliveStoresHelper(ctx context.Context, stores []string, ttl time.Dura

func getTiFlashComputeRPCContextByConsistentHash(ids []tikv.RegionVerID, storesStr []string) (res []*tikv.RPCContext, err error) {
hasher := consistent.New()
hasher.NumberOfReplicas = 200
for _, addr := range storesStr {
hasher.Add(addr)
}
Expand Down Expand Up @@ -648,6 +657,8 @@ func buildBatchCopTasksConsistentHash(
if err != nil {
return nil, err
}
storesStr = filterAliveStoresStr(ctx, storesStr, ttl, kvStore)
logutil.BgLogger().Info("topo filter alive", zap.Any("topo", storesStr))
if len(storesStr) == 0 {
retErr := errors.New("Cannot find proper topo from AutoScaler")
logutil.BgLogger().Info("buildBatchCopTasksConsistentHash retry because FetchAndGetTopo return empty topo", zap.Int("retryNum", retryNum))
Expand Down Expand Up @@ -775,7 +786,7 @@ func buildBatchCopTasksCore(bo *backoff.Backoffer, store *kvStore, rangesForEach
storeTaskMap := make(map[string]*batchCopTask)
needRetry := false
for _, task := range tasks {
rpcCtx, err := cache.GetTiFlashRPCContext(bo.TiKVBackoffer(), task.region, isMPP)
rpcCtx, err := cache.GetTiFlashRPCContext(bo.TiKVBackoffer(), task.region, isMPP, tikv.LabelFilterNoTiFlashWriteNode)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -790,7 +801,7 @@ func buildBatchCopTasksCore(bo *backoff.Backoffer, store *kvStore, rangesForEach
// Then `splitRegion` will reloads these regions.
continue
}
allStores := cache.GetAllValidTiFlashStores(task.region, rpcCtx.Store)
allStores := cache.GetAllValidTiFlashStores(task.region, rpcCtx.Store, tikv.LabelFilterNoTiFlashWriteNode)
if batchCop, ok := storeTaskMap[rpcCtx.Addr]; ok {
batchCop.regionInfos = append(batchCop.regionInfos, RegionInfo{Region: task.region, Meta: rpcCtx.Meta, Ranges: task.ranges, AllStores: allStores, PartitionIndex: task.partitionIndex})
} else {
Expand Down Expand Up @@ -1256,7 +1267,11 @@ func buildBatchCopTasksConsistentHashForPD(bo *backoff.Backoffer,
}
getStoreElapsed = time.Since(getStoreStart)

rpcCtxs, err := cache.GetTiFlashComputeRPCContextByConsistentHash(bo.TiKVBackoffer(), regionIDs, stores)
storesStr := make([]string, 0, len(stores))
for _, s := range stores {
storesStr = append(storesStr, s.GetAddr())
}
rpcCtxs, err := getTiFlashComputeRPCContextByConsistentHash(regionIDs, storesStr)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1295,17 +1310,13 @@ func buildBatchCopTasksConsistentHashForPD(bo *backoff.Backoffer,
res = append(res, batchTask)
}
}
logutil.BgLogger().Info("buildBatchCopTasksConsistentHashForPD done", zap.Any("len(tasks)", len(taskMap)), zap.Any("len(tiflash_compute)", len(stores)))
logutil.BgLogger().Info("buildBatchCopTasksConsistentHashForPD done", zap.Any("len(tasks)", len(taskMap)), zap.Any("len(tiflash_compute)", len(storesStr)))
if log.GetLevel() <= zap.DebugLevel {
debugStores := make([]string, 0, len(stores))
for _, s := range stores {
debugStores = append(debugStores, s.GetAddr())
}
debugTaskMap := make(map[string]string, len(taskMap))
for s, b := range taskMap {
debugTaskMap[s] = fmt.Sprintf("addr: %s; regionInfos: %v", b.storeAddr, b.regionInfos)
}
logutil.BgLogger().Debug("detailed info buildBatchCopTasksConsistentHashForPD", zap.Any("taskMap", debugTaskMap), zap.Any("allStores", debugStores))
logutil.BgLogger().Debug("detailed info buildBatchCopTasksConsistentHashForPD", zap.Any("taskMap", debugTaskMap), zap.Any("allStores", storesStr))
}
break
}
Expand Down
7 changes: 6 additions & 1 deletion store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1174,7 +1174,12 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch
worker.logTimeCopTask(costTime, task, bo, copResp)
}
storeID := strconv.FormatUint(req.Context.GetPeer().GetStoreId(), 10)
metrics.TiKVCoprocessorHistogram.WithLabelValues(storeID, strconv.FormatBool(staleRead)).Observe(costTime.Seconds())
isInternal := util.IsRequestSourceInternal(&task.requestSource)
scope := metrics.LblGeneral
if isInternal {
scope = metrics.LblInternal
}
metrics.TiKVCoprocessorHistogram.WithLabelValues(storeID, strconv.FormatBool(staleRead), scope).Observe(costTime.Seconds())
if copResp != nil {
tidbmetrics.DistSQLCoprRespBodySize.WithLabelValues(storeAddr).Observe(float64(len(copResp.Data)))
}
Expand Down
11 changes: 1 addition & 10 deletions store/copr/mpp.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,6 @@ func (c *batchCopTask) GetAddress() string {
return c.storeAddr
}

func (c *MPPClient) selectAllTiFlashStore() []kv.MPPTaskMeta {
resultTasks := make([]kv.MPPTaskMeta, 0)
for _, s := range c.store.GetRegionCache().GetTiFlashStores() {
task := &batchCopTask{storeAddr: s.GetAddr(), cmdType: tikvrpc.CmdMPPTask}
resultTasks = append(resultTasks, task)
}
return resultTasks
}

// ConstructMPPTasks receives ScheduleRequest, which are actually collects of kv ranges. We allocates MPPTaskMeta for them and returns.
func (c *MPPClient) ConstructMPPTasks(ctx context.Context, req *kv.MPPBuildTasksRequest, ttl time.Duration) ([]kv.MPPTaskMeta, error) {
ctx = context.WithValue(ctx, tikv.TxnStartKey(), req.StartTS)
Expand All @@ -79,7 +70,7 @@ func (c *MPPClient) ConstructMPPTasks(ctx context.Context, req *kv.MPPBuildTasks
tasks, err = buildBatchCopTasksForPartitionedTable(ctx, bo, c.store, rangesForEachPartition, kv.TiFlash, true, ttl, true, 20, partitionIDs)
} else {
if req.KeyRanges == nil {
return c.selectAllTiFlashStore(), nil
return nil, errors.New("KeyRanges in MPPBuildTasksRequest is nil")
}
ranges := NewKeyRanges(req.KeyRanges)
tasks, err = buildBatchCopTasksForNonPartitionedTable(ctx, bo, c.store, ranges, kv.TiFlash, true, ttl, true, 20)
Expand Down

0 comments on commit 5ff5b34

Please sign in to comment.