Skip to content

Commit

Permalink
ddl: fix pre-split region timeout constraint not w ... (pingcap#17459) (
Browse files Browse the repository at this point in the history
  • Loading branch information
sre-bot authored Jun 11, 2020
1 parent 58ee9da commit 5b7c596
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 34 deletions.
8 changes: 4 additions & 4 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1441,9 +1441,9 @@ func (d *ddl) CreateTable(ctx sessionctx.Context, s *ast.CreateTableStmt) (err e
}
pi := tbInfo.GetPartitionInfo()
if pi != nil {
preSplit = func() { splitPartitionTableRegion(sp, pi, scatterRegion) }
preSplit = func() { splitPartitionTableRegion(ctx, sp, pi, scatterRegion) }
} else {
preSplit = func() { splitTableRegion(sp, tbInfo, scatterRegion) }
preSplit = func() { splitTableRegion(ctx, sp, tbInfo, scatterRegion) }
}
if scatterRegion {
preSplit()
Expand Down Expand Up @@ -1514,9 +1514,9 @@ func (d *ddl) preSplitAndScatter(ctx sessionctx.Context, tbInfo *model.TableInfo
pi = tbInfo.GetPartitionInfo()
}
if pi != nil {
preSplit = func() { splitPartitionTableRegion(sp, pi, scatterRegion) }
preSplit = func() { splitPartitionTableRegion(ctx, sp, pi, scatterRegion) }
} else {
preSplit = func() { splitTableRegion(sp, tbInfo, scatterRegion) }
preSplit = func() { splitTableRegion(ctx, sp, tbInfo, scatterRegion) }
}
if scatterRegion {
preSplit()
Expand Down
34 changes: 20 additions & 14 deletions ddl/split_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,34 +18,39 @@ import (

"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
)

func splitPartitionTableRegion(store kv.SplitableStore, pi *model.PartitionInfo, scatter bool) {
func splitPartitionTableRegion(ctx sessionctx.Context, store kv.SplitableStore, pi *model.PartitionInfo, scatter bool) {
// Max partition count is 4096, should we sample and just choose some of the partition to split?
regionIDs := make([]uint64, 0, len(pi.Definitions))
ctxWithTimeout, cancel := context.WithTimeout(context.Background(), ctx.GetSessionVars().GetSplitRegionTimeout())
defer cancel()
for _, def := range pi.Definitions {
regionIDs = append(regionIDs, splitRecordRegion(store, def.ID, scatter))
regionIDs = append(regionIDs, splitRecordRegion(ctxWithTimeout, store, def.ID, scatter))
}
if scatter {
waitScatterRegionFinish(store, regionIDs...)
waitScatterRegionFinish(ctxWithTimeout, store, regionIDs...)
}
}

func splitTableRegion(store kv.SplitableStore, tbInfo *model.TableInfo, scatter bool) {
func splitTableRegion(ctx sessionctx.Context, store kv.SplitableStore, tbInfo *model.TableInfo, scatter bool) {
ctxWithTimeout, cancel := context.WithTimeout(context.Background(), ctx.GetSessionVars().GetSplitRegionTimeout())
defer cancel()
if tbInfo.ShardRowIDBits > 0 && tbInfo.PreSplitRegions > 0 {
splitPreSplitedTable(store, tbInfo, scatter)
splitPreSplitedTable(ctxWithTimeout, store, tbInfo, scatter)
} else {
regionID := splitRecordRegion(store, tbInfo.ID, scatter)
regionID := splitRecordRegion(ctxWithTimeout, store, tbInfo.ID, scatter)
if scatter {
waitScatterRegionFinish(store, regionID)
waitScatterRegionFinish(ctxWithTimeout, store, regionID)
}
}
}

func splitPreSplitedTable(store kv.SplitableStore, tbInfo *model.TableInfo, scatter bool) {
func splitPreSplitedTable(ctx context.Context, store kv.SplitableStore, tbInfo *model.TableInfo, scatter bool) {
// Example:
// ShardRowIDBits = 4
// PreSplitRegions = 2
Expand Down Expand Up @@ -80,20 +85,20 @@ func splitPreSplitedTable(store kv.SplitableStore, tbInfo *model.TableInfo, scat
splitTableKeys = append(splitTableKeys, key)
}
var err error
regionIDs, err := store.SplitRegions(context.Background(), splitTableKeys, scatter)
regionIDs, err := store.SplitRegions(ctx, splitTableKeys, scatter)
if err != nil {
logutil.Logger(context.Background()).Warn("[ddl] pre split table region failed",
zap.Stringer("table", tbInfo.Name), zap.Int("successful region count", len(regionIDs)), zap.Error(err))
}
regionIDs = append(regionIDs, splitIndexRegion(store, tbInfo, scatter)...)
if scatter {
waitScatterRegionFinish(store, regionIDs...)
waitScatterRegionFinish(ctx, store, regionIDs...)
}
}

func splitRecordRegion(store kv.SplitableStore, tableID int64, scatter bool) uint64 {
func splitRecordRegion(ctx context.Context, store kv.SplitableStore, tableID int64, scatter bool) uint64 {
tableStartKey := tablecodec.GenTablePrefix(tableID)
regionIDs, err := store.SplitRegions(context.Background(), [][]byte{tableStartKey}, scatter)
regionIDs, err := store.SplitRegions(ctx, [][]byte{tableStartKey}, scatter)
if err != nil {
// It will be automatically split by TiKV later.
logutil.Logger(context.Background()).Warn("[ddl] split table region failed", zap.Error(err))
Expand All @@ -118,11 +123,12 @@ func splitIndexRegion(store kv.SplitableStore, tblInfo *model.TableInfo, scatter
return regionIDs
}

func waitScatterRegionFinish(store kv.SplitableStore, regionIDs ...uint64) {
func waitScatterRegionFinish(ctx context.Context, store kv.SplitableStore, regionIDs ...uint64) {
for _, regionID := range regionIDs {
err := store.WaitScatterRegionFinish(regionID, 0)
err := store.WaitScatterRegionFinish(ctx, regionID, 0)
if err != nil {
logutil.Logger(context.Background()).Warn("[ddl] wait scatter region failed", zap.Uint64("regionID", regionID), zap.Error(err))
return
}
}
}
10 changes: 10 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2025,6 +2025,16 @@ func (s *testSuite) TestSplitRegionTimeout(c *C) {
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/MockScatterRegionTimeout", `return(true)`), IsNil)
tk.MustQuery(`split table t between (0) and (10000) regions 10`).Check(testkit.Rows("10 1"))
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/MockScatterRegionTimeout"), IsNil)

// Test pre-split with timeout.
tk.MustExec("drop table if exists t")
tk.MustExec("set @@global.tidb_scatter_region=1;")
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/MockScatterRegionTimeout", `return(true)`), IsNil)
atomic.StoreUint32(&ddl.EnableSplitTableRegion, 1)
start := time.Now()
tk.MustExec("create table t (a int, b int) partition by hash(a) partitions 5;")
c.Assert(time.Since(start).Seconds(), Less, 10.0)
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/MockScatterRegionTimeout"), IsNil)
}

func (s *testSuite) TestRow(c *C) {
Expand Down
4 changes: 2 additions & 2 deletions executor/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (e *SplitIndexRegionExec) splitIndexRegion(ctx context.Context) error {
start := time.Now()
ctxWithTimeout, cancel := context.WithTimeout(ctx, e.ctx.GetSessionVars().GetSplitRegionTimeout())
defer cancel()
regionIDs, err := s.SplitRegions(context.Background(), e.splitIdxKeys, true)
regionIDs, err := s.SplitRegions(ctxWithTimeout, e.splitIdxKeys, true)
if err != nil {
logutil.Logger(context.Background()).Warn("split table index region failed",
zap.String("table", e.tableInfo.Name.L),
Expand Down Expand Up @@ -401,7 +401,7 @@ func waitScatterRegionFinish(ctxWithTimeout context.Context, sctx sessionctx.Con
remainMillisecond = int((sctx.GetSessionVars().GetSplitRegionTimeout().Seconds() - time.Since(startTime).Seconds()) * 1000)
}

err := store.WaitScatterRegionFinish(regionID, remainMillisecond)
err := store.WaitScatterRegionFinish(ctxWithTimeout, regionID, remainMillisecond)
if err == nil {
finishScatterNum++
} else {
Expand Down
2 changes: 1 addition & 1 deletion kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ type Iterator interface {
// SplitableStore is the kv store which supports split regions.
type SplitableStore interface {
SplitRegions(ctx context.Context, splitKey [][]byte, scatter bool) (regionID []uint64, err error)
WaitScatterRegionFinish(regionID uint64, backOff int) error
WaitScatterRegionFinish(ctx context.Context, regionID uint64, backOff int) error
CheckRegionInScattering(regionID uint64) (bool, error)
}

Expand Down
27 changes: 14 additions & 13 deletions store/tikv/split_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func (s *tikvStore) batchSendSingleRegion(bo *Backoffer, batch batch, scatter bo
}

for i, r := range spResp.Regions {
if err = s.scatterRegion(r.Id); err == nil {
if err = s.scatterRegion(bo.ctx, r.Id); err == nil {
logutil.Logger(context.Background()).Info("batch split regions, scatter region complete",
zap.Uint64("batch region ID", batch.regionID.id),
zap.Binary("at", batch.keys[i]),
Expand Down Expand Up @@ -209,18 +209,19 @@ func (s *tikvStore) SplitRegions(ctx context.Context, splitKeys [][]byte, scatte
return regionIDs, errors.Trace(err)
}

func (s *tikvStore) scatterRegion(regionID uint64) error {
failpoint.Inject("MockScatterRegionTimeout", func(val failpoint.Value) {
if val.(bool) {
failpoint.Return(ErrPDServerTimeout)
}
})

func (s *tikvStore) scatterRegion(ctx context.Context, regionID uint64) error {
logutil.Logger(context.Background()).Info("start scatter region",
zap.Uint64("regionID", regionID))
bo := NewBackoffer(context.Background(), scatterRegionBackoff)
bo := NewBackoffer(ctx, scatterRegionBackoff)
for {
err := s.pdClient.ScatterRegion(context.Background(), regionID)
err := s.pdClient.ScatterRegion(ctx, regionID)

failpoint.Inject("MockScatterRegionTimeout", func(val failpoint.Value) {
if val.(bool) {
err = ErrPDServerTimeout
}
})

if err == nil {
break
}
Expand All @@ -237,17 +238,17 @@ func (s *tikvStore) scatterRegion(regionID uint64) error {
// WaitScatterRegionFinish implements SplitableStore interface.
// backOff is the back off time of the wait scatter region.(Milliseconds)
// if backOff <= 0, the default wait scatter back off time will be used.
func (s *tikvStore) WaitScatterRegionFinish(regionID uint64, backOff int) error {
func (s *tikvStore) WaitScatterRegionFinish(ctx context.Context, regionID uint64, backOff int) error {
if backOff <= 0 {
backOff = waitScatterRegionFinishBackoff
}
logutil.Logger(context.Background()).Info("wait scatter region",
zap.Uint64("regionID", regionID), zap.Int("backoff(ms)", backOff))

bo := NewBackoffer(context.Background(), backOff)
bo := NewBackoffer(ctx, backOff)
logFreq := 0
for {
resp, err := s.pdClient.GetOperator(context.Background(), regionID)
resp, err := s.pdClient.GetOperator(ctx, regionID)
if err == nil && resp != nil {
if !bytes.Equal(resp.Desc, []byte("scatter-region")) || resp.Status != pdpb.OperatorStatus_RUNNING {
logutil.Logger(context.Background()).Info("wait scatter region finished",
Expand Down

0 comments on commit 5b7c596

Please sign in to comment.