diff --git a/executor/executor_test.go b/executor/executor_test.go index 33f4bdb07c183..bae9d9ba26772 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -2156,6 +2156,11 @@ func (s *testSuite4) TestSplitRegionTimeout(c *C) { // result 0 0 means split 0 region and 0 region finish scatter regions before timeout. tk.MustQuery(`split table t between (0) and (10000) regions 10`).Check(testkit.Rows("0 0")) c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/MockSplitRegionTimeout"), IsNil) + + // Test scatter regions timeout. + c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/MockScatterRegionTimeout", `return(true)`), IsNil) + tk.MustQuery(`split table t between (0) and (10000) regions 10`).Check(testkit.Rows("10 1")) + c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/MockScatterRegionTimeout"), IsNil) } func (s *testSuiteP1) TestRow(c *C) { @@ -4085,7 +4090,7 @@ func (s *testSuiteP1) TestSplitRegion(c *C) { // Test for split table region. tk.MustExec(`split table t between (0) and (1000000000) regions 10`) - // Check the ower value is more than the upper value. + // Check the lower value is more than the upper value. _, err = tk.Exec(`split table t between (2) and (1) regions 10`) c.Assert(err, NotNil) c.Assert(err.Error(), Equals, "Split table `t` region lower value 2 should less than the upper value 1") diff --git a/executor/split.go b/executor/split.go index 4c7fc72b9d9ce..641f93b257958 100755 --- a/executor/split.go +++ b/executor/split.go @@ -43,12 +43,13 @@ import ( type SplitIndexRegionExec struct { baseExecutor - tableInfo *model.TableInfo - indexInfo *model.IndexInfo - lower []types.Datum - upper []types.Datum - num int - valueLists [][]types.Datum + tableInfo *model.TableInfo + indexInfo *model.IndexInfo + lower []types.Datum + upper []types.Datum + num int + valueLists [][]types.Datum + splitIdxKeys [][]byte done bool splitRegionResult @@ -60,8 +61,9 @@ type splitRegionResult struct { } // Open implements the Executor Open interface. -func (e *SplitIndexRegionExec) Open(ctx context.Context) error { - return e.splitIndexRegion(ctx) +func (e *SplitIndexRegionExec) Open(ctx context.Context) (err error) { + e.splitIdxKeys, err = e.getSplitIdxKeys() + return err } // Next implements the Executor Next interface. @@ -70,8 +72,12 @@ func (e *SplitIndexRegionExec) Next(ctx context.Context, chk *chunk.Chunk) error if e.done { return nil } - appendSplitRegionResultToChunk(chk, e.splitRegions, e.finishScatterNum) e.done = true + if err := e.splitIndexRegion(ctx); err != nil { + return err + } + + appendSplitRegionResultToChunk(chk, e.splitRegions, e.finishScatterNum) return nil } @@ -85,15 +91,11 @@ func (e *SplitIndexRegionExec) splitIndexRegion(ctx context.Context) error { if !ok { return nil } - splitIdxKeys, err := e.getSplitIdxKeys() - if err != nil { - return err - } start := time.Now() ctxWithTimeout, cancel := context.WithTimeout(ctx, e.ctx.GetSessionVars().GetSplitRegionTimeout()) defer cancel() - regionIDs, err := s.SplitRegions(context.Background(), splitIdxKeys, true) + regionIDs, err := s.SplitRegions(context.Background(), e.splitIdxKeys, true) if err != nil { logutil.BgLogger().Warn("split table index region failed", zap.String("table", e.tableInfo.Name.L), @@ -248,14 +250,16 @@ type SplitTableRegionExec struct { upper types.Datum num int valueLists [][]types.Datum + splitKeys [][]byte done bool splitRegionResult } // Open implements the Executor Open interface. -func (e *SplitTableRegionExec) Open(ctx context.Context) error { - return e.splitTableRegion(ctx) +func (e *SplitTableRegionExec) Open(ctx context.Context) (err error) { + e.splitKeys, err = e.getSplitTableKeys() + return err } // Next implements the Executor Next interface. @@ -264,8 +268,12 @@ func (e *SplitTableRegionExec) Next(ctx context.Context, chk *chunk.Chunk) error if e.done { return nil } - appendSplitRegionResultToChunk(chk, e.splitRegions, e.finishScatterNum) e.done = true + + if err := e.splitTableRegion(ctx); err != nil { + return err + } + appendSplitRegionResultToChunk(chk, e.splitRegions, e.finishScatterNum) return nil } @@ -280,11 +288,7 @@ func (e *SplitTableRegionExec) splitTableRegion(ctx context.Context) error { ctxWithTimeout, cancel := context.WithTimeout(ctx, e.ctx.GetSessionVars().GetSplitRegionTimeout()) defer cancel() - splitKeys, err := e.getSplitTableKeys() - if err != nil { - return err - } - regionIDs, err := s.SplitRegions(ctxWithTimeout, splitKeys, true) + regionIDs, err := s.SplitRegions(ctxWithTimeout, e.splitKeys, true) if err != nil { logutil.BgLogger().Warn("split table region failed", zap.String("table", e.tableInfo.Name.L), diff --git a/store/tikv/split_region.go b/store/tikv/split_region.go index bf1ac13d9ad26..0050e69fb9d6b 100644 --- a/store/tikv/split_region.go +++ b/store/tikv/split_region.go @@ -48,7 +48,7 @@ func (s *tikvStore) splitBatchRegionsReq(bo *Backoffer, keys [][]byte, scatter b var batches []batch for regionID, groupKeys := range groups { - batches = appendKeyBatches(batches, regionID, groupKeys, rawBatchPairCount) + batches = appendKeyBatches(batches, regionID, groupKeys, rawBatchPutSize) } if len(batches) == 0 { @@ -68,14 +68,13 @@ func (s *tikvStore) splitBatchRegionsReq(bo *Backoffer, keys [][]byte, scatter b } ch := make(chan singleBatchResp, len(batches)) for _, batch1 := range batches { - batch := batch1 - go func() { + go func(b batch) { backoffer, cancel := bo.Fork() defer cancel() util.WithRecovery(func() { select { - case ch <- s.batchSendSingleRegion(backoffer, batch, scatter): + case ch <- s.batchSendSingleRegion(backoffer, b, scatter): case <-bo.ctx.Done(): ch <- singleBatchResp{err: bo.ctx.Err()} } @@ -84,24 +83,25 @@ func (s *tikvStore) splitBatchRegionsReq(bo *Backoffer, keys [][]byte, scatter b ch <- singleBatchResp{err: errors.Errorf("%v", r)} } }) - }() + }(batch1) } srResp := &kvrpcpb.SplitRegionResponse{Regions: make([]*metapb.Region, 0, len(keys)*2)} for i := 0; i < len(batches); i++ { batchResp := <-ch if batchResp.err != nil { - logutil.BgLogger().Debug("tikv store batch send failed", - zap.Error(batchResp.err)) + logutil.BgLogger().Info("batch split regions failed", zap.Error(batchResp.err)) if err == nil { err = batchResp.err } - continue } - spResp := batchResp.resp.Resp.(*kvrpcpb.SplitRegionResponse) - regions := spResp.GetRegions() - srResp.Regions = append(srResp.Regions, regions...) + // If the split succeeds and the scatter fails, we also need to add the region IDs. + if batchResp.resp != nil { + spResp := batchResp.resp.Resp.(*kvrpcpb.SplitRegionResponse) + regions := spResp.GetRegions() + srResp.Regions = append(srResp.Regions, regions...) + } } return &tikvrpc.Response{Resp: srResp}, errors.Trace(err) } @@ -151,28 +151,29 @@ func (s *tikvStore) batchSendSingleRegion(bo *Backoffer, batch batch, scatter bo // so n-1 needs to be scattered to other stores. spResp.Regions = regions[:len(regions)-1] } + logutil.BgLogger().Info("batch split regions complete", + zap.Uint64("batch region ID", batch.regionID.id), + zap.Stringer("first at", kv.Key(batch.keys[0])), + zap.Stringer("first new region left", logutil.Hex(spResp.Regions[0])), + zap.Int("new region count", len(spResp.Regions))) + if !scatter { if len(spResp.Regions) == 0 { return batchResp } - logutil.BgLogger().Info("batch split regions complete", - zap.Uint64("batch region ID", batch.regionID.id), - zap.Stringer("first at", kv.Key(batch.keys[0])), - zap.Stringer("first new region left", logutil.Hex(spResp.Regions[0])), - zap.Int("new region count", len(spResp.Regions))) return batchResp } for i, r := range spResp.Regions { if err = s.scatterRegion(r.Id); err == nil { - logutil.BgLogger().Info("batch split regions, scatter a region complete", + logutil.BgLogger().Info("batch split regions, scatter region complete", zap.Uint64("batch region ID", batch.regionID.id), zap.Stringer("at", kv.Key(batch.keys[i])), zap.Stringer("new region left", logutil.Hex(r))) continue } - logutil.BgLogger().Info("batch split regions, scatter a region failed", + logutil.BgLogger().Info("batch split regions, scatter region failed", zap.Uint64("batch region ID", batch.regionID.id), zap.Stringer("at", kv.Key(batch.keys[i])), zap.Stringer("new region left", logutil.Hex(r)), @@ -180,6 +181,9 @@ func (s *tikvStore) batchSendSingleRegion(bo *Backoffer, batch batch, scatter bo if batchResp.err == nil { batchResp.err = err } + if ErrPDServerTimeout.Equal(err) { + break + } } return batchResp } @@ -194,12 +198,18 @@ func (s *tikvStore) SplitRegions(ctx context.Context, splitKeys [][]byte, scatte for _, r := range spResp.Regions { regionIDs = append(regionIDs, r.Id) } - logutil.BgLogger().Info("split regions complete", zap.Uint64s("region IDs", regionIDs)) + logutil.BgLogger().Info("split regions complete", zap.Int("region count", len(regionIDs)), zap.Uint64s("region IDs", regionIDs)) } return regionIDs, errors.Trace(err) } func (s *tikvStore) scatterRegion(regionID uint64) error { + failpoint.Inject("MockScatterRegionTimeout", func(val failpoint.Value) { + if val.(bool) { + failpoint.Return(ErrPDServerTimeout) + } + }) + logutil.BgLogger().Info("start scatter region", zap.Uint64("regionID", regionID)) bo := NewBackoffer(context.Background(), scatterRegionBackoff) @@ -208,12 +218,12 @@ func (s *tikvStore) scatterRegion(regionID uint64) error { if err == nil { break } - err = bo.Backoff(BoRegionMiss, errors.New(err.Error())) + err = bo.Backoff(BoPDRPC, errors.New(err.Error())) if err != nil { return errors.Trace(err) } } - logutil.BgLogger().Info("scatter region complete", + logutil.BgLogger().Debug("scatter region complete", zap.Uint64("regionID", regionID)) return nil } @@ -222,11 +232,12 @@ func (s *tikvStore) scatterRegion(regionID uint64) error { // backOff is the back off time of the wait scatter region.(Milliseconds) // if backOff <= 0, the default wait scatter back off time will be used. func (s *tikvStore) WaitScatterRegionFinish(regionID uint64, backOff int) error { - logutil.BgLogger().Info("wait scatter region", - zap.Uint64("regionID", regionID)) if backOff <= 0 { backOff = waitScatterRegionFinishBackoff } + logutil.BgLogger().Info("wait scatter region", + zap.Uint64("regionID", regionID), zap.Int("backoff(ms)", backOff)) + bo := NewBackoffer(context.Background(), backOff) logFreq := 0 for {