Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: fix scatter region timeout issues and "show processlist" display issues #12057

Merged
merged 4 commits into from
Sep 17, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2156,6 +2156,11 @@ func (s *testSuite4) TestSplitRegionTimeout(c *C) {
// result 0 0 means split 0 region and 0 region finish scatter regions before timeout.
tk.MustQuery(`split table t between (0) and (10000) regions 10`).Check(testkit.Rows("0 0"))
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/MockSplitRegionTimeout"), IsNil)

// Test scatter regions timeout.
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/MockScatterRegionTimeout", `return(true)`), IsNil)
tk.MustQuery(`split table t between (0) and (10000) regions 10`).Check(testkit.Rows("10 1"))
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/MockScatterRegionTimeout"), IsNil)
}

func (s *testSuiteP1) TestRow(c *C) {
Expand Down Expand Up @@ -4085,7 +4090,7 @@ func (s *testSuiteP1) TestSplitRegion(c *C) {

// Test for split table region.
tk.MustExec(`split table t between (0) and (1000000000) regions 10`)
// Check the ower value is more than the upper value.
// Check the lower value is more than the upper value.
_, err = tk.Exec(`split table t between (2) and (1) regions 10`)
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "Split table `t` region lower value 2 should less than the upper value 1")
Expand Down
48 changes: 26 additions & 22 deletions executor/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,13 @@ import (
type SplitIndexRegionExec struct {
baseExecutor

tableInfo *model.TableInfo
indexInfo *model.IndexInfo
lower []types.Datum
upper []types.Datum
num int
valueLists [][]types.Datum
tableInfo *model.TableInfo
indexInfo *model.IndexInfo
lower []types.Datum
upper []types.Datum
num int
valueLists [][]types.Datum
splitIdxKeys [][]byte

done bool
splitRegionResult
Expand All @@ -60,8 +61,9 @@ type splitRegionResult struct {
}

// Open implements the Executor Open interface.
func (e *SplitIndexRegionExec) Open(ctx context.Context) error {
return e.splitIndexRegion(ctx)
func (e *SplitIndexRegionExec) Open(ctx context.Context) (err error) {
e.splitIdxKeys, err = e.getSplitIdxKeys()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why moving getSplitIdxKeys() to Open?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we do some checks in getSplitIdxKeys, if put it to Next some test cases will be failed and I think best to put it before Next.

return err
}

// Next implements the Executor Next interface.
Expand All @@ -70,8 +72,12 @@ func (e *SplitIndexRegionExec) Next(ctx context.Context, chk *chunk.Chunk) error
if e.done {
return nil
}
appendSplitRegionResultToChunk(chk, e.splitRegions, e.finishScatterNum)
e.done = true
if err := e.splitIndexRegion(ctx); err != nil {
return err
}

appendSplitRegionResultToChunk(chk, e.splitRegions, e.finishScatterNum)
return nil
}

Expand All @@ -85,15 +91,11 @@ func (e *SplitIndexRegionExec) splitIndexRegion(ctx context.Context) error {
if !ok {
return nil
}
splitIdxKeys, err := e.getSplitIdxKeys()
if err != nil {
return err
}

start := time.Now()
ctxWithTimeout, cancel := context.WithTimeout(ctx, e.ctx.GetSessionVars().GetSplitRegionTimeout())
defer cancel()
regionIDs, err := s.SplitRegions(context.Background(), splitIdxKeys, true)
regionIDs, err := s.SplitRegions(context.Background(), e.splitIdxKeys, true)
if err != nil {
logutil.BgLogger().Warn("split table index region failed",
zap.String("table", e.tableInfo.Name.L),
Expand Down Expand Up @@ -248,14 +250,16 @@ type SplitTableRegionExec struct {
upper types.Datum
num int
valueLists [][]types.Datum
splitKeys [][]byte

done bool
splitRegionResult
}

// Open implements the Executor Open interface.
func (e *SplitTableRegionExec) Open(ctx context.Context) error {
return e.splitTableRegion(ctx)
func (e *SplitTableRegionExec) Open(ctx context.Context) (err error) {
e.splitKeys, err = e.getSplitTableKeys()
return err
}

// Next implements the Executor Next interface.
Expand All @@ -264,8 +268,12 @@ func (e *SplitTableRegionExec) Next(ctx context.Context, chk *chunk.Chunk) error
if e.done {
return nil
}
appendSplitRegionResultToChunk(chk, e.splitRegions, e.finishScatterNum)
e.done = true

if err := e.splitTableRegion(ctx); err != nil {
return err
}
appendSplitRegionResultToChunk(chk, e.splitRegions, e.finishScatterNum)
return nil
}

Expand All @@ -280,11 +288,7 @@ func (e *SplitTableRegionExec) splitTableRegion(ctx context.Context) error {
ctxWithTimeout, cancel := context.WithTimeout(ctx, e.ctx.GetSessionVars().GetSplitRegionTimeout())
defer cancel()

splitKeys, err := e.getSplitTableKeys()
if err != nil {
return err
}
regionIDs, err := s.SplitRegions(ctxWithTimeout, splitKeys, true)
regionIDs, err := s.SplitRegions(ctxWithTimeout, e.splitKeys, true)
if err != nil {
logutil.BgLogger().Warn("split table region failed",
zap.String("table", e.tableInfo.Name.L),
Expand Down
57 changes: 34 additions & 23 deletions store/tikv/split_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (s *tikvStore) splitBatchRegionsReq(bo *Backoffer, keys [][]byte, scatter b

var batches []batch
for regionID, groupKeys := range groups {
batches = appendKeyBatches(batches, regionID, groupKeys, rawBatchPairCount)
batches = appendKeyBatches(batches, regionID, groupKeys, rawBatchPutSize)
}

if len(batches) == 0 {
Expand All @@ -68,14 +68,13 @@ func (s *tikvStore) splitBatchRegionsReq(bo *Backoffer, keys [][]byte, scatter b
}
ch := make(chan singleBatchResp, len(batches))
for _, batch1 := range batches {
batch := batch1
go func() {
go func(b batch) {
backoffer, cancel := bo.Fork()
defer cancel()

util.WithRecovery(func() {
select {
case ch <- s.batchSendSingleRegion(backoffer, batch, scatter):
case ch <- s.batchSendSingleRegion(backoffer, b, scatter):
case <-bo.ctx.Done():
ch <- singleBatchResp{err: bo.ctx.Err()}
}
Expand All @@ -84,24 +83,25 @@ func (s *tikvStore) splitBatchRegionsReq(bo *Backoffer, keys [][]byte, scatter b
ch <- singleBatchResp{err: errors.Errorf("%v", r)}
}
})
}()
}(batch1)
}

srResp := &kvrpcpb.SplitRegionResponse{Regions: make([]*metapb.Region, 0, len(keys)*2)}
for i := 0; i < len(batches); i++ {
batchResp := <-ch
if batchResp.err != nil {
logutil.BgLogger().Debug("tikv store batch send failed",
zap.Error(batchResp.err))
logutil.BgLogger().Info("batch split regions failed", zap.Error(batchResp.err))
if err == nil {
err = batchResp.err
}
continue
}

spResp := batchResp.resp.Resp.(*kvrpcpb.SplitRegionResponse)
regions := spResp.GetRegions()
srResp.Regions = append(srResp.Regions, regions...)
// If the split succeeds and the scatter fails, we also need to add the region IDs.
if batchResp.resp != nil {
spResp := batchResp.resp.Resp.(*kvrpcpb.SplitRegionResponse)
regions := spResp.GetRegions()
srResp.Regions = append(srResp.Regions, regions...)
}
}
return &tikvrpc.Response{Resp: srResp}, errors.Trace(err)
}
Expand Down Expand Up @@ -151,35 +151,39 @@ func (s *tikvStore) batchSendSingleRegion(bo *Backoffer, batch batch, scatter bo
// so n-1 needs to be scattered to other stores.
spResp.Regions = regions[:len(regions)-1]
}
logutil.BgLogger().Info("batch split regions complete",
zap.Uint64("batch region ID", batch.regionID.id),
zap.Stringer("first at", kv.Key(batch.keys[0])),
zap.Stringer("first new region left", logutil.Hex(spResp.Regions[0])),
zap.Int("new region count", len(spResp.Regions)))

if !scatter {
if len(spResp.Regions) == 0 {
return batchResp
}
logutil.BgLogger().Info("batch split regions complete",
zap.Uint64("batch region ID", batch.regionID.id),
zap.Stringer("first at", kv.Key(batch.keys[0])),
zap.Stringer("first new region left", logutil.Hex(spResp.Regions[0])),
zap.Int("new region count", len(spResp.Regions)))
return batchResp
}

for i, r := range spResp.Regions {
if err = s.scatterRegion(r.Id); err == nil {
logutil.BgLogger().Info("batch split regions, scatter a region complete",
logutil.BgLogger().Info("batch split regions, scatter region complete",
zap.Uint64("batch region ID", batch.regionID.id),
zap.Stringer("at", kv.Key(batch.keys[i])),
zap.Stringer("new region left", logutil.Hex(r)))
continue
}

logutil.BgLogger().Info("batch split regions, scatter a region failed",
logutil.BgLogger().Info("batch split regions, scatter region failed",
zap.Uint64("batch region ID", batch.regionID.id),
zap.Stringer("at", kv.Key(batch.keys[i])),
zap.Stringer("new region left", logutil.Hex(r)),
zap.Error(err))
if batchResp.err == nil {
batchResp.err = err
}
if ErrPDServerTimeout.Equal(err) {
break
}
}
return batchResp
}
Expand All @@ -194,12 +198,18 @@ func (s *tikvStore) SplitRegions(ctx context.Context, splitKeys [][]byte, scatte
for _, r := range spResp.Regions {
regionIDs = append(regionIDs, r.Id)
}
logutil.BgLogger().Info("split regions complete", zap.Uint64s("region IDs", regionIDs))
logutil.BgLogger().Info("split regions complete", zap.Int("region count", len(regionIDs)), zap.Uint64s("region IDs", regionIDs))
}
return regionIDs, errors.Trace(err)
}

func (s *tikvStore) scatterRegion(regionID uint64) error {
failpoint.Inject("MockScatterRegionTimeout", func(val failpoint.Value) {
if val.(bool) {
failpoint.Return(ErrPDServerTimeout)
}
})

logutil.BgLogger().Info("start scatter region",
zap.Uint64("regionID", regionID))
bo := NewBackoffer(context.Background(), scatterRegionBackoff)
Expand All @@ -208,12 +218,12 @@ func (s *tikvStore) scatterRegion(regionID uint64) error {
if err == nil {
break
}
err = bo.Backoff(BoRegionMiss, errors.New(err.Error()))
err = bo.Backoff(BoPDRPC, errors.New(err.Error()))
if err != nil {
return errors.Trace(err)
}
}
logutil.BgLogger().Info("scatter region complete",
logutil.BgLogger().Debug("scatter region complete",
zap.Uint64("regionID", regionID))
return nil
}
Expand All @@ -222,11 +232,12 @@ func (s *tikvStore) scatterRegion(regionID uint64) error {
// backOff is the back off time of the wait scatter region.(Milliseconds)
// if backOff <= 0, the default wait scatter back off time will be used.
func (s *tikvStore) WaitScatterRegionFinish(regionID uint64, backOff int) error {
logutil.BgLogger().Info("wait scatter region",
zap.Uint64("regionID", regionID))
if backOff <= 0 {
backOff = waitScatterRegionFinishBackoff
}
logutil.BgLogger().Info("wait scatter region",
zap.Uint64("regionID", regionID), zap.Int("backoff(ms)", backOff))

bo := NewBackoffer(context.Background(), backOff)
logFreq := 0
for {
Expand Down