diff --git a/ddl/column.go b/ddl/column.go index c6ba5d08e88f4..a9326647d77e4 100644 --- a/ddl/column.go +++ b/ddl/column.go @@ -1009,51 +1009,12 @@ func (w *worker) doModifyColumnTypeWithData( return ver, errors.Trace(err) } - reorgInfo, err := getReorgInfo(w.jobContext, d, t, job, tbl, BuildElements(changingCol, changingIdxs)) - if err != nil || reorgInfo.first { - // If we run reorg firstly, we should update the job snapshot version - // and then run the reorg next time. - return ver, errors.Trace(err) + var done bool + done, ver, err = doReorgWorkForModifyColumn(w, d, t, job, tbl, oldCol, changingCol, changingIdxs) + if !done { + return ver, err } - // Inject a failpoint so that we can pause here and do verification on other components. - // With a failpoint-enabled version of TiDB, you can trigger this failpoint by the following command: - // enable: curl -X PUT -d "pause" "http://127.0.0.1:10080/fail/github.com/pingcap/tidb/ddl/mockDelayInModifyColumnTypeWithData". - // disable: curl -X DELETE "http://127.0.0.1:10080/fail/github.com/pingcap/tidb/ddl/mockDelayInModifyColumnTypeWithData" - failpoint.Inject("mockDelayInModifyColumnTypeWithData", func() {}) - err = w.runReorgJob(t, reorgInfo, tbl.Meta(), d.lease, func() (addIndexErr error) { - defer util.Recover(metrics.LabelDDL, "onModifyColumn", - func() { - addIndexErr = dbterror.ErrCancelledDDLJob.GenWithStack("modify table `%v` column `%v` panic", tblInfo.Name, oldCol.Name) - }, false) - // Use old column name to generate less confusing error messages. - changingColCpy := changingCol.Clone() - changingColCpy.Name = oldCol.Name - return w.updateColumnAndIndexes(tbl, oldCol, changingColCpy, changingIdxs, reorgInfo) - }) - if err != nil { - if dbterror.ErrWaitReorgTimeout.Equal(err) { - // If timeout, we should return, check for the owner and re-wait job done. - return ver, nil - } - if kv.IsTxnRetryableError(err) { - // Clean up the channel of notifyCancelReorgJob. Make sure it can't affect other jobs. - w.reorgCtx.cleanNotifyReorgCancel() - return ver, errors.Trace(err) - } - if err1 := t.RemoveDDLReorgHandle(job, reorgInfo.elements); err1 != nil { - logutil.BgLogger().Warn("[ddl] run modify column job failed, RemoveDDLReorgHandle failed, can't convert job to rollback", - zap.String("job", job.String()), zap.Error(err1)) - } - logutil.BgLogger().Warn("[ddl] run modify column job failed, convert job to rollback", zap.String("job", job.String()), zap.Error(err)) - job.State = model.JobStateRollingback - // Clean up the channel of notifyCancelReorgJob. Make sure it can't affect other jobs. - w.reorgCtx.cleanNotifyReorgCancel() - return ver, errors.Trace(err) - } - // Clean up the channel of notifyCancelReorgJob. Make sure it can't affect other jobs. - w.reorgCtx.cleanNotifyReorgCancel() - oldIdxIDs := getOldIndexIDs(tblInfo, oldCol) // used by GC delete range. err = adjustTableInfoAfterModifyColumnWithData(tblInfo, pos, oldCol, changingCol, colName, changingIdxs) @@ -1080,6 +1041,55 @@ func (w *worker) doModifyColumnTypeWithData( return ver, errors.Trace(err) } +func doReorgWorkForModifyColumn(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, tbl table.Table, + oldCol, changingCol *model.ColumnInfo, changingIdxs []*model.IndexInfo) (done bool, ver int64, err error) { + reorgInfo, err := getReorgInfo(w.jobContext, d, t, job, tbl, BuildElements(changingCol, changingIdxs)) + if err != nil || reorgInfo.first { + // If we run reorg firstly, we should update the job snapshot version + // and then run the reorg next time. + return false, ver, errors.Trace(err) + } + + // Inject a failpoint so that we can pause here and do verification on other components. + // With a failpoint-enabled version of TiDB, you can trigger this failpoint by the following command: + // enable: curl -X PUT -d "pause" "http://127.0.0.1:10080/fail/github.com/pingcap/tidb/ddl/mockDelayInModifyColumnTypeWithData". + // disable: curl -X DELETE "http://127.0.0.1:10080/fail/github.com/pingcap/tidb/ddl/mockDelayInModifyColumnTypeWithData" + failpoint.Inject("mockDelayInModifyColumnTypeWithData", func() {}) + err = w.runReorgJob(t, reorgInfo, tbl.Meta(), d.lease, func() (addIndexErr error) { + defer util.Recover(metrics.LabelDDL, "onModifyColumn", + func() { + addIndexErr = dbterror.ErrCancelledDDLJob.GenWithStack("modify table `%v` column `%v` panic", tbl.Meta().Name, oldCol.Name) + }, false) + // Use old column name to generate less confusing error messages. + changingColCpy := changingCol.Clone() + changingColCpy.Name = oldCol.Name + return w.updateColumnAndIndexes(tbl, oldCol, changingColCpy, changingIdxs, reorgInfo) + }) + if err != nil { + if dbterror.ErrWaitReorgTimeout.Equal(err) { + // If timeout, we should return, check for the owner and re-wait job done. + return false, ver, nil + } + if kv.IsTxnRetryableError(err) { + // Clean up the channel of notifyCancelReorgJob. Make sure it can't affect other jobs. + w.reorgCtx.cleanNotifyReorgCancel() + return false, ver, errors.Trace(err) + } + if err1 := t.RemoveDDLReorgHandle(job, reorgInfo.elements); err1 != nil { + logutil.BgLogger().Warn("[ddl] run modify column job failed, RemoveDDLReorgHandle failed, can't convert job to rollback", + zap.String("job", job.String()), zap.Error(err1)) + } + logutil.BgLogger().Warn("[ddl] run modify column job failed, convert job to rollback", zap.String("job", job.String()), zap.Error(err)) + job.State = model.JobStateRollingback + // Clean up the channel of notifyCancelReorgJob. Make sure it can't affect other jobs. + w.reorgCtx.cleanNotifyReorgCancel() + return false, ver, errors.Trace(err) + } + // Clean up the channel of notifyCancelReorgJob. Make sure it can't affect other jobs. + w.reorgCtx.cleanNotifyReorgCancel() + return true, ver, nil +} + func adjustTableInfoAfterModifyColumnWithData(tblInfo *model.TableInfo, pos *ast.ColumnPosition, oldCol, changingCol *model.ColumnInfo, newName model.CIStr, changingIdxs []*model.IndexInfo) (err error) { if pos != nil && pos.RelativeColumn != nil && oldCol.Name.L == pos.RelativeColumn.Name.L { diff --git a/ddl/index.go b/ddl/index.go index 109418cb17926..a646c819f145d 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -557,40 +557,12 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo return ver, errors.Trace(err) } - elements := []*meta.Element{{ID: indexInfo.ID, TypeKey: meta.IndexElementKey}} - reorgInfo, err := getReorgInfo(w.jobContext, d, t, job, tbl, elements) - if err != nil || reorgInfo.first { - // If we run reorg firstly, we should update the job snapshot version - // and then run the reorg next time. - return ver, errors.Trace(err) + var done bool + done, ver, err = doReorgWorkForCreateIndex(w, d, t, job, tbl, indexInfo) + if !done { + return ver, err } - err = w.runReorgJob(t, reorgInfo, tbl.Meta(), d.lease, func() (addIndexErr error) { - defer util.Recover(metrics.LabelDDL, "onCreateIndex", - func() { - addIndexErr = dbterror.ErrCancelledDDLJob.GenWithStack("add table `%v` index `%v` panic", tblInfo.Name, indexInfo.Name) - }, false) - return w.addTableIndex(tbl, indexInfo, reorgInfo) - }) - if err != nil { - if dbterror.ErrWaitReorgTimeout.Equal(err) { - // if timeout, we should return, check for the owner and re-wait job done. - return ver, nil - } - if kv.ErrKeyExists.Equal(err) || dbterror.ErrCancelledDDLJob.Equal(err) || dbterror.ErrCantDecodeRecord.Equal(err) { - logutil.BgLogger().Warn("[ddl] run add index job failed, convert job to rollback", zap.String("job", job.String()), zap.Error(err)) - ver, err = convertAddIdxJob2RollbackJob(t, job, tblInfo, indexInfo, err) - if err1 := t.RemoveDDLReorgHandle(job, reorgInfo.elements); err1 != nil { - logutil.BgLogger().Warn("[ddl] run add index job failed, convert job to rollback, RemoveDDLReorgHandle failed", zap.String("job", job.String()), zap.Error(err1)) - } - } - // Clean up the channel of notifyCancelReorgJob. Make sure it can't affect other jobs. - w.reorgCtx.cleanNotifyReorgCancel() - return ver, errors.Trace(err) - } - // Clean up the channel of notifyCancelReorgJob. Make sure it can't affect other jobs. - w.reorgCtx.cleanNotifyReorgCancel() - indexInfo.State = model.StatePublic // Set column index flag. addIndexColumnFlag(tblInfo, indexInfo) @@ -612,6 +584,44 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo return ver, errors.Trace(err) } +func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, + tbl table.Table, indexInfo *model.IndexInfo) (done bool, ver int64, err error) { + elements := []*meta.Element{{ID: indexInfo.ID, TypeKey: meta.IndexElementKey}} + reorgInfo, err := getReorgInfo(w.jobContext, d, t, job, tbl, elements) + if err != nil || reorgInfo.first { + // If we run reorg firstly, we should update the job snapshot version + // and then run the reorg next time. + return false, ver, errors.Trace(err) + } + + err = w.runReorgJob(t, reorgInfo, tbl.Meta(), d.lease, func() (addIndexErr error) { + defer util.Recover(metrics.LabelDDL, "onCreateIndex", + func() { + addIndexErr = dbterror.ErrCancelledDDLJob.GenWithStack("add table `%v` index `%v` panic", tbl.Meta().Name, indexInfo.Name) + }, false) + return w.addTableIndex(tbl, indexInfo, reorgInfo) + }) + if err != nil { + if dbterror.ErrWaitReorgTimeout.Equal(err) { + // if timeout, we should return, check for the owner and re-wait job done. + return false, ver, nil + } + if kv.ErrKeyExists.Equal(err) || dbterror.ErrCancelledDDLJob.Equal(err) || dbterror.ErrCantDecodeRecord.Equal(err) { + logutil.BgLogger().Warn("[ddl] run add index job failed, convert job to rollback", zap.String("job", job.String()), zap.Error(err)) + ver, err = convertAddIdxJob2RollbackJob(t, job, tbl.Meta(), indexInfo, err) + if err1 := t.RemoveDDLReorgHandle(job, reorgInfo.elements); err1 != nil { + logutil.BgLogger().Warn("[ddl] run add index job failed, convert job to rollback, RemoveDDLReorgHandle failed", zap.String("job", job.String()), zap.Error(err1)) + } + } + // Clean up the channel of notifyCancelReorgJob. Make sure it can't affect other jobs. + w.reorgCtx.cleanNotifyReorgCancel() + return false, ver, errors.Trace(err) + } + // Clean up the channel of notifyCancelReorgJob. Make sure it can't affect other jobs. + w.reorgCtx.cleanNotifyReorgCancel() + return true, ver, errors.Trace(err) +} + func onDropIndex(t *meta.Meta, job *model.Job) (ver int64, _ error) { tblInfo, indexInfo, err := checkDropIndex(t, job) if err != nil {