Skip to content

Commit

Permalink
ddl: refactor V2 args for RebaseAutoID/Modify-table/Index-visibility/…
Browse files Browse the repository at this point in the history
…FK related DDL. (#56164)

ref #53930
  • Loading branch information
joccau authored Sep 24, 2024
1 parent ad68216 commit 58bece3
Show file tree
Hide file tree
Showing 10 changed files with 337 additions and 45 deletions.
57 changes: 41 additions & 16 deletions pkg/ddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1988,6 +1988,8 @@ func (e *executor) multiSchemaChange(ctx sessionctx.Context, ti ast.Ident, info
logFn = logutil.DDLLogger().Fatal
}

// to do:(joccau)
// we need refactor this part to support V2 job version after refactor all of ddl types.
var involvingSchemaInfo []model.InvolvingSchemaInfo
for _, j := range subJobs {
switch j.Type {
Expand Down Expand Up @@ -2132,17 +2134,23 @@ func (e *executor) RebaseAutoID(ctx sessionctx.Context, ident ast.Ident, newBase
newBase = newBaseTemp
}
job := &model.Job{
Version: model.JobVersion1,
SchemaID: schema.ID,
TableID: tbInfo.ID,
SchemaName: schema.Name.L,
TableName: tbInfo.Name.L,
Type: actionType,
BinlogInfo: &model.HistoryInfo{},
Args: []any{newBase, force},
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
SQLMode: ctx.GetSessionVars().SQLMode,
}
err = e.DoDDLJob(ctx, job)
args := &model.RebaseAutoIDArgs{
NewBase: newBase,
Force: force,
}
// need fill args, the job will be pushed subjob.
job.FillArgs(args)
err = e.doDDLJob2(ctx, job, args)
return errors.Trace(err)
}

Expand Down Expand Up @@ -3525,18 +3533,19 @@ func (e *executor) AlterTableComment(ctx sessionctx.Context, ident ast.Ident, sp
}

job := &model.Job{
Version: model.JobVersion1,
SchemaID: schema.ID,
TableID: tb.Meta().ID,
SchemaName: schema.Name.L,
TableName: tb.Meta().Name.L,
Type: model.ActionModifyTableComment,
BinlogInfo: &model.HistoryInfo{},
Args: []any{spec.Comment},
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
SQLMode: ctx.GetSessionVars().SQLMode,
}

err = e.DoDDLJob(ctx, job)
args := &model.ModifyTableCommentArgs{Comment: spec.Comment}
job.FillArgs(args)
err = e.doDDLJob2(ctx, job, args)
return errors.Trace(err)
}

Expand Down Expand Up @@ -3607,17 +3616,24 @@ func (e *executor) AlterTableCharsetAndCollate(ctx sessionctx.Context, ident ast
}

job := &model.Job{
Version: model.JobVersion1,
SchemaID: schema.ID,
TableID: tb.Meta().ID,
SchemaName: schema.Name.L,
TableName: tb.Meta().Name.L,
Type: model.ActionModifyTableCharsetAndCollate,
BinlogInfo: &model.HistoryInfo{},
Args: []any{toCharset, toCollate, needsOverwriteCols},
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
SQLMode: ctx.GetSessionVars().SQLMode,
}
err = e.DoDDLJob(ctx, job)

args := &model.ModifyTableCharsetAndCollateArgs{
ToCharset: toCharset,
ToCollate: toCollate,
NeedsOverwriteCols: needsOverwriteCols,
}
job.FillArgs(args)
err = e.doDDLJob2(ctx, job, args)
return errors.Trace(err)
}

Expand Down Expand Up @@ -4955,13 +4971,13 @@ func (e *executor) CreateForeignKey(ctx sessionctx.Context, ti ast.Ident, fkName
}

job := &model.Job{
Version: model.JobVersion1,
SchemaID: schema.ID,
TableID: t.Meta().ID,
SchemaName: schema.Name.L,
TableName: t.Meta().Name.L,
Type: model.ActionAddForeignKey,
BinlogInfo: &model.HistoryInfo{},
Args: []any{fkInfo, fkCheck},
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
InvolvingSchemaInfo: []model.InvolvingSchemaInfo{
{
Expand All @@ -4976,8 +4992,12 @@ func (e *executor) CreateForeignKey(ctx sessionctx.Context, ti ast.Ident, fkName
},
SQLMode: ctx.GetSessionVars().SQLMode,
}

err = e.DoDDLJob(ctx, job)
args := &model.AddForeignKeyArgs{
FkInfo: fkInfo,
FkCheck: fkCheck,
}
job.FillArgs(args)
err = e.doDDLJob2(ctx, job, args)
return errors.Trace(err)
}

Expand All @@ -4994,19 +5014,20 @@ func (e *executor) DropForeignKey(ctx sessionctx.Context, ti ast.Ident, fkName p
}

job := &model.Job{
Version: model.JobVersion1,
SchemaID: schema.ID,
TableID: t.Meta().ID,
SchemaName: schema.Name.L,
SchemaState: model.StatePublic,
TableName: t.Meta().Name.L,
Type: model.ActionDropForeignKey,
BinlogInfo: &model.HistoryInfo{},
Args: []any{fkName},
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
SQLMode: ctx.GetSessionVars().SQLMode,
}

err = e.DoDDLJob(ctx, job)
args := &model.DropForeignKeyArgs{FkName: fkName}
job.FillArgs(args)
err = e.doDDLJob2(ctx, job, args)
return errors.Trace(err)
}

Expand Down Expand Up @@ -5614,18 +5635,22 @@ func (e *executor) AlterIndexVisibility(ctx sessionctx.Context, ident ast.Ident,
}

job := &model.Job{
Version: model.JobVersion1,
SchemaID: schema.ID,
TableID: tb.Meta().ID,
SchemaName: schema.Name.L,
TableName: tb.Meta().Name.L,
Type: model.ActionAlterIndexVisibility,
BinlogInfo: &model.HistoryInfo{},
Args: []any{indexName, invisible},
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
SQLMode: ctx.GetSessionVars().SQLMode,
}

err = e.DoDDLJob(ctx, job)
args := &model.AlterIndexVisibilityArgs{
IndexName: indexName,
Invisible: invisible,
}
job.FillArgs(args)
err = e.doDDLJob2(ctx, job, args)
return errors.Trace(err)
}

Expand Down
17 changes: 8 additions & 9 deletions pkg/ddl/foreign_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,34 +39,34 @@ func (w *worker) onCreateForeignKey(jobCtx *jobContext, t *meta.Meta, job *model
return ver, errors.Trace(err)
}

var fkInfo model.FKInfo
var fkCheck bool
err = job.DecodeArgs(&fkInfo, &fkCheck)
args, err := model.GetAddForeignKeyArgs(job)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
fkInfo, fkCheck := args.FkInfo, args.FkCheck

if job.IsRollingback() {
return dropForeignKey(jobCtx, t, job, tblInfo, fkInfo.Name)
}
switch job.SchemaState {
case model.StateNone:
err = checkAddForeignKeyValidInOwner(jobCtx.infoCache, job.SchemaName, tblInfo, &fkInfo, fkCheck)
err = checkAddForeignKeyValidInOwner(jobCtx.infoCache, job.SchemaName, tblInfo, fkInfo, fkCheck)
if err != nil {
job.State = model.JobStateCancelled
return ver, err
}
fkInfo.State = model.StateWriteOnly
fkInfo.ID = allocateFKIndexID(tblInfo)
tblInfo.ForeignKeys = append(tblInfo.ForeignKeys, &fkInfo)
tblInfo.ForeignKeys = append(tblInfo.ForeignKeys, fkInfo)
ver, err = updateVersionAndTableInfo(jobCtx, t, job, tblInfo, true)
if err != nil {
return ver, errors.Trace(err)
}
job.SchemaState = model.StateWriteOnly
return ver, nil
case model.StateWriteOnly:
err = checkForeignKeyConstrain(w, job.SchemaName, tblInfo.Name.L, &fkInfo, fkCheck)
err = checkForeignKeyConstrain(w, job.SchemaName, tblInfo.Name.L, fkInfo, fkCheck)
if err != nil {
job.State = model.JobStateRollingback
return ver, err
Expand Down Expand Up @@ -99,13 +99,12 @@ func onDropForeignKey(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ver int
return ver, errors.Trace(err)
}

var fkName pmodel.CIStr
err = job.DecodeArgs(&fkName)
args, err := model.GetDropForeignKeyArgs(job)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
return dropForeignKey(jobCtx, t, job, tblInfo, fkName)
return dropForeignKey(jobCtx, t, job, tblInfo, args.FkName)
}

func dropForeignKey(jobCtx *jobContext, t *meta.Meta, job *model.Job, tblInfo *model.TableInfo, fkName pmodel.CIStr) (ver int64, err error) {
Expand Down
13 changes: 9 additions & 4 deletions pkg/ddl/foreign_key_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,34 +58,39 @@ func testCreateForeignKey(t *testing.T, d ddl.ExecutorForTest, ctx sessionctx.Co
}

job := &model.Job{
Version: model.GetJobVerInUse(),
SchemaID: dbInfo.ID,
SchemaName: dbInfo.Name.L,
TableID: tblInfo.ID,
TableName: tblInfo.Name.L,
Type: model.ActionAddForeignKey,
BinlogInfo: &model.HistoryInfo{},
Args: []any{fkInfo},
}
err := sessiontxn.NewTxn(context.Background(), ctx)
require.NoError(t, err)
ctx.SetValue(sessionctx.QueryString, "skip")
err = d.DoDDLJobWrapper(ctx, ddl.NewJobWrapper(job, true))

args := &model.AddForeignKeyArgs{FkInfo: fkInfo}
job.FillArgs(args)
err = d.DoDDLJobWrapper(ctx, ddl.NewJobWrapperWithArgs(job, args, true))
require.NoError(t, err)
return job
}

func testDropForeignKey(t *testing.T, ctx sessionctx.Context, d ddl.ExecutorForTest, dbInfo *model.DBInfo, tblInfo *model.TableInfo, foreignKeyName string) *model.Job {
job := &model.Job{
Version: model.GetJobVerInUse(),
SchemaID: dbInfo.ID,
SchemaName: dbInfo.Name.L,
TableID: tblInfo.ID,
TableName: tblInfo.Name.L,
Type: model.ActionDropForeignKey,
BinlogInfo: &model.HistoryInfo{},
Args: []any{pmodel.NewCIStr(foreignKeyName)},
}
ctx.SetValue(sessionctx.QueryString, "skip")
err := d.DoDDLJobWrapper(ctx, ddl.NewJobWrapper(job, true))
args := &model.DropForeignKeyArgs{FkName: pmodel.NewCIStr(foreignKeyName)}
job.FillArgs(args)
err := d.DoDDLJobWrapper(ctx, ddl.NewJobWrapperWithArgs(job, args, true))
require.NoError(t, err)
v := getSchemaVer(t, ctx)
checkHistoryJobArgs(t, ctx, job.ID, &historyJobArgs{ver: v, tbl: tblInfo})
Expand Down
4 changes: 3 additions & 1 deletion pkg/ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1327,10 +1327,12 @@ func checkAlterIndexVisibility(t *meta.Meta, job *model.Job) (*model.TableInfo,
return nil, indexName, invisible, errors.Trace(err)
}

if err := job.DecodeArgs(&indexName, &invisible); err != nil {
args, err := model.GetAlterIndexVisibilityArgs(job)
if err != nil {
job.State = model.JobStateCancelled
return nil, indexName, invisible, errors.Trace(err)
}
indexName, invisible = args.IndexName, args.Invisible

skip, err := validateAlterIndexVisibility(nil, indexName, invisible, tblInfo)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/ddl/multi_schema_change.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,11 +251,11 @@ func fillMultiSchemaInfo(info *model.MultiSchemaInfo, job *JobWrapper) error {
col := job.Args[0].(*table.Column)
info.ModifyColumns = append(info.ModifyColumns, col.Name)
case model.ActionAlterIndexVisibility:
idxName := job.Args[0].(pmodel.CIStr)
idxName := job.JobArgs.(*model.AlterIndexVisibilityArgs).IndexName
info.AlterIndexes = append(info.AlterIndexes, idxName)
case model.ActionRebaseAutoID, model.ActionModifyTableComment, model.ActionModifyTableCharsetAndCollate:
case model.ActionAddForeignKey:
fkInfo := job.Args[0].(*model.FKInfo)
fkInfo := job.JobArgs.(*model.AddForeignKeyArgs).FkInfo
info.AddForeignKeys = append(info.AddForeignKeys, model.AddForeignKeyInfo{
Name: fkInfo.Name,
Cols: fkInfo.Cols,
Expand Down
22 changes: 10 additions & 12 deletions pkg/ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,17 +592,15 @@ func onRebaseAutoRandomType(jobCtx *jobContext, t *meta.Meta, job *model.Job) (v
}

func onRebaseAutoID(jobCtx *jobContext, t *meta.Meta, job *model.Job, tp autoid.AllocatorType) (ver int64, _ error) {
schemaID := job.SchemaID
var (
newBase int64
force bool
)
err := job.DecodeArgs(&newBase, &force)
args, err := model.GetRebaseAutoIDArgs(job)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

schemaID := job.SchemaID
newBase, force := args.NewBase, args.Force

if job.MultiSchemaInfo != nil && job.MultiSchemaInfo.Revertible {
job.MarkNonRevertible()
return ver, nil
Expand Down Expand Up @@ -987,8 +985,8 @@ func finishJobRenameTables(jobCtx *jobContext, t *meta.Meta, job *model.Job, arg
}

func onModifyTableComment(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ver int64, _ error) {
var comment string
if err := job.DecodeArgs(&comment); err != nil {
args, err := model.GetModifyTableCommentArgs(job)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
Expand All @@ -1003,7 +1001,7 @@ func onModifyTableComment(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ver
return ver, nil
}

tblInfo.Comment = comment
tblInfo.Comment = args.Comment
ver, err = updateVersionAndTableInfo(jobCtx, t, job, tblInfo, true)
if err != nil {
return ver, errors.Trace(err)
Expand All @@ -1013,12 +1011,12 @@ func onModifyTableComment(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ver
}

func onModifyTableCharsetAndCollate(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ver int64, _ error) {
var toCharset, toCollate string
var needsOverwriteCols bool
if err := job.DecodeArgs(&toCharset, &toCollate, &needsOverwriteCols); err != nil {
args, err := model.GetModifyTableCharsetAndCollateArgs(job)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
toCharset, toCollate, needsOverwriteCols := args.ToCharset, args.ToCollate, args.NeedsOverwriteCols

dbInfo, err := checkSchemaExistAndCancelNotExistJob(t, job)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/meta/model/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ go_test(
],
embed = [":model"],
flaky = True,
shard_count = 41,
shard_count = 46,
deps = [
"//pkg/parser/ast",
"//pkg/parser/charset",
Expand Down
1 change: 1 addition & 0 deletions pkg/meta/model/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,7 @@ func (job *Job) GetWarnings() (map[errors.ErrorID]*terror.Error, map[errors.Erro

// FillArgs fills args for new job.
func (job *Job) FillArgs(args JobArgs) {
intest.Assert(job.Version == JobVersion1 || job.Version == JobVersion2, "job version is invalid")
args.fillJob(job)
}

Expand Down
Loading

0 comments on commit 58bece3

Please sign in to comment.