Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store: remove stores that have no region before balance #52314

Merged
merged 21 commits into from
May 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pkg/store/copr/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ go_test(
embed = [":copr"],
flaky = True,
race = "on",
shard_count = 29,
shard_count = 30,
deps = [
"//pkg/kv",
"//pkg/store/driver/backoff",
Expand All @@ -91,6 +91,7 @@ go_test(
"//pkg/util/trxevents",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_kvproto//pkg/coprocessor",
"@com_github_pingcap_kvproto//pkg/metapb",
"@com_github_pingcap_kvproto//pkg/mpp",
"@com_github_stathat_consistent//:consistent",
"@com_github_stretchr_testify//require",
Expand Down
117 changes: 85 additions & 32 deletions pkg/store/copr/batch_coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ func balanceBatchCopTaskWithContinuity(storeTaskMap map[uint64]*batchCopTask, ca
//
// The second balance strategy: Not only consider the region count between TiFlash stores, but also try to make the regions' range continuous(stored in TiFlash closely).
// If balanceWithContinuity is true, the second balance strategy is enable.
func balanceBatchCopTask(ctx context.Context, aliveStores []*tikv.Store, originalTasks []*batchCopTask, balanceWithContinuity bool, balanceContinuousRegionCount int64) []*batchCopTask {
func balanceBatchCopTask(aliveStores []*tikv.Store, originalTasks []*batchCopTask, balanceWithContinuity bool, balanceContinuousRegionCount int64) []*batchCopTask {
if len(originalTasks) == 0 {
log.Info("Batch cop task balancer got an empty task set.")
return originalTasks
Expand Down Expand Up @@ -819,12 +819,24 @@ func filterAllStoresAccordingToTiFlashReplicaRead(allStores []uint64, aliveStore
return
}

func getAllUsedTiFlashStores(allTiFlashStores []*tikv.Store, allUsedTiFlashStoresMap map[uint64]struct{}) []*tikv.Store {
allUsedTiFlashStores := make([]*tikv.Store, 0, len(allUsedTiFlashStoresMap))
for _, store := range allTiFlashStores {
_, ok := allUsedTiFlashStoresMap[store.StoreID()]
if ok {
allUsedTiFlashStores = append(allUsedTiFlashStores, store)
}
}
return allUsedTiFlashStores
}

// getAliveStoresAndStoreIDs gets alive TiFlash stores and their IDs.
// If tiflashReplicaReadPolicy is not all_replicas, it will also return the IDs of the alive TiFlash stores in TiDB zone.
func getAliveStoresAndStoreIDs(ctx context.Context, cache *RegionCache, ttl time.Duration, store *kvStore, tiflashReplicaReadPolicy tiflash.ReplicaRead, tidbZone string) (aliveStores *aliveStoresBundle) {
func getAliveStoresAndStoreIDs(ctx context.Context, cache *RegionCache, allUsedTiFlashStoresMap map[uint64]struct{}, ttl time.Duration, store *kvStore, tiflashReplicaReadPolicy tiflash.ReplicaRead, tidbZone string) (aliveStores *aliveStoresBundle) {
aliveStores = new(aliveStoresBundle)
allTiFlashStores := cache.RegionCache.GetTiFlashStores(tikv.LabelFilterNoTiFlashWriteNode)
aliveStores.storesInAllZones = filterAliveStores(ctx, allTiFlashStores, ttl, store)
allUsedTiFlashStores := getAllUsedTiFlashStores(allTiFlashStores, allUsedTiFlashStoresMap)
aliveStores.storesInAllZones = filterAliveStores(ctx, allUsedTiFlashStores, ttl, store)

if !tiflashReplicaReadPolicy.IsAllReplicas() {
aliveStores.storeIDsInTiDBZone = make(map[uint64]struct{}, len(aliveStores.storesInAllZones))
Expand All @@ -849,11 +861,28 @@ func getAliveStoresAndStoreIDs(ctx context.Context, cache *RegionCache, ttl time
// 1. tiflash_replica_read policy
// 2. whether the store is alive
// After filtering, it will build the RegionInfo.
func filterAccessibleStoresAndBuildRegionInfo(cache *RegionCache, bo *Backoffer, task *copTask, rpcCtx *tikv.RPCContext, aliveStores *aliveStoresBundle, isTiDBLabelZoneSet bool, tiflashReplicaReadPolicy tiflash.ReplicaRead, regionInfoNeedsReloadOnSendFail []RegionInfo, regionsInOtherZones []uint64, maxRemoteReadCountAllowed int, tidbZone string) (regionInfo RegionInfo, _ []RegionInfo, _ []uint64, err error) {
func filterAccessibleStoresAndBuildRegionInfo(
cache *RegionCache,
allStores []uint64,
bo *Backoffer,
task *copTask,
rpcCtx *tikv.RPCContext,
aliveStores *aliveStoresBundle,
tiflashReplicaReadPolicy tiflash.ReplicaRead,
regionInfoNeedsReloadOnSendFail []RegionInfo,
regionsInOtherZones []uint64,
maxRemoteReadCountAllowed int,
tidbZone string) (regionInfo RegionInfo, _ []RegionInfo, _ []uint64, err error) {
needCrossZoneAccess := false
allStores, _ := cache.GetAllValidTiFlashStores(task.region, rpcCtx.Store, tikv.LabelFilterNoTiFlashWriteNode)
allStores, needCrossZoneAccess = filterAllStoresAccordingToTiFlashReplicaRead(allStores, aliveStores, tiflashReplicaReadPolicy)
regionInfo = RegionInfo{Region: task.region, Meta: rpcCtx.Meta, Ranges: task.ranges, AllStores: allStores, PartitionIndex: task.partitionIndex}

regionInfo = RegionInfo{
Region: task.region,
Meta: rpcCtx.Meta,
Ranges: task.ranges,
AllStores: allStores,
PartitionIndex: task.partitionIndex}

if needCrossZoneAccess {
regionsInOtherZones = append(regionsInOtherZones, task.region.GetID())
regionInfoNeedsReloadOnSendFail = append(regionInfoNeedsReloadOnSendFail, regionInfo)
Expand All @@ -862,7 +891,9 @@ func filterAccessibleStoresAndBuildRegionInfo(cache *RegionCache, bo *Backoffer,
for i := 0; i < 3 && i < len(regionsInOtherZones); i++ {
regionIDErrMsg += fmt.Sprintf("%d, ", regionsInOtherZones[i])
}
err = errors.Errorf("no less than %d region(s) can not be accessed by TiFlash in the zone [%s]: %setc", len(regionsInOtherZones), tidbZone, regionIDErrMsg)
err = errors.Errorf(
"no less than %d region(s) can not be accessed by TiFlash in the zone [%s]: %setc",
len(regionsInOtherZones), tidbZone, regionIDErrMsg)
// We need to reload the region cache here to avoid the failure throughout the region cache refresh TTL.
cache.OnSendFailForBatchRegions(bo, rpcCtx.Store, regionInfoNeedsReloadOnSendFail, true, err)
return regionInfo, nil, nil, err
Expand Down Expand Up @@ -895,10 +926,7 @@ func buildBatchCopTasksCore(bo *backoff.Backoffer, store *kvStore, rangesForEach
if !isTiDBLabelZoneSet {
tiflashReplicaReadPolicy = tiflash.AllReplicas
}
aliveStores = getAliveStoresAndStoreIDs(bo.GetCtx(), cache, ttl, store, tiflashReplicaReadPolicy, tidbZone)
if tiflashReplicaReadPolicy.IsClosestReplicas() {
maxRemoteReadCountAllowed = len(aliveStores.storeIDsInTiDBZone) * tiflash.MaxRemoteReadCountPerNodeForClosestReplicas
}

for {
var tasks []*copTask
rangesLen = 0
Expand All @@ -919,17 +947,16 @@ func buildBatchCopTasksCore(bo *backoff.Backoffer, store *kvStore, rangesForEach
}
}

var batchTasks []*batchCopTask
var regionIDsInOtherZones []uint64
var regionInfosNeedReloadOnSendFail []RegionInfo
storeTaskMap := make(map[string]*batchCopTask)
rpcCtxs := make([]*tikv.RPCContext, 0, len(tasks))
usedTiFlashStores := make([][]uint64, 0, len(tasks))
usedTiFlashStoresMap := make(map[uint64]struct{}, 0)
needRetry := false
storeIDsUnionSetForAllTasks := make(map[uint64]struct{})
for _, task := range tasks {
rpcCtx, err := cache.GetTiFlashRPCContext(bo.TiKVBackoffer(), task.region, isMPP, tikv.LabelFilterNoTiFlashWriteNode)
if err != nil {
return nil, errors.Trace(err)
}

// When rpcCtx is nil, it's not only attributed to the miss region, but also
// some TiFlash stores crash and can't be recovered.
// That is not an error that can be easily recovered, so we regard this error
Expand All @@ -941,36 +968,62 @@ func buildBatchCopTasksCore(bo *backoff.Backoffer, store *kvStore, rangesForEach
// Then `splitRegion` will reloads these regions.
continue
}

allStores, _ := cache.GetAllValidTiFlashStores(task.region, rpcCtx.Store, tikv.LabelFilterNoTiFlashWriteNode)
for _, storeID := range allStores {
usedTiFlashStoresMap[storeID] = struct{}{}
}
rpcCtxs = append(rpcCtxs, rpcCtx)
usedTiFlashStores = append(usedTiFlashStores, allStores)
}

if needRetry {
// As mentioned above, nil rpcCtx is always attributed to failed stores.
// It's equal to long poll the store but get no response. Here we'd better use
// TiFlash error to trigger the TiKV fallback mechanism.
err := bo.Backoff(tikv.BoTiFlashRPC(), errors.New("Cannot find region with TiFlash peer"))
if err != nil {
return nil, errors.Trace(err)
}
continue
}

aliveStores = getAliveStoresAndStoreIDs(bo.GetCtx(), cache, usedTiFlashStoresMap, ttl, store, tiflashReplicaReadPolicy, tidbZone)
if tiflashReplicaReadPolicy.IsClosestReplicas() {
if len(aliveStores.storeIDsInTiDBZone) == 0 {
return nil, errors.Errorf("There is no region in tidb zone(%s)", tidbZone)
}
maxRemoteReadCountAllowed = len(aliveStores.storeIDsInTiDBZone) * tiflash.MaxRemoteReadCountPerNodeForClosestReplicas
}

var batchTasks []*batchCopTask
var regionIDsInOtherZones []uint64
var regionInfosNeedReloadOnSendFail []RegionInfo
storeTaskMap := make(map[string]*batchCopTask)
storeIDsUnionSetForAllTasks := make(map[uint64]struct{})
for idx, task := range tasks {
var err error
var regionInfo RegionInfo
regionInfo, regionInfosNeedReloadOnSendFail, regionIDsInOtherZones, err = filterAccessibleStoresAndBuildRegionInfo(cache, bo, task, rpcCtx, aliveStores, isTiDBLabelZoneSet, tiflashReplicaReadPolicy, regionInfosNeedReloadOnSendFail, regionIDsInOtherZones, maxRemoteReadCountAllowed, tidbZone)
regionInfo, regionInfosNeedReloadOnSendFail, regionIDsInOtherZones, err = filterAccessibleStoresAndBuildRegionInfo(cache, usedTiFlashStores[idx], bo, task, rpcCtxs[idx], aliveStores, tiflashReplicaReadPolicy, regionInfosNeedReloadOnSendFail, regionIDsInOtherZones, maxRemoteReadCountAllowed, tidbZone)
if err != nil {
return nil, err
}
if batchCop, ok := storeTaskMap[rpcCtx.Addr]; ok {
if batchCop, ok := storeTaskMap[rpcCtxs[idx].Addr]; ok {
batchCop.regionInfos = append(batchCop.regionInfos, regionInfo)
} else {
batchTask := &batchCopTask{
storeAddr: rpcCtx.Addr,
storeAddr: rpcCtxs[idx].Addr,
cmdType: cmdType,
ctx: rpcCtx,
ctx: rpcCtxs[idx],
regionInfos: []RegionInfo{regionInfo},
}
storeTaskMap[rpcCtx.Addr] = batchTask
storeTaskMap[rpcCtxs[idx].Addr] = batchTask
}
for _, storeID := range regionInfo.AllStores {
storeIDsUnionSetForAllTasks[storeID] = struct{}{}
}
}
if needRetry {
// As mentioned above, nil rpcCtx is always attributed to failed stores.
// It's equal to long poll the store but get no response. Here we'd better use
// TiFlash error to trigger the TiKV fallback mechanism.
err := bo.Backoff(tikv.BoTiFlashRPC(), errors.New("Cannot find region with TiFlash peer"))
if err != nil {
return nil, errors.Trace(err)
}
continue
}

if len(regionIDsInOtherZones) != 0 {
warningMsg := fmt.Sprintf("total %d region(s) can not be accessed by TiFlash in the zone [%s]:", len(regionIDsInOtherZones), tidbZone)
regionIDErrMsg := ""
Expand Down Expand Up @@ -998,7 +1051,7 @@ func buildBatchCopTasksCore(bo *backoff.Backoffer, store *kvStore, rangesForEach
storesUnionSetForAllTasks = append(storesUnionSetForAllTasks, store)
}
}
batchTasks = balanceBatchCopTask(bo.GetCtx(), storesUnionSetForAllTasks, batchTasks, balanceWithContinuity, balanceContinuousRegionCount)
batchTasks = balanceBatchCopTask(storesUnionSetForAllTasks, batchTasks, balanceWithContinuity, balanceContinuousRegionCount)
balanceElapsed := time.Since(balanceStart)
if log.GetLevel() <= zap.DebugLevel {
msg := "After region balance:"
Expand Down
42 changes: 40 additions & 2 deletions pkg/store/copr/batch_coprocessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,15 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/store/driver/backoff"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/stathat/consistent"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/testutils"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/tikvrpc"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -125,13 +128,13 @@ func TestBalanceBatchCopTaskWithContinuity(t *testing.T) {
func TestBalanceBatchCopTaskWithEmptyTaskSet(t *testing.T) {
{
var nilTaskSet []*batchCopTask
nilResult := balanceBatchCopTask(nil, nil, nilTaskSet, false, 0)
nilResult := balanceBatchCopTask(nil, nilTaskSet, false, 0)
require.True(t, nilResult == nil)
}

{
emptyTaskSet := make([]*batchCopTask, 0)
emptyResult := balanceBatchCopTask(nil, nil, emptyTaskSet, false, 0)
emptyResult := balanceBatchCopTask(nil, emptyTaskSet, false, 0)
require.True(t, emptyResult != nil)
require.True(t, len(emptyResult) == 0)
}
Expand Down Expand Up @@ -282,3 +285,38 @@ func TestTopoFetcherBackoff(t *testing.T) {
require.GreaterOrEqual(t, dura, 30*time.Second)
require.LessOrEqual(t, dura, 50*time.Second)
}

func TestGetAllUsedTiFlashStores(t *testing.T) {
mockClient, _, pdClient, err := testutils.NewMockTiKV("", nil)
require.NoError(t, err)
defer func() {
pdClient.Close()
err = mockClient.Close()
require.NoError(t, err)
}()

pdCli := tikv.NewCodecPDClient(tikv.ModeTxn, pdClient)
defer pdCli.Close()

cache := NewRegionCache(tikv.NewRegionCache(pdCli))
defer cache.Close()

label1 := metapb.StoreLabel{Key: tikvrpc.EngineLabelKey, Value: tikvrpc.EngineLabelTiFlash}
label2 := metapb.StoreLabel{Key: tikvrpc.EngineRoleLabelKey, Value: tikvrpc.EngineLabelTiFlashCompute}

cache.SetRegionCacheStore(1, "192.168.1.1", "", tikvrpc.TiFlash, 1, []*metapb.StoreLabel{&label1, &label2})
cache.SetRegionCacheStore(2, "192.168.1.2", "192.168.1.3", tikvrpc.TiFlash, 1, []*metapb.StoreLabel{&label1, &label2})
cache.SetRegionCacheStore(3, "192.168.1.3", "192.168.1.2", tikvrpc.TiFlash, 1, []*metapb.StoreLabel{&label1, &label2})

allUsedTiFlashStoresMap := make(map[uint64]struct{})
allUsedTiFlashStoresMap[2] = struct{}{}
allUsedTiFlashStoresMap[3] = struct{}{}
allTiFlashStores := cache.RegionCache.GetTiFlashStores(tikv.LabelFilterNoTiFlashWriteNode)
require.Equal(t, 3, len(allTiFlashStores))
allUsedTiFlashStores := getAllUsedTiFlashStores(allTiFlashStores, allUsedTiFlashStoresMap)
require.Equal(t, len(allUsedTiFlashStoresMap), len(allUsedTiFlashStores))
for _, store := range allUsedTiFlashStores {
_, ok := allUsedTiFlashStoresMap[store.StoreID()]
require.True(t, ok)
}
}