Skip to content

Commit

Permalink
use gofail test
Browse files Browse the repository at this point in the history
  • Loading branch information
crazycs520 committed Dec 21, 2018
1 parent 451a9da commit 77f49bd
Show file tree
Hide file tree
Showing 2 changed files with 175 additions and 24 deletions.
190 changes: 169 additions & 21 deletions ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

. "github.com/pingcap/check"
"github.com/pingcap/errors"
gofail "github.com/pingcap/gofail/runtime"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
tmysql "github.com/pingcap/parser/mysql"
Expand Down Expand Up @@ -609,27 +610,185 @@ func (s *testDBSuite) testAddIndex(c *C, testPartition bool, createTableSQL stri
s.mustExec(c, sql)
otherKeys = append(otherKeys, v)

sessionExecInGoroutine(c, s.store, "create index c3_index on test_add_index (c3)", done)

deletedKeys := make(map[int]struct{})

ticker := time.NewTicker(s.lease / 2)
defer ticker.Stop()
LOOP:
for {
select {
case err := <-done:
if err == nil {
break LOOP
}
c.Assert(err, IsNil, Commentf("err:%v", errors.ErrorStack(err)))
case <-ticker.C:
// When the server performance is particularly poor,
// the adding index operation can not be completed.
// So here is a limit to the number of rows inserted.
if num > defaultBatchSize*10 {
break
}
step := 10
// delete some rows, and add some data
for i := num; i < num+step; i++ {
n := rand.Intn(num)
deletedKeys[n] = struct{}{}
sql := fmt.Sprintf("delete from test_add_index where c1 = %d", n)
s.mustExec(c, sql)
sql = fmt.Sprintf("insert into test_add_index values (%d, %d, %d)", i, i, i)
s.mustExec(c, sql)
}
num += step
}
}

// get exists keys
keys := make([]int, 0, num)
for i := start; i < num; i++ {
if _, ok := deletedKeys[i]; ok {
continue
}
keys = append(keys, i)
}
keys = append(keys, otherKeys...)

// test index key
expectedRows := make([][]interface{}, 0, len(keys))
for _, key := range keys {
expectedRows = append(expectedRows, []interface{}{key})
}
rows := s.mustQuery(c, fmt.Sprintf("select c1 from test_add_index where c3 >= %d order by c1", start))
matchRows(c, rows, expectedRows)

if testPartition {
s.tk.MustExec("admin check table test_add_index")
return
}

// test index range
for i := 0; i < 100; i++ {
index := rand.Intn(len(keys) - 3)
rows := s.mustQuery(c, "select c1 from test_add_index where c3 >= ? limit 3", keys[index])
matchRows(c, rows, [][]interface{}{{keys[index]}, {keys[index+1]}, {keys[index+2]}})
}

// TODO: Support explain in future.
// rows := s.mustQuery(c, "explain select c1 from test_add_index where c3 >= 100")

// ay := dumpRows(c, rows)
// c.Assert(strings.Contains(fmt.Sprintf("%v", ay), "c3_index"), IsTrue)

// get all row handles
ctx := s.s.(sessionctx.Context)
c.Assert(ctx.NewTxn(context.Background()), IsNil)
t := s.testGetTable(c, "test_add_index")
handles := make(map[int64]struct{})
startKey := t.RecordKey(math.MinInt64)
err := t.IterRecords(ctx, startKey, t.Cols(),
func(h int64, data []types.Datum, cols []*table.Column) (bool, error) {
handles[h] = struct{}{}
return true, nil
})
c.Assert(err, IsNil)

// check in index
var nidx table.Index
for _, tidx := range t.Indices() {
if tidx.Meta().Name.L == "c3_index" {
nidx = tidx
break
}
}
// Make sure there is index with name c3_index.
c.Assert(nidx, NotNil)
c.Assert(nidx.Meta().ID, Greater, int64(0))
txn, err := ctx.Txn(true)
c.Assert(err, IsNil)
txn.Rollback()

c.Assert(ctx.NewTxn(context.Background()), IsNil)
defer txn.Rollback()

it, err := nidx.SeekFirst(txn)
c.Assert(err, IsNil)
defer it.Close()

for {
_, h, err := it.Next()
if terror.ErrorEqual(err, io.EOF) {
break
}

c.Assert(err, IsNil)
_, ok := handles[h]
c.Assert(ok, IsTrue)
delete(handles, h)
}
c.Assert(handles, HasLen, 0)
s.tk.MustExec("drop table test_add_index")
}

func (s *testDBSuite) TestAddIndexWorkerNum(c *C) {
s.tk = testkit.NewTestKit(c, s.store)
s.tk.MustExec("use " + s.schemaName)
s.tk.MustExec("drop table if exists test_add_index")
s.tk.MustExec("create table test_add_index (c1 bigint, c2 bigint, c3 bigint, primary key(c1))")

done := make(chan error, 1)
start := -10
num := defaultBatchSize
// first add some rows
for i := start; i < num; i++ {
sql := fmt.Sprintf("insert into test_add_index values (%d, %d, %d)", i, i, i)
s.mustExec(c, sql)
}
// Add some discrete rows.
maxBatch := 20
batchCnt := 100
otherKeys := make([]int, 0, batchCnt*maxBatch)
// Make sure there are no duplicate keys.
base := defaultBatchSize * 20
for i := 1; i < batchCnt; i++ {
n := base + i*defaultBatchSize + i
for j := 0; j < rand.Intn(maxBatch); j++ {
n += j
sql := fmt.Sprintf("insert into test_add_index values (%d, %d, %d)", n, n, n)
s.mustExec(c, sql)
otherKeys = append(otherKeys, n)
}
}
// Encounter the value of math.MaxInt64 in middle of
v := math.MaxInt64 - defaultBatchSize/2
sql := fmt.Sprintf("insert into test_add_index values (%d, %d, %d)", v, v, v)
s.mustExec(c, sql)
otherKeys = append(otherKeys, v)

is := s.dom.InfoSchema()
schemaName := model.NewCIStr(s.schemaName)
tableName := model.NewCIStr("test_add_index")
tbl, err := is.TableByName(schemaName, tableName)
c.Assert(err, IsNil)

splitCount := 100
if !testPartition {
// Split table to multi region.
s.cluster.SplitTable(s.mvccStore, tbl.Meta().ID, splitCount)
}
// Split table to multi region.
s.cluster.SplitTable(s.mvccStore, tbl.Meta().ID, splitCount)

// set hook for check add index worker num
gofail.Enable("github.com/pingcap/tidb/ddl/checkIndexWorker", `return(true)`)
defer gofail.Disable("github.com/pingcap/tidb/ddl/checkIndexWorker")

hook := &ddl.TestDDLCallback{}
oringDDLAddIndexWorkerCnt := variable.GetDDLReorgWorkerCounter()
lastSetWorkerCnt := int(oringDDLAddIndexWorkerCnt)
defer variable.SetDDLReorgWorkerCounter(oringDDLAddIndexWorkerCnt)
// firstCheck is use to check split table range is take effect.
firstCheck := !testPartition
firstCheck := true
var checkErr error
changeWorkerNumEnable := false
var changeWorkerNumEnable = int32(0)

hook.OnIndexWorkerReorgBeforeExported = func(workerNum, rangesNum int) {
if checkErr != nil {
return
Expand All @@ -649,7 +808,7 @@ func (s *testDBSuite) testAddIndex(c *C, testPartition bool, createTableSQL stri
} else if workerNum != setNum {
checkErr = errors.Errorf("rangeNum is %v, expect workerNum is: %v, but got: %v", rangesNum, setNum, workerNum)
}
changeWorkerNumEnable = true
atomic.StoreInt32(&changeWorkerNumEnable, 1)
}
originHook := s.dom.DDL().GetHook()
defer s.dom.DDL().(ddl.DDLForTest).SetHook(originHook)
Expand Down Expand Up @@ -691,15 +850,16 @@ LOOP:
}
num += step
case <-ticker2.C:
if changeWorkerNumEnable {
if atomic.LoadInt32(&changeWorkerNumEnable) == 1 {
lastSetWorkerCnt = rand.Intn(8) + 8
s.mustExec(c, fmt.Sprintf("set @@tidb_ddl_reorg_worker_cnt=%d", lastSetWorkerCnt))
c.Assert(checkErr, IsNil)
changeWorkerNumEnable = false
atomic.StoreInt32(&changeWorkerNumEnable, 0)
}
}
}
c.Assert(checkErr, IsNil)
c.Assert(firstCheck, IsFalse)

// get exists keys
keys := make([]int, 0, num)
Expand All @@ -719,25 +879,13 @@ LOOP:
rows := s.mustQuery(c, fmt.Sprintf("select c1 from test_add_index where c3 >= %d order by c1", start))
matchRows(c, rows, expectedRows)

if testPartition {
s.tk.MustExec("admin check table test_add_index")
return
}

// test index range
for i := 0; i < 100; i++ {
index := rand.Intn(len(keys) - 3)
rows := s.mustQuery(c, "select c1 from test_add_index where c3 >= ? limit 3", keys[index])
matchRows(c, rows, [][]interface{}{{keys[index]}, {keys[index+1]}, {keys[index+2]}})
}

// TODO: Support explain in future.
// rows := s.mustQuery(c, "explain select c1 from test_add_index where c3 >= 100")

// ay := dumpRows(c, rows)
// c.Assert(strings.Contains(fmt.Sprintf("%v", ay), "c3_index"), IsTrue)

// get all row handles
ctx := s.s.(sessionctx.Context)
c.Assert(ctx.NewTxn(context.Background()), IsNil)
t := s.testGetTable(c, "test_add_index")
Expand Down
9 changes: 6 additions & 3 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1106,9 +1106,12 @@ func (w *worker) addPhysicalTableIndex(t table.PhysicalTable, indexInfo *model.I
closeAddIndexWorkers(workers)
}

reorgInfo.d.mu.Lock()
reorgInfo.d.mu.hook.OnIndexWorkerReorgBefore(len(idxWorkers), len(kvRanges))
reorgInfo.d.mu.Unlock()
// gofail: var checkIndexWorker bool
//if checkIndexWorker {
// reorgInfo.d.mu.Lock()
// reorgInfo.d.mu.hook.OnIndexWorkerReorgBefore(len(idxWorkers), len(kvRanges))
// reorgInfo.d.mu.Unlock()
//}

log.Infof("[ddl-reorg] start %d workers to reorg index of %v region ranges, handle range:[%v, %v).", len(idxWorkers), len(kvRanges), startHandle, endHandle)
remains, err := w.sendRangeTaskToWorkers(t, idxWorkers, reorgInfo, &totalAddedCount, kvRanges)
Expand Down

0 comments on commit 77f49bd

Please sign in to comment.