Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: dynamic adjust add index worker number. #8295

Merged
merged 28 commits into from
Dec 24, 2018
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
a5c7024
ddl: dynamic adjust add index worker number init
crazycs520 Nov 13, 2018
00ad984
shrink worker num
crazycs520 Nov 14, 2018
f936c59
add index
crazycs520 Nov 14, 2018
c7fd5ee
refine test
crazycs520 Nov 14, 2018
86d6b99
Merge branch 'master' of https://github.com/pingcap/tidb into adjust-…
crazycs520 Nov 14, 2018
b1cbd7c
refine test
crazycs520 Nov 14, 2018
fe27282
Merge branch 'master' of https://github.com/pingcap/tidb into adjust-…
crazycs520 Nov 15, 2018
4545ebc
shrink worker num if regions is less then worker num
crazycs520 Nov 19, 2018
e05f9b7
Merge branch 'master' of https://github.com/pingcap/tidb into adjust-…
crazycs520 Nov 19, 2018
3033df1
add comment
crazycs520 Nov 20, 2018
d11e016
Merge branch 'master' of https://github.com/pingcap/tidb into adjust-…
crazycs520 Dec 5, 2018
fb85c9b
add test to check change add index worker num take effect.
crazycs520 Dec 5, 2018
a2bdc3e
refine test
crazycs520 Dec 5, 2018
f2aa994
Merge branch 'master' of https://github.com/pingcap/tidb into adjust-…
crazycs520 Dec 6, 2018
73cb88b
add log
crazycs520 Dec 7, 2018
cd97603
Merge branch 'master' of https://github.com/pingcap/tidb into adjust-…
crazycs520 Dec 7, 2018
d6c57b0
Merge branch 'master' of https://github.com/pingcap/tidb into adjust-…
crazycs520 Dec 12, 2018
c594dda
address comment
crazycs520 Dec 17, 2018
8d90b29
Merge branch 'master' of https://github.com/pingcap/tidb into adjust-…
crazycs520 Dec 17, 2018
216c58f
fix test
crazycs520 Dec 17, 2018
451a9da
Merge branch 'master' of https://github.com/pingcap/tidb into adjust-…
crazycs520 Dec 21, 2018
77f49bd
use gofail test
crazycs520 Dec 21, 2018
dfa3e4d
Merge branch 'master' of https://github.com/pingcap/tidb into adjust-…
crazycs520 Dec 21, 2018
d60f065
refine gofail test and remove hook
crazycs520 Dec 21, 2018
50470ad
refine test
crazycs520 Dec 21, 2018
fc6674e
Merge branch 'master' of https://github.com/pingcap/tidb into adjust-…
crazycs520 Dec 21, 2018
8b2ab52
add comment
crazycs520 Dec 22, 2018
34eafec
Merge branch 'master' into adjust-add-index-worker
crazycs520 Dec 24, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 32 additions & 3 deletions ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -588,16 +588,31 @@ func (s *testDBSuite) testAddIndex(c *C, testPartition bool, createTableSQL stri
s.mustExec(c, sql)
otherKeys = append(otherKeys, v)

is := s.dom.InfoSchema()
schemaName := model.NewCIStr(s.schemaName)
tableName := model.NewCIStr("test_add_index")
tbl, err := is.TableByName(schemaName, tableName)
c.Assert(err, IsNil)

if !testPartition {
// Split table to multi region.
s.cluster.SplitTable(s.mvccStore, tbl.Meta().ID, 100)
}

sessionExecInGoroutine(c, s.store, "create index c3_index on test_add_index (c3)", done)

deletedKeys := make(map[int]struct{})

ticker := time.NewTicker(s.lease / 2)
defer ticker.Stop()
ticker2 := time.NewTicker(10 * time.Millisecond)
defer ticker2.Stop()
startReorganization := false

LOOP:
for {
select {
case err := <-done:
case err = <-done:
if err == nil {
break LOOP
}
Expand All @@ -620,6 +635,21 @@ LOOP:
s.mustExec(c, sql)
}
num += step
case <-ticker2.C:
if !startReorganization {
is = s.dom.InfoSchema()
tbl, err = is.TableByName(schemaName, tableName)
c.Assert(err, IsNil)
for _, idx := range tbl.Meta().Indices {
if idx.Name.L == "c3_index" && idx.State == model.StateWriteReorganization {
startReorganization = true
}
}
}
if startReorganization {
workerCnt := rand.Intn(8) + 8
s.mustExec(c, fmt.Sprintf("set @@tidb_ddl_reorg_worker_cnt=%d", workerCnt))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can we ensure the worker count is changed to workerCnt?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

em... I check the log when do manual test to know the worker count is changed... 😂

}
}
}

Expand Down Expand Up @@ -665,7 +695,7 @@ LOOP:
t := s.testGetTable(c, "test_add_index")
handles := make(map[int64]struct{})
startKey := t.RecordKey(math.MinInt64)
err := t.IterRecords(ctx, startKey, t.Cols(),
err = t.IterRecords(ctx, startKey, t.Cols(),
func(h int64, data []types.Datum, cols []*table.Column) (bool, error) {
handles[h] = struct{}{}
return true, nil
Expand Down Expand Up @@ -704,7 +734,6 @@ LOOP:
delete(handles, h)
}
c.Assert(handles, HasLen, 0)

s.tk.MustExec("drop table test_add_index")
}

Expand Down
50 changes: 31 additions & 19 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1053,18 +1053,46 @@ func (w *worker) sendRangeTaskToWorkers(t table.Table, workers []*addIndexWorker

// buildIndexForReorgInfo build backfilling tasks from [reorgInfo.StartHandle, reorgInfo.EndHandle),
// and send these tasks to add index workers, till we finish adding the indices.
func (w *worker) buildIndexForReorgInfo(t table.PhysicalTable, workers []*addIndexWorker, job *model.Job, reorgInfo *reorgInfo) error {
func (w *worker) buildIndexForReorgInfo(t table.PhysicalTable, indexInfo *model.IndexInfo, job *model.Job, reorgInfo *reorgInfo) error {
totalAddedCount := job.GetRowCount()

startHandle, endHandle := reorgInfo.StartHandle, reorgInfo.EndHandle
sessCtx := newContext(reorgInfo.d.store)
decodeColMap, err := makeupDecodeColMap(sessCtx, t, indexInfo)
if err != nil {
return errors.Trace(err)
}

// variable.ddlReorgWorkerCounter can be modified by system variable "tidb_ddl_reorg_worker_cnt".
workerCnt := variable.GetDDLReorgWorkerCounter()
idxWorkers := make([]*addIndexWorker, 0, workerCnt)
defer func() {
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
closeAddIndexWorkers(idxWorkers)
}()

for {
// For dynamic adjust add index worker number.
workerCnt = variable.GetDDLReorgWorkerCounter()
for i := len(idxWorkers); i < int(workerCnt); i++ {
sessCtx := newContext(reorgInfo.d.store)
idxWorker := newAddIndexWorker(sessCtx, w, i, t, indexInfo, decodeColMap)
idxWorker.priority = job.Priority
idxWorkers = append(idxWorkers, idxWorker)
go idxWorkers[i].run(reorgInfo.d)
}
if len(idxWorkers) > int(workerCnt) {
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
workers := idxWorkers[workerCnt:]
idxWorkers = idxWorkers[:workerCnt]
closeAddIndexWorkers(workers)
}

kvRanges, err := splitTableRanges(t, reorgInfo.d.store, startHandle, endHandle)
if err != nil {
return errors.Trace(err)
}

log.Infof("[ddl-reorg] start to reorg index of %v region ranges, handle range:[%v, %v).", len(kvRanges), startHandle, endHandle)
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
remains, err := w.sendRangeTaskToWorkers(t, workers, reorgInfo, &totalAddedCount, kvRanges)
remains, err := w.sendRangeTaskToWorkers(t, idxWorkers, reorgInfo, &totalAddedCount, kvRanges)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -1097,23 +1125,7 @@ func (w *worker) buildIndexForReorgInfo(t table.PhysicalTable, workers []*addInd
func (w *worker) addPhysicalTableIndex(t table.PhysicalTable, indexInfo *model.IndexInfo, reorgInfo *reorgInfo) error {
job := reorgInfo.Job
log.Infof("[ddl-reorg] addTableIndex, job:%s, reorgInfo:%#v", job, reorgInfo)
sessCtx := newContext(reorgInfo.d.store)
decodeColMap, err := makeupDecodeColMap(sessCtx, t, indexInfo)
if err != nil {
return errors.Trace(err)
}

// variable.ddlReorgWorkerCounter can be modified by system variable "tidb_ddl_reorg_worker_cnt".
workerCnt := variable.GetDDLReorgWorkerCounter()
idxWorkers := make([]*addIndexWorker, workerCnt)
for i := 0; i < int(workerCnt); i++ {
sessCtx := newContext(reorgInfo.d.store)
idxWorkers[i] = newAddIndexWorker(sessCtx, w, i, t, indexInfo, decodeColMap)
idxWorkers[i].priority = job.Priority
go idxWorkers[i].run(reorgInfo.d)
}
defer closeAddIndexWorkers(idxWorkers)
err = w.buildIndexForReorgInfo(t, idxWorkers, job, reorgInfo)
err := w.buildIndexForReorgInfo(t, indexInfo, job, reorgInfo)
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
return errors.Trace(err)
}

Expand Down