Skip to content

Commit

Permalink
fix cancel add/drop partitioned table ddl job (#9376)
Browse files Browse the repository at this point in the history
  • Loading branch information
ciscoxll authored and zimulala committed Feb 21, 2019
1 parent 7513f0c commit 29a48ae
Show file tree
Hide file tree
Showing 4 changed files with 163 additions and 0 deletions.
88 changes: 88 additions & 0 deletions ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -890,6 +890,94 @@ func checkDelRangeDone(c *C, ctx sessionctx.Context, idx table.Index) {
c.Assert(handles, HasLen, 0, Commentf("take time %v", time.Since(startTime)))
}

// TestCancelAddTableAndDropTablePartition tests cancel ddl job which type is add/drop table partition.
func (s *testDBSuite) TestCancelAddTableAndDropTablePartition(c *C) {
s.tk = testkit.NewTestKit(c, s.store)
s.mustExec(c, "create database if not exists test_partition_table")
s.mustExec(c, "use test_partition_table")
s.mustExec(c, "drop table if exists t_part")
s.mustExec(c, `create table t_part (a int key)
partition by range(a) (
partition p0 values less than (10),
partition p1 values less than (20)
);`)
defer s.mustExec(c, "drop table t_part;")
for i := 0; i < 10; i++ {
s.mustExec(c, "insert into t_part values (?)", i)
}

testCases := []struct {
action model.ActionType
jobState model.JobState
JobSchemaState model.SchemaState
cancelSucc bool
}{
{model.ActionAddTablePartition, model.JobStateNone, model.StateNone, true},
{model.ActionDropTablePartition, model.JobStateNone, model.StateNone, true},
{model.ActionAddTablePartition, model.JobStateRunning, model.StatePublic, false},
{model.ActionDropTablePartition, model.JobStateRunning, model.StatePublic, false},
}
var checkErr error
hook := &ddl.TestDDLCallback{}
testCase := &testCases[0]
var jobID int64
hook.OnJobRunBeforeExported = func(job *model.Job) {
if job.Type == testCase.action && job.State == testCase.jobState && job.SchemaState == testCase.JobSchemaState {
jobIDs := []int64{job.ID}
jobID = job.ID
hookCtx := mock.NewContext()
hookCtx.Store = s.store
err := hookCtx.NewTxn()
if err != nil {
checkErr = errors.Trace(err)
return
}
txn, err := hookCtx.Txn(true)
if err != nil {
checkErr = errors.Trace(err)
return
}
errs, err := admin.CancelJobs(txn, jobIDs)
if err != nil {
checkErr = errors.Trace(err)
return
}
if errs[0] != nil {
checkErr = errors.Trace(errs[0])
return
}
checkErr = txn.Commit(context.Background())
}
s.dom.DDL().(ddl.DDLForTest).SetHook(hook)
var err error
sql := ""
for i := range testCases {
testCase = &testCases[i]
if testCase.action == model.ActionAddTablePartition {
sql = `alter table t_part add partition (
partition p2 values less than (30)
);`
} else if testCase.action == model.ActionDropTablePartition {
sql = "alter table t_part drop partition p1;"
}
_, err = s.tk.Exec(sql)
if testCase.cancelSucc {
c.Assert(checkErr, IsNil)
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[ddl:12]cancelled DDL job")
s.mustExec(c, "insert into t_part values (?)", i)
} else {
c.Assert(err, IsNil)
c.Assert(checkErr, NotNil)
c.Assert(checkErr.Error(), Equals, admin.ErrCannotCancelDDLJob.GenWithStackByArgs(jobID).Error())
_, err = s.tk.Exec("insert into t_part values (?)", i)
c.Assert(err, NotNil)
}
}
}
s.dom.DDL().(ddl.DDLForTest).SetHook(&ddl.TestDDLCallback{})
}

// TestCancelDropColumn tests cancel ddl job which type is drop column.
func (s *testDBSuite) TestCancelDropColumn(c *C) {
s.tk = testkit.NewTestKit(c, s.store)
Expand Down
32 changes: 32 additions & 0 deletions ddl/rollingback.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,18 @@ func convertNotStartAddIdxJob2RollbackJob(t *meta.Meta, job *model.Job, occuredE
return convertAddIdxJob2RollbackJob(t, job, tblInfo, indexInfo, occuredErr)
}

func cancelOnlyNotHandledJob(job *model.Job) (ver int64, err error) {
// We can only cancel the not handled job.
if job.SchemaState == model.StateNone {
job.State = model.JobStateCancelled
return ver, errCancelledDDLJob
}

job.State = model.JobStateRunning

return ver, nil
}

func rollingbackAddColumn(t *meta.Meta, job *model.Job) (ver int64, err error) {
job.State = model.JobStateRollingback
col := &model.ColumnInfo{}
Expand Down Expand Up @@ -131,6 +143,14 @@ func rollingbackAddindex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) (ve
return
}

func rollingbackAddTablePartition(t *meta.Meta, job *model.Job) (ver int64, err error) {
_, err = getTableInfoAndCancelFaultJob(t, job, job.SchemaID)
if err != nil {
return ver, errors.Trace(err)
}
return cancelOnlyNotHandledJob(job)
}

func rollingbackDropColumn(t *meta.Meta, job *model.Job) (ver int64, err error) {
tblInfo, err := getTableInfo(t, job, job.SchemaID)
if err != nil {
Expand Down Expand Up @@ -163,14 +183,26 @@ func rollingbackDropColumn(t *meta.Meta, job *model.Job) (ver int64, err error)
return ver, errors.Trace(errCancelledDDLJob)
}

func rollingbackDropTablePartition(t *meta.Meta, job *model.Job) (ver int64, err error) {
_, err = getTableInfoAndCancelFaultJob(t, job, job.SchemaID)
if err != nil {
return ver, errors.Trace(err)
}
return cancelOnlyNotHandledJob(job)
}

func convertJob2RollbackJob(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) {
switch job.Type {
case model.ActionAddColumn:
ver, err = rollingbackAddColumn(t, job)
case model.ActionAddIndex:
ver, err = rollingbackAddindex(w, d, t, job)
case model.ActionAddTablePartition:
ver, err = rollingbackAddTablePartition(t, job)
case model.ActionDropColumn:
ver, err = rollingbackDropColumn(t, job)
case model.ActionDropTablePartition:
ver, err = rollingbackDropTablePartition(t, job)
default:
job.State = model.JobStateCancelled
err = errCancelledDDLJob
Expand Down
39 changes: 39 additions & 0 deletions ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,45 @@ func getTable(store kv.Storage, schemaID int64, tblInfo *model.TableInfo) (table
return tbl, errors.Trace(err)
}

func getTableInfoAndCancelFaultJob(t *meta.Meta, job *model.Job, schemaID int64) (*model.TableInfo, error) {
tblInfo, err := checkTableExistAndCancelNonExistJob(t, job, schemaID)
if err != nil {
return nil, errors.Trace(err)
}

if tblInfo.State != model.StatePublic {
job.State = model.JobStateCancelled
return nil, ErrInvalidTableState.GenWithStack("table %s is not in public, but %s", tblInfo.Name, tblInfo.State)
}

return tblInfo, nil
}

func checkTableExistAndCancelNonExistJob(t *meta.Meta, job *model.Job, schemaID int64) (*model.TableInfo, error) {
tableID := job.TableID
// Check this table's database.
tblInfo, err := t.GetTable(schemaID, tableID)
if err != nil {
if meta.ErrDBNotExists.Equal(err) {
job.State = model.JobStateCancelled
return nil, errors.Trace(infoschema.ErrDatabaseNotExists.GenWithStackByArgs(
fmt.Sprintf("(Schema ID %d)", schemaID),
))
}
return nil, errors.Trace(err)
}

// Check the table.
if tblInfo == nil {
job.State = model.JobStateCancelled
return nil, errors.Trace(infoschema.ErrTableNotExists.GenWithStackByArgs(
fmt.Sprintf("(Schema ID %d)", schemaID),
fmt.Sprintf("(Table ID %d)", tableID),
))
}
return tblInfo, nil
}

func getTableInfo(t *meta.Meta, job *model.Job, schemaID int64) (*model.TableInfo, error) {
tableID := job.TableID
tblInfo, err := t.GetTable(schemaID, tableID)
Expand Down
4 changes: 4 additions & 0 deletions util/admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ func isJobRollbackable(job *model.Job, id int64) error {
if job.SchemaState != model.StateNone {
return ErrCannotCancelDDLJob.GenWithStackByArgs(id)
}
case model.ActionDropTablePartition, model.ActionAddTablePartition:
if job.SchemaState != model.StateNone {
return ErrCannotCancelDDLJob.GenWithStackByArgs(id)
}
}
return nil
}
Expand Down

0 comments on commit 29a48ae

Please sign in to comment.