diff --git a/pkg/store/copr/BUILD.bazel b/pkg/store/copr/BUILD.bazel index 3649ccff14778..13a70a9c00aa9 100644 --- a/pkg/store/copr/BUILD.bazel +++ b/pkg/store/copr/BUILD.bazel @@ -82,7 +82,7 @@ go_test( embed = [":copr"], flaky = True, race = "on", - shard_count = 29, + shard_count = 30, deps = [ "//pkg/kv", "//pkg/store/driver/backoff", @@ -92,6 +92,7 @@ go_test( "//pkg/util/trxevents", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_kvproto//pkg/coprocessor", + "@com_github_pingcap_kvproto//pkg/metapb", "@com_github_pingcap_kvproto//pkg/mpp", "@com_github_stathat_consistent//:consistent", "@com_github_stretchr_testify//require", diff --git a/pkg/store/copr/batch_coprocessor.go b/pkg/store/copr/batch_coprocessor.go index a386a58574410..c33e8f9f6e112 100644 --- a/pkg/store/copr/batch_coprocessor.go +++ b/pkg/store/copr/batch_coprocessor.go @@ -820,12 +820,24 @@ func filterAllStoresAccordingToTiFlashReplicaRead(allStores []uint64, aliveStore return } +func getAllUsedTiFlashStores(allTiFlashStores []*tikv.Store, allUsedTiFlashStoresMap map[uint64]struct{}) []*tikv.Store { + allUsedTiFlashStores := make([]*tikv.Store, 0, len(allUsedTiFlashStoresMap)) + for _, store := range allTiFlashStores { + _, ok := allUsedTiFlashStoresMap[store.StoreID()] + if ok { + allUsedTiFlashStores = append(allUsedTiFlashStores, store) + } + } + return allUsedTiFlashStores +} + // getAliveStoresAndStoreIDs gets alive TiFlash stores and their IDs. // If tiflashReplicaReadPolicy is not all_replicas, it will also return the IDs of the alive TiFlash stores in TiDB zone. -func getAliveStoresAndStoreIDs(ctx context.Context, cache *RegionCache, ttl time.Duration, store *kvStore, tiflashReplicaReadPolicy tiflash.ReplicaRead, tidbZone string) (aliveStores *aliveStoresBundle) { +func getAliveStoresAndStoreIDs(ctx context.Context, cache *RegionCache, allUsedTiFlashStoresMap map[uint64]struct{}, ttl time.Duration, store *kvStore, tiflashReplicaReadPolicy tiflash.ReplicaRead, tidbZone string) (aliveStores *aliveStoresBundle) { aliveStores = new(aliveStoresBundle) allTiFlashStores := cache.RegionCache.GetTiFlashStores(tikv.LabelFilterNoTiFlashWriteNode) - aliveStores.storesInAllZones = filterAliveStores(ctx, allTiFlashStores, ttl, store) + allUsedTiFlashStores := getAllUsedTiFlashStores(allTiFlashStores, allUsedTiFlashStoresMap) + aliveStores.storesInAllZones = filterAliveStores(ctx, allUsedTiFlashStores, ttl, store) if !tiflashReplicaReadPolicy.IsAllReplicas() { aliveStores.storeIDsInTiDBZone = make(map[uint64]struct{}, len(aliveStores.storesInAllZones)) @@ -850,9 +862,8 @@ func getAliveStoresAndStoreIDs(ctx context.Context, cache *RegionCache, ttl time // 1. tiflash_replica_read policy // 2. whether the store is alive // After filtering, it will build the RegionInfo. -func filterAccessibleStoresAndBuildRegionInfo(cache *RegionCache, bo *Backoffer, task *copTask, rpcCtx *tikv.RPCContext, aliveStores *aliveStoresBundle, isTiDBLabelZoneSet bool, tiflashReplicaReadPolicy tiflash.ReplicaRead, regionInfoNeedsReloadOnSendFail []RegionInfo, regionsInOtherZones []uint64, maxRemoteReadCountAllowed int, tidbZone string) (regionInfo RegionInfo, _ []RegionInfo, _ []uint64, err error) { +func filterAccessibleStoresAndBuildRegionInfo(cache *RegionCache, allStores []uint64, bo *Backoffer, task *copTask, rpcCtx *tikv.RPCContext, aliveStores *aliveStoresBundle, isTiDBLabelZoneSet bool, tiflashReplicaReadPolicy tiflash.ReplicaRead, regionInfoNeedsReloadOnSendFail []RegionInfo, regionsInOtherZones []uint64, maxRemoteReadCountAllowed int, tidbZone string) (regionInfo RegionInfo, _ []RegionInfo, _ []uint64, err error) { needCrossZoneAccess := false - allStores, _ := cache.GetAllValidTiFlashStores(task.region, rpcCtx.Store, tikv.LabelFilterNoTiFlashWriteNode) allStores, needCrossZoneAccess = filterAllStoresAccordingToTiFlashReplicaRead(allStores, aliveStores, tiflashReplicaReadPolicy) regionInfo = RegionInfo{Region: task.region, Meta: rpcCtx.Meta, Ranges: task.ranges, AllStores: allStores, PartitionIndex: task.partitionIndex} if needCrossZoneAccess { @@ -896,10 +907,7 @@ func buildBatchCopTasksCore(bo *backoff.Backoffer, store *kvStore, rangesForEach if !isTiDBLabelZoneSet { tiflashReplicaReadPolicy = tiflash.AllReplicas } - aliveStores = getAliveStoresAndStoreIDs(bo.GetCtx(), cache, ttl, store, tiflashReplicaReadPolicy, tidbZone) - if tiflashReplicaReadPolicy.IsClosestReplicas() { - maxRemoteReadCountAllowed = len(aliveStores.storeIDsInTiDBZone) * tiflash.MaxRemoteReadCountPerNodeForClosestReplicas - } + for { var tasks []*copTask rangesLen = 0 @@ -920,12 +928,10 @@ func buildBatchCopTasksCore(bo *backoff.Backoffer, store *kvStore, rangesForEach } } - var batchTasks []*batchCopTask - var regionIDsInOtherZones []uint64 - var regionInfosNeedReloadOnSendFail []RegionInfo - storeTaskMap := make(map[string]*batchCopTask) + rpcCtxs := make([]*tikv.RPCContext, 0, len(tasks)) + usedTiFlashStores := make([][]uint64, 0, len(tasks)) + usedTiFlashStoresMap := make(map[uint64]struct{}, 0) needRetry := false - storeIDsUnionSetForAllTasks := make(map[uint64]struct{}) for _, task := range tasks { rpcCtx, err := cache.GetTiFlashRPCContext(bo.TiKVBackoffer(), task.region, isMPP, tikv.LabelFilterNoTiFlashWriteNode) if err != nil { @@ -942,36 +948,62 @@ func buildBatchCopTasksCore(bo *backoff.Backoffer, store *kvStore, rangesForEach // Then `splitRegion` will reloads these regions. continue } + + allStores, _ := cache.GetAllValidTiFlashStores(task.region, rpcCtx.Store, tikv.LabelFilterNoTiFlashWriteNode) + for _, storeID := range allStores { + usedTiFlashStoresMap[storeID] = struct{}{} + } + rpcCtxs = append(rpcCtxs, rpcCtx) + usedTiFlashStores = append(usedTiFlashStores, allStores) + } + + if needRetry { + // As mentioned above, nil rpcCtx is always attributed to failed stores. + // It's equal to long poll the store but get no response. Here we'd better use + // TiFlash error to trigger the TiKV fallback mechanism. + err := bo.Backoff(tikv.BoTiFlashRPC(), errors.New("Cannot find region with TiFlash peer")) + if err != nil { + return nil, errors.Trace(err) + } + continue + } + + aliveStores = getAliveStoresAndStoreIDs(bo.GetCtx(), cache, usedTiFlashStoresMap, ttl, store, tiflashReplicaReadPolicy, tidbZone) + if tiflashReplicaReadPolicy.IsClosestReplicas() { + if len(aliveStores.storeIDsInTiDBZone) == 0 { + return nil, errors.Errorf("There is no region in tidb zone(%s)", tidbZone) + } + maxRemoteReadCountAllowed = len(aliveStores.storeIDsInTiDBZone) * tiflash.MaxRemoteReadCountPerNodeForClosestReplicas + } + + var batchTasks []*batchCopTask + var regionIDsInOtherZones []uint64 + var regionInfosNeedReloadOnSendFail []RegionInfo + storeTaskMap := make(map[string]*batchCopTask) + storeIDsUnionSetForAllTasks := make(map[uint64]struct{}) + for idx, task := range tasks { + var err error var regionInfo RegionInfo - regionInfo, regionInfosNeedReloadOnSendFail, regionIDsInOtherZones, err = filterAccessibleStoresAndBuildRegionInfo(cache, bo, task, rpcCtx, aliveStores, isTiDBLabelZoneSet, tiflashReplicaReadPolicy, regionInfosNeedReloadOnSendFail, regionIDsInOtherZones, maxRemoteReadCountAllowed, tidbZone) + regionInfo, regionInfosNeedReloadOnSendFail, regionIDsInOtherZones, err = filterAccessibleStoresAndBuildRegionInfo(cache, usedTiFlashStores[idx], bo, task, rpcCtxs[idx], aliveStores, isTiDBLabelZoneSet, tiflashReplicaReadPolicy, regionInfosNeedReloadOnSendFail, regionIDsInOtherZones, maxRemoteReadCountAllowed, tidbZone) if err != nil { return nil, err } - if batchCop, ok := storeTaskMap[rpcCtx.Addr]; ok { + if batchCop, ok := storeTaskMap[rpcCtxs[idx].Addr]; ok { batchCop.regionInfos = append(batchCop.regionInfos, regionInfo) } else { batchTask := &batchCopTask{ - storeAddr: rpcCtx.Addr, + storeAddr: rpcCtxs[idx].Addr, cmdType: cmdType, - ctx: rpcCtx, + ctx: rpcCtxs[idx], regionInfos: []RegionInfo{regionInfo}, } - storeTaskMap[rpcCtx.Addr] = batchTask + storeTaskMap[rpcCtxs[idx].Addr] = batchTask } for _, storeID := range regionInfo.AllStores { storeIDsUnionSetForAllTasks[storeID] = struct{}{} } } - if needRetry { - // As mentioned above, nil rpcCtx is always attributed to failed stores. - // It's equal to long poll the store but get no response. Here we'd better use - // TiFlash error to trigger the TiKV fallback mechanism. - err := bo.Backoff(tikv.BoTiFlashRPC(), errors.New("Cannot find region with TiFlash peer")) - if err != nil { - return nil, errors.Trace(err) - } - continue - } + if len(regionIDsInOtherZones) != 0 { warningMsg := fmt.Sprintf("total %d region(s) can not be accessed by TiFlash in the zone [%s]:", len(regionIDsInOtherZones), tidbZone) regionIDErrMsg := "" diff --git a/pkg/store/copr/batch_coprocessor_test.go b/pkg/store/copr/batch_coprocessor_test.go index e94d2c17effe9..687adb80c4ff3 100644 --- a/pkg/store/copr/batch_coprocessor_test.go +++ b/pkg/store/copr/batch_coprocessor_test.go @@ -23,12 +23,15 @@ import ( "time" "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/store/driver/backoff" "github.com/pingcap/tidb/pkg/util/logutil" "github.com/stathat/consistent" "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/testutils" "github.com/tikv/client-go/v2/tikv" + "github.com/tikv/client-go/v2/tikvrpc" "go.uber.org/zap" ) @@ -282,3 +285,38 @@ func TestTopoFetcherBackoff(t *testing.T) { require.GreaterOrEqual(t, dura, 30*time.Second) require.LessOrEqual(t, dura, 50*time.Second) } + +func TestGetAllUsedTiFlashStores(t *testing.T) { + mockClient, _, pdClient, err := testutils.NewMockTiKV("", nil) + require.NoError(t, err) + defer func() { + pdClient.Close() + err = mockClient.Close() + require.NoError(t, err) + }() + + pdCli := tikv.NewCodecPDClient(tikv.ModeTxn, pdClient) + defer pdCli.Close() + + cache := NewRegionCache(tikv.NewRegionCache(pdCli)) + defer cache.Close() + + label1 := metapb.StoreLabel{Key: tikvrpc.EngineLabelKey, Value: tikvrpc.EngineLabelTiFlash} + label2 := metapb.StoreLabel{Key: tikvrpc.EngineRoleLabelKey, Value: tikvrpc.EngineLabelTiFlashCompute} + + cache.SetRegionCacheStore(1, "192.168.1.1", "", tikvrpc.TiFlash, 0, []*metapb.StoreLabel{&label1, &label2}) + cache.SetRegionCacheStore(2, "192.168.1.2", "192.168.1.3", tikvrpc.TiFlash, 0, []*metapb.StoreLabel{&label1, &label2}) + cache.SetRegionCacheStore(3, "192.168.1.3", "192.168.1.2", tikvrpc.TiFlash, 0, []*metapb.StoreLabel{&label1, &label2}) + + allUsedTiFlashStoresMap := make(map[uint64]struct{}) + allUsedTiFlashStoresMap[2] = struct{}{} + allUsedTiFlashStoresMap[3] = struct{}{} + allTiFlashStores := cache.RegionCache.GetTiFlashStores(tikv.LabelFilterNoTiFlashWriteNode) + require.Equal(t, 3, len(allTiFlashStores)) + allUsedTiFlashStores := getAllUsedTiFlashStores(allTiFlashStores, allUsedTiFlashStoresMap) + require.Equal(t, len(allUsedTiFlashStoresMap), len(allUsedTiFlashStores)) + for _, store := range allUsedTiFlashStores { + _, ok := allUsedTiFlashStoresMap[store.StoreID()] + require.True(t, ok) + } +}