Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: fix cancel drop column ddl error #8545

Merged
merged 23 commits into from
Dec 28, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 88 additions & 0 deletions ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -884,6 +884,94 @@ LOOP:
s.tk.MustExec("drop table test_drop_index")
}

// TestCancelDropColumn tests cancel ddl job which type is drop column.
func (s *testDBSuite) TestCancelDropColumn(c *C) {
s.tk = testkit.NewTestKit(c, s.store)
s.tk.MustExec("use " + s.schemaName)
s.mustExec(c, "drop table if exists test_drop_column")
s.mustExec(c, "create table test_drop_column(c1 int, c2 int)")
defer s.mustExec(c, "drop table test_drop_column;")
testCases := []struct {
needAddColumn bool
jobState model.JobState
JobSchemaState model.SchemaState
cancelSucc bool
}{
{true, model.JobStateNone, model.StateNone, true},
{false, model.JobStateRunning, model.StateWriteOnly, false},
{true, model.JobStateRunning, model.StateDeleteOnly, false},
{true, model.JobStateRunning, model.StateDeleteReorganization, false},
}
var checkErr error
oldReorgWaitTimeout := ddl.ReorgWaitTimeout
ddl.ReorgWaitTimeout = 50 * time.Millisecond
defer func() { ddl.ReorgWaitTimeout = oldReorgWaitTimeout }()
hook := &ddl.TestDDLCallback{}
var jobID int64
testCase := &testCases[0]
hook.OnJobRunBeforeExported = func(job *model.Job) {
if job.Type == model.ActionDropColumn && job.State == testCase.jobState && job.SchemaState == testCase.JobSchemaState {
jobIDs := []int64{job.ID}
jobID = job.ID
hookCtx := mock.NewContext()
hookCtx.Store = s.store
err := hookCtx.NewTxn(context.TODO())
if err != nil {
checkErr = errors.Trace(err)
return
}
txn, err := hookCtx.Txn(true)
if err != nil {
checkErr = errors.Trace(err)
return
}
errs, err := admin.CancelJobs(txn, jobIDs)
if err != nil {
checkErr = errors.Trace(err)
return
}
if errs[0] != nil {
checkErr = errors.Trace(errs[0])
return
}
checkErr = txn.Commit(context.Background())
}
}

originalHook := s.dom.DDL().GetHook()
s.dom.DDL().(ddl.DDLForTest).SetHook(hook)
var err1 error
for i := range testCases {
testCase = &testCases[i]
if testCase.needAddColumn {
s.mustExec(c, "alter table test_drop_column add column c3 int")
}
_, err1 = s.tk.Exec("alter table test_drop_column drop column c3")
var col1 *table.Column
t := s.testGetTable(c, "test_drop_column")
for _, col := range t.Cols() {
if strings.EqualFold(col.Name.L, "c3") {
col1 = col
break
}
}
if testCase.cancelSucc {
c.Assert(checkErr, IsNil)
c.Assert(col1, NotNil)
c.Assert(col1.Name.L, Equals, "c3")
c.Assert(err1.Error(), Equals, "[ddl:12]cancelled DDL job")
} else {
c.Assert(col1, IsNil)
c.Assert(err1, IsNil)
c.Assert(checkErr, NotNil)
c.Assert(checkErr.Error(), Equals, admin.ErrCannotCancelDDLJob.GenWithStackByArgs(jobID).Error())
}
}
s.dom.DDL().(ddl.DDLForTest).SetHook(originalHook)
s.mustExec(c, "alter table test_drop_column add column c3 int")
s.mustExec(c, "alter table test_drop_column drop column c3")
}

func checkDelRangeDone(c *C, ctx sessionctx.Context, idx table.Index) {
startTime := time.Now()
f := func() map[int64]struct{} {
Expand Down
33 changes: 33 additions & 0 deletions ddl/ddl_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,10 @@ func buildCancelJobTests(firstID int64) []testCancelJob {
{act: model.ActionCreateTable, jobIDs: []int64{firstID + 10}, cancelRetErrs: noErrs, cancelState: model.StateNone, ddlRetErr: err},
// Test create database, watch out, database id will alloc a globalID.
{act: model.ActionCreateSchema, jobIDs: []int64{firstID + 12}, cancelRetErrs: noErrs, cancelState: model.StateNone, ddlRetErr: err},

{act: model.ActionDropColumn, jobIDs: []int64{firstID + 13}, cancelRetErrs: []error{admin.ErrCancelFinishedDDLJob.GenWithStackByArgs(firstID + 13)}, cancelState: model.StateDeleteOnly, ddlRetErr: err},
{act: model.ActionDropColumn, jobIDs: []int64{firstID + 14}, cancelRetErrs: []error{admin.ErrCancelFinishedDDLJob.GenWithStackByArgs(firstID + 14)}, cancelState: model.StateWriteOnly, ddlRetErr: err},
{act: model.ActionDropColumn, jobIDs: []int64{firstID + 15}, cancelRetErrs: []error{admin.ErrCancelFinishedDDLJob.GenWithStackByArgs(firstID + 15)}, cancelState: model.StateWriteReorganization, ddlRetErr: err},
}

return tests
Expand Down Expand Up @@ -403,6 +407,18 @@ func (s *testDDLSuite) checkAddColumn(c *C, d *ddl, schemaID int64, tableID int6
c.Assert(found, Equals, success)
}

func (s *testDDLSuite) checkCancelDropColumn(c *C, d *ddl, schemaID int64, tableID int64, colName string, success bool) {
changedTable := testGetTable(c, d, schemaID, tableID)
notFound := true
for _, colInfo := range changedTable.Meta().Columns {
if colInfo.Name.O == colName {
notFound = false
break
}
}
c.Assert(notFound, Equals, success)
}

func (s *testDDLSuite) TestCancelJob(c *C) {
store := testCreateStore(c, "test_cancel_job")
defer store.Close()
Expand Down Expand Up @@ -541,6 +557,23 @@ func (s *testDDLSuite) TestCancelJob(c *C) {
c.Check(checkErr, IsNil)
testCheckSchemaState(c, d, dbInfo1, model.StateNone)

// for drop column.
test = &tests[10]
dropColName := "c2"
dropColumnArgs := []interface{}{model.NewCIStr(dropColName)}
doDDLJobErrWithSchemaState(ctx, d, c, dbInfo.ID, tblInfo.ID, model.ActionDropColumn, dropColumnArgs, &cancelState)
c.Check(errors.ErrorStack(checkErr), Equals, "")
s.checkCancelDropColumn(c, d, dbInfo.ID, tblInfo.ID, dropColName, false)

test = &tests[11]
doDDLJobErrWithSchemaState(ctx, d, c, dbInfo.ID, tblInfo.ID, model.ActionDropColumn, dropColumnArgs, &cancelState)
c.Check(errors.ErrorStack(checkErr), Equals, "")
s.checkCancelDropColumn(c, d, dbInfo.ID, tblInfo.ID, dropColName, false)

ciscoxll marked this conversation as resolved.
Show resolved Hide resolved
test = &tests[12]
doDDLJobErrWithSchemaState(ctx, d, c, dbInfo.ID, tblInfo.ID, model.ActionDropColumn, dropColumnArgs, &cancelState)
c.Check(errors.ErrorStack(checkErr), Equals, "")
s.checkCancelDropColumn(c, d, dbInfo.ID, tblInfo.ID, dropColName, false)
}

func (s *testDDLSuite) TestIgnorableSpec(c *C) {
Expand Down
34 changes: 34 additions & 0 deletions ddl/rollingback.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,38 @@ func rollingbackAddColumn(t *meta.Meta, job *model.Job) (ver int64, err error) {
return ver, errCancelledDDLJob
}

func rollingbackDropColumn(t *meta.Meta, job *model.Job) (ver int64, err error) {
ciscoxll marked this conversation as resolved.
Show resolved Hide resolved
tblInfo, err := getTableInfo(t, job, job.SchemaID)
if err != nil {
return ver, errors.Trace(err)
}

var colName model.CIStr
err = job.DecodeArgs(&colName)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

colInfo := model.FindColumnInfo(tblInfo.Columns, colName.L)
if colInfo == nil {
job.State = model.JobStateCancelled
return ver, ErrCantDropFieldOrKey.GenWithStack("column %s doesn't exist", colName)
}

// StatePublic means when the job is not running yet.
if colInfo.State == model.StatePublic {
job.State = model.JobStateCancelled
} else {
// In the state of drop column `write only -> delete only -> reorganization`,
// We can not rollback now, so just continue to drop column.
job.State = model.JobStateRunning
return ver, errors.Trace(nil)
}
job.FinishTableJob(model.JobStateRollbackDone, model.StatePublic, ver, tblInfo)
return ver, errors.Trace(errCancelledDDLJob)
}

func rollingbackDropIndex(t *meta.Meta, job *model.Job) (ver int64, err error) {
schemaID := job.SchemaID
tblInfo, err := getTableInfo(t, job, schemaID)
Expand Down Expand Up @@ -182,6 +214,8 @@ func convertJob2RollbackJob(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job)
ver, err = rollingbackAddColumn(t, job)
case model.ActionAddIndex:
ver, err = rollingbackAddindex(w, d, t, job)
case model.ActionDropColumn:
ver, err = rollingbackDropColumn(t, job)
case model.ActionDropIndex:
ver, err = rollingbackDropIndex(t, job)
case model.ActionDropTable, model.ActionDropSchema:
Expand Down
4 changes: 4 additions & 0 deletions util/admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ func isJobRollbackable(job *model.Job, id int64) error {
job.SchemaState == model.StateDeleteReorganization {
return ErrCannotCancelDDLJob.GenWithStackByArgs(id)
}
case model.ActionDropColumn:
if job.SchemaState != model.StateNone {
return ErrCannotCancelDDLJob.GenWithStackByArgs(id)
}
case model.ActionDropSchema, model.ActionDropTable:
// To simplify the rollback logic, cannot be canceled in the following states.
if job.SchemaState == model.StateWriteOnly ||
Expand Down