From 77f49bd38b666980c80c79e1e1418fe36ec149e1 Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Fri, 21 Dec 2018 11:05:55 +0800 Subject: [PATCH] use gofail test --- ddl/db_test.go | 190 +++++++++++++++++++++++++++++++++++++++++++------ ddl/index.go | 9 ++- 2 files changed, 175 insertions(+), 24 deletions(-) diff --git a/ddl/db_test.go b/ddl/db_test.go index 1ce4131c20e12..7e70b386a5c99 100644 --- a/ddl/db_test.go +++ b/ddl/db_test.go @@ -27,6 +27,7 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/errors" + gofail "github.com/pingcap/gofail/runtime" "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" tmysql "github.com/pingcap/parser/mysql" @@ -609,6 +610,162 @@ func (s *testDBSuite) testAddIndex(c *C, testPartition bool, createTableSQL stri s.mustExec(c, sql) otherKeys = append(otherKeys, v) + sessionExecInGoroutine(c, s.store, "create index c3_index on test_add_index (c3)", done) + + deletedKeys := make(map[int]struct{}) + + ticker := time.NewTicker(s.lease / 2) + defer ticker.Stop() +LOOP: + for { + select { + case err := <-done: + if err == nil { + break LOOP + } + c.Assert(err, IsNil, Commentf("err:%v", errors.ErrorStack(err))) + case <-ticker.C: + // When the server performance is particularly poor, + // the adding index operation can not be completed. + // So here is a limit to the number of rows inserted. + if num > defaultBatchSize*10 { + break + } + step := 10 + // delete some rows, and add some data + for i := num; i < num+step; i++ { + n := rand.Intn(num) + deletedKeys[n] = struct{}{} + sql := fmt.Sprintf("delete from test_add_index where c1 = %d", n) + s.mustExec(c, sql) + sql = fmt.Sprintf("insert into test_add_index values (%d, %d, %d)", i, i, i) + s.mustExec(c, sql) + } + num += step + } + } + + // get exists keys + keys := make([]int, 0, num) + for i := start; i < num; i++ { + if _, ok := deletedKeys[i]; ok { + continue + } + keys = append(keys, i) + } + keys = append(keys, otherKeys...) + + // test index key + expectedRows := make([][]interface{}, 0, len(keys)) + for _, key := range keys { + expectedRows = append(expectedRows, []interface{}{key}) + } + rows := s.mustQuery(c, fmt.Sprintf("select c1 from test_add_index where c3 >= %d order by c1", start)) + matchRows(c, rows, expectedRows) + + if testPartition { + s.tk.MustExec("admin check table test_add_index") + return + } + + // test index range + for i := 0; i < 100; i++ { + index := rand.Intn(len(keys) - 3) + rows := s.mustQuery(c, "select c1 from test_add_index where c3 >= ? limit 3", keys[index]) + matchRows(c, rows, [][]interface{}{{keys[index]}, {keys[index+1]}, {keys[index+2]}}) + } + + // TODO: Support explain in future. + // rows := s.mustQuery(c, "explain select c1 from test_add_index where c3 >= 100") + + // ay := dumpRows(c, rows) + // c.Assert(strings.Contains(fmt.Sprintf("%v", ay), "c3_index"), IsTrue) + + // get all row handles + ctx := s.s.(sessionctx.Context) + c.Assert(ctx.NewTxn(context.Background()), IsNil) + t := s.testGetTable(c, "test_add_index") + handles := make(map[int64]struct{}) + startKey := t.RecordKey(math.MinInt64) + err := t.IterRecords(ctx, startKey, t.Cols(), + func(h int64, data []types.Datum, cols []*table.Column) (bool, error) { + handles[h] = struct{}{} + return true, nil + }) + c.Assert(err, IsNil) + + // check in index + var nidx table.Index + for _, tidx := range t.Indices() { + if tidx.Meta().Name.L == "c3_index" { + nidx = tidx + break + } + } + // Make sure there is index with name c3_index. + c.Assert(nidx, NotNil) + c.Assert(nidx.Meta().ID, Greater, int64(0)) + txn, err := ctx.Txn(true) + c.Assert(err, IsNil) + txn.Rollback() + + c.Assert(ctx.NewTxn(context.Background()), IsNil) + defer txn.Rollback() + + it, err := nidx.SeekFirst(txn) + c.Assert(err, IsNil) + defer it.Close() + + for { + _, h, err := it.Next() + if terror.ErrorEqual(err, io.EOF) { + break + } + + c.Assert(err, IsNil) + _, ok := handles[h] + c.Assert(ok, IsTrue) + delete(handles, h) + } + c.Assert(handles, HasLen, 0) + s.tk.MustExec("drop table test_add_index") +} + +func (s *testDBSuite) TestAddIndexWorkerNum(c *C) { + s.tk = testkit.NewTestKit(c, s.store) + s.tk.MustExec("use " + s.schemaName) + s.tk.MustExec("drop table if exists test_add_index") + s.tk.MustExec("create table test_add_index (c1 bigint, c2 bigint, c3 bigint, primary key(c1))") + + done := make(chan error, 1) + start := -10 + num := defaultBatchSize + // first add some rows + for i := start; i < num; i++ { + sql := fmt.Sprintf("insert into test_add_index values (%d, %d, %d)", i, i, i) + s.mustExec(c, sql) + } + // Add some discrete rows. + maxBatch := 20 + batchCnt := 100 + otherKeys := make([]int, 0, batchCnt*maxBatch) + // Make sure there are no duplicate keys. + base := defaultBatchSize * 20 + for i := 1; i < batchCnt; i++ { + n := base + i*defaultBatchSize + i + for j := 0; j < rand.Intn(maxBatch); j++ { + n += j + sql := fmt.Sprintf("insert into test_add_index values (%d, %d, %d)", n, n, n) + s.mustExec(c, sql) + otherKeys = append(otherKeys, n) + } + } + // Encounter the value of math.MaxInt64 in middle of + v := math.MaxInt64 - defaultBatchSize/2 + sql := fmt.Sprintf("insert into test_add_index values (%d, %d, %d)", v, v, v) + s.mustExec(c, sql) + otherKeys = append(otherKeys, v) + is := s.dom.InfoSchema() schemaName := model.NewCIStr(s.schemaName) tableName := model.NewCIStr("test_add_index") @@ -616,20 +773,22 @@ func (s *testDBSuite) testAddIndex(c *C, testPartition bool, createTableSQL stri c.Assert(err, IsNil) splitCount := 100 - if !testPartition { - // Split table to multi region. - s.cluster.SplitTable(s.mvccStore, tbl.Meta().ID, splitCount) - } + // Split table to multi region. + s.cluster.SplitTable(s.mvccStore, tbl.Meta().ID, splitCount) // set hook for check add index worker num + gofail.Enable("github.com/pingcap/tidb/ddl/checkIndexWorker", `return(true)`) + defer gofail.Disable("github.com/pingcap/tidb/ddl/checkIndexWorker") + hook := &ddl.TestDDLCallback{} oringDDLAddIndexWorkerCnt := variable.GetDDLReorgWorkerCounter() lastSetWorkerCnt := int(oringDDLAddIndexWorkerCnt) defer variable.SetDDLReorgWorkerCounter(oringDDLAddIndexWorkerCnt) // firstCheck is use to check split table range is take effect. - firstCheck := !testPartition + firstCheck := true var checkErr error - changeWorkerNumEnable := false + var changeWorkerNumEnable = int32(0) + hook.OnIndexWorkerReorgBeforeExported = func(workerNum, rangesNum int) { if checkErr != nil { return @@ -649,7 +808,7 @@ func (s *testDBSuite) testAddIndex(c *C, testPartition bool, createTableSQL stri } else if workerNum != setNum { checkErr = errors.Errorf("rangeNum is %v, expect workerNum is: %v, but got: %v", rangesNum, setNum, workerNum) } - changeWorkerNumEnable = true + atomic.StoreInt32(&changeWorkerNumEnable, 1) } originHook := s.dom.DDL().GetHook() defer s.dom.DDL().(ddl.DDLForTest).SetHook(originHook) @@ -691,15 +850,16 @@ LOOP: } num += step case <-ticker2.C: - if changeWorkerNumEnable { + if atomic.LoadInt32(&changeWorkerNumEnable) == 1 { lastSetWorkerCnt = rand.Intn(8) + 8 s.mustExec(c, fmt.Sprintf("set @@tidb_ddl_reorg_worker_cnt=%d", lastSetWorkerCnt)) c.Assert(checkErr, IsNil) - changeWorkerNumEnable = false + atomic.StoreInt32(&changeWorkerNumEnable, 0) } } } c.Assert(checkErr, IsNil) + c.Assert(firstCheck, IsFalse) // get exists keys keys := make([]int, 0, num) @@ -719,11 +879,6 @@ LOOP: rows := s.mustQuery(c, fmt.Sprintf("select c1 from test_add_index where c3 >= %d order by c1", start)) matchRows(c, rows, expectedRows) - if testPartition { - s.tk.MustExec("admin check table test_add_index") - return - } - // test index range for i := 0; i < 100; i++ { index := rand.Intn(len(keys) - 3) @@ -731,13 +886,6 @@ LOOP: matchRows(c, rows, [][]interface{}{{keys[index]}, {keys[index+1]}, {keys[index+2]}}) } - // TODO: Support explain in future. - // rows := s.mustQuery(c, "explain select c1 from test_add_index where c3 >= 100") - - // ay := dumpRows(c, rows) - // c.Assert(strings.Contains(fmt.Sprintf("%v", ay), "c3_index"), IsTrue) - - // get all row handles ctx := s.s.(sessionctx.Context) c.Assert(ctx.NewTxn(context.Background()), IsNil) t := s.testGetTable(c, "test_add_index") diff --git a/ddl/index.go b/ddl/index.go index 544c47d5d7d12..271f3d3aab54e 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -1106,9 +1106,12 @@ func (w *worker) addPhysicalTableIndex(t table.PhysicalTable, indexInfo *model.I closeAddIndexWorkers(workers) } - reorgInfo.d.mu.Lock() - reorgInfo.d.mu.hook.OnIndexWorkerReorgBefore(len(idxWorkers), len(kvRanges)) - reorgInfo.d.mu.Unlock() + // gofail: var checkIndexWorker bool + //if checkIndexWorker { + // reorgInfo.d.mu.Lock() + // reorgInfo.d.mu.hook.OnIndexWorkerReorgBefore(len(idxWorkers), len(kvRanges)) + // reorgInfo.d.mu.Unlock() + //} log.Infof("[ddl-reorg] start %d workers to reorg index of %v region ranges, handle range:[%v, %v).", len(idxWorkers), len(kvRanges), startHandle, endHandle) remains, err := w.sendRangeTaskToWorkers(t, idxWorkers, reorgInfo, &totalAddedCount, kvRanges)