Skip to content

Commit

Permalink
ddl:support add index rollback of partitioned table
Browse files Browse the repository at this point in the history
  • Loading branch information
ciscoxll committed Aug 30, 2018
1 parent 67d7544 commit 4a0b0f2
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 55 deletions.
184 changes: 134 additions & 50 deletions ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,61 +403,12 @@ func (s *testDBSuite) TestCancelAddIndex(c *C) {
var checkErr error
var c3IdxInfo *model.IndexInfo
hook := &ddl.TestDDLCallback{}
first := true
oldReorgWaitTimeout := ddl.ReorgWaitTimeout
// let hook.OnJobUpdatedExported has chance to cancel the job.
// the hook.OnJobUpdatedExported is called when the job is updated, runReorgJob will wait ddl.ReorgWaitTimeout, then return the ddl.runDDLJob.
// After that ddl call d.hook.OnJobUpdated(job), so that we can canceled the job in this test case.
ddl.ReorgWaitTimeout = 50 * time.Millisecond
hook.OnJobUpdatedExported = func(job *model.Job) {
addIndexNotFirstReorg := job.Type == model.ActionAddIndex && job.SchemaState == model.StateWriteReorganization && job.SnapshotVer != 0
// If the action is adding index and the state is writing reorganization, it want to test the case of cancelling the job when backfilling indexes.
// When the job satisfies this case of addIndexNotFirstReorg, the worker will start to backfill indexes.
if !addIndexNotFirstReorg {
// Get the index's meta.
if c3IdxInfo != nil {
return
}
t := s.testGetTable(c, "t1")
for _, index := range t.WritableIndices() {
if index.Meta().Name.L == "c3_index" {
c3IdxInfo = index.Meta()
}
}
return
}
// The job satisfies the case of addIndexNotFirst for the first time, the worker hasn't finished a batch of backfill indexes.
if first {
first = false
return
}
if checkErr != nil {
return
}
hookCtx := mock.NewContext()
hookCtx.Store = s.store
var err error
err = hookCtx.NewTxn()
if err != nil {
checkErr = errors.Trace(err)
return
}
jobIDs := []int64{job.ID}
errs, err := admin.CancelJobs(hookCtx.Txn(), jobIDs)
if err != nil {
checkErr = errors.Trace(err)
return
}
// It only tests cancel one DDL job.
if errs[0] != nil {
checkErr = errors.Trace(errs[0])
return
}
err = hookCtx.Txn().Commit(context.Background())
if err != nil {
checkErr = errors.Trace(err)
}
}
hook.OnJobUpdatedExported, c3IdxInfo = backgroundExecOnJobUpdatedExported(c, s, hook, checkErr)
s.dom.DDL().(ddl.DDLForTest).SetHook(hook)
done := make(chan error, 1)
go backgroundExec(s.store, "create unique index c3_index on t1 (c3)", done)
Expand Down Expand Up @@ -3212,6 +3163,139 @@ LOOP:
s.tk.MustExec("drop table partition_drop_idx;")
}

func (s *testDBSuite) TestPartitionCancelAddIndex(c *C) {
s.tk = testkit.NewTestKit(c, s.store)
s.mustExec(c, "use test_db")
s.mustExec(c, "set @@session.tidb_enable_table_partition=1;")
s.mustExec(c, "drop table if exists t1;")
s.mustExec(c, `create table t1 (
c1 int, c2 int, c3 int
)
partition by range( c1 ) (
partition p0 values less than (1024),
partition p1 values less than (2048),
partition p2 values less than (3072),
partition p3 values less than (4096),
partition p4 values less than (maxvalue)
);`)
base := defaultBatchSize * 2
count := base
// add some rows
for i := 0; i < count; i++ {
s.mustExec(c, "insert into t1 values (?, ?, ?)", i, i, i)
}

var checkErr error
var c3IdxInfo *model.IndexInfo
hook := &ddl.TestDDLCallback{}
oldReorgWaitTimeout := ddl.ReorgWaitTimeout
ddl.ReorgWaitTimeout = 10 * time.Millisecond
hook.OnJobUpdatedExported, c3IdxInfo = backgroundExecOnJobUpdatedExported(c, s, hook, checkErr)
s.dom.DDL().(ddl.DDLForTest).SetHook(hook)
done := make(chan error, 1)
go backgroundExec(s.store, "create index c3_index on t1 (c3)", done)

times := 0
ticker := time.NewTicker(s.lease / 2)
defer ticker.Stop()
LOOP:
for {
select {
case err := <-done:
c.Assert(checkErr, IsNil)
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[ddl:12]cancelled DDL job")
break LOOP
case <-ticker.C:
if times >= 10 {
break
}
step := 10
rand.Seed(time.Now().Unix())
// delete some rows, and add some data
for i := count; i < count+step; i++ {
n := rand.Intn(count)
s.mustExec(c, "delete from t1 where c1 = ?", n)
s.mustExec(c, "insert into t1 values (?, ?, ?)", i+10, i, i)
}
count += step
times++
}
}

t := s.testGetTable(c, "t1")
// Only one partition id test is taken here.
pid := t.Meta().Partition.Definitions[0].ID
for _, tidx := range t.Indices() {
c.Assert(strings.EqualFold(tidx.Meta().Name.L, "c3_index"), IsFalse)
}

ctx := s.s.(sessionctx.Context)
idx := tables.NewIndex(pid, t.Meta(), c3IdxInfo)
checkDelRangeDone(c, ctx, idx)

s.mustExec(c, "drop table t1")
ddl.ReorgWaitTimeout = oldReorgWaitTimeout
callback := &ddl.TestDDLCallback{}
s.dom.DDL().(ddl.DDLForTest).SetHook(callback)
}

func backgroundExecOnJobUpdatedExported(c *C, s *testDBSuite, hook *ddl.TestDDLCallback, checkErr error) (func(*model.Job), *model.IndexInfo) {
first := true
ddl.ReorgWaitTimeout = 10 * time.Millisecond
c3IdxInfo := &model.IndexInfo{}
hook.OnJobUpdatedExported = func(job *model.Job) {
addIndexNotFirstReorg := job.Type == model.ActionAddIndex && job.SchemaState == model.StateWriteReorganization && job.SnapshotVer != 0
// If the action is adding index and the state is writing reorganization, it want to test the case of cancelling the job when backfilling indexes.
// When the job satisfies this case of addIndexNotFirstReorg, the worker will start to backfill indexes.
if !addIndexNotFirstReorg {
// Get the index's meta.
if c3IdxInfo != nil {
return
}
t := s.testGetTable(c, "t1")
for _, index := range t.WritableIndices() {
if index.Meta().Name.L == "c3_index" {
c3IdxInfo = index.Meta()
}
}
return
}
// The job satisfies the case of addIndexNotFirst for the first time, the worker hasn't finished a batch of backfill indexes.
if first {
first = false
return
}
if checkErr != nil {
return
}
hookCtx := mock.NewContext()
hookCtx.Store = s.store
var err error
err = hookCtx.NewTxn()
if err != nil {
checkErr = errors.Trace(err)
return
}
jobIDs := []int64{job.ID}
errs, err := admin.CancelJobs(hookCtx.Txn(), jobIDs)
if err != nil {
checkErr = errors.Trace(err)
return
}
// It only tests cancel one DDL job.
if errs[0] != nil {
checkErr = errors.Trace(errs[0])
return
}
err = hookCtx.Txn().Commit(context.Background())
if err != nil {
checkErr = errors.Trace(err)
}
}
return hook.OnJobUpdatedExported, c3IdxInfo
}

func (s *testDBSuite) TestPartitionAddIndex(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
Expand Down
19 changes: 15 additions & 4 deletions ddl/delete_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,12 +268,23 @@ func insertJobIntoDeleteRangeTable(ctx sessionctx.Context, job *model.Job) error
case model.ActionAddIndex:
tableID := job.TableID
var indexID int64
if err := job.DecodeArgs(&indexID); err != nil {
var partitionIDs []int64
if err := job.DecodeArgs(&indexID, &partitionIDs); err != nil {
return errors.Trace(err)
}
startKey := tablecodec.EncodeTableIndexPrefix(tableID, indexID)
endKey := tablecodec.EncodeTableIndexPrefix(tableID, indexID+1)
return doInsert(s, job.ID, indexID, startKey, endKey, now)
if len(partitionIDs) > 0 {
for _, pid := range partitionIDs {
startKey := tablecodec.EncodeTableIndexPrefix(pid, indexID)
endKey := tablecodec.EncodeTableIndexPrefix(pid, indexID+1)
if err := doInsert(s, job.ID, indexID, startKey, endKey, now); err != nil {
return errors.Trace(err)
}
}
} else {
startKey := tablecodec.EncodeTableIndexPrefix(tableID, indexID)
endKey := tablecodec.EncodeTableIndexPrefix(tableID, indexID+1)
return doInsert(s, job.ID, indexID, startKey, endKey, now)
}
case model.ActionDropIndex:
tableID := job.TableID
var indexName interface{}
Expand Down
2 changes: 1 addition & 1 deletion ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int

func convert2RollbackJob(t *meta.Meta, job *model.Job, tblInfo *model.TableInfo, indexInfo *model.IndexInfo, err error) (int64, error) {
job.State = model.JobStateRollingback
job.Args = []interface{}{indexInfo.Name}
job.Args = []interface{}{indexInfo.Name, getPartitionIDs(tblInfo)}
// If add index job rollbacks in write reorganization state, its need to delete all keys which has been added.
// Its work is the same as drop index job do.
// The write reorganization state in add index job that likes write only state in drop index job.
Expand Down

0 comments on commit 4a0b0f2

Please sign in to comment.