Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor/split: return split result when do split region and refine split timeout logic. #11259

Merged
merged 15 commits into from
Jul 23, 2019
Merged
2 changes: 1 addition & 1 deletion ddl/split_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func splitIndexRegion(store kv.SplitableStore, tblInfo *model.TableInfo, scatter

func waitScatterRegionFinish(store kv.SplitableStore, regionIDs ...uint64) {
for _, regionID := range regionIDs {
err := store.WaitScatterRegionFinish(regionID)
err := store.WaitScatterRegionFinish(regionID, 0)
if err != nil {
logutil.BgLogger().Warn("[ddl] wait scatter region failed", zap.Uint64("regionID", regionID), zap.Error(err))
}
Expand Down
5 changes: 3 additions & 2 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1265,8 +1265,9 @@ func (b *executorBuilder) buildUnionAll(v *plannercore.PhysicalUnionAll) Executo
}

func (b *executorBuilder) buildSplitRegion(v *plannercore.SplitRegion) Executor {
base := newBaseExecutor(b.ctx, nil, v.ExplainID())
base.initCap = chunk.ZeroCapacity
base := newBaseExecutor(b.ctx, v.Schema(), v.ExplainID())
base.initCap = 1
base.maxChunkSize = 1
if v.IndexInfo != nil {
return &SplitIndexRegionExec{
baseExecutor: base,
Expand Down
19 changes: 6 additions & 13 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2074,16 +2074,9 @@ func (s *testSuite4) TestSplitRegionTimeout(c *C) {
tk.MustExec("create table t(a varchar(100),b int, index idx1(b,a))")
tk.MustExec(`split table t index idx1 by (10000,"abcd"),(10000000);`)
tk.MustExec(`set @@tidb_wait_split_region_timeout=1`)
_, err := tk.Exec(`split table t between (0) and (10000) regions 10`)
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "split region timeout(1s)")
// result 0 0 means split 0 region and 0 region finish scatter regions before timeout.
tk.MustQuery(`split table t between (0) and (10000) regions 10`).Check(testkit.Rows("0 0"))
c.Assert(failpoint.Disable("github.com/pingcap/tidb/executor/mockSplitRegionTimeout"), IsNil)

c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/mockScatterRegionTimeout", `return(true)`), IsNil)
_, err = tk.Exec(`split table t between (0) and (10000) regions 10`)
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "wait split region scatter timeout(1s)")
c.Assert(failpoint.Disable("github.com/pingcap/tidb/executor/mockScatterRegionTimeout"), IsNil)
}

func (s *testSuiteP1) TestRow(c *C) {
Expand Down Expand Up @@ -4052,7 +4045,7 @@ func (s *testSuite) TestShowTableRegion(c *C) {

// Test show table regions.
tk.MustExec(`split table t_regions1 by (0)`)
tk.MustExec(`split table t_regions between (-10000) and (10000) regions 4;`)
tk.MustQuery(`split table t_regions between (-10000) and (10000) regions 4;`).Check(testkit.Rows("3 1"))
re := tk.MustQuery("show table t_regions regions")
rows := re.Rows()
// Table t_regions should have 4 regions now.
Expand All @@ -4067,7 +4060,7 @@ func (s *testSuite) TestShowTableRegion(c *C) {
c.Assert(rows[3][1], Equals, fmt.Sprintf("t_%d_r_5000", tbl.Meta().ID))

// Test show table index regions.
tk.MustExec(`split table t_regions index idx between (-1000) and (1000) regions 4;`)
tk.MustQuery(`split table t_regions index idx between (-1000) and (1000) regions 4;`).Check(testkit.Rows("4 1"))
re = tk.MustQuery("show table t_regions index idx regions")
rows = re.Rows()
// The index `idx` of table t_regions should have 4 regions now.
Expand Down Expand Up @@ -4097,7 +4090,7 @@ func (s *testSuite) TestShowTableRegion(c *C) {

// Test show table regions.
tk.MustExec(`set @@session.tidb_wait_split_region_finish=1;`)
tk.MustExec(`split table t_regions between (0) and (10000) regions 4;`)
tk.MustQuery(`split table t_regions by (2500),(5000),(7500);`).Check(testkit.Rows("3 1"))
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
re = tk.MustQuery("show table t_regions regions")
rows = re.Rows()
// Table t_regions should have 4 regions now.
Expand All @@ -4110,7 +4103,7 @@ func (s *testSuite) TestShowTableRegion(c *C) {
c.Assert(rows[3][1], Equals, fmt.Sprintf("t_%d_r_7500", tbl.Meta().ID))

// Test show table index regions.
tk.MustExec(`split table t_regions index idx between (0) and (1000) regions 4;`)
tk.MustQuery(`split table t_regions index idx by (250),(500),(750);`).Check(testkit.Rows("4 1"))
re = tk.MustQuery("show table t_regions index idx regions")
rows = re.Rows()
// The index `idx` of table t_regions should have 4 regions now.
Expand Down
144 changes: 113 additions & 31 deletions executor/split.go
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,37 @@ type SplitIndexRegionExec struct {
upper []types.Datum
num int
valueLists [][]types.Datum

done bool
splitRegionResult
}

type splitRegionResult struct {
splitRegions int
finishScatterNum int
}

// Open implements the Executor Open interface.
func (e *SplitIndexRegionExec) Open(ctx context.Context) error {
return e.splitIndexRegion(ctx)
}

// Next implements the Executor Next interface.
func (e *SplitIndexRegionExec) Next(ctx context.Context, _ *chunk.Chunk) error {
func (e *SplitIndexRegionExec) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
if e.done {
return nil
}
appendSplitRegionResultToChunk(chk, e.splitRegions, e.finishScatterNum)
e.done = true
return nil
}

// checkScatterRegionFinishBackOff is the back off that uses to check region scatter finished when split region timeout.
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
const checkScatterRegionFinishBackOff = 50

// splitIndexRegion uses to split index regions.
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
func (e *SplitIndexRegionExec) splitIndexRegion(ctx context.Context) error {
store := e.ctx.GetStore()
s, ok := store.(kv.SplitableStore)
if !ok {
Expand All @@ -62,10 +89,15 @@ func (e *SplitIndexRegionExec) Next(ctx context.Context, _ *chunk.Chunk) error {
return err
}

start := time.Now()
ctxWithTimeout, cancel := context.WithTimeout(ctx, e.ctx.GetSessionVars().GetSplitRegionTimeout())
defer cancel()
regionIDs := make([]uint64, 0, len(splitIdxKeys))
for _, idxKey := range splitIdxKeys {
if isCtxDone(ctxWithTimeout) {
break
}

regionID, err := s.SplitRegion(idxKey, true)
if err != nil {
logutil.BgLogger().Warn("split table index region failed",
Expand All @@ -74,28 +106,40 @@ func (e *SplitIndexRegionExec) Next(ctx context.Context, _ *chunk.Chunk) error {
zap.Error(err))
continue
}
if regionID == 0 {
continue
}
regionIDs = append(regionIDs, regionID)

if isCtxDone(ctxWithTimeout) {
return errors.Errorf("wait split region timeout(%v)", e.ctx.GetSessionVars().GetSplitRegionTimeout())
}
}
e.splitRegions = len(regionIDs)
if !e.ctx.GetSessionVars().WaitSplitRegionFinish {
return nil
}
remainMillisecond := 0
finishScatterNum := 0
for _, regionID := range regionIDs {
err := s.WaitScatterRegionFinish(regionID)
if err != nil {
if isCtxDone(ctxWithTimeout) {
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
// Do not break here for checking remain region scatter finished with a very short back off time.
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
// Imagine this situation, we split region 1,2,3, and timeout on wait region 1 scatter,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO the comment is still not good enough... would you please get help from some other colleagues, like i18n team?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

// but region 2 and region 3 is already scatter finish, then we should return result finish scatter region num 2,
// instead of finish scatter region num 0.
remainMillisecond = checkScatterRegionFinishBackOff
} else {
remainMillisecond = int((e.ctx.GetSessionVars().GetSplitRegionTimeout().Seconds() - time.Since(start).Seconds()) * 1000)
}
err := s.WaitScatterRegionFinish(regionID, remainMillisecond)
if err == nil {
finishScatterNum++
} else {
logutil.BgLogger().Warn("wait scatter region failed",
zap.Uint64("regionID", regionID),
zap.String("table", e.tableInfo.Name.L),
zap.String("index", e.indexInfo.Name.L),
zap.Error(err))
}
if isCtxDone(ctxWithTimeout) {
return errors.Errorf("wait split region timeout(%v)", e.ctx.GetSessionVars().GetSplitRegionTimeout())
}
}
e.finishScatterNum = finishScatterNum
return nil
}

Expand Down Expand Up @@ -225,16 +269,36 @@ type SplitTableRegionExec struct {
upper types.Datum
num int
valueLists [][]types.Datum

done bool
splitRegionResult
}

// Open implements the Executor Open interface.
func (e *SplitTableRegionExec) Open(ctx context.Context) error {
return e.splitTableRegion(ctx)
}

// Next implements the Executor Next interface.
func (e *SplitTableRegionExec) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
if e.done {
return nil
}
appendSplitRegionResultToChunk(chk, e.splitRegions, e.finishScatterNum)
e.done = true
return nil
}

// Next implements the Executor Next interface.
func (e *SplitTableRegionExec) Next(ctx context.Context, _ *chunk.Chunk) error {
func (e *SplitTableRegionExec) splitTableRegion(ctx context.Context) error {
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
store := e.ctx.GetStore()
s, ok := store.(kv.SplitableStore)
if !ok {
return nil
}

start := time.Now()
ctxWithTimeout, cancel := context.WithTimeout(ctx, e.ctx.GetSessionVars().GetSplitRegionTimeout())
defer cancel()

Expand All @@ -244,50 +308,68 @@ func (e *SplitTableRegionExec) Next(ctx context.Context, _ *chunk.Chunk) error {
}
regionIDs := make([]uint64, 0, len(splitKeys))
for _, key := range splitKeys {
failpoint.Inject("mockSplitRegionTimeout", func(val failpoint.Value) {
if val.(bool) {
time.Sleep(time.Second*1 + time.Millisecond*10)
}
})
if isCtxDone(ctxWithTimeout) {
break
}
regionID, err := s.SplitRegion(key, true)
if err != nil {
logutil.BgLogger().Warn("split table region failed",
zap.String("table", e.tableInfo.Name.L),
zap.Error(err))
continue
}
if regionID == 0 {
continue
}
regionIDs = append(regionIDs, regionID)

failpoint.Inject("mockSplitRegionTimeout", func(val failpoint.Value) {
if val.(bool) {
time.Sleep(time.Second * 1)
}
})

if isCtxDone(ctxWithTimeout) {
return errors.Errorf("split region timeout(%v)", e.ctx.GetSessionVars().GetSplitRegionTimeout())
}
}
e.splitRegions = len(regionIDs)
if !e.ctx.GetSessionVars().WaitSplitRegionFinish {
return nil
}

remainMillisecond := 0
finishScatterNum := 0
for _, regionID := range regionIDs {
err := s.WaitScatterRegionFinish(regionID)
if err != nil {
if isCtxDone(ctxWithTimeout) {
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
// Do not break here for checking remain regions scatter finished with a very short backoff time.
// Imagine this situation, we split region 1,2,3, and timeout when wait region 1 scatter finish,
// but region 2 and region 3 was already scatter finished, then we should return the result that finish scatter region num 2,
// instead of finish scatter region num 0.
remainMillisecond = checkScatterRegionFinishBackOff
} else {
remainMillisecond = int((e.ctx.GetSessionVars().GetSplitRegionTimeout().Seconds() - time.Since(start).Seconds()) * 1000)
}

err := s.WaitScatterRegionFinish(regionID, remainMillisecond)
if err == nil {
finishScatterNum++
} else {
logutil.BgLogger().Warn("wait scatter region failed",
zap.Uint64("regionID", regionID),
zap.String("table", e.tableInfo.Name.L),
zap.Error(err))
}

failpoint.Inject("mockScatterRegionTimeout", func(val failpoint.Value) {
if val.(bool) {
time.Sleep(time.Second * 1)
}
})

if isCtxDone(ctxWithTimeout) {
return errors.Errorf("wait split region scatter timeout(%v)", e.ctx.GetSessionVars().GetSplitRegionTimeout())
}
}
e.finishScatterNum = finishScatterNum
return nil
}

func appendSplitRegionResultToChunk(chk *chunk.Chunk, totalRegions, finishScatterNum int) {
chk.AppendInt64(0, int64(totalRegions))
if finishScatterNum > 0 && totalRegions > 0 {
chk.AppendFloat64(1, float64(finishScatterNum)/float64(totalRegions))
} else {
chk.AppendFloat64(1, float64(0))
}
}

func isCtxDone(ctx context.Context) bool {
select {
case <-ctx.Done():
Expand Down
2 changes: 1 addition & 1 deletion kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,6 @@ type Iterator interface {
// SplitableStore is the kv store which supports split regions.
type SplitableStore interface {
SplitRegion(splitKey Key, scatter bool) (regionID uint64, err error)
WaitScatterRegionFinish(regionID uint64) error
WaitScatterRegionFinish(regionID uint64, backOff int) error
CheckRegionInScattering(regionID uint64) (bool, error)
}
9 changes: 9 additions & 0 deletions planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1002,6 +1002,13 @@ func buildTableRegionsSchema() *expression.Schema {
return schema
}

func buildSplitRegionsSchema() *expression.Schema {
schema := expression.NewSchema(make([]*expression.Column, 0, 10)...)
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
schema.Append(buildColumn("", "TOTAL_SPLIT_REGION", mysql.TypeLonglong, 4))
schema.Append(buildColumn("", "SCATTER_FINISH_RATIO", mysql.TypeDouble, 8))
return schema
}

func buildShowDDLJobQueriesFields() *expression.Schema {
schema := expression.NewSchema(make([]*expression.Column, 0, 1)...)
schema.Append(buildColumn("", "QUERY", mysql.TypeVarchar, 256))
Expand Down Expand Up @@ -1677,6 +1684,7 @@ func (b *PlanBuilder) buildSplitIndexRegion(node *ast.SplitRegionStmt) (Plan, er
TableInfo: tblInfo,
IndexInfo: indexInfo,
}
p.SetSchema(buildSplitRegionsSchema())
// Split index regions by user specified value lists.
if len(node.SplitOpt.ValueLists) > 0 {
indexValues := make([][]types.Datum, 0, len(node.SplitOpt.ValueLists))
Expand Down Expand Up @@ -1791,6 +1799,7 @@ func (b *PlanBuilder) buildSplitTableRegion(node *ast.SplitRegionStmt) (Plan, er
p := &SplitRegion{
TableInfo: tblInfo,
}
p.SetSchema(buildSplitRegionsSchema())
if len(node.SplitOpt.ValueLists) > 0 {
values := make([][]types.Datum, 0, len(node.SplitOpt.ValueLists))
for i, valuesItem := range node.SplitOpt.ValueLists {
Expand Down
3 changes: 2 additions & 1 deletion store/mockstore/mocktikv/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,12 +363,13 @@ func (c *Cluster) Split(regionID, newRegionID uint64, key []byte, peerIDs []uint
}

// SplitRaw splits a Region at the key (not encoded) and creates new Region.
func (c *Cluster) SplitRaw(regionID, newRegionID uint64, rawKey []byte, peerIDs []uint64, leaderPeerID uint64) {
func (c *Cluster) SplitRaw(regionID, newRegionID uint64, rawKey []byte, peerIDs []uint64, leaderPeerID uint64) *Region {
c.Lock()
defer c.Unlock()

newRegion := c.regions[regionID].split(newRegionID, rawKey, peerIDs, leaderPeerID)
c.regions[newRegionID] = newRegion
return newRegion
}

// Merge merges 2 regions, their key ranges should be adjacent.
Expand Down
4 changes: 2 additions & 2 deletions store/mockstore/mocktikv/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -609,8 +609,8 @@ func (h *rpcHandler) handleSplitRegion(req *kvrpcpb.SplitRegionRequest) *kvrpcpb
return &kvrpcpb.SplitRegionResponse{}
}
newRegionID, newPeerIDs := h.cluster.AllocID(), h.cluster.AllocIDs(len(region.Peers))
h.cluster.SplitRaw(region.GetId(), newRegionID, key, newPeerIDs, newPeerIDs[0])
return &kvrpcpb.SplitRegionResponse{}
newRegion := h.cluster.SplitRaw(region.GetId(), newRegionID, key, newPeerIDs, newPeerIDs[0])
return &kvrpcpb.SplitRegionResponse{Left: newRegion.Meta}
}

// RPCClient sends kv RPC calls to mock cluster. RPCClient mocks the behavior of
Expand Down
9 changes: 7 additions & 2 deletions store/tikv/split_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,15 @@ func (s *tikvStore) scatterRegion(regionID uint64) error {
}

// WaitScatterRegionFinish implements SplitableStore interface.
func (s *tikvStore) WaitScatterRegionFinish(regionID uint64) error {
// backOff is the back off time of the wait scatter region.(Milliseconds)
// if backOff <= 0, will use the default wait scatter back off time.
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
func (s *tikvStore) WaitScatterRegionFinish(regionID uint64, backOff int) error {
logutil.BgLogger().Info("wait scatter region",
zap.Uint64("regionID", regionID))
bo := NewBackoffer(context.Background(), waitScatterRegionFinishBackoff)
if backOff <= 0 {
backOff = waitScatterRegionFinishBackoff
}
bo := NewBackoffer(context.Background(), backOff)
logFreq := 0
for {
resp, err := s.pdClient.GetOperator(context.Background(), regionID)
Expand Down