Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#52314
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
xzhangxian1008 authored and ti-chi-bot committed May 27, 2024
1 parent b511a45 commit db38cd5
Show file tree
Hide file tree
Showing 3 changed files with 277 additions and 1 deletion.
3 changes: 2 additions & 1 deletion store/copr/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ go_test(
embed = [":copr"],
flaky = True,
race = "on",
shard_count = 29,
shard_count = 30,
deps = [
"//kv",
"//store/driver/backoff",
Expand All @@ -88,6 +88,7 @@ go_test(
"//util/trxevents",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_kvproto//pkg/coprocessor",
"@com_github_pingcap_kvproto//pkg/metapb",
"@com_github_pingcap_kvproto//pkg/mpp",
"@com_github_stathat_consistent//:consistent",
"@com_github_stretchr_testify//require",
Expand Down
223 changes: 223 additions & 0 deletions store/copr/batch_coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,11 @@ func balanceBatchCopTaskWithContinuity(storeTaskMap map[uint64]*batchCopTask, ca
//
// The second balance strategy: Not only consider the region count between TiFlash stores, but also try to make the regions' range continuous(stored in TiFlash closely).
// If balanceWithContinuity is true, the second balance strategy is enable.
<<<<<<< HEAD:store/copr/batch_coprocessor.go
func balanceBatchCopTask(ctx context.Context, kvStore *kvStore, originalTasks []*batchCopTask, isMPP bool, ttl time.Duration, balanceWithContinuity bool, balanceContinuousRegionCount int64) []*batchCopTask {
=======
func balanceBatchCopTask(aliveStores []*tikv.Store, originalTasks []*batchCopTask, balanceWithContinuity bool, balanceContinuousRegionCount int64) []*batchCopTask {
>>>>>>> 9657f063168 (store: remove stores that have no region before balance (#52314)):pkg/store/copr/batch_coprocessor.go
if len(originalTasks) == 0 {
log.Info("Batch cop task balancer got an empty task set.")
return originalTasks
Expand Down Expand Up @@ -791,6 +795,136 @@ func failpointCheckWhichPolicy(act tiflashcompute.DispatchPolicy) {
})
}

<<<<<<< HEAD:store/copr/batch_coprocessor.go
=======
func filterAllStoresAccordingToTiFlashReplicaRead(allStores []uint64, aliveStores *aliveStoresBundle, policy tiflash.ReplicaRead) (storesMatchedPolicy []uint64, needsCrossZoneAccess bool) {
if policy.IsAllReplicas() {
for _, id := range allStores {
if _, ok := aliveStores.storeIDsInAllZones[id]; ok {
storesMatchedPolicy = append(storesMatchedPolicy, id)
}
}
return
}
// Check whether exists available stores in TiDB zone. If so, we only need to access TiFlash stores in TiDB zone.
for _, id := range allStores {
if _, ok := aliveStores.storeIDsInTiDBZone[id]; ok {
storesMatchedPolicy = append(storesMatchedPolicy, id)
}
}
// If no available stores in TiDB zone, we need to access TiFlash stores in other zones.
if len(storesMatchedPolicy) == 0 {
// needsCrossZoneAccess indicates whether we need to access(directly read or remote read) TiFlash stores in other zones.
needsCrossZoneAccess = true

if policy == tiflash.ClosestAdaptive {
// If the policy is `ClosestAdaptive`, we can dispatch tasks to the TiFlash stores in other zones.
for _, id := range allStores {
if _, ok := aliveStores.storeIDsInAllZones[id]; ok {
storesMatchedPolicy = append(storesMatchedPolicy, id)
}
}
} else if policy == tiflash.ClosestReplicas {
// If the policy is `ClosestReplicas`, we dispatch tasks to the TiFlash stores in TiDB zone and remote read from other zones.
for id := range aliveStores.storeIDsInTiDBZone {
storesMatchedPolicy = append(storesMatchedPolicy, id)
}
}
}
return
}

func getAllUsedTiFlashStores(allTiFlashStores []*tikv.Store, allUsedTiFlashStoresMap map[uint64]struct{}) []*tikv.Store {
allUsedTiFlashStores := make([]*tikv.Store, 0, len(allUsedTiFlashStoresMap))
for _, store := range allTiFlashStores {
_, ok := allUsedTiFlashStoresMap[store.StoreID()]
if ok {
allUsedTiFlashStores = append(allUsedTiFlashStores, store)
}
}
return allUsedTiFlashStores
}

// getAliveStoresAndStoreIDs gets alive TiFlash stores and their IDs.
// If tiflashReplicaReadPolicy is not all_replicas, it will also return the IDs of the alive TiFlash stores in TiDB zone.
func getAliveStoresAndStoreIDs(ctx context.Context, cache *RegionCache, allUsedTiFlashStoresMap map[uint64]struct{}, ttl time.Duration, store *kvStore, tiflashReplicaReadPolicy tiflash.ReplicaRead, tidbZone string) (aliveStores *aliveStoresBundle) {
aliveStores = new(aliveStoresBundle)
allTiFlashStores := cache.RegionCache.GetTiFlashStores(tikv.LabelFilterNoTiFlashWriteNode)
allUsedTiFlashStores := getAllUsedTiFlashStores(allTiFlashStores, allUsedTiFlashStoresMap)
aliveStores.storesInAllZones = filterAliveStores(ctx, allUsedTiFlashStores, ttl, store)

if !tiflashReplicaReadPolicy.IsAllReplicas() {
aliveStores.storeIDsInTiDBZone = make(map[uint64]struct{}, len(aliveStores.storesInAllZones))
for _, as := range aliveStores.storesInAllZones {
// If the `zone` label of the TiFlash store is not set, we treat it as a TiFlash store in other zones.
if tiflashZone, isSet := as.GetLabelValue(placement.DCLabelKey); isSet && tiflashZone == tidbZone {
aliveStores.storeIDsInTiDBZone[as.StoreID()] = struct{}{}
aliveStores.storesInTiDBZone = append(aliveStores.storesInTiDBZone, as)
}
}
}
if !tiflashReplicaReadPolicy.IsClosestReplicas() {
aliveStores.storeIDsInAllZones = make(map[uint64]struct{}, len(aliveStores.storesInAllZones))
for _, as := range aliveStores.storesInAllZones {
aliveStores.storeIDsInAllZones[as.StoreID()] = struct{}{}
}
}
return aliveStores
}

// filterAccessibleStoresAndBuildRegionInfo filters the stores that can be accessed according to:
// 1. tiflash_replica_read policy
// 2. whether the store is alive
// After filtering, it will build the RegionInfo.
func filterAccessibleStoresAndBuildRegionInfo(
cache *RegionCache,
allStores []uint64,
bo *Backoffer,
task *copTask,
rpcCtx *tikv.RPCContext,
aliveStores *aliveStoresBundle,
tiflashReplicaReadPolicy tiflash.ReplicaRead,
regionInfoNeedsReloadOnSendFail []RegionInfo,
regionsInOtherZones []uint64,
maxRemoteReadCountAllowed int,
tidbZone string) (regionInfo RegionInfo, _ []RegionInfo, _ []uint64, err error) {
needCrossZoneAccess := false
allStores, needCrossZoneAccess = filterAllStoresAccordingToTiFlashReplicaRead(allStores, aliveStores, tiflashReplicaReadPolicy)

regionInfo = RegionInfo{
Region: task.region,
Meta: rpcCtx.Meta,
Ranges: task.ranges,
AllStores: allStores,
PartitionIndex: task.partitionIndex}

if needCrossZoneAccess {
regionsInOtherZones = append(regionsInOtherZones, task.region.GetID())
regionInfoNeedsReloadOnSendFail = append(regionInfoNeedsReloadOnSendFail, regionInfo)
if tiflashReplicaReadPolicy.IsClosestReplicas() && len(regionsInOtherZones) > maxRemoteReadCountAllowed {
regionIDErrMsg := ""
for i := 0; i < 3 && i < len(regionsInOtherZones); i++ {
regionIDErrMsg += fmt.Sprintf("%d, ", regionsInOtherZones[i])
}
err = errors.Errorf(
"no less than %d region(s) can not be accessed by TiFlash in the zone [%s]: %setc",
len(regionsInOtherZones), tidbZone, regionIDErrMsg)
// We need to reload the region cache here to avoid the failure throughout the region cache refresh TTL.
cache.OnSendFailForBatchRegions(bo, rpcCtx.Store, regionInfoNeedsReloadOnSendFail, true, err)
return regionInfo, nil, nil, err
}
}
return regionInfo, regionInfoNeedsReloadOnSendFail, regionsInOtherZones, nil
}

type aliveStoresBundle struct {
storesInAllZones []*tikv.Store
storeIDsInAllZones map[uint64]struct{}
storesInTiDBZone []*tikv.Store
storeIDsInTiDBZone map[uint64]struct{}
}

>>>>>>> 9657f063168 (store: remove stores that have no region before balance (#52314)):pkg/store/copr/batch_coprocessor.go
// When `partitionIDs != nil`, it means that buildBatchCopTasksCore is constructing a batch cop tasks for PartitionTableScan.
// At this time, `len(rangesForEachPhysicalTable) == len(partitionIDs)` and `rangesForEachPhysicalTable[i]` is for partition `partitionIDs[i]`.
// Otherwise, `rangesForEachPhysicalTable[0]` indicates the range for the single physical table.
Expand All @@ -800,6 +934,18 @@ func buildBatchCopTasksCore(bo *backoff.Backoffer, store *kvStore, rangesForEach
const cmdType = tikvrpc.CmdBatchCop
rangesLen := 0

<<<<<<< HEAD:store/copr/batch_coprocessor.go
=======
tidbZone, isTiDBLabelZoneSet := config.GetGlobalConfig().Labels[placement.DCLabelKey]
var (
aliveStores *aliveStoresBundle
maxRemoteReadCountAllowed int
)
if !isTiDBLabelZoneSet {
tiflashReplicaReadPolicy = tiflash.AllReplicas
}

>>>>>>> 9657f063168 (store: remove stores that have no region before balance (#52314)):pkg/store/copr/batch_coprocessor.go
for {
var tasks []*copTask
rangesLen = 0
Expand All @@ -820,15 +966,22 @@ func buildBatchCopTasksCore(bo *backoff.Backoffer, store *kvStore, rangesForEach
}
}

<<<<<<< HEAD:store/copr/batch_coprocessor.go
var batchTasks []*batchCopTask

storeTaskMap := make(map[string]*batchCopTask)
=======
rpcCtxs := make([]*tikv.RPCContext, 0, len(tasks))
usedTiFlashStores := make([][]uint64, 0, len(tasks))
usedTiFlashStoresMap := make(map[uint64]struct{}, 0)
>>>>>>> 9657f063168 (store: remove stores that have no region before balance (#52314)):pkg/store/copr/batch_coprocessor.go
needRetry := false
for _, task := range tasks {
rpcCtx, err := cache.GetTiFlashRPCContext(bo.TiKVBackoffer(), task.region, isMPP, tikv.LabelFilterNoTiFlashWriteNode)
if err != nil {
return nil, errors.Trace(err)
}

// When rpcCtx is nil, it's not only attributed to the miss region, but also
// some TiFlash stores crash and can't be recovered.
// That is not an error that can be easily recovered, so we regard this error
Expand All @@ -840,6 +993,7 @@ func buildBatchCopTasksCore(bo *backoff.Backoffer, store *kvStore, rangesForEach
// Then `splitRegion` will reloads these regions.
continue
}
<<<<<<< HEAD:store/copr/batch_coprocessor.go
allStores := cache.GetAllValidTiFlashStores(task.region, rpcCtx.Store, tikv.LabelFilterNoTiFlashWriteNode)
if batchCop, ok := storeTaskMap[rpcCtx.Addr]; ok {
batchCop.regionInfos = append(batchCop.regionInfos, RegionInfo{Region: task.region, Meta: rpcCtx.Meta, Ranges: task.ranges, AllStores: allStores, PartitionIndex: task.partitionIndex})
Expand All @@ -852,7 +1006,17 @@ func buildBatchCopTasksCore(bo *backoff.Backoffer, store *kvStore, rangesForEach
}
storeTaskMap[rpcCtx.Addr] = batchTask
}
=======

allStores, _ := cache.GetAllValidTiFlashStores(task.region, rpcCtx.Store, tikv.LabelFilterNoTiFlashWriteNode)
for _, storeID := range allStores {
usedTiFlashStoresMap[storeID] = struct{}{}
}
rpcCtxs = append(rpcCtxs, rpcCtx)
usedTiFlashStores = append(usedTiFlashStores, allStores)
>>>>>>> 9657f063168 (store: remove stores that have no region before balance (#52314)):pkg/store/copr/batch_coprocessor.go
}

if needRetry {
// As mentioned above, nil rpcCtx is always attributed to failed stores.
// It's equal to long poll the store but get no response. Here we'd better use
Expand All @@ -863,6 +1027,55 @@ func buildBatchCopTasksCore(bo *backoff.Backoffer, store *kvStore, rangesForEach
}
continue
}
<<<<<<< HEAD:store/copr/batch_coprocessor.go
=======

aliveStores = getAliveStoresAndStoreIDs(bo.GetCtx(), cache, usedTiFlashStoresMap, ttl, store, tiflashReplicaReadPolicy, tidbZone)
if tiflashReplicaReadPolicy.IsClosestReplicas() {
if len(aliveStores.storeIDsInTiDBZone) == 0 {
return nil, errors.Errorf("There is no region in tidb zone(%s)", tidbZone)
}
maxRemoteReadCountAllowed = len(aliveStores.storeIDsInTiDBZone) * tiflash.MaxRemoteReadCountPerNodeForClosestReplicas
}

var batchTasks []*batchCopTask
var regionIDsInOtherZones []uint64
var regionInfosNeedReloadOnSendFail []RegionInfo
storeTaskMap := make(map[string]*batchCopTask)
storeIDsUnionSetForAllTasks := make(map[uint64]struct{})
for idx, task := range tasks {
var err error
var regionInfo RegionInfo
regionInfo, regionInfosNeedReloadOnSendFail, regionIDsInOtherZones, err = filterAccessibleStoresAndBuildRegionInfo(cache, usedTiFlashStores[idx], bo, task, rpcCtxs[idx], aliveStores, tiflashReplicaReadPolicy, regionInfosNeedReloadOnSendFail, regionIDsInOtherZones, maxRemoteReadCountAllowed, tidbZone)
if err != nil {
return nil, err
}
if batchCop, ok := storeTaskMap[rpcCtxs[idx].Addr]; ok {
batchCop.regionInfos = append(batchCop.regionInfos, regionInfo)
} else {
batchTask := &batchCopTask{
storeAddr: rpcCtxs[idx].Addr,
cmdType: cmdType,
ctx: rpcCtxs[idx],
regionInfos: []RegionInfo{regionInfo},
}
storeTaskMap[rpcCtxs[idx].Addr] = batchTask
}
for _, storeID := range regionInfo.AllStores {
storeIDsUnionSetForAllTasks[storeID] = struct{}{}
}
}

if len(regionIDsInOtherZones) != 0 {
warningMsg := fmt.Sprintf("total %d region(s) can not be accessed by TiFlash in the zone [%s]:", len(regionIDsInOtherZones), tidbZone)
regionIDErrMsg := ""
for i := 0; i < 3 && i < len(regionIDsInOtherZones); i++ {
regionIDErrMsg += fmt.Sprintf("%d, ", regionIDsInOtherZones[i])
}
warningMsg += regionIDErrMsg + "etc"
appendWarning(errors.NewNoStackErrorf(warningMsg))
}
>>>>>>> 9657f063168 (store: remove stores that have no region before balance (#52314)):pkg/store/copr/batch_coprocessor.go

for _, task := range storeTaskMap {
batchTasks = append(batchTasks, task)
Expand All @@ -875,7 +1088,17 @@ func buildBatchCopTasksCore(bo *backoff.Backoffer, store *kvStore, rangesForEach
logutil.BgLogger().Debug(msg)
}
balanceStart := time.Now()
<<<<<<< HEAD:store/copr/batch_coprocessor.go
batchTasks = balanceBatchCopTask(bo.GetCtx(), store, batchTasks, isMPP, ttl, balanceWithContinuity, balanceContinuousRegionCount)
=======
storesUnionSetForAllTasks := make([]*tikv.Store, 0, len(storeIDsUnionSetForAllTasks))
for _, store := range aliveStores.storesInAllZones {
if _, ok := storeIDsUnionSetForAllTasks[store.StoreID()]; ok {
storesUnionSetForAllTasks = append(storesUnionSetForAllTasks, store)
}
}
batchTasks = balanceBatchCopTask(storesUnionSetForAllTasks, batchTasks, balanceWithContinuity, balanceContinuousRegionCount)
>>>>>>> 9657f063168 (store: remove stores that have no region before balance (#52314)):pkg/store/copr/batch_coprocessor.go
balanceElapsed := time.Since(balanceStart)
if log.GetLevel() <= zap.DebugLevel {
msg := "After region balance:"
Expand Down
52 changes: 52 additions & 0 deletions store/copr/batch_coprocessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,21 @@ import (
"time"

"github.com/pingcap/errors"
<<<<<<< HEAD:store/copr/batch_coprocessor_test.go
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/driver/backoff"
"github.com/pingcap/tidb/util/logutil"
=======
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/store/driver/backoff"
"github.com/pingcap/tidb/pkg/util/logutil"
>>>>>>> 9657f063168 (store: remove stores that have no region before balance (#52314)):pkg/store/copr/batch_coprocessor_test.go
"github.com/stathat/consistent"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/testutils"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/tikvrpc"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -125,13 +134,21 @@ func TestBalanceBatchCopTaskWithContinuity(t *testing.T) {
func TestBalanceBatchCopTaskWithEmptyTaskSet(t *testing.T) {
{
var nilTaskSet []*batchCopTask
<<<<<<< HEAD:store/copr/batch_coprocessor_test.go
nilResult := balanceBatchCopTask(nil, nil, nilTaskSet, false, time.Second, false, 0)
=======
nilResult := balanceBatchCopTask(nil, nilTaskSet, false, 0)
>>>>>>> 9657f063168 (store: remove stores that have no region before balance (#52314)):pkg/store/copr/batch_coprocessor_test.go
require.True(t, nilResult == nil)
}

{
emptyTaskSet := make([]*batchCopTask, 0)
<<<<<<< HEAD:store/copr/batch_coprocessor_test.go
emptyResult := balanceBatchCopTask(nil, nil, emptyTaskSet, false, time.Second, false, 0)
=======
emptyResult := balanceBatchCopTask(nil, emptyTaskSet, false, 0)
>>>>>>> 9657f063168 (store: remove stores that have no region before balance (#52314)):pkg/store/copr/batch_coprocessor_test.go
require.True(t, emptyResult != nil)
require.True(t, len(emptyResult) == 0)
}
Expand Down Expand Up @@ -282,3 +299,38 @@ func TestTopoFetcherBackoff(t *testing.T) {
require.GreaterOrEqual(t, dura, 30*time.Second)
require.LessOrEqual(t, dura, 50*time.Second)
}

func TestGetAllUsedTiFlashStores(t *testing.T) {
mockClient, _, pdClient, err := testutils.NewMockTiKV("", nil)
require.NoError(t, err)
defer func() {
pdClient.Close()
err = mockClient.Close()
require.NoError(t, err)
}()

pdCli := tikv.NewCodecPDClient(tikv.ModeTxn, pdClient)
defer pdCli.Close()

cache := NewRegionCache(tikv.NewRegionCache(pdCli))
defer cache.Close()

label1 := metapb.StoreLabel{Key: tikvrpc.EngineLabelKey, Value: tikvrpc.EngineLabelTiFlash}
label2 := metapb.StoreLabel{Key: tikvrpc.EngineRoleLabelKey, Value: tikvrpc.EngineLabelTiFlashCompute}

cache.SetRegionCacheStore(1, "192.168.1.1", "", tikvrpc.TiFlash, 1, []*metapb.StoreLabel{&label1, &label2})
cache.SetRegionCacheStore(2, "192.168.1.2", "192.168.1.3", tikvrpc.TiFlash, 1, []*metapb.StoreLabel{&label1, &label2})
cache.SetRegionCacheStore(3, "192.168.1.3", "192.168.1.2", tikvrpc.TiFlash, 1, []*metapb.StoreLabel{&label1, &label2})

allUsedTiFlashStoresMap := make(map[uint64]struct{})
allUsedTiFlashStoresMap[2] = struct{}{}
allUsedTiFlashStoresMap[3] = struct{}{}
allTiFlashStores := cache.RegionCache.GetTiFlashStores(tikv.LabelFilterNoTiFlashWriteNode)
require.Equal(t, 3, len(allTiFlashStores))
allUsedTiFlashStores := getAllUsedTiFlashStores(allTiFlashStores, allUsedTiFlashStoresMap)
require.Equal(t, len(allUsedTiFlashStoresMap), len(allUsedTiFlashStores))
for _, store := range allUsedTiFlashStores {
_, ok := allUsedTiFlashStoresMap[store.StoreID()]
require.True(t, ok)
}
}

0 comments on commit db38cd5

Please sign in to comment.