diff --git a/ddl/db_test.go b/ddl/db_test.go index fba087f66f092..e16367f3ab98c 100644 --- a/ddl/db_test.go +++ b/ddl/db_test.go @@ -403,61 +403,12 @@ func (s *testDBSuite) TestCancelAddIndex(c *C) { var checkErr error var c3IdxInfo *model.IndexInfo hook := &ddl.TestDDLCallback{} - first := true oldReorgWaitTimeout := ddl.ReorgWaitTimeout // let hook.OnJobUpdatedExported has chance to cancel the job. // the hook.OnJobUpdatedExported is called when the job is updated, runReorgJob will wait ddl.ReorgWaitTimeout, then return the ddl.runDDLJob. // After that ddl call d.hook.OnJobUpdated(job), so that we can canceled the job in this test case. ddl.ReorgWaitTimeout = 50 * time.Millisecond - hook.OnJobUpdatedExported = func(job *model.Job) { - addIndexNotFirstReorg := job.Type == model.ActionAddIndex && job.SchemaState == model.StateWriteReorganization && job.SnapshotVer != 0 - // If the action is adding index and the state is writing reorganization, it want to test the case of cancelling the job when backfilling indexes. - // When the job satisfies this case of addIndexNotFirstReorg, the worker will start to backfill indexes. - if !addIndexNotFirstReorg { - // Get the index's meta. - if c3IdxInfo != nil { - return - } - t := s.testGetTable(c, "t1") - for _, index := range t.WritableIndices() { - if index.Meta().Name.L == "c3_index" { - c3IdxInfo = index.Meta() - } - } - return - } - // The job satisfies the case of addIndexNotFirst for the first time, the worker hasn't finished a batch of backfill indexes. - if first { - first = false - return - } - if checkErr != nil { - return - } - hookCtx := mock.NewContext() - hookCtx.Store = s.store - var err error - err = hookCtx.NewTxn() - if err != nil { - checkErr = errors.Trace(err) - return - } - jobIDs := []int64{job.ID} - errs, err := admin.CancelJobs(hookCtx.Txn(), jobIDs) - if err != nil { - checkErr = errors.Trace(err) - return - } - // It only tests cancel one DDL job. - if errs[0] != nil { - checkErr = errors.Trace(errs[0]) - return - } - err = hookCtx.Txn().Commit(context.Background()) - if err != nil { - checkErr = errors.Trace(err) - } - } + hook.OnJobUpdatedExported, c3IdxInfo = backgroundExecOnJobUpdatedExported(c, s, hook, checkErr) s.dom.DDL().(ddl.DDLForTest).SetHook(hook) done := make(chan error, 1) go backgroundExec(s.store, "create unique index c3_index on t1 (c3)", done) @@ -3221,6 +3172,139 @@ LOOP: s.tk.MustExec("drop table partition_drop_idx;") } +func (s *testDBSuite) TestPartitionCancelAddIndex(c *C) { + s.tk = testkit.NewTestKit(c, s.store) + s.mustExec(c, "use test_db") + s.mustExec(c, "set @@session.tidb_enable_table_partition=1;") + s.mustExec(c, "drop table if exists t1;") + s.mustExec(c, `create table t1 ( + c1 int, c2 int, c3 int + ) + partition by range( c1 ) ( + partition p0 values less than (1024), + partition p1 values less than (2048), + partition p2 values less than (3072), + partition p3 values less than (4096), + partition p4 values less than (maxvalue) + );`) + base := defaultBatchSize * 2 + count := base + // add some rows + for i := 0; i < count; i++ { + s.mustExec(c, "insert into t1 values (?, ?, ?)", i, i, i) + } + + var checkErr error + var c3IdxInfo *model.IndexInfo + hook := &ddl.TestDDLCallback{} + oldReorgWaitTimeout := ddl.ReorgWaitTimeout + ddl.ReorgWaitTimeout = 10 * time.Millisecond + hook.OnJobUpdatedExported, c3IdxInfo = backgroundExecOnJobUpdatedExported(c, s, hook, checkErr) + s.dom.DDL().(ddl.DDLForTest).SetHook(hook) + done := make(chan error, 1) + go backgroundExec(s.store, "create index c3_index on t1 (c3)", done) + + times := 0 + ticker := time.NewTicker(s.lease / 2) + defer ticker.Stop() +LOOP: + for { + select { + case err := <-done: + c.Assert(checkErr, IsNil) + c.Assert(err, NotNil) + c.Assert(err.Error(), Equals, "[ddl:12]cancelled DDL job") + break LOOP + case <-ticker.C: + if times >= 10 { + break + } + step := 10 + rand.Seed(time.Now().Unix()) + // delete some rows, and add some data + for i := count; i < count+step; i++ { + n := rand.Intn(count) + s.mustExec(c, "delete from t1 where c1 = ?", n) + s.mustExec(c, "insert into t1 values (?, ?, ?)", i+10, i, i) + } + count += step + times++ + } + } + + t := s.testGetTable(c, "t1") + // Only one partition id test is taken here. + pid := t.Meta().Partition.Definitions[0].ID + for _, tidx := range t.Indices() { + c.Assert(strings.EqualFold(tidx.Meta().Name.L, "c3_index"), IsFalse) + } + + ctx := s.s.(sessionctx.Context) + idx := tables.NewIndex(pid, t.Meta(), c3IdxInfo) + checkDelRangeDone(c, ctx, idx) + + s.mustExec(c, "drop table t1") + ddl.ReorgWaitTimeout = oldReorgWaitTimeout + callback := &ddl.TestDDLCallback{} + s.dom.DDL().(ddl.DDLForTest).SetHook(callback) +} + +func backgroundExecOnJobUpdatedExported(c *C, s *testDBSuite, hook *ddl.TestDDLCallback, checkErr error) (func(*model.Job), *model.IndexInfo) { + first := true + ddl.ReorgWaitTimeout = 10 * time.Millisecond + c3IdxInfo := &model.IndexInfo{} + hook.OnJobUpdatedExported = func(job *model.Job) { + addIndexNotFirstReorg := job.Type == model.ActionAddIndex && job.SchemaState == model.StateWriteReorganization && job.SnapshotVer != 0 + // If the action is adding index and the state is writing reorganization, it want to test the case of cancelling the job when backfilling indexes. + // When the job satisfies this case of addIndexNotFirstReorg, the worker will start to backfill indexes. + if !addIndexNotFirstReorg { + // Get the index's meta. + if c3IdxInfo != nil { + return + } + t := s.testGetTable(c, "t1") + for _, index := range t.WritableIndices() { + if index.Meta().Name.L == "c3_index" { + c3IdxInfo = index.Meta() + } + } + return + } + // The job satisfies the case of addIndexNotFirst for the first time, the worker hasn't finished a batch of backfill indexes. + if first { + first = false + return + } + if checkErr != nil { + return + } + hookCtx := mock.NewContext() + hookCtx.Store = s.store + var err error + err = hookCtx.NewTxn() + if err != nil { + checkErr = errors.Trace(err) + return + } + jobIDs := []int64{job.ID} + errs, err := admin.CancelJobs(hookCtx.Txn(), jobIDs) + if err != nil { + checkErr = errors.Trace(err) + return + } + // It only tests cancel one DDL job. + if errs[0] != nil { + checkErr = errors.Trace(errs[0]) + return + } + err = hookCtx.Txn().Commit(context.Background()) + if err != nil { + checkErr = errors.Trace(err) + } + } + return hook.OnJobUpdatedExported, c3IdxInfo +} + func (s *testDBSuite) TestPartitionAddIndex(c *C) { tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") diff --git a/ddl/delete_range.go b/ddl/delete_range.go index 2ad93e5928038..2eaabe3d28070 100644 --- a/ddl/delete_range.go +++ b/ddl/delete_range.go @@ -268,12 +268,23 @@ func insertJobIntoDeleteRangeTable(ctx sessionctx.Context, job *model.Job) error case model.ActionAddIndex: tableID := job.TableID var indexID int64 - if err := job.DecodeArgs(&indexID); err != nil { + var partitionIDs []int64 + if err := job.DecodeArgs(&indexID, &partitionIDs); err != nil { return errors.Trace(err) } - startKey := tablecodec.EncodeTableIndexPrefix(tableID, indexID) - endKey := tablecodec.EncodeTableIndexPrefix(tableID, indexID+1) - return doInsert(s, job.ID, indexID, startKey, endKey, now) + if len(partitionIDs) > 0 { + for _, pid := range partitionIDs { + startKey := tablecodec.EncodeTableIndexPrefix(pid, indexID) + endKey := tablecodec.EncodeTableIndexPrefix(pid, indexID+1) + if err := doInsert(s, job.ID, indexID, startKey, endKey, now); err != nil { + return errors.Trace(err) + } + } + } else { + startKey := tablecodec.EncodeTableIndexPrefix(tableID, indexID) + endKey := tablecodec.EncodeTableIndexPrefix(tableID, indexID+1) + return doInsert(s, job.ID, indexID, startKey, endKey, now) + } case model.ActionDropIndex: tableID := job.TableID var indexName interface{} diff --git a/ddl/index.go b/ddl/index.go index 54a68511c1e54..30b21039123fd 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -357,7 +357,7 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int func convert2RollbackJob(t *meta.Meta, job *model.Job, tblInfo *model.TableInfo, indexInfo *model.IndexInfo, err error) (int64, error) { job.State = model.JobStateRollingback - job.Args = []interface{}{indexInfo.Name} + job.Args = []interface{}{indexInfo.Name, getPartitionIDs(tblInfo)} // If add index job rollbacks in write reorganization state, its need to delete all keys which has been added. // Its work is the same as drop index job do. // The write reorganization state in add index job that likes write only state in drop index job.