Skip to content

Commit

Permalink
ddl: dynamic adjust add index worker number. (pingcap#8295)
Browse files Browse the repository at this point in the history
  • Loading branch information
crazycs520 authored and yu34po committed Jan 2, 2019
1 parent 27201e9 commit 385ddb7
Show file tree
Hide file tree
Showing 5 changed files with 201 additions and 80 deletions.
3 changes: 2 additions & 1 deletion ddl/db_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/parser/model"
tmysql "github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/ddl/testutil"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
Expand Down Expand Up @@ -952,7 +953,7 @@ func (s *testIntegrationSuite) TestPartitionDropIndex(c *C) {
}
c.Assert(idx1, NotNil)

sessionExecInGoroutine(c, s.store, "drop index idx1 on partition_drop_idx;", done)
testutil.SessionExecInGoroutine(c, s.store, "drop index idx1 on partition_drop_idx;", done)
ticker := time.NewTicker(s.lease / 2)
defer ticker.Stop()
LOOP:
Expand Down
48 changes: 8 additions & 40 deletions ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
tmysql "github.com/pingcap/parser/mysql"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/ddl"
testddlutil "github.com/pingcap/tidb/ddl/testutil"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -608,7 +609,7 @@ func (s *testDBSuite) testAddIndex(c *C, testPartition bool, createTableSQL stri
s.mustExec(c, sql)
otherKeys = append(otherKeys, v)

sessionExecInGoroutine(c, s.store, "create index c3_index on test_add_index (c3)", done)
testddlutil.SessionExecInGoroutine(c, s.store, "create index c3_index on test_add_index (c3)", done)

deletedKeys := make(map[int]struct{})

Expand Down Expand Up @@ -726,7 +727,6 @@ LOOP:
delete(handles, h)
}
c.Assert(handles, HasLen, 0)

s.tk.MustExec("drop table test_add_index")
}

Expand Down Expand Up @@ -754,7 +754,7 @@ func (s *testDBSuite) TestDropIndex(c *C) {
}
c.Assert(c3idx, NotNil)

sessionExecInGoroutine(c, s.store, "drop index c3_index on test_drop_index", done)
testddlutil.SessionExecInGoroutine(c, s.store, "drop index c3_index on test_drop_index", done)

ticker := time.NewTicker(s.lease / 2)
defer ticker.Stop()
Expand Down Expand Up @@ -916,38 +916,6 @@ func sessionExec(c *C, s kv.Storage, sql string) {
se.Close()
}

func sessionExecInGoroutine(c *C, s kv.Storage, sql string, done chan error) {
execMultiSQLInGoroutine(c, s, "test_db", []string{sql}, done)
}

func execMultiSQLInGoroutine(c *C, s kv.Storage, dbName string, multiSQL []string, done chan error) {
go func() {
se, err := session.CreateSession4Test(s)
if err != nil {
done <- errors.Trace(err)
return
}
defer se.Close()
_, err = se.Execute(context.Background(), "use "+dbName)
if err != nil {
done <- errors.Trace(err)
return
}
for _, sql := range multiSQL {
rs, err := se.Execute(context.Background(), sql)
if err != nil {
done <- errors.Trace(err)
return
}
if rs != nil {
done <- errors.Errorf("RecordSet should be empty.")
return
}
done <- nil
}
}()
}

func (s *testDBSuite) testAddColumn(c *C) {
done := make(chan error, 1)

Expand All @@ -957,7 +925,7 @@ func (s *testDBSuite) testAddColumn(c *C) {
s.mustExec(c, "insert into t2 values (?, ?, ?)", i, i, i)
}

sessionExecInGoroutine(c, s.store, "alter table t2 add column c4 int default -1", done)
testddlutil.SessionExecInGoroutine(c, s.store, "alter table t2 add column c4 int default -1", done)

ticker := time.NewTicker(s.lease / 2)
defer ticker.Stop()
Expand Down Expand Up @@ -1092,7 +1060,7 @@ func (s *testDBSuite) testDropColumn(c *C) {
}

// get c4 column id
sessionExecInGoroutine(c, s.store, "alter table t2 drop column c4", done)
testddlutil.SessionExecInGoroutine(c, s.store, "alter table t2 drop column c4", done)

ticker := time.NewTicker(s.lease / 2)
defer ticker.Stop()
Expand Down Expand Up @@ -1151,9 +1119,9 @@ func (s *testDBSuite) TestDropColumn(c *C) {
for i := 0; i < num/2; i++ {
multiDDL = append(multiDDL, "alter table t2 add column c4 int", "alter table t2 drop column c4")
}
execMultiSQLInGoroutine(c, s.store, "drop_col_db", multiDDL, ddlDone)
testddlutil.ExecMultiSQLInGoroutine(c, s.store, "drop_col_db", multiDDL, ddlDone)
for i := 0; i < num; i++ {
execMultiSQLInGoroutine(c, s.store, "drop_col_db", []string{"insert into t2 set c1 = 1, c2 = 1, c3 = 1, c4 = 1"}, dmlDone)
testddlutil.ExecMultiSQLInGoroutine(c, s.store, "drop_col_db", []string{"insert into t2 set c1 = 1, c2 = 1, c3 = 1, c4 = 1"}, dmlDone)
}
for i := 0; i < num; i++ {
select {
Expand Down Expand Up @@ -1524,7 +1492,7 @@ func (s *testDBSuite) TestAddNotNullColumn(c *C) {
s.tk.MustExec("create table tnn (c1 int primary key auto_increment, c2 int)")
s.tk.MustExec("insert tnn (c2) values (0)" + strings.Repeat(",(0)", 99))
done := make(chan error, 1)
sessionExecInGoroutine(c, s.store, "alter table tnn add column c3 int not null default 3", done)
testddlutil.SessionExecInGoroutine(c, s.store, "alter table tnn add column c3 int not null default 3", done)
updateCnt := 0
out:
for {
Expand Down
62 changes: 62 additions & 0 deletions ddl/failtest/fail_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,21 @@ import (
"fmt"
"math/rand"
"os"
"sync/atomic"
"testing"
"time"

. "github.com/pingcap/check"
"github.com/pingcap/errors"
gofail "github.com/pingcap/gofail/runtime"
"github.com/pingcap/parser"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/ddl/testutil"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/store/mockstore/mocktikv"
"github.com/pingcap/tidb/util/logutil"
Expand Down Expand Up @@ -311,3 +315,61 @@ func (s *testFailDBSuite) TestGenGlobalIDFail(c *C) {
tk.MustExec("admin check table t1")
tk.MustExec("admin check table t2")
}

func (s *testFailDBSuite) TestAddIndexWorkerNum(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("create database if not exists test_db")
tk.MustExec("use test_db")
tk.MustExec("drop table if exists test_add_index")
tk.MustExec("create table test_add_index (c1 bigint, c2 bigint, c3 bigint, primary key(c1))")

done := make(chan error, 1)
start := -10
num := 4096
// first add some rows
for i := start; i < num; i++ {
sql := fmt.Sprintf("insert into test_add_index values (%d, %d, %d)", i, i, i)
tk.MustExec(sql)
}

is := s.dom.InfoSchema()
schemaName := model.NewCIStr("test_db")
tableName := model.NewCIStr("test_add_index")
tbl, err := is.TableByName(schemaName, tableName)
c.Assert(err, IsNil)

splitCount := 100
// Split table to multi region.
s.cluster.SplitTable(s.mvccStore, tbl.Meta().ID, splitCount)

originDDLAddIndexWorkerCnt := variable.GetDDLReorgWorkerCounter()
lastSetWorkerCnt := originDDLAddIndexWorkerCnt
atomic.StoreInt32(&ddl.TestCheckWorkerNumber, lastSetWorkerCnt)
ddl.TestCheckWorkerNumber = lastSetWorkerCnt
defer variable.SetDDLReorgWorkerCounter(originDDLAddIndexWorkerCnt)

gofail.Enable("github.com/pingcap/tidb/ddl/checkIndexWorkerNum", `return(true)`)
defer gofail.Disable("github.com/pingcap/tidb/ddl/checkIndexWorkerNum")

testutil.SessionExecInGoroutine(c, s.store, "create index c3_index on test_add_index (c3)", done)
checkNum := 0

LOOP:
for {
select {
case err = <-done:
if err == nil {
break LOOP
}
c.Assert(err, IsNil, Commentf("err:%v", errors.ErrorStack(err)))
case <-ddl.TestCheckWorkerNumCh:
lastSetWorkerCnt = int32(rand.Intn(8) + 8)
tk.MustExec(fmt.Sprintf("set @@tidb_ddl_reorg_worker_cnt=%d", lastSetWorkerCnt))
atomic.StoreInt32(&ddl.TestCheckWorkerNumber, lastSetWorkerCnt)
checkNum++
}
}
c.Assert(checkNum, Greater, 5)
tk.MustExec("admin check table test_add_index")
tk.MustExec("drop table test_add_index")
}
111 changes: 72 additions & 39 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -823,7 +823,7 @@ func (w *addIndexWorker) handleBackfillTask(d *ddlCtx, task *reorgIndexTask) *ad
if task.endIncluded {
rightParenthesis = "]"
}
log.Infof("[ddl-reorg] worker(%v), finish region %v ranges [%v,%v%s, addedCount:%v, scanCount:%v, nextHandle:%v, elapsed time(s):%v",
log.Infof("[ddl-reorg] worker(%v), finish table %v ranges [%v,%v%s, addedCount:%v, scanCount:%v, nextHandle:%v, elapsed time(s):%v",
w.id, task.physicalTableID, task.startHandle, task.endHandle, rightParenthesis, result.addedCount, result.scanCount, result.nextHandle, time.Since(startTime).Seconds())

return result
Expand Down Expand Up @@ -1046,34 +1046,12 @@ func (w *worker) sendRangeTaskToWorkers(t table.Table, workers []*addIndexWorker
return nil, nil
}

// buildIndexForReorgInfo build backfilling tasks from [reorgInfo.StartHandle, reorgInfo.EndHandle),
// and send these tasks to add index workers, till we finish adding the indices.
func (w *worker) buildIndexForReorgInfo(t table.PhysicalTable, workers []*addIndexWorker, job *model.Job, reorgInfo *reorgInfo) error {
totalAddedCount := job.GetRowCount()

startHandle, endHandle := reorgInfo.StartHandle, reorgInfo.EndHandle
for {
kvRanges, err := splitTableRanges(t, reorgInfo.d.store, startHandle, endHandle)
if err != nil {
return errors.Trace(err)
}

log.Infof("[ddl-reorg] start to reorg index of %v region ranges, handle range:[%v, %v).", len(kvRanges), startHandle, endHandle)
remains, err := w.sendRangeTaskToWorkers(t, workers, reorgInfo, &totalAddedCount, kvRanges)
if err != nil {
return errors.Trace(err)
}

if len(remains) == 0 {
break
}
startHandle, _, err = decodeHandleRange(remains[0])
if err != nil {
return errors.Trace(err)
}
}
return nil
}
var (
// TestCheckWorkerNumCh use for test adjust add index worker.
TestCheckWorkerNumCh = make(chan struct{}, 0)
// TestCheckWorkerNumber use for test adjust add index worker.
TestCheckWorkerNumber = int32(16)
)

// addPhysicalTableIndex handles the add index reorganization state for a non-partitioned table or a partition.
// For a partitioned table, it should be handled partition by partition.
Expand All @@ -1092,6 +1070,9 @@ func (w *worker) buildIndexForReorgInfo(t table.PhysicalTable, workers []*addInd
func (w *worker) addPhysicalTableIndex(t table.PhysicalTable, indexInfo *model.IndexInfo, reorgInfo *reorgInfo) error {
job := reorgInfo.Job
log.Infof("[ddl-reorg] addTableIndex, job:%s, reorgInfo:%#v", job, reorgInfo)
totalAddedCount := job.GetRowCount()

startHandle, endHandle := reorgInfo.StartHandle, reorgInfo.EndHandle
sessCtx := newContext(reorgInfo.d.store)
decodeColMap, err := makeupDecodeColMap(sessCtx, t, indexInfo)
if err != nil {
Expand All @@ -1100,16 +1081,68 @@ func (w *worker) addPhysicalTableIndex(t table.PhysicalTable, indexInfo *model.I

// variable.ddlReorgWorkerCounter can be modified by system variable "tidb_ddl_reorg_worker_cnt".
workerCnt := variable.GetDDLReorgWorkerCounter()
idxWorkers := make([]*addIndexWorker, workerCnt)
for i := 0; i < int(workerCnt); i++ {
sessCtx := newContext(reorgInfo.d.store)
idxWorkers[i] = newAddIndexWorker(sessCtx, w, i, t, indexInfo, decodeColMap)
idxWorkers[i].priority = job.Priority
go idxWorkers[i].run(reorgInfo.d)
}
defer closeAddIndexWorkers(idxWorkers)
err = w.buildIndexForReorgInfo(t, idxWorkers, job, reorgInfo)
return errors.Trace(err)
idxWorkers := make([]*addIndexWorker, 0, workerCnt)
defer func() {
closeAddIndexWorkers(idxWorkers)
}()

for {
kvRanges, err := splitTableRanges(t, reorgInfo.d.store, startHandle, endHandle)
if err != nil {
return errors.Trace(err)
}

// For dynamic adjust add index worker number.
workerCnt = variable.GetDDLReorgWorkerCounter()
// If only have 1 range, we can only start 1 worker.
if len(kvRanges) < int(workerCnt) {
workerCnt = int32(len(kvRanges))
}
// Enlarge the worker size.
for i := len(idxWorkers); i < int(workerCnt); i++ {
sessCtx := newContext(reorgInfo.d.store)
idxWorker := newAddIndexWorker(sessCtx, w, i, t, indexInfo, decodeColMap)
idxWorker.priority = job.Priority
idxWorkers = append(idxWorkers, idxWorker)
go idxWorkers[i].run(reorgInfo.d)
}
// Shrink the worker size.
if len(idxWorkers) > int(workerCnt) {
workers := idxWorkers[workerCnt:]
idxWorkers = idxWorkers[:workerCnt]
closeAddIndexWorkers(workers)
}

// gofail: var checkIndexWorkerNum bool
// if checkIndexWorkerNum {
// num := int(atomic.LoadInt32(&TestCheckWorkerNumber))
// if num != 0 {
// if num > len(kvRanges) {
// if len(idxWorkers) != len(kvRanges) {
// return errors.Errorf("check index worker num error, len kv ranges is: %v, check index worker num is: %v, actual index num is: %v", len(kvRanges), num, len(idxWorkers))
// }
// } else if num != len(idxWorkers) {
// return errors.Errorf("check index worker num error, len kv ranges is: %v, check index worker num is: %v, actual index num is: %v", len(kvRanges), num, len(idxWorkers))
// }
// TestCheckWorkerNumCh <- struct{}{}
// }
//}

log.Infof("[ddl-reorg] start %d workers to reorg index of %v region ranges, handle range:[%v, %v).", len(idxWorkers), len(kvRanges), startHandle, endHandle)
remains, err := w.sendRangeTaskToWorkers(t, idxWorkers, reorgInfo, &totalAddedCount, kvRanges)
if err != nil {
return errors.Trace(err)
}

if len(remains) == 0 {
break
}
startHandle, _, err = decodeHandleRange(remains[0])
if err != nil {
return errors.Trace(err)
}
}
return nil
}

// addTableIndex handles the add index reorganization state for a table.
Expand Down
Loading

0 comments on commit 385ddb7

Please sign in to comment.