From 8c85e6f8076e63f1d142a0bac5ba3a1eff064108 Mon Sep 17 00:00:00 2001 From: tangenta Date: Sun, 29 Jan 2023 16:05:54 +0800 Subject: [PATCH 1/2] This is an automated cherry-pick of #40411 Signed-off-by: ti-chi-bot --- ddl/backfilling.go | 323 +++++++++++++++++++++++++++++++- store/copr/batch_coprocessor.go | 153 ++++++++++++++- store/copr/coprocessor_test.go | 21 ++- store/copr/region_cache.go | 14 +- 4 files changed, 495 insertions(+), 16 deletions(-) diff --git a/ddl/backfilling.go b/ddl/backfilling.go index 5ea026184f13a..44a0b803423aa 100644 --- a/ddl/backfilling.go +++ b/ddl/backfilling.go @@ -359,7 +359,7 @@ func (w *backfillWorker) run(d *ddlCtx, bf backfiller, job *model.Job) { // splitTableRanges uses PD region's key ranges to split the backfilling table key range space, // to speed up backfilling data in table with disperse handle. // The `t` should be a non-partitioned table or a partition. -func splitTableRanges(t table.PhysicalTable, store kv.Storage, startKey, endKey kv.Key) ([]kv.KeyRange, error) { +func splitTableRanges(t table.PhysicalTable, store kv.Storage, startKey, endKey kv.Key, limit int) ([]kv.KeyRange, error) { logutil.BgLogger().Info("[ddl] split table range from PD", zap.Int64("physicalTableID", t.GetPhysicalID()), zap.String("start key", hex.EncodeToString(startKey)), @@ -374,7 +374,7 @@ func splitTableRanges(t table.PhysicalTable, store kv.Storage, startKey, endKey maxSleep := 10000 // ms bo := backoff.NewBackofferWithVars(context.Background(), maxSleep, nil) rc := copr.NewRegionCache(s.GetRegionCache()) - ranges, err := rc.SplitRegionRanges(bo, []kv.KeyRange{kvRange}) + ranges, err := rc.SplitRegionRanges(bo, []kv.KeyRange{kvRange}, limit) if err != nil { return nil, errors.Trace(err) } @@ -823,7 +823,7 @@ func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sessionPool, t table.Physic } for { - kvRanges, err := splitTableRanges(t, reorgInfo.d.store, startKey, endKey) + kvRanges, err := splitTableRanges(t, reorgInfo.d.store, startKey, endKey, backfillTaskChanSize) if err != nil { return errors.Trace(err) } @@ -885,6 +885,323 @@ func injectCheckBackfillWorkerNum(curWorkerSize int, isMergeWorker bool) error { return nil } +<<<<<<< HEAD +======= +func addBatchBackfillJobs(sess *session, bfWorkerType backfillerType, reorgInfo *reorgInfo, notDistTask bool, + batchTasks []*reorgBackfillTask, bJobs []*BackfillJob, isUnique bool, id *int64) error { + bJobs = bJobs[:0] + instanceID := "" + if notDistTask { + instanceID = reorgInfo.d.uuid + } + // TODO: Adjust the number of ranges(region) for each task. + for _, task := range batchTasks { + bm := &model.BackfillMeta{ + PhysicalTableID: reorgInfo.PhysicalTableID, + IsUnique: isUnique, + EndInclude: task.endInclude, + ReorgTp: reorgInfo.Job.ReorgMeta.ReorgTp, + SQLMode: reorgInfo.ReorgMeta.SQLMode, + Location: reorgInfo.ReorgMeta.Location, + JobMeta: &model.JobMeta{ + SchemaID: reorgInfo.Job.SchemaID, + TableID: reorgInfo.Job.TableID, + Query: reorgInfo.Job.Query, + }, + } + bj := &BackfillJob{ + ID: *id, + JobID: reorgInfo.Job.ID, + EleID: reorgInfo.currElement.ID, + EleKey: reorgInfo.currElement.TypeKey, + Tp: bfWorkerType, + State: model.JobStateNone, + InstanceID: instanceID, + CurrKey: task.startKey, + StartKey: task.startKey, + EndKey: task.endKey, + Meta: bm, + } + *id++ + bJobs = append(bJobs, bj) + } + if err := AddBackfillJobs(sess, bJobs); err != nil { + return errors.Trace(err) + } + return nil +} + +func (*ddlCtx) splitTableToBackfillJobs(sess *session, reorgInfo *reorgInfo, pTbl table.PhysicalTable, isUnique bool, + bfWorkerType backfillerType, startKey kv.Key, currBackfillJobID int64) error { + endKey := reorgInfo.EndKey + isFirstOps := true + bJobs := make([]*BackfillJob, 0, genTaskBatch) + for { + kvRanges, err := splitTableRanges(pTbl, reorgInfo.d.store, startKey, endKey, genTaskBatch) + if err != nil { + return errors.Trace(err) + } + batchTasks := getBatchTasks(pTbl, reorgInfo, kvRanges, genTaskBatch) + if len(batchTasks) == 0 { + break + } + notNeedDistProcess := isFirstOps && (len(kvRanges) < minDistTaskCnt) + if err = addBatchBackfillJobs(sess, bfWorkerType, reorgInfo, notNeedDistProcess, batchTasks, bJobs, isUnique, &currBackfillJobID); err != nil { + return errors.Trace(err) + } + isFirstOps = false + + remains := kvRanges[len(batchTasks):] + // TODO: After adding backfillCh do asyncNotify(dc.backfillJobCh). + logutil.BgLogger().Info("[ddl] split backfill jobs to the backfill table", + zap.Int("batchTasksCnt", len(batchTasks)), + zap.Int("totalRegionCnt", len(kvRanges)), + zap.Int("remainRegionCnt", len(remains)), + zap.String("startHandle", hex.EncodeToString(startKey)), + zap.String("endHandle", hex.EncodeToString(endKey))) + + if len(remains) == 0 { + break + } + + for { + bJobCnt, err := checkBackfillJobCount(sess, reorgInfo.Job.ID, reorgInfo.currElement.ID, reorgInfo.currElement.TypeKey) + if err != nil { + return errors.Trace(err) + } + if bJobCnt < minGenTaskBatch { + break + } + time.Sleep(retrySQLInterval) + } + startKey = remains[0].StartKey + } + return nil +} + +func (dc *ddlCtx) controlWritePhysicalTableRecord(sess *session, t table.PhysicalTable, bfWorkerType backfillerType, reorgInfo *reorgInfo) error { + startKey, endKey := reorgInfo.StartKey, reorgInfo.EndKey + if startKey == nil && endKey == nil { + return nil + } + + if err := dc.isReorgRunnable(reorgInfo.Job.ID); err != nil { + return errors.Trace(err) + } + + currBackfillJobID := int64(1) + err := checkAndHandleInterruptedBackfillJobs(sess, reorgInfo.Job.ID, reorgInfo.currElement.ID, reorgInfo.currElement.TypeKey) + if err != nil { + return errors.Trace(err) + } + maxBfJob, err := GetMaxBackfillJob(sess, reorgInfo.Job.ID, reorgInfo.currElement.ID, reorgInfo.currElement.TypeKey) + if err != nil { + return errors.Trace(err) + } + if maxBfJob != nil { + startKey = maxBfJob.EndKey + currBackfillJobID = maxBfJob.ID + 1 + } + + var isUnique bool + if bfWorkerType == typeAddIndexWorker { + idxInfo := model.FindIndexInfoByID(t.Meta().Indices, reorgInfo.currElement.ID) + isUnique = idxInfo.Unique + } + err = dc.splitTableToBackfillJobs(sess, reorgInfo, t, isUnique, bfWorkerType, startKey, currBackfillJobID) + if err != nil { + return errors.Trace(err) + } + + var backfillJobFinished bool + jobID := reorgInfo.Job.ID + ticker := time.NewTicker(300 * time.Millisecond) + defer ticker.Stop() + for { + if err := dc.isReorgRunnable(reorgInfo.Job.ID); err != nil { + return errors.Trace(err) + } + + select { + case <-ticker.C: + if !backfillJobFinished { + err := checkAndHandleInterruptedBackfillJobs(sess, jobID, reorgInfo.currElement.ID, reorgInfo.currElement.TypeKey) + if err != nil { + logutil.BgLogger().Warn("[ddl] finish interrupted backfill jobs", zap.Int64("job ID", jobID), zap.Error(err)) + return errors.Trace(err) + } + + bfJob, err := getBackfillJobWithRetry(sess, BackfillTable, jobID, reorgInfo.currElement.ID, reorgInfo.currElement.TypeKey, false) + if err != nil { + logutil.BgLogger().Info("[ddl] getBackfillJobWithRetry failed", zap.Int64("job ID", jobID), zap.Error(err)) + return errors.Trace(err) + } + if bfJob == nil { + backfillJobFinished = true + logutil.BgLogger().Info("[ddl] finish backfill jobs", zap.Int64("job ID", jobID)) + } + } + if backfillJobFinished { + // TODO: Consider whether these backfill jobs are always out of sync. + isSynced, err := checkJobIsSynced(sess, jobID) + if err != nil { + logutil.BgLogger().Warn("[ddl] checkJobIsSynced failed", zap.Int64("job ID", jobID), zap.Error(err)) + return errors.Trace(err) + } + if isSynced { + logutil.BgLogger().Info("[ddl] sync backfill jobs", zap.Int64("job ID", jobID)) + return nil + } + } + case <-dc.ctx.Done(): + return dc.ctx.Err() + } + } +} + +func checkJobIsSynced(sess *session, jobID int64) (bool, error) { + var err error + var unsyncedInstanceIDs []string + for i := 0; i < retrySQLTimes; i++ { + unsyncedInstanceIDs, err = getUnsyncedInstanceIDs(sess, jobID, "check_backfill_history_job_sync") + if err == nil && len(unsyncedInstanceIDs) == 0 { + return true, nil + } + + logutil.BgLogger().Info("[ddl] checkJobIsSynced failed", + zap.Strings("unsyncedInstanceIDs", unsyncedInstanceIDs), zap.Int("tryTimes", i), zap.Error(err)) + time.Sleep(retrySQLInterval) + } + + return false, errors.Trace(err) +} + +func checkAndHandleInterruptedBackfillJobs(sess *session, jobID, currEleID int64, currEleKey []byte) (err error) { + var bJobs []*BackfillJob + for i := 0; i < retrySQLTimes; i++ { + bJobs, err = GetInterruptedBackfillJobsForOneEle(sess, jobID, currEleID, currEleKey) + if err == nil { + break + } + logutil.BgLogger().Info("[ddl] getInterruptedBackfillJobsForOneEle failed", zap.Error(err)) + time.Sleep(retrySQLInterval) + } + if err != nil { + return errors.Trace(err) + } + if len(bJobs) == 0 { + return nil + } + + for i := 0; i < retrySQLTimes; i++ { + err = MoveBackfillJobsToHistoryTable(sess, bJobs[0]) + if err == nil { + return errors.Errorf(bJobs[0].Meta.ErrMsg) + } + logutil.BgLogger().Info("[ddl] MoveBackfillJobsToHistoryTable failed", zap.Error(err)) + time.Sleep(retrySQLInterval) + } + return errors.Trace(err) +} + +func checkBackfillJobCount(sess *session, jobID, currEleID int64, currEleKey []byte) (backfillJobCnt int, err error) { + err = checkAndHandleInterruptedBackfillJobs(sess, jobID, currEleID, currEleKey) + if err != nil { + return 0, errors.Trace(err) + } + + backfillJobCnt, err = GetBackfillJobCount(sess, BackfillTable, fmt.Sprintf("ddl_job_id = %d and ele_id = %d and ele_key = '%s'", + jobID, currEleID, currEleKey), "check_backfill_job_count") + if err != nil { + return 0, errors.Trace(err) + } + + return backfillJobCnt, nil +} + +func getBackfillJobWithRetry(sess *session, tableName string, jobID, currEleID int64, currEleKey []byte, isDesc bool) (*BackfillJob, error) { + var err error + var bJobs []*BackfillJob + descStr := "" + if isDesc { + descStr = "order by id desc" + } + for i := 0; i < retrySQLTimes; i++ { + bJobs, err = GetBackfillJobs(sess, tableName, fmt.Sprintf("ddl_job_id = %d and ele_id = %d and ele_key = '%s' %s limit 1", + jobID, currEleID, currEleKey, descStr), "check_backfill_job_state") + if err != nil { + logutil.BgLogger().Warn("[ddl] GetBackfillJobs failed", zap.Error(err)) + continue + } + + if len(bJobs) != 0 { + return bJobs[0], nil + } + break + } + return nil, errors.Trace(err) +} + +// GetMaxBackfillJob gets the max backfill job in BackfillTable and BackfillHistoryTable. +func GetMaxBackfillJob(sess *session, jobID, currEleID int64, currEleKey []byte) (*BackfillJob, error) { + bfJob, err := getBackfillJobWithRetry(sess, BackfillTable, jobID, currEleID, currEleKey, true) + if err != nil { + return nil, errors.Trace(err) + } + hJob, err := getBackfillJobWithRetry(sess, BackfillHistoryTable, jobID, currEleID, currEleKey, true) + if err != nil { + return nil, errors.Trace(err) + } + + if bfJob == nil { + return hJob, nil + } + if hJob == nil { + return bfJob, nil + } + if bfJob.ID > hJob.ID { + return bfJob, nil + } + return hJob, nil +} + +// MoveBackfillJobsToHistoryTable moves backfill table jobs to the backfill history table. +func MoveBackfillJobsToHistoryTable(sctx sessionctx.Context, bfJob *BackfillJob) error { + s, ok := sctx.(*session) + if !ok { + return errors.Errorf("sess ctx:%#v convert session failed", sctx) + } + + return s.runInTxn(func(se *session) error { + // TODO: Consider batch by batch update backfill jobs and insert backfill history jobs. + bJobs, err := GetBackfillJobs(se, BackfillTable, fmt.Sprintf("ddl_job_id = %d and ele_id = %d and ele_key = '%s'", + bfJob.JobID, bfJob.EleID, bfJob.EleKey), "update_backfill_job") + if err != nil { + return errors.Trace(err) + } + if len(bJobs) == 0 { + return nil + } + + txn, err := se.txn() + if err != nil { + return errors.Trace(err) + } + startTS := txn.StartTS() + err = RemoveBackfillJob(se, true, bJobs[0]) + if err == nil { + for _, bj := range bJobs { + bj.State = model.JobStateCancelled + bj.FinishTS = startTS + } + err = AddBackfillHistoryJob(se, bJobs) + } + logutil.BgLogger().Info("[ddl] move backfill jobs to history table", zap.Int("job count", len(bJobs))) + return errors.Trace(err) + }) +} + +>>>>>>> f842cd9ffbb (store/copr: add a param "limit" to region cache's `SplitRegionRanges` (#40411)) // recordIterFunc is used for low-level record iteration. type recordIterFunc func(h kv.Handle, rowKey kv.Key, rawRecord []byte) (more bool, err error) diff --git a/store/copr/batch_coprocessor.go b/store/copr/batch_coprocessor.go index 9bc53d3aabc45..10b3401d4abd2 100644 --- a/store/copr/batch_coprocessor.go +++ b/store/copr/batch_coprocessor.go @@ -516,6 +516,157 @@ func buildBatchCopTasksForPartitionedTable(bo *backoff.Backoffer, store *kvStore return batchTasks, nil } +<<<<<<< HEAD +======= +func filterAliveStores(ctx context.Context, stores []*tikv.Store, ttl time.Duration, kvStore *kvStore) []*tikv.Store { + var aliveStores []*tikv.Store + var wg sync.WaitGroup + var mu sync.Mutex + wg.Add(len(stores)) + for i := range stores { + go func(idx int) { + defer wg.Done() + s := stores[idx] + + // Check if store is failed already. + if ok := GlobalMPPFailedStoreProber.IsRecovery(ctx, s.GetAddr(), ttl); !ok { + return + } + + tikvClient := kvStore.GetTiKVClient() + if ok := detectMPPStore(ctx, tikvClient, s.GetAddr(), DetectTimeoutLimit); !ok { + GlobalMPPFailedStoreProber.Add(ctx, s.GetAddr(), tikvClient) + return + } + + mu.Lock() + defer mu.Unlock() + aliveStores = append(aliveStores, s) + }(i) + } + wg.Wait() + + logutil.BgLogger().Info("detecting available mpp stores", zap.Any("total", len(stores)), zap.Any("alive", len(aliveStores))) + return aliveStores +} + +// 1. Split range by region location to build copTasks. +// 2. For each copTask build its rpcCtx , the target tiflash_compute node will be chosen using consistent hash. +// 3. All copTasks that will be sent to one tiflash_compute node are put in one batchCopTask. +func buildBatchCopTasksConsistentHash(bo *backoff.Backoffer, + kvStore *kvStore, + rangesForEachPhysicalTable []*KeyRanges, + storeType kv.StoreType, + ttl time.Duration) (res []*batchCopTask, err error) { + const cmdType = tikvrpc.CmdBatchCop + var retryNum int + cache := kvStore.GetRegionCache() + + for { + retryNum++ + var rangesLen int + tasks := make([]*copTask, 0) + regionIDs := make([]tikv.RegionVerID, 0) + + for i, ranges := range rangesForEachPhysicalTable { + rangesLen += ranges.Len() + locations, err := cache.SplitKeyRangesByLocations(bo, ranges, UnspecifiedLimit) + if err != nil { + return nil, errors.Trace(err) + } + for _, lo := range locations { + tasks = append(tasks, &copTask{ + region: lo.Location.Region, + ranges: lo.Ranges, + cmdType: cmdType, + storeType: storeType, + partitionIndex: int64(i), + }) + regionIDs = append(regionIDs, lo.Location.Region) + } + } + + stores, err := cache.GetTiFlashComputeStores(bo.TiKVBackoffer()) + if err != nil { + return nil, err + } + stores = filterAliveStores(bo.GetCtx(), stores, ttl, kvStore) + if len(stores) == 0 { + return nil, errors.New("tiflash_compute node is unavailable") + } + + rpcCtxs, err := cache.GetTiFlashComputeRPCContextByConsistentHash(bo.TiKVBackoffer(), regionIDs, stores) + if err != nil { + return nil, err + } + if rpcCtxs == nil { + logutil.BgLogger().Info("buildBatchCopTasksConsistentHash retry because rcpCtx is nil", zap.Int("retryNum", retryNum)) + err := bo.Backoff(tikv.BoTiFlashRPC(), errors.New("Cannot find region with TiFlash peer")) + if err != nil { + return nil, errors.Trace(err) + } + continue + } + if len(rpcCtxs) != len(tasks) { + return nil, errors.Errorf("length should be equal, len(rpcCtxs): %d, len(tasks): %d", len(rpcCtxs), len(tasks)) + } + taskMap := make(map[string]*batchCopTask) + for i, rpcCtx := range rpcCtxs { + regionInfo := RegionInfo{ + // tasks and rpcCtxs are correspond to each other. + Region: tasks[i].region, + Meta: rpcCtx.Meta, + Ranges: tasks[i].ranges, + AllStores: []uint64{rpcCtx.Store.StoreID()}, + PartitionIndex: tasks[i].partitionIndex, + } + if batchTask, ok := taskMap[rpcCtx.Addr]; ok { + batchTask.regionInfos = append(batchTask.regionInfos, regionInfo) + } else { + batchTask := &batchCopTask{ + storeAddr: rpcCtx.Addr, + cmdType: cmdType, + ctx: rpcCtx, + regionInfos: []RegionInfo{regionInfo}, + } + taskMap[rpcCtx.Addr] = batchTask + res = append(res, batchTask) + } + } + logutil.BgLogger().Info("buildBatchCopTasksConsistentHash done", zap.Any("len(tasks)", len(taskMap)), zap.Any("len(tiflash_compute)", len(stores))) + break + } + + failpointCheckForConsistentHash(res) + return res, nil +} + +func failpointCheckForConsistentHash(tasks []*batchCopTask) { + failpoint.Inject("checkOnlyDispatchToTiFlashComputeNodes", func(val failpoint.Value) { + logutil.BgLogger().Debug("in checkOnlyDispatchToTiFlashComputeNodes") + + // This failpoint will be tested in test-infra case, because we needs setup a cluster. + // All tiflash_compute nodes addrs are stored in val, separated by semicolon. + str := val.(string) + addrs := strings.Split(str, ";") + if len(addrs) < 1 { + err := fmt.Sprintf("unexpected length of tiflash_compute node addrs: %v, %s", len(addrs), str) + panic(err) + } + addrMap := make(map[string]struct{}) + for _, addr := range addrs { + addrMap[addr] = struct{}{} + } + for _, batchTask := range tasks { + if _, ok := addrMap[batchTask.storeAddr]; !ok { + err := errors.Errorf("batchCopTask send to node which is not tiflash_compute: %v(tiflash_compute nodes: %s)", batchTask.storeAddr, str) + panic(err) + } + } + }) +} + +>>>>>>> f842cd9ffbb (store/copr: add a param "limit" to region cache's `SplitRegionRanges` (#40411)) // When `partitionIDs != nil`, it means that buildBatchCopTasksCore is constructing a batch cop tasks for PartitionTableScan. // At this time, `len(rangesForEachPhysicalTable) == len(partitionIDs)` and `rangesForEachPhysicalTable[i]` is for partition `partitionIDs[i]`. // Otherwise, `rangesForEachPhysicalTable[0]` indicates the range for the single physical table. @@ -530,7 +681,7 @@ func buildBatchCopTasksCore(bo *backoff.Backoffer, store *kvStore, rangesForEach rangesLen = 0 for i, ranges := range rangesForEachPhysicalTable { rangesLen += ranges.Len() - locations, err := cache.SplitKeyRangesByLocations(bo, ranges) + locations, err := cache.SplitKeyRangesByLocations(bo, ranges, UnspecifiedLimit) if err != nil { return nil, errors.Trace(err) } diff --git a/store/copr/coprocessor_test.go b/store/copr/coprocessor_test.go index 7790e8f7661fc..046303d02bbb8 100644 --- a/store/copr/coprocessor_test.go +++ b/store/copr/coprocessor_test.go @@ -371,46 +371,51 @@ func TestSplitRegionRanges(t *testing.T) { bo := backoff.NewBackofferWithVars(context.Background(), 3000, nil) - ranges, err := cache.SplitRegionRanges(bo, BuildKeyRanges("a", "c")) + ranges, err := cache.SplitRegionRanges(bo, BuildKeyRanges("a", "c"), UnspecifiedLimit) require.NoError(t, err) require.Len(t, ranges, 1) rangeEqual(t, ranges, "a", "c") - ranges, err = cache.SplitRegionRanges(bo, BuildKeyRanges("h", "y")) + ranges, err = cache.SplitRegionRanges(bo, BuildKeyRanges("h", "y"), UnspecifiedLimit) require.NoError(t, err) require.Len(t, ranges, 3) rangeEqual(t, ranges, "h", "n", "n", "t", "t", "y") - ranges, err = cache.SplitRegionRanges(bo, BuildKeyRanges("s", "z")) + ranges, err = cache.SplitRegionRanges(bo, BuildKeyRanges("s", "z"), UnspecifiedLimit) require.NoError(t, err) require.Len(t, ranges, 2) rangeEqual(t, ranges, "s", "t", "t", "z") - ranges, err = cache.SplitRegionRanges(bo, BuildKeyRanges("s", "s")) + ranges, err = cache.SplitRegionRanges(bo, BuildKeyRanges("s", "s"), UnspecifiedLimit) require.NoError(t, err) require.Len(t, ranges, 1) rangeEqual(t, ranges, "s", "s") - ranges, err = cache.SplitRegionRanges(bo, BuildKeyRanges("t", "t")) + ranges, err = cache.SplitRegionRanges(bo, BuildKeyRanges("t", "t"), UnspecifiedLimit) require.NoError(t, err) require.Len(t, ranges, 1) rangeEqual(t, ranges, "t", "t") - ranges, err = cache.SplitRegionRanges(bo, BuildKeyRanges("t", "u")) + ranges, err = cache.SplitRegionRanges(bo, BuildKeyRanges("t", "u"), UnspecifiedLimit) require.NoError(t, err) require.Len(t, ranges, 1) rangeEqual(t, ranges, "t", "u") - ranges, err = cache.SplitRegionRanges(bo, BuildKeyRanges("u", "z")) + ranges, err = cache.SplitRegionRanges(bo, BuildKeyRanges("u", "z"), UnspecifiedLimit) require.NoError(t, err) require.Len(t, ranges, 1) rangeEqual(t, ranges, "u", "z") // min --> max - ranges, err = cache.SplitRegionRanges(bo, BuildKeyRanges("a", "z")) + ranges, err = cache.SplitRegionRanges(bo, BuildKeyRanges("a", "z"), UnspecifiedLimit) require.NoError(t, err) require.Len(t, ranges, 4) rangeEqual(t, ranges, "a", "g", "g", "n", "n", "t", "t", "z") + + ranges, err = cache.SplitRegionRanges(bo, BuildKeyRanges("a", "z"), 3) + require.NoError(t, err) + require.Len(t, ranges, 3) + rangeEqual(t, ranges, "a", "g", "g", "n", "n", "t") } func TestRebuild(t *testing.T) { diff --git a/store/copr/region_cache.go b/store/copr/region_cache.go index a3fd20e036d43..73d718cab423a 100644 --- a/store/copr/region_cache.go +++ b/store/copr/region_cache.go @@ -43,10 +43,10 @@ func NewRegionCache(rc *tikv.RegionCache) *RegionCache { } // SplitRegionRanges gets the split ranges from pd region. -func (c *RegionCache) SplitRegionRanges(bo *Backoffer, keyRanges []kv.KeyRange) ([]kv.KeyRange, error) { +func (c *RegionCache) SplitRegionRanges(bo *Backoffer, keyRanges []kv.KeyRange, limit int) ([]kv.KeyRange, error) { ranges := NewKeyRanges(keyRanges) - locations, err := c.SplitKeyRangesByLocations(bo, ranges) + locations, err := c.SplitKeyRangesByLocations(bo, ranges, limit) if err != nil { return nil, derr.ToTiDBErr(err) } @@ -123,10 +123,16 @@ func (l *LocationKeyRanges) splitKeyRangesByBuckets() []*LocationKeyRanges { return res } +// UnspecifiedLimit means no limit. +const UnspecifiedLimit = -1 + // SplitKeyRangesByLocations splits the KeyRanges by logical info in the cache. -func (c *RegionCache) SplitKeyRangesByLocations(bo *Backoffer, ranges *KeyRanges) ([]*LocationKeyRanges, error) { +func (c *RegionCache) SplitKeyRangesByLocations(bo *Backoffer, ranges *KeyRanges, limit int) ([]*LocationKeyRanges, error) { res := make([]*LocationKeyRanges, 0) for ranges.Len() > 0 { + if limit != UnspecifiedLimit && len(res) >= limit { + break + } loc, err := c.LocateKey(bo.TiKVBackoffer(), ranges.At(0).StartKey) if err != nil { return res, derr.ToTiDBErr(err) @@ -177,7 +183,7 @@ func (c *RegionCache) SplitKeyRangesByLocations(bo *Backoffer, ranges *KeyRanges // // TODO(youjiali1995): Try to do it in one round and reduce allocations if bucket is not enabled. func (c *RegionCache) SplitKeyRangesByBuckets(bo *Backoffer, ranges *KeyRanges) ([]*LocationKeyRanges, error) { - locs, err := c.SplitKeyRangesByLocations(bo, ranges) + locs, err := c.SplitKeyRangesByLocations(bo, ranges, UnspecifiedLimit) if err != nil { return nil, derr.ToTiDBErr(err) } From d2494e8a7277084bbfd7362ab61fa848cea937f6 Mon Sep 17 00:00:00 2001 From: tangenta Date: Tue, 7 Feb 2023 21:28:24 +0800 Subject: [PATCH 2/2] resolve conflict --- ddl/backfilling.go | 317 -------------------------------- store/copr/batch_coprocessor.go | 151 --------------- 2 files changed, 468 deletions(-) diff --git a/ddl/backfilling.go b/ddl/backfilling.go index 44a0b803423aa..58de2d07acb95 100644 --- a/ddl/backfilling.go +++ b/ddl/backfilling.go @@ -885,323 +885,6 @@ func injectCheckBackfillWorkerNum(curWorkerSize int, isMergeWorker bool) error { return nil } -<<<<<<< HEAD -======= -func addBatchBackfillJobs(sess *session, bfWorkerType backfillerType, reorgInfo *reorgInfo, notDistTask bool, - batchTasks []*reorgBackfillTask, bJobs []*BackfillJob, isUnique bool, id *int64) error { - bJobs = bJobs[:0] - instanceID := "" - if notDistTask { - instanceID = reorgInfo.d.uuid - } - // TODO: Adjust the number of ranges(region) for each task. - for _, task := range batchTasks { - bm := &model.BackfillMeta{ - PhysicalTableID: reorgInfo.PhysicalTableID, - IsUnique: isUnique, - EndInclude: task.endInclude, - ReorgTp: reorgInfo.Job.ReorgMeta.ReorgTp, - SQLMode: reorgInfo.ReorgMeta.SQLMode, - Location: reorgInfo.ReorgMeta.Location, - JobMeta: &model.JobMeta{ - SchemaID: reorgInfo.Job.SchemaID, - TableID: reorgInfo.Job.TableID, - Query: reorgInfo.Job.Query, - }, - } - bj := &BackfillJob{ - ID: *id, - JobID: reorgInfo.Job.ID, - EleID: reorgInfo.currElement.ID, - EleKey: reorgInfo.currElement.TypeKey, - Tp: bfWorkerType, - State: model.JobStateNone, - InstanceID: instanceID, - CurrKey: task.startKey, - StartKey: task.startKey, - EndKey: task.endKey, - Meta: bm, - } - *id++ - bJobs = append(bJobs, bj) - } - if err := AddBackfillJobs(sess, bJobs); err != nil { - return errors.Trace(err) - } - return nil -} - -func (*ddlCtx) splitTableToBackfillJobs(sess *session, reorgInfo *reorgInfo, pTbl table.PhysicalTable, isUnique bool, - bfWorkerType backfillerType, startKey kv.Key, currBackfillJobID int64) error { - endKey := reorgInfo.EndKey - isFirstOps := true - bJobs := make([]*BackfillJob, 0, genTaskBatch) - for { - kvRanges, err := splitTableRanges(pTbl, reorgInfo.d.store, startKey, endKey, genTaskBatch) - if err != nil { - return errors.Trace(err) - } - batchTasks := getBatchTasks(pTbl, reorgInfo, kvRanges, genTaskBatch) - if len(batchTasks) == 0 { - break - } - notNeedDistProcess := isFirstOps && (len(kvRanges) < minDistTaskCnt) - if err = addBatchBackfillJobs(sess, bfWorkerType, reorgInfo, notNeedDistProcess, batchTasks, bJobs, isUnique, &currBackfillJobID); err != nil { - return errors.Trace(err) - } - isFirstOps = false - - remains := kvRanges[len(batchTasks):] - // TODO: After adding backfillCh do asyncNotify(dc.backfillJobCh). - logutil.BgLogger().Info("[ddl] split backfill jobs to the backfill table", - zap.Int("batchTasksCnt", len(batchTasks)), - zap.Int("totalRegionCnt", len(kvRanges)), - zap.Int("remainRegionCnt", len(remains)), - zap.String("startHandle", hex.EncodeToString(startKey)), - zap.String("endHandle", hex.EncodeToString(endKey))) - - if len(remains) == 0 { - break - } - - for { - bJobCnt, err := checkBackfillJobCount(sess, reorgInfo.Job.ID, reorgInfo.currElement.ID, reorgInfo.currElement.TypeKey) - if err != nil { - return errors.Trace(err) - } - if bJobCnt < minGenTaskBatch { - break - } - time.Sleep(retrySQLInterval) - } - startKey = remains[0].StartKey - } - return nil -} - -func (dc *ddlCtx) controlWritePhysicalTableRecord(sess *session, t table.PhysicalTable, bfWorkerType backfillerType, reorgInfo *reorgInfo) error { - startKey, endKey := reorgInfo.StartKey, reorgInfo.EndKey - if startKey == nil && endKey == nil { - return nil - } - - if err := dc.isReorgRunnable(reorgInfo.Job.ID); err != nil { - return errors.Trace(err) - } - - currBackfillJobID := int64(1) - err := checkAndHandleInterruptedBackfillJobs(sess, reorgInfo.Job.ID, reorgInfo.currElement.ID, reorgInfo.currElement.TypeKey) - if err != nil { - return errors.Trace(err) - } - maxBfJob, err := GetMaxBackfillJob(sess, reorgInfo.Job.ID, reorgInfo.currElement.ID, reorgInfo.currElement.TypeKey) - if err != nil { - return errors.Trace(err) - } - if maxBfJob != nil { - startKey = maxBfJob.EndKey - currBackfillJobID = maxBfJob.ID + 1 - } - - var isUnique bool - if bfWorkerType == typeAddIndexWorker { - idxInfo := model.FindIndexInfoByID(t.Meta().Indices, reorgInfo.currElement.ID) - isUnique = idxInfo.Unique - } - err = dc.splitTableToBackfillJobs(sess, reorgInfo, t, isUnique, bfWorkerType, startKey, currBackfillJobID) - if err != nil { - return errors.Trace(err) - } - - var backfillJobFinished bool - jobID := reorgInfo.Job.ID - ticker := time.NewTicker(300 * time.Millisecond) - defer ticker.Stop() - for { - if err := dc.isReorgRunnable(reorgInfo.Job.ID); err != nil { - return errors.Trace(err) - } - - select { - case <-ticker.C: - if !backfillJobFinished { - err := checkAndHandleInterruptedBackfillJobs(sess, jobID, reorgInfo.currElement.ID, reorgInfo.currElement.TypeKey) - if err != nil { - logutil.BgLogger().Warn("[ddl] finish interrupted backfill jobs", zap.Int64("job ID", jobID), zap.Error(err)) - return errors.Trace(err) - } - - bfJob, err := getBackfillJobWithRetry(sess, BackfillTable, jobID, reorgInfo.currElement.ID, reorgInfo.currElement.TypeKey, false) - if err != nil { - logutil.BgLogger().Info("[ddl] getBackfillJobWithRetry failed", zap.Int64("job ID", jobID), zap.Error(err)) - return errors.Trace(err) - } - if bfJob == nil { - backfillJobFinished = true - logutil.BgLogger().Info("[ddl] finish backfill jobs", zap.Int64("job ID", jobID)) - } - } - if backfillJobFinished { - // TODO: Consider whether these backfill jobs are always out of sync. - isSynced, err := checkJobIsSynced(sess, jobID) - if err != nil { - logutil.BgLogger().Warn("[ddl] checkJobIsSynced failed", zap.Int64("job ID", jobID), zap.Error(err)) - return errors.Trace(err) - } - if isSynced { - logutil.BgLogger().Info("[ddl] sync backfill jobs", zap.Int64("job ID", jobID)) - return nil - } - } - case <-dc.ctx.Done(): - return dc.ctx.Err() - } - } -} - -func checkJobIsSynced(sess *session, jobID int64) (bool, error) { - var err error - var unsyncedInstanceIDs []string - for i := 0; i < retrySQLTimes; i++ { - unsyncedInstanceIDs, err = getUnsyncedInstanceIDs(sess, jobID, "check_backfill_history_job_sync") - if err == nil && len(unsyncedInstanceIDs) == 0 { - return true, nil - } - - logutil.BgLogger().Info("[ddl] checkJobIsSynced failed", - zap.Strings("unsyncedInstanceIDs", unsyncedInstanceIDs), zap.Int("tryTimes", i), zap.Error(err)) - time.Sleep(retrySQLInterval) - } - - return false, errors.Trace(err) -} - -func checkAndHandleInterruptedBackfillJobs(sess *session, jobID, currEleID int64, currEleKey []byte) (err error) { - var bJobs []*BackfillJob - for i := 0; i < retrySQLTimes; i++ { - bJobs, err = GetInterruptedBackfillJobsForOneEle(sess, jobID, currEleID, currEleKey) - if err == nil { - break - } - logutil.BgLogger().Info("[ddl] getInterruptedBackfillJobsForOneEle failed", zap.Error(err)) - time.Sleep(retrySQLInterval) - } - if err != nil { - return errors.Trace(err) - } - if len(bJobs) == 0 { - return nil - } - - for i := 0; i < retrySQLTimes; i++ { - err = MoveBackfillJobsToHistoryTable(sess, bJobs[0]) - if err == nil { - return errors.Errorf(bJobs[0].Meta.ErrMsg) - } - logutil.BgLogger().Info("[ddl] MoveBackfillJobsToHistoryTable failed", zap.Error(err)) - time.Sleep(retrySQLInterval) - } - return errors.Trace(err) -} - -func checkBackfillJobCount(sess *session, jobID, currEleID int64, currEleKey []byte) (backfillJobCnt int, err error) { - err = checkAndHandleInterruptedBackfillJobs(sess, jobID, currEleID, currEleKey) - if err != nil { - return 0, errors.Trace(err) - } - - backfillJobCnt, err = GetBackfillJobCount(sess, BackfillTable, fmt.Sprintf("ddl_job_id = %d and ele_id = %d and ele_key = '%s'", - jobID, currEleID, currEleKey), "check_backfill_job_count") - if err != nil { - return 0, errors.Trace(err) - } - - return backfillJobCnt, nil -} - -func getBackfillJobWithRetry(sess *session, tableName string, jobID, currEleID int64, currEleKey []byte, isDesc bool) (*BackfillJob, error) { - var err error - var bJobs []*BackfillJob - descStr := "" - if isDesc { - descStr = "order by id desc" - } - for i := 0; i < retrySQLTimes; i++ { - bJobs, err = GetBackfillJobs(sess, tableName, fmt.Sprintf("ddl_job_id = %d and ele_id = %d and ele_key = '%s' %s limit 1", - jobID, currEleID, currEleKey, descStr), "check_backfill_job_state") - if err != nil { - logutil.BgLogger().Warn("[ddl] GetBackfillJobs failed", zap.Error(err)) - continue - } - - if len(bJobs) != 0 { - return bJobs[0], nil - } - break - } - return nil, errors.Trace(err) -} - -// GetMaxBackfillJob gets the max backfill job in BackfillTable and BackfillHistoryTable. -func GetMaxBackfillJob(sess *session, jobID, currEleID int64, currEleKey []byte) (*BackfillJob, error) { - bfJob, err := getBackfillJobWithRetry(sess, BackfillTable, jobID, currEleID, currEleKey, true) - if err != nil { - return nil, errors.Trace(err) - } - hJob, err := getBackfillJobWithRetry(sess, BackfillHistoryTable, jobID, currEleID, currEleKey, true) - if err != nil { - return nil, errors.Trace(err) - } - - if bfJob == nil { - return hJob, nil - } - if hJob == nil { - return bfJob, nil - } - if bfJob.ID > hJob.ID { - return bfJob, nil - } - return hJob, nil -} - -// MoveBackfillJobsToHistoryTable moves backfill table jobs to the backfill history table. -func MoveBackfillJobsToHistoryTable(sctx sessionctx.Context, bfJob *BackfillJob) error { - s, ok := sctx.(*session) - if !ok { - return errors.Errorf("sess ctx:%#v convert session failed", sctx) - } - - return s.runInTxn(func(se *session) error { - // TODO: Consider batch by batch update backfill jobs and insert backfill history jobs. - bJobs, err := GetBackfillJobs(se, BackfillTable, fmt.Sprintf("ddl_job_id = %d and ele_id = %d and ele_key = '%s'", - bfJob.JobID, bfJob.EleID, bfJob.EleKey), "update_backfill_job") - if err != nil { - return errors.Trace(err) - } - if len(bJobs) == 0 { - return nil - } - - txn, err := se.txn() - if err != nil { - return errors.Trace(err) - } - startTS := txn.StartTS() - err = RemoveBackfillJob(se, true, bJobs[0]) - if err == nil { - for _, bj := range bJobs { - bj.State = model.JobStateCancelled - bj.FinishTS = startTS - } - err = AddBackfillHistoryJob(se, bJobs) - } - logutil.BgLogger().Info("[ddl] move backfill jobs to history table", zap.Int("job count", len(bJobs))) - return errors.Trace(err) - }) -} - ->>>>>>> f842cd9ffbb (store/copr: add a param "limit" to region cache's `SplitRegionRanges` (#40411)) // recordIterFunc is used for low-level record iteration. type recordIterFunc func(h kv.Handle, rowKey kv.Key, rawRecord []byte) (more bool, err error) diff --git a/store/copr/batch_coprocessor.go b/store/copr/batch_coprocessor.go index 10b3401d4abd2..056957855e280 100644 --- a/store/copr/batch_coprocessor.go +++ b/store/copr/batch_coprocessor.go @@ -516,157 +516,6 @@ func buildBatchCopTasksForPartitionedTable(bo *backoff.Backoffer, store *kvStore return batchTasks, nil } -<<<<<<< HEAD -======= -func filterAliveStores(ctx context.Context, stores []*tikv.Store, ttl time.Duration, kvStore *kvStore) []*tikv.Store { - var aliveStores []*tikv.Store - var wg sync.WaitGroup - var mu sync.Mutex - wg.Add(len(stores)) - for i := range stores { - go func(idx int) { - defer wg.Done() - s := stores[idx] - - // Check if store is failed already. - if ok := GlobalMPPFailedStoreProber.IsRecovery(ctx, s.GetAddr(), ttl); !ok { - return - } - - tikvClient := kvStore.GetTiKVClient() - if ok := detectMPPStore(ctx, tikvClient, s.GetAddr(), DetectTimeoutLimit); !ok { - GlobalMPPFailedStoreProber.Add(ctx, s.GetAddr(), tikvClient) - return - } - - mu.Lock() - defer mu.Unlock() - aliveStores = append(aliveStores, s) - }(i) - } - wg.Wait() - - logutil.BgLogger().Info("detecting available mpp stores", zap.Any("total", len(stores)), zap.Any("alive", len(aliveStores))) - return aliveStores -} - -// 1. Split range by region location to build copTasks. -// 2. For each copTask build its rpcCtx , the target tiflash_compute node will be chosen using consistent hash. -// 3. All copTasks that will be sent to one tiflash_compute node are put in one batchCopTask. -func buildBatchCopTasksConsistentHash(bo *backoff.Backoffer, - kvStore *kvStore, - rangesForEachPhysicalTable []*KeyRanges, - storeType kv.StoreType, - ttl time.Duration) (res []*batchCopTask, err error) { - const cmdType = tikvrpc.CmdBatchCop - var retryNum int - cache := kvStore.GetRegionCache() - - for { - retryNum++ - var rangesLen int - tasks := make([]*copTask, 0) - regionIDs := make([]tikv.RegionVerID, 0) - - for i, ranges := range rangesForEachPhysicalTable { - rangesLen += ranges.Len() - locations, err := cache.SplitKeyRangesByLocations(bo, ranges, UnspecifiedLimit) - if err != nil { - return nil, errors.Trace(err) - } - for _, lo := range locations { - tasks = append(tasks, &copTask{ - region: lo.Location.Region, - ranges: lo.Ranges, - cmdType: cmdType, - storeType: storeType, - partitionIndex: int64(i), - }) - regionIDs = append(regionIDs, lo.Location.Region) - } - } - - stores, err := cache.GetTiFlashComputeStores(bo.TiKVBackoffer()) - if err != nil { - return nil, err - } - stores = filterAliveStores(bo.GetCtx(), stores, ttl, kvStore) - if len(stores) == 0 { - return nil, errors.New("tiflash_compute node is unavailable") - } - - rpcCtxs, err := cache.GetTiFlashComputeRPCContextByConsistentHash(bo.TiKVBackoffer(), regionIDs, stores) - if err != nil { - return nil, err - } - if rpcCtxs == nil { - logutil.BgLogger().Info("buildBatchCopTasksConsistentHash retry because rcpCtx is nil", zap.Int("retryNum", retryNum)) - err := bo.Backoff(tikv.BoTiFlashRPC(), errors.New("Cannot find region with TiFlash peer")) - if err != nil { - return nil, errors.Trace(err) - } - continue - } - if len(rpcCtxs) != len(tasks) { - return nil, errors.Errorf("length should be equal, len(rpcCtxs): %d, len(tasks): %d", len(rpcCtxs), len(tasks)) - } - taskMap := make(map[string]*batchCopTask) - for i, rpcCtx := range rpcCtxs { - regionInfo := RegionInfo{ - // tasks and rpcCtxs are correspond to each other. - Region: tasks[i].region, - Meta: rpcCtx.Meta, - Ranges: tasks[i].ranges, - AllStores: []uint64{rpcCtx.Store.StoreID()}, - PartitionIndex: tasks[i].partitionIndex, - } - if batchTask, ok := taskMap[rpcCtx.Addr]; ok { - batchTask.regionInfos = append(batchTask.regionInfos, regionInfo) - } else { - batchTask := &batchCopTask{ - storeAddr: rpcCtx.Addr, - cmdType: cmdType, - ctx: rpcCtx, - regionInfos: []RegionInfo{regionInfo}, - } - taskMap[rpcCtx.Addr] = batchTask - res = append(res, batchTask) - } - } - logutil.BgLogger().Info("buildBatchCopTasksConsistentHash done", zap.Any("len(tasks)", len(taskMap)), zap.Any("len(tiflash_compute)", len(stores))) - break - } - - failpointCheckForConsistentHash(res) - return res, nil -} - -func failpointCheckForConsistentHash(tasks []*batchCopTask) { - failpoint.Inject("checkOnlyDispatchToTiFlashComputeNodes", func(val failpoint.Value) { - logutil.BgLogger().Debug("in checkOnlyDispatchToTiFlashComputeNodes") - - // This failpoint will be tested in test-infra case, because we needs setup a cluster. - // All tiflash_compute nodes addrs are stored in val, separated by semicolon. - str := val.(string) - addrs := strings.Split(str, ";") - if len(addrs) < 1 { - err := fmt.Sprintf("unexpected length of tiflash_compute node addrs: %v, %s", len(addrs), str) - panic(err) - } - addrMap := make(map[string]struct{}) - for _, addr := range addrs { - addrMap[addr] = struct{}{} - } - for _, batchTask := range tasks { - if _, ok := addrMap[batchTask.storeAddr]; !ok { - err := errors.Errorf("batchCopTask send to node which is not tiflash_compute: %v(tiflash_compute nodes: %s)", batchTask.storeAddr, str) - panic(err) - } - } - }) -} - ->>>>>>> f842cd9ffbb (store/copr: add a param "limit" to region cache's `SplitRegionRanges` (#40411)) // When `partitionIDs != nil`, it means that buildBatchCopTasksCore is constructing a batch cop tasks for PartitionTableScan. // At this time, `len(rangesForEachPhysicalTable) == len(partitionIDs)` and `rangesForEachPhysicalTable[i]` is for partition `partitionIDs[i]`. // Otherwise, `rangesForEachPhysicalTable[0]` indicates the range for the single physical table.