diff --git a/ddl/db_partition_test.go b/ddl/db_partition_test.go index 78629e4dbc073..b597b0ce95700 100644 --- a/ddl/db_partition_test.go +++ b/ddl/db_partition_test.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/parser/model" tmysql "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb/ddl" + "github.com/pingcap/tidb/ddl/testutil" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta" @@ -952,7 +953,7 @@ func (s *testIntegrationSuite) TestPartitionDropIndex(c *C) { } c.Assert(idx1, NotNil) - sessionExecInGoroutine(c, s.store, "drop index idx1 on partition_drop_idx;", done) + testutil.SessionExecInGoroutine(c, s.store, "drop index idx1 on partition_drop_idx;", done) ticker := time.NewTicker(s.lease / 2) defer ticker.Stop() LOOP: diff --git a/ddl/db_test.go b/ddl/db_test.go index 72f2237d8193c..8ddb68b2a9e27 100644 --- a/ddl/db_test.go +++ b/ddl/db_test.go @@ -32,6 +32,7 @@ import ( tmysql "github.com/pingcap/parser/mysql" "github.com/pingcap/parser/terror" "github.com/pingcap/tidb/ddl" + testddlutil "github.com/pingcap/tidb/ddl/testutil" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/kv" @@ -608,7 +609,7 @@ func (s *testDBSuite) testAddIndex(c *C, testPartition bool, createTableSQL stri s.mustExec(c, sql) otherKeys = append(otherKeys, v) - sessionExecInGoroutine(c, s.store, "create index c3_index on test_add_index (c3)", done) + testddlutil.SessionExecInGoroutine(c, s.store, "create index c3_index on test_add_index (c3)", done) deletedKeys := make(map[int]struct{}) @@ -726,7 +727,6 @@ LOOP: delete(handles, h) } c.Assert(handles, HasLen, 0) - s.tk.MustExec("drop table test_add_index") } @@ -754,7 +754,7 @@ func (s *testDBSuite) TestDropIndex(c *C) { } c.Assert(c3idx, NotNil) - sessionExecInGoroutine(c, s.store, "drop index c3_index on test_drop_index", done) + testddlutil.SessionExecInGoroutine(c, s.store, "drop index c3_index on test_drop_index", done) ticker := time.NewTicker(s.lease / 2) defer ticker.Stop() @@ -916,38 +916,6 @@ func sessionExec(c *C, s kv.Storage, sql string) { se.Close() } -func sessionExecInGoroutine(c *C, s kv.Storage, sql string, done chan error) { - execMultiSQLInGoroutine(c, s, "test_db", []string{sql}, done) -} - -func execMultiSQLInGoroutine(c *C, s kv.Storage, dbName string, multiSQL []string, done chan error) { - go func() { - se, err := session.CreateSession4Test(s) - if err != nil { - done <- errors.Trace(err) - return - } - defer se.Close() - _, err = se.Execute(context.Background(), "use "+dbName) - if err != nil { - done <- errors.Trace(err) - return - } - for _, sql := range multiSQL { - rs, err := se.Execute(context.Background(), sql) - if err != nil { - done <- errors.Trace(err) - return - } - if rs != nil { - done <- errors.Errorf("RecordSet should be empty.") - return - } - done <- nil - } - }() -} - func (s *testDBSuite) testAddColumn(c *C) { done := make(chan error, 1) @@ -957,7 +925,7 @@ func (s *testDBSuite) testAddColumn(c *C) { s.mustExec(c, "insert into t2 values (?, ?, ?)", i, i, i) } - sessionExecInGoroutine(c, s.store, "alter table t2 add column c4 int default -1", done) + testddlutil.SessionExecInGoroutine(c, s.store, "alter table t2 add column c4 int default -1", done) ticker := time.NewTicker(s.lease / 2) defer ticker.Stop() @@ -1092,7 +1060,7 @@ func (s *testDBSuite) testDropColumn(c *C) { } // get c4 column id - sessionExecInGoroutine(c, s.store, "alter table t2 drop column c4", done) + testddlutil.SessionExecInGoroutine(c, s.store, "alter table t2 drop column c4", done) ticker := time.NewTicker(s.lease / 2) defer ticker.Stop() @@ -1151,9 +1119,9 @@ func (s *testDBSuite) TestDropColumn(c *C) { for i := 0; i < num/2; i++ { multiDDL = append(multiDDL, "alter table t2 add column c4 int", "alter table t2 drop column c4") } - execMultiSQLInGoroutine(c, s.store, "drop_col_db", multiDDL, ddlDone) + testddlutil.ExecMultiSQLInGoroutine(c, s.store, "drop_col_db", multiDDL, ddlDone) for i := 0; i < num; i++ { - execMultiSQLInGoroutine(c, s.store, "drop_col_db", []string{"insert into t2 set c1 = 1, c2 = 1, c3 = 1, c4 = 1"}, dmlDone) + testddlutil.ExecMultiSQLInGoroutine(c, s.store, "drop_col_db", []string{"insert into t2 set c1 = 1, c2 = 1, c3 = 1, c4 = 1"}, dmlDone) } for i := 0; i < num; i++ { select { @@ -1524,7 +1492,7 @@ func (s *testDBSuite) TestAddNotNullColumn(c *C) { s.tk.MustExec("create table tnn (c1 int primary key auto_increment, c2 int)") s.tk.MustExec("insert tnn (c2) values (0)" + strings.Repeat(",(0)", 99)) done := make(chan error, 1) - sessionExecInGoroutine(c, s.store, "alter table tnn add column c3 int not null default 3", done) + testddlutil.SessionExecInGoroutine(c, s.store, "alter table tnn add column c3 int not null default 3", done) updateCnt := 0 out: for { diff --git a/ddl/failtest/fail_db_test.go b/ddl/failtest/fail_db_test.go index bcb2d21d914e6..491704bca3eb9 100644 --- a/ddl/failtest/fail_db_test.go +++ b/ddl/failtest/fail_db_test.go @@ -18,17 +18,21 @@ import ( "fmt" "math/rand" "os" + "sync/atomic" "testing" "time" . "github.com/pingcap/check" + "github.com/pingcap/errors" gofail "github.com/pingcap/gofail/runtime" "github.com/pingcap/parser" "github.com/pingcap/parser/model" "github.com/pingcap/tidb/ddl" + "github.com/pingcap/tidb/ddl/testutil" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/session" + "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/store/mockstore" "github.com/pingcap/tidb/store/mockstore/mocktikv" "github.com/pingcap/tidb/util/logutil" @@ -311,3 +315,61 @@ func (s *testFailDBSuite) TestGenGlobalIDFail(c *C) { tk.MustExec("admin check table t1") tk.MustExec("admin check table t2") } + +func (s *testFailDBSuite) TestAddIndexWorkerNum(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("create database if not exists test_db") + tk.MustExec("use test_db") + tk.MustExec("drop table if exists test_add_index") + tk.MustExec("create table test_add_index (c1 bigint, c2 bigint, c3 bigint, primary key(c1))") + + done := make(chan error, 1) + start := -10 + num := 4096 + // first add some rows + for i := start; i < num; i++ { + sql := fmt.Sprintf("insert into test_add_index values (%d, %d, %d)", i, i, i) + tk.MustExec(sql) + } + + is := s.dom.InfoSchema() + schemaName := model.NewCIStr("test_db") + tableName := model.NewCIStr("test_add_index") + tbl, err := is.TableByName(schemaName, tableName) + c.Assert(err, IsNil) + + splitCount := 100 + // Split table to multi region. + s.cluster.SplitTable(s.mvccStore, tbl.Meta().ID, splitCount) + + originDDLAddIndexWorkerCnt := variable.GetDDLReorgWorkerCounter() + lastSetWorkerCnt := originDDLAddIndexWorkerCnt + atomic.StoreInt32(&ddl.TestCheckWorkerNumber, lastSetWorkerCnt) + ddl.TestCheckWorkerNumber = lastSetWorkerCnt + defer variable.SetDDLReorgWorkerCounter(originDDLAddIndexWorkerCnt) + + gofail.Enable("github.com/pingcap/tidb/ddl/checkIndexWorkerNum", `return(true)`) + defer gofail.Disable("github.com/pingcap/tidb/ddl/checkIndexWorkerNum") + + testutil.SessionExecInGoroutine(c, s.store, "create index c3_index on test_add_index (c3)", done) + checkNum := 0 + +LOOP: + for { + select { + case err = <-done: + if err == nil { + break LOOP + } + c.Assert(err, IsNil, Commentf("err:%v", errors.ErrorStack(err))) + case <-ddl.TestCheckWorkerNumCh: + lastSetWorkerCnt = int32(rand.Intn(8) + 8) + tk.MustExec(fmt.Sprintf("set @@tidb_ddl_reorg_worker_cnt=%d", lastSetWorkerCnt)) + atomic.StoreInt32(&ddl.TestCheckWorkerNumber, lastSetWorkerCnt) + checkNum++ + } + } + c.Assert(checkNum, Greater, 5) + tk.MustExec("admin check table test_add_index") + tk.MustExec("drop table test_add_index") +} diff --git a/ddl/index.go b/ddl/index.go index a35d39feca527..2dafb9b66ef2a 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -823,7 +823,7 @@ func (w *addIndexWorker) handleBackfillTask(d *ddlCtx, task *reorgIndexTask) *ad if task.endIncluded { rightParenthesis = "]" } - log.Infof("[ddl-reorg] worker(%v), finish region %v ranges [%v,%v%s, addedCount:%v, scanCount:%v, nextHandle:%v, elapsed time(s):%v", + log.Infof("[ddl-reorg] worker(%v), finish table %v ranges [%v,%v%s, addedCount:%v, scanCount:%v, nextHandle:%v, elapsed time(s):%v", w.id, task.physicalTableID, task.startHandle, task.endHandle, rightParenthesis, result.addedCount, result.scanCount, result.nextHandle, time.Since(startTime).Seconds()) return result @@ -1046,34 +1046,12 @@ func (w *worker) sendRangeTaskToWorkers(t table.Table, workers []*addIndexWorker return nil, nil } -// buildIndexForReorgInfo build backfilling tasks from [reorgInfo.StartHandle, reorgInfo.EndHandle), -// and send these tasks to add index workers, till we finish adding the indices. -func (w *worker) buildIndexForReorgInfo(t table.PhysicalTable, workers []*addIndexWorker, job *model.Job, reorgInfo *reorgInfo) error { - totalAddedCount := job.GetRowCount() - - startHandle, endHandle := reorgInfo.StartHandle, reorgInfo.EndHandle - for { - kvRanges, err := splitTableRanges(t, reorgInfo.d.store, startHandle, endHandle) - if err != nil { - return errors.Trace(err) - } - - log.Infof("[ddl-reorg] start to reorg index of %v region ranges, handle range:[%v, %v).", len(kvRanges), startHandle, endHandle) - remains, err := w.sendRangeTaskToWorkers(t, workers, reorgInfo, &totalAddedCount, kvRanges) - if err != nil { - return errors.Trace(err) - } - - if len(remains) == 0 { - break - } - startHandle, _, err = decodeHandleRange(remains[0]) - if err != nil { - return errors.Trace(err) - } - } - return nil -} +var ( + // TestCheckWorkerNumCh use for test adjust add index worker. + TestCheckWorkerNumCh = make(chan struct{}, 0) + // TestCheckWorkerNumber use for test adjust add index worker. + TestCheckWorkerNumber = int32(16) +) // addPhysicalTableIndex handles the add index reorganization state for a non-partitioned table or a partition. // For a partitioned table, it should be handled partition by partition. @@ -1092,6 +1070,9 @@ func (w *worker) buildIndexForReorgInfo(t table.PhysicalTable, workers []*addInd func (w *worker) addPhysicalTableIndex(t table.PhysicalTable, indexInfo *model.IndexInfo, reorgInfo *reorgInfo) error { job := reorgInfo.Job log.Infof("[ddl-reorg] addTableIndex, job:%s, reorgInfo:%#v", job, reorgInfo) + totalAddedCount := job.GetRowCount() + + startHandle, endHandle := reorgInfo.StartHandle, reorgInfo.EndHandle sessCtx := newContext(reorgInfo.d.store) decodeColMap, err := makeupDecodeColMap(sessCtx, t, indexInfo) if err != nil { @@ -1100,16 +1081,68 @@ func (w *worker) addPhysicalTableIndex(t table.PhysicalTable, indexInfo *model.I // variable.ddlReorgWorkerCounter can be modified by system variable "tidb_ddl_reorg_worker_cnt". workerCnt := variable.GetDDLReorgWorkerCounter() - idxWorkers := make([]*addIndexWorker, workerCnt) - for i := 0; i < int(workerCnt); i++ { - sessCtx := newContext(reorgInfo.d.store) - idxWorkers[i] = newAddIndexWorker(sessCtx, w, i, t, indexInfo, decodeColMap) - idxWorkers[i].priority = job.Priority - go idxWorkers[i].run(reorgInfo.d) - } - defer closeAddIndexWorkers(idxWorkers) - err = w.buildIndexForReorgInfo(t, idxWorkers, job, reorgInfo) - return errors.Trace(err) + idxWorkers := make([]*addIndexWorker, 0, workerCnt) + defer func() { + closeAddIndexWorkers(idxWorkers) + }() + + for { + kvRanges, err := splitTableRanges(t, reorgInfo.d.store, startHandle, endHandle) + if err != nil { + return errors.Trace(err) + } + + // For dynamic adjust add index worker number. + workerCnt = variable.GetDDLReorgWorkerCounter() + // If only have 1 range, we can only start 1 worker. + if len(kvRanges) < int(workerCnt) { + workerCnt = int32(len(kvRanges)) + } + // Enlarge the worker size. + for i := len(idxWorkers); i < int(workerCnt); i++ { + sessCtx := newContext(reorgInfo.d.store) + idxWorker := newAddIndexWorker(sessCtx, w, i, t, indexInfo, decodeColMap) + idxWorker.priority = job.Priority + idxWorkers = append(idxWorkers, idxWorker) + go idxWorkers[i].run(reorgInfo.d) + } + // Shrink the worker size. + if len(idxWorkers) > int(workerCnt) { + workers := idxWorkers[workerCnt:] + idxWorkers = idxWorkers[:workerCnt] + closeAddIndexWorkers(workers) + } + + // gofail: var checkIndexWorkerNum bool + // if checkIndexWorkerNum { + // num := int(atomic.LoadInt32(&TestCheckWorkerNumber)) + // if num != 0 { + // if num > len(kvRanges) { + // if len(idxWorkers) != len(kvRanges) { + // return errors.Errorf("check index worker num error, len kv ranges is: %v, check index worker num is: %v, actual index num is: %v", len(kvRanges), num, len(idxWorkers)) + // } + // } else if num != len(idxWorkers) { + // return errors.Errorf("check index worker num error, len kv ranges is: %v, check index worker num is: %v, actual index num is: %v", len(kvRanges), num, len(idxWorkers)) + // } + // TestCheckWorkerNumCh <- struct{}{} + // } + //} + + log.Infof("[ddl-reorg] start %d workers to reorg index of %v region ranges, handle range:[%v, %v).", len(idxWorkers), len(kvRanges), startHandle, endHandle) + remains, err := w.sendRangeTaskToWorkers(t, idxWorkers, reorgInfo, &totalAddedCount, kvRanges) + if err != nil { + return errors.Trace(err) + } + + if len(remains) == 0 { + break + } + startHandle, _, err = decodeHandleRange(remains[0]) + if err != nil { + return errors.Trace(err) + } + } + return nil } // addTableIndex handles the add index reorganization state for a table. diff --git a/ddl/testutil/testutil.go b/ddl/testutil/testutil.go new file mode 100644 index 0000000000000..c690b05af3108 --- /dev/null +++ b/ddl/testutil/testutil.go @@ -0,0 +1,57 @@ +// Copyright 2018 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package testutil + +import ( + "context" + + "github.com/pingcap/check" + "github.com/pingcap/errors" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/session" +) + +// SessionExecInGoroutine export for testing. +func SessionExecInGoroutine(c *check.C, s kv.Storage, sql string, done chan error) { + ExecMultiSQLInGoroutine(c, s, "test_db", []string{sql}, done) +} + +// ExecMultiSQLInGoroutine exports for testing. +func ExecMultiSQLInGoroutine(c *check.C, s kv.Storage, dbName string, multiSQL []string, done chan error) { + go func() { + se, err := session.CreateSession4Test(s) + if err != nil { + done <- errors.Trace(err) + return + } + defer se.Close() + _, err = se.Execute(context.Background(), "use "+dbName) + if err != nil { + done <- errors.Trace(err) + return + } + for _, sql := range multiSQL { + rs, err := se.Execute(context.Background(), sql) + if err != nil { + done <- errors.Trace(err) + return + } + if rs != nil { + done <- errors.Errorf("RecordSet should be empty.") + return + } + done <- nil + } + }() +}