From 91681e733897ac49fe6e04a036163d90a06b9197 Mon Sep 17 00:00:00 2001 From: joccau Date: Thu, 19 Sep 2024 18:23:13 +0800 Subject: [PATCH 01/14] refactor the V2 args for ActionRebaseAutoID, ActionRebaseAutoRandomBase ddl Signed-off-by: joccau --- pkg/ddl/executor.go | 5 +++++ pkg/ddl/table.go | 10 ++++----- pkg/meta/model/job_args.go | 36 +++++++++++++++++++++++++++++++++ pkg/meta/model/job_args_test.go | 16 +++++++++++++++ 4 files changed, 61 insertions(+), 6 deletions(-) diff --git a/pkg/ddl/executor.go b/pkg/ddl/executor.go index a26e5b3cb46c9..0c0919a43f0d1 100644 --- a/pkg/ddl/executor.go +++ b/pkg/ddl/executor.go @@ -2132,6 +2132,7 @@ func (e *executor) RebaseAutoID(ctx sessionctx.Context, ident ast.Ident, newBase newBase = newBaseTemp } job := &model.Job{ + Version: model.GetJobVerInUse(), SchemaID: schema.ID, TableID: tbInfo.ID, SchemaName: schema.Name.L, @@ -2142,6 +2143,10 @@ func (e *executor) RebaseAutoID(ctx sessionctx.Context, ident ast.Ident, newBase CDCWriteSource: ctx.GetSessionVars().CDCWriteSource, SQLMode: ctx.GetSessionVars().SQLMode, } + job.FillArgs(&model.RebaseAutoIDArgs{ + NewBase: newBase, + Force: force, + }) err = e.DoDDLJob(ctx, job) return errors.Trace(err) } diff --git a/pkg/ddl/table.go b/pkg/ddl/table.go index 90a97822a46de..b1f6193210dfe 100644 --- a/pkg/ddl/table.go +++ b/pkg/ddl/table.go @@ -588,17 +588,15 @@ func onRebaseAutoRandomType(jobCtx *jobContext, t *meta.Meta, job *model.Job) (v } func onRebaseAutoID(jobCtx *jobContext, t *meta.Meta, job *model.Job, tp autoid.AllocatorType) (ver int64, _ error) { - schemaID := job.SchemaID - var ( - newBase int64 - force bool - ) - err := job.DecodeArgs(&newBase, &force) + args, err := model.GetRebaseAutoIDArgs(job) if err != nil { job.State = model.JobStateCancelled return ver, errors.Trace(err) } + schemaID := job.SchemaID + newBase, force := args.NewBase, args.Force + if job.MultiSchemaInfo != nil && job.MultiSchemaInfo.Revertible { job.MarkNonRevertible() return ver, nil diff --git a/pkg/meta/model/job_args.go b/pkg/meta/model/job_args.go index d763971a0649d..f9b0b34560052 100644 --- a/pkg/meta/model/job_args.go +++ b/pkg/meta/model/job_args.go @@ -467,3 +467,39 @@ func GetResourceGroupArgs(job *Job) (*ResourceGroupArgs, error) { } return getOrDecodeArgsV2[*ResourceGroupArgs](job) } + +// RebaseAutoIDArgs is the arguments for ActionRebaseAutoID DDL. +// It is also for ActionRebaseAutoRandomBase. +type RebaseAutoIDArgs struct { + NewBase int64 `json:"new_base,omitempty"` + Force bool `json:"force,omitempty"` +} + +func (a *RebaseAutoIDArgs) fillJob(job *Job) { + if job.Version == JobVersion1 { + job.Args = []any{a.NewBase, a.Force} + } else { + job.Args = []any{a} + } +} + +// GetRebaseAutoIDArgs the args for ActionRebaseAutoID/ActionRebaseAutoRandomBase ddl. +func GetRebaseAutoIDArgs(job *Job) (*RebaseAutoIDArgs, error) { + var ( + newBase int64 + force bool + ) + + if job.Version == JobVersion1 { + if err := job.DecodeArgs(&newBase, &force); err != nil { + return nil, errors.Trace(err) + } + return &RebaseAutoIDArgs{ + NewBase: newBase, + Force: force, + }, nil + } + + // for version V2 + return getOrDecodeArgsV2[*RebaseAutoIDArgs](job) +} diff --git a/pkg/meta/model/job_args_test.go b/pkg/meta/model/job_args_test.go index 0ac1520920e36..9db881ee9c884 100644 --- a/pkg/meta/model/job_args_test.go +++ b/pkg/meta/model/job_args_test.go @@ -294,3 +294,19 @@ func TestResourceGroupArgs(t *testing.T) { } } } + +func TestGetRebaseAutoIDArgs(t *testing.T) { + inArgs := &RebaseAutoIDArgs{ + NewBase: 9527, + Force: true, + } + for _, tp := range []ActionType{ActionRebaseAutoID, ActionRebaseAutoRandomBase} { + for _, v := range []JobVersion{JobVersion1, JobVersion2} { + j2 := &Job{} + require.NoError(t, j2.Decode(getJobBytes(t, inArgs, v, tp))) + args, err := GetRebaseAutoIDArgs(j2) + require.NoError(t, err) + require.Equal(t, inArgs, args) + } + } +} From 9602c8701c3e2c53132ae0b83099ff982c2bea19 Mon Sep 17 00:00:00 2001 From: joccau Date: Thu, 19 Sep 2024 19:38:46 +0800 Subject: [PATCH 02/14] refactor v2 job args for ActionModifyTableComment ddl Signed-off-by: joccau --- pkg/ddl/executor.go | 5 ++++- pkg/ddl/table.go | 6 +++--- pkg/meta/model/job_args.go | 30 ++++++++++++++++++++++++++++++ pkg/meta/model/job_args_test.go | 14 ++++++++++++++ 4 files changed, 51 insertions(+), 4 deletions(-) diff --git a/pkg/ddl/executor.go b/pkg/ddl/executor.go index 0c0919a43f0d1..f47c8da0dd168 100644 --- a/pkg/ddl/executor.go +++ b/pkg/ddl/executor.go @@ -3505,17 +3505,20 @@ func (e *executor) AlterTableComment(ctx sessionctx.Context, ident ast.Ident, sp } job := &model.Job{ + Version: model.GetJobVerInUse(), SchemaID: schema.ID, TableID: tb.Meta().ID, SchemaName: schema.Name.L, TableName: tb.Meta().Name.L, Type: model.ActionModifyTableComment, BinlogInfo: &model.HistoryInfo{}, - Args: []any{spec.Comment}, CDCWriteSource: ctx.GetSessionVars().CDCWriteSource, SQLMode: ctx.GetSessionVars().SQLMode, } + job.FillArgs(&model.ModifyTableCommentArgs{ + Comment: spec.Comment, + }) err = e.DoDDLJob(ctx, job) return errors.Trace(err) } diff --git a/pkg/ddl/table.go b/pkg/ddl/table.go index b1f6193210dfe..428530d630787 100644 --- a/pkg/ddl/table.go +++ b/pkg/ddl/table.go @@ -978,8 +978,8 @@ func finishJobRenameTables(jobCtx *jobContext, t *meta.Meta, job *model.Job, } func onModifyTableComment(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ver int64, _ error) { - var comment string - if err := job.DecodeArgs(&comment); err != nil { + args, err := model.GetModifyTableCommentArgs(job) + if err != nil { job.State = model.JobStateCancelled return ver, errors.Trace(err) } @@ -994,7 +994,7 @@ func onModifyTableComment(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ver return ver, nil } - tblInfo.Comment = comment + tblInfo.Comment = args.Comment ver, err = updateVersionAndTableInfo(jobCtx, t, job, tblInfo, true) if err != nil { return ver, errors.Trace(err) diff --git a/pkg/meta/model/job_args.go b/pkg/meta/model/job_args.go index f9b0b34560052..eaee4b00fc957 100644 --- a/pkg/meta/model/job_args.go +++ b/pkg/meta/model/job_args.go @@ -503,3 +503,33 @@ func GetRebaseAutoIDArgs(job *Job) (*RebaseAutoIDArgs, error) { // for version V2 return getOrDecodeArgsV2[*RebaseAutoIDArgs](job) } + +// ModifyTableCommentArgs is the arguments for ActionModifyTableComment ddl. +type ModifyTableCommentArgs struct { + Comment string +} + +func (a *ModifyTableCommentArgs) fillJob(job *Job) { + intest.Assert(job.Version == JobVersion1 || job.Version == JobVersion2, "job version is invalid") + + if job.Version == JobVersion1 { + job.Args = []any{a.Comment} + } else { + job.Args = []any{a} + } +} + +// GetModifyTableCommentArgs gets the args for ActionModifyTableComment. +func GetModifyTableCommentArgs(job *Job) (*ModifyTableCommentArgs, error) { + if job.Version == JobVersion1 { + var comment string + if err := job.DecodeArgs(&comment); err != nil { + return nil, errors.Trace(err) + } + return &ModifyTableCommentArgs{ + Comment: comment, + }, nil + } + + return getOrDecodeArgsV2[*ModifyTableCommentArgs](job) +} diff --git a/pkg/meta/model/job_args_test.go b/pkg/meta/model/job_args_test.go index 9db881ee9c884..600b12785bc52 100644 --- a/pkg/meta/model/job_args_test.go +++ b/pkg/meta/model/job_args_test.go @@ -310,3 +310,17 @@ func TestGetRebaseAutoIDArgs(t *testing.T) { } } } + +func TestGetModifyTableCommentArgs(t *testing.T) { + inArgs := &ModifyTableCommentArgs{ + Comment: "TiDb is great", + } + + for _, v := range []JobVersion{JobVersion1, JobVersion2} { + j2 := &Job{} + require.NoError(t, j2.Decode(getJobBytes(t, inArgs, v, ActionModifyTableComment))) + args, err := GetModifyTableCommentArgs(j2) + require.NoError(t, err) + require.Equal(t, inArgs, args) + } +} From 33d41548e4e1fe652dcc587d8817a5105cb1f6e3 Mon Sep 17 00:00:00 2001 From: joccau Date: Thu, 19 Sep 2024 19:55:03 +0800 Subject: [PATCH 03/14] refactor job args for ActionModifyTableCharsetAndCollate ddl Signed-off-by: joccau --- pkg/ddl/executor.go | 9 +++++++-- pkg/ddl/table.go | 6 +++--- pkg/meta/model/job_args.go | 32 ++++++++++++++++++++++++++++++++ 3 files changed, 42 insertions(+), 5 deletions(-) diff --git a/pkg/ddl/executor.go b/pkg/ddl/executor.go index f47c8da0dd168..5a9fd14ca1235 100644 --- a/pkg/ddl/executor.go +++ b/pkg/ddl/executor.go @@ -2139,7 +2139,6 @@ func (e *executor) RebaseAutoID(ctx sessionctx.Context, ident ast.Ident, newBase TableName: tbInfo.Name.L, Type: actionType, BinlogInfo: &model.HistoryInfo{}, - Args: []any{newBase, force}, CDCWriteSource: ctx.GetSessionVars().CDCWriteSource, SQLMode: ctx.GetSessionVars().SQLMode, } @@ -3590,16 +3589,22 @@ func (e *executor) AlterTableCharsetAndCollate(ctx sessionctx.Context, ident ast } job := &model.Job{ + Version: model.GetJobVerInUse(), SchemaID: schema.ID, TableID: tb.Meta().ID, SchemaName: schema.Name.L, TableName: tb.Meta().Name.L, Type: model.ActionModifyTableCharsetAndCollate, BinlogInfo: &model.HistoryInfo{}, - Args: []any{toCharset, toCollate, needsOverwriteCols}, CDCWriteSource: ctx.GetSessionVars().CDCWriteSource, SQLMode: ctx.GetSessionVars().SQLMode, } + + job.FillArgs(&model.ModifyTableCharsetAndCollateArgs{ + ToCharset: toCharset, + ToCollate: toCollate, + NeedsOverwriteCols: needsOverwriteCols, + }) err = e.DoDDLJob(ctx, job) return errors.Trace(err) } diff --git a/pkg/ddl/table.go b/pkg/ddl/table.go index 428530d630787..ed01080732859 100644 --- a/pkg/ddl/table.go +++ b/pkg/ddl/table.go @@ -1004,12 +1004,12 @@ func onModifyTableComment(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ver } func onModifyTableCharsetAndCollate(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ver int64, _ error) { - var toCharset, toCollate string - var needsOverwriteCols bool - if err := job.DecodeArgs(&toCharset, &toCollate, &needsOverwriteCols); err != nil { + args, err := model.GetModifyTableCharsetAndCollateArgs(job) + if err != nil { job.State = model.JobStateCancelled return ver, errors.Trace(err) } + toCharset, toCollate, needsOverwriteCols := args.ToCharset, args.ToCollate, args.NeedsOverwriteCols dbInfo, err := checkSchemaExistAndCancelNotExistJob(t, job) if err != nil { diff --git a/pkg/meta/model/job_args.go b/pkg/meta/model/job_args.go index eaee4b00fc957..55dc013e9a1f4 100644 --- a/pkg/meta/model/job_args.go +++ b/pkg/meta/model/job_args.go @@ -533,3 +533,35 @@ func GetModifyTableCommentArgs(job *Job) (*ModifyTableCommentArgs, error) { return getOrDecodeArgsV2[*ModifyTableCommentArgs](job) } + +// ModifyTableCharsetAndCollateArgs is the arguments for ActionModifyTableCharsetAndCollate ddl. +type ModifyTableCharsetAndCollateArgs struct { + ToCharset string + ToCollate string + NeedsOverwriteCols bool +} + +func (a *ModifyTableCharsetAndCollateArgs) fillJob(job *Job) { + intest.Assert(job.Version == JobVersion1 || job.Version == JobVersion2, "job version is invalid") + + if job.Version == JobVersion1 { + job.Args = []any{a.ToCharset, a.ToCollate, a.NeedsOverwriteCols} + + } else { + job.Args = []any{a} + } +} + +// GetModifyTableCharsetAndCollateArgs gets the args for ActionModifyTableCharsetAndCollate ddl. +func GetModifyTableCharsetAndCollateArgs(job *Job) (*ModifyTableCharsetAndCollateArgs, error) { + if job.Version == JobVersion1 { + args := &ModifyTableCharsetAndCollateArgs{} + err := job.DecodeArgs(&args.ToCharset, &args.ToCollate, &args.NeedsOverwriteCols) + if err != nil { + return nil, errors.Trace(err) + } + return args, nil + } + + return getOrDecodeArgsV2[*ModifyTableCharsetAndCollateArgs](job) +} From 6171a436fd0095c7f175e8fea4cfa307f7e47ee5 Mon Sep 17 00:00:00 2001 From: joccau Date: Thu, 19 Sep 2024 20:12:11 +0800 Subject: [PATCH 04/14] refactor job args for AlterIndexVisibility ddl. Signed-off-by: joccau --- pkg/ddl/executor.go | 7 +++++-- pkg/ddl/index.go | 4 +++- pkg/ddl/multi_schema_change.go | 7 ++++++- pkg/meta/model/job_args.go | 35 +++++++++++++++++++++++++++++++++ pkg/meta/model/job_args_test.go | 15 ++++++++++++++ 5 files changed, 64 insertions(+), 4 deletions(-) diff --git a/pkg/ddl/executor.go b/pkg/ddl/executor.go index 5a9fd14ca1235..a0eb4b0745397 100644 --- a/pkg/ddl/executor.go +++ b/pkg/ddl/executor.go @@ -5598,17 +5598,20 @@ func (e *executor) AlterIndexVisibility(ctx sessionctx.Context, ident ast.Ident, } job := &model.Job{ + Version: model.GetJobVerInUse(), SchemaID: schema.ID, TableID: tb.Meta().ID, SchemaName: schema.Name.L, TableName: tb.Meta().Name.L, Type: model.ActionAlterIndexVisibility, BinlogInfo: &model.HistoryInfo{}, - Args: []any{indexName, invisible}, CDCWriteSource: ctx.GetSessionVars().CDCWriteSource, SQLMode: ctx.GetSessionVars().SQLMode, } - + job.FillArgs(&model.AlterIndexVisibilityArgs{ + IndexName: indexName, + Invisible: invisible, + }) err = e.DoDDLJob(ctx, job) return errors.Trace(err) } diff --git a/pkg/ddl/index.go b/pkg/ddl/index.go index 2f292703bff7d..44d13b4254305 100644 --- a/pkg/ddl/index.go +++ b/pkg/ddl/index.go @@ -1327,10 +1327,12 @@ func checkAlterIndexVisibility(t *meta.Meta, job *model.Job) (*model.TableInfo, return nil, indexName, invisible, errors.Trace(err) } - if err := job.DecodeArgs(&indexName, &invisible); err != nil { + args, err := model.GetAlterIndexVisibilityArgs(job) + if err != nil { job.State = model.JobStateCancelled return nil, indexName, invisible, errors.Trace(err) } + indexName, invisible = args.IndexName, args.Invisible skip, err := validateAlterIndexVisibility(nil, indexName, invisible, tblInfo) if err != nil { diff --git a/pkg/ddl/multi_schema_change.go b/pkg/ddl/multi_schema_change.go index db675ac8bda26..e5f3f4c1ccb5b 100644 --- a/pkg/ddl/multi_schema_change.go +++ b/pkg/ddl/multi_schema_change.go @@ -251,7 +251,12 @@ func fillMultiSchemaInfo(info *model.MultiSchemaInfo, job *model.Job) (err error col := job.Args[0].(*table.Column) info.ModifyColumns = append(info.ModifyColumns, col.Name) case model.ActionAlterIndexVisibility: - idxName := job.Args[0].(pmodel.CIStr) + var idxName pmodel.CIStr + if job.Version == model.JobVersion1 { + idxName = job.Args[0].(pmodel.CIStr) + } else { + idxName = job.Args[0].(*model.AlterIndexVisibilityArgs).IndexName + } info.AlterIndexes = append(info.AlterIndexes, idxName) case model.ActionRebaseAutoID, model.ActionModifyTableComment, model.ActionModifyTableCharsetAndCollate: case model.ActionAddForeignKey: diff --git a/pkg/meta/model/job_args.go b/pkg/meta/model/job_args.go index 55dc013e9a1f4..a4413556b9f48 100644 --- a/pkg/meta/model/job_args.go +++ b/pkg/meta/model/job_args.go @@ -18,6 +18,7 @@ import ( "encoding/json" "github.com/pingcap/errors" + "github.com/pingcap/tidb/pkg/parser/model" pmodel "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/util/intest" ) @@ -565,3 +566,37 @@ func GetModifyTableCharsetAndCollateArgs(job *Job) (*ModifyTableCharsetAndCollat return getOrDecodeArgsV2[*ModifyTableCharsetAndCollateArgs](job) } + +// AlterIndexVisibilityArgs is the arguments for ActionAlterIndexVisibility ddl. +type AlterIndexVisibilityArgs struct { + IndexName model.CIStr + Invisible bool +} + +func (a *AlterIndexVisibilityArgs) fillJob(job *Job) { + intest.Assert(job.Version == JobVersion1 || job.Version == JobVersion2, "job version is invalid") + + if job.Version == JobVersion1 { + job.Args = []any{a.IndexName, a.Invisible} + } else { + job.Args = []any{a} + } +} + +func GetAlterIndexVisibilityArgs(job *Job) (*AlterIndexVisibilityArgs, error) { + if job.Version == JobVersion1 { + var ( + indexName model.CIStr + invisible bool + ) + if err := job.DecodeArgs(&indexName, &invisible); err != nil { + return nil, errors.Trace(err) + } + return &AlterIndexVisibilityArgs{ + IndexName: indexName, + Invisible: invisible, + }, nil + } + + return getOrDecodeArgsV2[*AlterIndexVisibilityArgs](job) +} diff --git a/pkg/meta/model/job_args_test.go b/pkg/meta/model/job_args_test.go index 600b12785bc52..25e416b6a1e47 100644 --- a/pkg/meta/model/job_args_test.go +++ b/pkg/meta/model/job_args_test.go @@ -324,3 +324,18 @@ func TestGetModifyTableCommentArgs(t *testing.T) { require.Equal(t, inArgs, args) } } + +func TestGetAlterIndexVisibilityArgs(t *testing.T) { + inArgs := &AlterIndexVisibilityArgs{ + IndexName: model.NewCIStr("index-name"), + Invisible: true, + } + + for _, v := range []JobVersion{JobVersion1, JobVersion2} { + j2 := &Job{} + require.NoError(t, j2.Decode(getJobBytes(t, inArgs, v, ActionAlterIndexVisibility))) + args, err := GetAlterIndexVisibilityArgs(j2) + require.NoError(t, err) + require.Equal(t, inArgs, args) + } +} From 5460ef69b71e75819be4e02822d548d78751e529 Mon Sep 17 00:00:00 2001 From: joccau Date: Fri, 20 Sep 2024 09:41:29 +0800 Subject: [PATCH 05/14] refactor job args for AddForeignKey ddl. Signed-off-by: joccau --- pkg/ddl/executor.go | 33 +++++++++++++++++++++++------- pkg/ddl/foreign_key.go | 12 +++++------ pkg/ddl/foreign_key_test.go | 6 ++++-- pkg/ddl/multi_schema_change.go | 9 ++++++++- pkg/meta/model/job.go | 1 + pkg/meta/model/job_args.go | 36 +++++++++++++++++++++++++++++++++ pkg/meta/model/job_args_test.go | 18 +++++++++++++++++ 7 files changed, 99 insertions(+), 16 deletions(-) diff --git a/pkg/ddl/executor.go b/pkg/ddl/executor.go index a0eb4b0745397..944feeb0d93e6 100644 --- a/pkg/ddl/executor.go +++ b/pkg/ddl/executor.go @@ -2007,12 +2007,27 @@ func (e *executor) multiSchemaChange(ctx sessionctx.Context, ti ast.Ident, info Mode: model.SharedInvolving, }) case model.ActionAddForeignKey: - ref, ok := j.Args[0].(*model.FKInfo) - if !ok { - logFn("unexpected type of foreign key info", - zap.Any("args[0]", j.Args[0]), - zap.String("type", fmt.Sprintf("%T", j.Args[0]))) - continue + var ( + ref *model.FKInfo + ok bool + ) + if j.Version == model.JobVersion1 { + ref, ok = j.Args[0].(*model.FKInfo) + if !ok { + logFn("unexpected type of foreign key info", + zap.Any("args[0]", j.Args[0]), + zap.String("type", fmt.Sprintf("%T", j.Args[0]))) + continue + } + } else { + args, ok := j.Args[0].(*model.AddForeignKeyArgs) + if !ok { + logFn("unexpected type of foreign key info", + zap.Any("args[0]", j.Args[0]), + zap.String("type", fmt.Sprintf("%T", j.Args[0]))) + continue + } + ref = args.FkInfo } involvingSchemaInfo = append(involvingSchemaInfo, model.InvolvingSchemaInfo{ Database: ref.RefSchema.L, @@ -4939,13 +4954,13 @@ func (e *executor) CreateForeignKey(ctx sessionctx.Context, ti ast.Ident, fkName } job := &model.Job{ + Version: model.GetJobVerInUse(), SchemaID: schema.ID, TableID: t.Meta().ID, SchemaName: schema.Name.L, TableName: t.Meta().Name.L, Type: model.ActionAddForeignKey, BinlogInfo: &model.HistoryInfo{}, - Args: []any{fkInfo, fkCheck}, CDCWriteSource: ctx.GetSessionVars().CDCWriteSource, InvolvingSchemaInfo: []model.InvolvingSchemaInfo{ { @@ -4961,6 +4976,10 @@ func (e *executor) CreateForeignKey(ctx sessionctx.Context, ti ast.Ident, fkName SQLMode: ctx.GetSessionVars().SQLMode, } + job.FillArgs(&model.AddForeignKeyArgs{ + FkInfo: fkInfo, + FkCheck: fkCheck, + }) err = e.DoDDLJob(ctx, job) return errors.Trace(err) } diff --git a/pkg/ddl/foreign_key.go b/pkg/ddl/foreign_key.go index 244919a5c7496..dac044d987357 100644 --- a/pkg/ddl/foreign_key.go +++ b/pkg/ddl/foreign_key.go @@ -39,26 +39,26 @@ func (w *worker) onCreateForeignKey(jobCtx *jobContext, t *meta.Meta, job *model return ver, errors.Trace(err) } - var fkInfo model.FKInfo - var fkCheck bool - err = job.DecodeArgs(&fkInfo, &fkCheck) + args, err := model.GetAddForeignKeyArgs(job) if err != nil { job.State = model.JobStateCancelled return ver, errors.Trace(err) } + fkInfo, fkCheck := args.FkInfo, args.FkCheck + if job.IsRollingback() { return dropForeignKey(jobCtx, t, job, tblInfo, fkInfo.Name) } switch job.SchemaState { case model.StateNone: - err = checkAddForeignKeyValidInOwner(jobCtx.infoCache, job.SchemaName, tblInfo, &fkInfo, fkCheck) + err = checkAddForeignKeyValidInOwner(jobCtx.infoCache, job.SchemaName, tblInfo, fkInfo, fkCheck) if err != nil { job.State = model.JobStateCancelled return ver, err } fkInfo.State = model.StateWriteOnly fkInfo.ID = allocateFKIndexID(tblInfo) - tblInfo.ForeignKeys = append(tblInfo.ForeignKeys, &fkInfo) + tblInfo.ForeignKeys = append(tblInfo.ForeignKeys, fkInfo) ver, err = updateVersionAndTableInfo(jobCtx, t, job, tblInfo, true) if err != nil { return ver, errors.Trace(err) @@ -66,7 +66,7 @@ func (w *worker) onCreateForeignKey(jobCtx *jobContext, t *meta.Meta, job *model job.SchemaState = model.StateWriteOnly return ver, nil case model.StateWriteOnly: - err = checkForeignKeyConstrain(w, job.SchemaName, tblInfo.Name.L, &fkInfo, fkCheck) + err = checkForeignKeyConstrain(w, job.SchemaName, tblInfo.Name.L, fkInfo, fkCheck) if err != nil { job.State = model.JobStateRollingback return ver, err diff --git a/pkg/ddl/foreign_key_test.go b/pkg/ddl/foreign_key_test.go index 2a7d2227e6df2..e8c873b032dc4 100644 --- a/pkg/ddl/foreign_key_test.go +++ b/pkg/ddl/foreign_key_test.go @@ -64,12 +64,14 @@ func testCreateForeignKey(t *testing.T, d ddl.ExecutorForTest, ctx sessionctx.Co TableName: tblInfo.Name.L, Type: model.ActionAddForeignKey, BinlogInfo: &model.HistoryInfo{}, - Args: []any{fkInfo}, } err := sessiontxn.NewTxn(context.Background(), ctx) require.NoError(t, err) ctx.SetValue(sessionctx.QueryString, "skip") - err = d.DoDDLJobWrapper(ctx, ddl.NewJobWrapper(job, true)) + + args := &model.AddForeignKeyArgs{FkInfo: fkInfo} + job.FillArgs(args) + err = d.DoDDLJobWrapper(ctx, ddl.NewJobWrapperWithArgs(job, args, true)) require.NoError(t, err) return job } diff --git a/pkg/ddl/multi_schema_change.go b/pkg/ddl/multi_schema_change.go index e5f3f4c1ccb5b..c21f7292bc7dd 100644 --- a/pkg/ddl/multi_schema_change.go +++ b/pkg/ddl/multi_schema_change.go @@ -184,6 +184,7 @@ func appendToSubJobs(m *model.MultiSchemaInfo, job *model.Job) error { reorgTp = job.ReorgMeta.ReorgTp } m.SubJobs = append(m.SubJobs, &model.SubJob{ + Version: job.Version, Type: job.Type, Args: job.Args, RawArgs: job.RawArgs, @@ -260,7 +261,13 @@ func fillMultiSchemaInfo(info *model.MultiSchemaInfo, job *model.Job) (err error info.AlterIndexes = append(info.AlterIndexes, idxName) case model.ActionRebaseAutoID, model.ActionModifyTableComment, model.ActionModifyTableCharsetAndCollate: case model.ActionAddForeignKey: - fkInfo := job.Args[0].(*model.FKInfo) + var fkInfo *model.FKInfo + if job.Version == model.JobVersion1 { + fkInfo = job.Args[0].(*model.FKInfo) + } else { + args := job.Args[0].(*model.AddForeignKeyArgs) + fkInfo = args.FkInfo + } info.AddForeignKeys = append(info.AddForeignKeys, model.AddForeignKeyInfo{ Name: fkInfo.Name, Cols: fkInfo.Cols, diff --git a/pkg/meta/model/job.go b/pkg/meta/model/job.go index bec4012419941..d903010e8cf7d 100644 --- a/pkg/meta/model/job.go +++ b/pkg/meta/model/job.go @@ -862,6 +862,7 @@ func (job *Job) GetInvolvingSchemaInfo() []InvolvingSchemaInfo { // SubJob is a representation of one DDL schema change. A Job may contain zero // (when multi-schema change is not applicable) or more SubJobs. type SubJob struct { + Version JobVersion `json:"-"` Type ActionType `json:"type"` Args []any `json:"-"` RawArgs json.RawMessage `json:"raw_args"` diff --git a/pkg/meta/model/job_args.go b/pkg/meta/model/job_args.go index a4413556b9f48..3e4387f2c8a10 100644 --- a/pkg/meta/model/job_args.go +++ b/pkg/meta/model/job_args.go @@ -583,6 +583,7 @@ func (a *AlterIndexVisibilityArgs) fillJob(job *Job) { } } +// GetAlterIndexVisibilityArgs gets the args for AlterIndexVisibility ddl. func GetAlterIndexVisibilityArgs(job *Job) (*AlterIndexVisibilityArgs, error) { if job.Version == JobVersion1 { var ( @@ -600,3 +601,38 @@ func GetAlterIndexVisibilityArgs(job *Job) (*AlterIndexVisibilityArgs, error) { return getOrDecodeArgsV2[*AlterIndexVisibilityArgs](job) } + +// AddForeignKeyArgs is the arguments for ActionAddForeignKey ddl. +type AddForeignKeyArgs struct { + FkInfo *FKInfo + FkCheck bool +} + +func (a *AddForeignKeyArgs) fillJob(job *Job) { + intest.Assert(job.Version == JobVersion1 || job.Version == JobVersion2, "job version is invalid") + + if job.Version == JobVersion1 { + job.Args = []any{a.FkInfo, a.FkCheck} + } else { + job.Args = []any{a} + } +} + +// GetAddForeignKeyArgs get the args for ActionAddForeignKey ddl. +func GetAddForeignKeyArgs(job *Job) (*AddForeignKeyArgs, error) { + if job.Version == JobVersion1 { + var ( + fkInfo *FKInfo + fkCheck bool + ) + if err := job.DecodeArgs(&fkInfo, &fkCheck); err != nil { + return nil, errors.Trace(err) + } + return &AddForeignKeyArgs{ + FkInfo: fkInfo, + FkCheck: fkCheck, + }, nil + } + + return getOrDecodeArgsV2[*AddForeignKeyArgs](job) +} diff --git a/pkg/meta/model/job_args_test.go b/pkg/meta/model/job_args_test.go index 25e416b6a1e47..040e3a1a86423 100644 --- a/pkg/meta/model/job_args_test.go +++ b/pkg/meta/model/job_args_test.go @@ -339,3 +339,21 @@ func TestGetAlterIndexVisibilityArgs(t *testing.T) { require.Equal(t, inArgs, args) } } + +func TestGetAddForeignKeyArgs(t *testing.T) { + inArgs := &AddForeignKeyArgs{ + FkInfo: &FKInfo{ + ID: 7527, + Name: model.NewCIStr("fk-name"), + }, + FkCheck: true, + } + + for _, v := range []JobVersion{JobVersion1, JobVersion2} { + j2 := &Job{} + require.NoError(t, j2.Decode(getJobBytes(t, inArgs, v, ActionAddForeignKey))) + args, err := GetAddForeignKeyArgs(j2) + require.NoError(t, err) + require.Equal(t, inArgs, args) + } +} From 9e80db116cd2739dcda03dd078590997fef99f64 Mon Sep 17 00:00:00 2001 From: joccau Date: Mon, 16 Sep 2024 14:00:34 +0800 Subject: [PATCH 06/14] marsher args in job and subjob Signed-off-by: joccau --- pkg/meta/model/job.go | 64 +++++++++++++++++++++++++------------------ 1 file changed, 38 insertions(+), 26 deletions(-) diff --git a/pkg/meta/model/job.go b/pkg/meta/model/job.go index 24c773e50d43c..3a174d8eeb2a2 100644 --- a/pkg/meta/model/job.go +++ b/pkg/meta/model/job.go @@ -490,39 +490,51 @@ func (job *Job) FillFinishedArgs(args FinishedJobArgs) { args.fillFinishedJob(job) } +func marshalArgs(jobVer JobVersion, args []any) (json.RawMessage, error) { + if jobVer <= JobVersion1 { + rawArgs, err := json.Marshal(args) + if err != nil { + return nil, errors.Trace(err) + } + return rawArgs, nil + } + + intest.Assert(jobVer == JobVersion2, "job version is not v2") + var arg any + if len(args) > 0 { + intest.Assert(len(args) == 1, "Job.Args should have only one element") + arg = args[0] + } + + rawArgs, err := json.Marshal(arg) + if err != nil { + return nil, errors.Trace(err) + } + return rawArgs, nil +} + // Encode encodes job with json format. // updateRawArgs is used to determine whether to update the raw args. func (job *Job) Encode(updateRawArgs bool) ([]byte, error) { var err error if updateRawArgs { - if job.Version == JobVersion1 { - job.RawArgs, err = json.Marshal(job.Args) - if err != nil { - return nil, errors.Trace(err) - } - if job.MultiSchemaInfo != nil { - for _, sub := range job.MultiSchemaInfo.SubJobs { - // Only update the args of executing sub-jobs. - if sub.Args == nil { - continue - } - sub.RawArgs, err = json.Marshal(sub.Args) - if err != nil { - return nil, errors.Trace(err) - } + job.RawArgs, err = marshalArgs(job.Version, job.Args) + if err != nil { + return nil, errors.Trace(err) + } + + if job.MultiSchemaInfo != nil { + for _, sub := range job.MultiSchemaInfo.SubJobs { + // Only update the args of executing sub-jobs. + if sub.Args == nil { + continue + } + + sub.RawArgs, err = marshalArgs(job.Version, sub.Args) + if err != nil { + return nil, errors.Trace(err) } } - } else { - var arg any - if len(job.Args) > 0 { - intest.Assert(len(job.Args) == 1, "Job.Args should have only one element") - arg = job.Args[0] - } - job.RawArgs, err = json.Marshal(arg) - if err != nil { - return nil, errors.Trace(err) - } - // TODO remember update sub-jobs' RawArgs when we do it. } } From 9cb17237e785424f3b2554d569e73586704f7594 Mon Sep 17 00:00:00 2001 From: joccau Date: Thu, 19 Sep 2024 18:42:43 +0800 Subject: [PATCH 07/14] add field version into subjob Signed-off-by: joccau --- pkg/meta/model/job.go | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/pkg/meta/model/job.go b/pkg/meta/model/job.go index 3a174d8eeb2a2..5f386e2dcbe75 100644 --- a/pkg/meta/model/job.go +++ b/pkg/meta/model/job.go @@ -530,7 +530,7 @@ func (job *Job) Encode(updateRawArgs bool) ([]byte, error) { continue } - sub.RawArgs, err = marshalArgs(job.Version, sub.Args) + sub.RawArgs, err = marshalArgs(sub.Version, sub.Args) if err != nil { return nil, errors.Trace(err) } @@ -874,7 +874,7 @@ func (job *Job) GetInvolvingSchemaInfo() []InvolvingSchemaInfo { // SubJob is a representation of one DDL schema change. A Job may contain zero // (when multi-schema change is not applicable) or more SubJobs. type SubJob struct { - Version JobVersion `json:"-"` + Version JobVersion `json:"version"` Type ActionType `json:"type"` Args []any `json:"-"` RawArgs json.RawMessage `json:"raw_args"` @@ -911,6 +911,17 @@ func (sub *SubJob) IsFinished() bool { // ToProxyJob converts a sub-job to a proxy job. func (sub *SubJob) ToProxyJob(parentJob *Job, seq int) Job { + var jobVer JobVersion + // because in mock test case. the version = V1 in ActionMultiSchemaChange, but maybe the version = v2 in subjob. + // we should retain the version from subjob. + // to do: + // we should set Version = sub.Version, after refactor all of DDL type. + if sub.Version == JobVersion2 { + jobVer = JobVersion2 + } else { + jobVer = parentJob.Version + } + return Job{ ID: parentJob.ID, Type: sub.Type, @@ -933,7 +944,7 @@ func (sub *SubJob) ToProxyJob(parentJob *Job, seq int) Job { DependencyID: parentJob.DependencyID, Query: parentJob.Query, BinlogInfo: parentJob.BinlogInfo, - Version: parentJob.Version, + Version: jobVer, ReorgMeta: parentJob.ReorgMeta, MultiSchemaInfo: &MultiSchemaInfo{Revertible: sub.Revertible, Seq: int32(seq)}, Priority: parentJob.Priority, From b6d59814d359031d300159bb612b6b07203981f5 Mon Sep 17 00:00:00 2001 From: joccau Date: Fri, 20 Sep 2024 15:25:46 +0800 Subject: [PATCH 08/14] make build Signed-off-by: joccau --- pkg/ddl/executor.go | 38 +++++++++++++++++++------------------ pkg/ddl/foreign_key_test.go | 1 + pkg/meta/model/BUILD.bazel | 2 +- pkg/meta/model/job_args.go | 7 ++----- 4 files changed, 24 insertions(+), 24 deletions(-) diff --git a/pkg/ddl/executor.go b/pkg/ddl/executor.go index 7b21032e45126..ead4f2dc85281 100644 --- a/pkg/ddl/executor.go +++ b/pkg/ddl/executor.go @@ -2157,11 +2157,13 @@ func (e *executor) RebaseAutoID(ctx sessionctx.Context, ident ast.Ident, newBase CDCWriteSource: ctx.GetSessionVars().CDCWriteSource, SQLMode: ctx.GetSessionVars().SQLMode, } - job.FillArgs(&model.RebaseAutoIDArgs{ + args := &model.RebaseAutoIDArgs{ NewBase: newBase, Force: force, - }) - err = e.DoDDLJob(ctx, job) + } + // need fill args, the job will be pushed subjob. + job.FillArgs(args) + err = e.doDDLJob2(ctx, job, args) return errors.Trace(err) } @@ -3529,11 +3531,9 @@ func (e *executor) AlterTableComment(ctx sessionctx.Context, ident ast.Ident, sp CDCWriteSource: ctx.GetSessionVars().CDCWriteSource, SQLMode: ctx.GetSessionVars().SQLMode, } - - job.FillArgs(&model.ModifyTableCommentArgs{ - Comment: spec.Comment, - }) - err = e.DoDDLJob(ctx, job) + args := &model.ModifyTableCommentArgs{Comment: spec.Comment} + job.FillArgs(args) + err = e.doDDLJob2(ctx, job, args) return errors.Trace(err) } @@ -3615,12 +3615,13 @@ func (e *executor) AlterTableCharsetAndCollate(ctx sessionctx.Context, ident ast SQLMode: ctx.GetSessionVars().SQLMode, } - job.FillArgs(&model.ModifyTableCharsetAndCollateArgs{ + args := &model.ModifyTableCharsetAndCollateArgs{ ToCharset: toCharset, ToCollate: toCollate, NeedsOverwriteCols: needsOverwriteCols, - }) - err = e.DoDDLJob(ctx, job) + } + job.FillArgs(args) + err = e.doDDLJob2(ctx, job, args) return errors.Trace(err) } @@ -4973,12 +4974,12 @@ func (e *executor) CreateForeignKey(ctx sessionctx.Context, ti ast.Ident, fkName }, SQLMode: ctx.GetSessionVars().SQLMode, } - - job.FillArgs(&model.AddForeignKeyArgs{ + args := &model.AddForeignKeyArgs{ FkInfo: fkInfo, FkCheck: fkCheck, - }) - err = e.DoDDLJob(ctx, job) + } + job.FillArgs(args) + err = e.doDDLJob2(ctx, job, args) return errors.Trace(err) } @@ -5625,11 +5626,12 @@ func (e *executor) AlterIndexVisibility(ctx sessionctx.Context, ident ast.Ident, CDCWriteSource: ctx.GetSessionVars().CDCWriteSource, SQLMode: ctx.GetSessionVars().SQLMode, } - job.FillArgs(&model.AlterIndexVisibilityArgs{ + args := &model.AlterIndexVisibilityArgs{ IndexName: indexName, Invisible: invisible, - }) - err = e.DoDDLJob(ctx, job) + } + job.FillArgs(args) + err = e.doDDLJob2(ctx, job, args) return errors.Trace(err) } diff --git a/pkg/ddl/foreign_key_test.go b/pkg/ddl/foreign_key_test.go index e8c873b032dc4..9e93343cb67b8 100644 --- a/pkg/ddl/foreign_key_test.go +++ b/pkg/ddl/foreign_key_test.go @@ -58,6 +58,7 @@ func testCreateForeignKey(t *testing.T, d ddl.ExecutorForTest, ctx sessionctx.Co } job := &model.Job{ + Version: model.GetJobVerInUse(), SchemaID: dbInfo.ID, SchemaName: dbInfo.Name.L, TableID: tblInfo.ID, diff --git a/pkg/meta/model/BUILD.bazel b/pkg/meta/model/BUILD.bazel index 265e73cf30c8a..02c8f7f728309 100644 --- a/pkg/meta/model/BUILD.bazel +++ b/pkg/meta/model/BUILD.bazel @@ -44,7 +44,7 @@ go_test( ], embed = [":model"], flaky = True, - shard_count = 34, + shard_count = 38, deps = [ "//pkg/parser/charset", "//pkg/parser/model", diff --git a/pkg/meta/model/job_args.go b/pkg/meta/model/job_args.go index 0cdd9fa78c4f6..505215d9e36fb 100644 --- a/pkg/meta/model/job_args.go +++ b/pkg/meta/model/job_args.go @@ -18,7 +18,6 @@ import ( "encoding/json" "github.com/pingcap/errors" - "github.com/pingcap/tidb/pkg/parser/model" pmodel "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/util/intest" ) @@ -558,10 +557,8 @@ type ModifyTableCharsetAndCollateArgs struct { func (a *ModifyTableCharsetAndCollateArgs) fillJob(job *Job) { intest.Assert(job.Version == JobVersion1 || job.Version == JobVersion2, "job version is invalid") - if job.Version == JobVersion1 { job.Args = []any{a.ToCharset, a.ToCollate, a.NeedsOverwriteCols} - } else { job.Args = []any{a} } @@ -583,7 +580,7 @@ func GetModifyTableCharsetAndCollateArgs(job *Job) (*ModifyTableCharsetAndCollat // AlterIndexVisibilityArgs is the arguments for ActionAlterIndexVisibility ddl. type AlterIndexVisibilityArgs struct { - IndexName model.CIStr + IndexName pmodel.CIStr Invisible bool } @@ -601,7 +598,7 @@ func (a *AlterIndexVisibilityArgs) fillJob(job *Job) { func GetAlterIndexVisibilityArgs(job *Job) (*AlterIndexVisibilityArgs, error) { if job.Version == JobVersion1 { var ( - indexName model.CIStr + indexName pmodel.CIStr invisible bool ) if err := job.DecodeArgs(&indexName, &invisible); err != nil { From 8aa895b81482920cfedf725a0de3b659d1766295 Mon Sep 17 00:00:00 2001 From: joccau Date: Fri, 20 Sep 2024 16:56:39 +0800 Subject: [PATCH 09/14] refactor job args for dropForeignKey ddl. Signed-off-by: joccau --- pkg/ddl/executor.go | 7 ++++--- pkg/ddl/foreign_key.go | 5 ++--- pkg/ddl/foreign_key_test.go | 7 +++++-- pkg/meta/model/job_args.go | 29 ++++++++++++++++++++++++++++- pkg/meta/model/job_args_test.go | 14 ++++++++++++++ 5 files changed, 53 insertions(+), 9 deletions(-) diff --git a/pkg/ddl/executor.go b/pkg/ddl/executor.go index ead4f2dc85281..0e77671320e4d 100644 --- a/pkg/ddl/executor.go +++ b/pkg/ddl/executor.go @@ -4996,6 +4996,7 @@ func (e *executor) DropForeignKey(ctx sessionctx.Context, ti ast.Ident, fkName p } job := &model.Job{ + Version: model.GetJobVerInUse(), SchemaID: schema.ID, TableID: t.Meta().ID, SchemaName: schema.Name.L, @@ -5003,12 +5004,12 @@ func (e *executor) DropForeignKey(ctx sessionctx.Context, ti ast.Ident, fkName p TableName: t.Meta().Name.L, Type: model.ActionDropForeignKey, BinlogInfo: &model.HistoryInfo{}, - Args: []any{fkName}, CDCWriteSource: ctx.GetSessionVars().CDCWriteSource, SQLMode: ctx.GetSessionVars().SQLMode, } - - err = e.DoDDLJob(ctx, job) + args := &model.DropForeignKeyArgs{FkName: fkName} + job.FillArgs(args) + err = e.doDDLJob2(ctx, job, args) return errors.Trace(err) } diff --git a/pkg/ddl/foreign_key.go b/pkg/ddl/foreign_key.go index dac044d987357..6896197d27e7f 100644 --- a/pkg/ddl/foreign_key.go +++ b/pkg/ddl/foreign_key.go @@ -99,13 +99,12 @@ func onDropForeignKey(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ver int return ver, errors.Trace(err) } - var fkName pmodel.CIStr - err = job.DecodeArgs(&fkName) + args, err := model.GetDropForeignKeyArgs(job) if err != nil { job.State = model.JobStateCancelled return ver, errors.Trace(err) } - return dropForeignKey(jobCtx, t, job, tblInfo, fkName) + return dropForeignKey(jobCtx, t, job, tblInfo, args.FkName) } func dropForeignKey(jobCtx *jobContext, t *meta.Meta, job *model.Job, tblInfo *model.TableInfo, fkName pmodel.CIStr) (ver int64, err error) { diff --git a/pkg/ddl/foreign_key_test.go b/pkg/ddl/foreign_key_test.go index 9e93343cb67b8..752148a14c133 100644 --- a/pkg/ddl/foreign_key_test.go +++ b/pkg/ddl/foreign_key_test.go @@ -79,16 +79,19 @@ func testCreateForeignKey(t *testing.T, d ddl.ExecutorForTest, ctx sessionctx.Co func testDropForeignKey(t *testing.T, ctx sessionctx.Context, d ddl.ExecutorForTest, dbInfo *model.DBInfo, tblInfo *model.TableInfo, foreignKeyName string) *model.Job { job := &model.Job{ + Version: model.GetJobVerInUse(), SchemaID: dbInfo.ID, SchemaName: dbInfo.Name.L, TableID: tblInfo.ID, TableName: tblInfo.Name.L, Type: model.ActionDropForeignKey, BinlogInfo: &model.HistoryInfo{}, - Args: []any{pmodel.NewCIStr(foreignKeyName)}, + Args: []any{}, } ctx.SetValue(sessionctx.QueryString, "skip") - err := d.DoDDLJobWrapper(ctx, ddl.NewJobWrapper(job, true)) + args := &model.DropForeignKeyArgs{FkName: pmodel.NewCIStr(foreignKeyName)} + job.FillArgs(args) + err := d.DoDDLJobWrapper(ctx, ddl.NewJobWrapperWithArgs(job, args, true)) require.NoError(t, err) v := getSchemaVer(t, ctx) checkHistoryJobArgs(t, ctx, job.ID, &historyJobArgs{ver: v, tbl: tblInfo}) diff --git a/pkg/meta/model/job_args.go b/pkg/meta/model/job_args.go index 505215d9e36fb..d301396e255d6 100644 --- a/pkg/meta/model/job_args.go +++ b/pkg/meta/model/job_args.go @@ -629,7 +629,7 @@ func (a *AddForeignKeyArgs) fillJob(job *Job) { } } -// GetAddForeignKeyArgs get the args for ActionAddForeignKey ddl. +// GetAddForeignKeyArgs get the args for AddForeignKey ddl. func GetAddForeignKeyArgs(job *Job) (*AddForeignKeyArgs, error) { if job.Version == JobVersion1 { var ( @@ -648,6 +648,33 @@ func GetAddForeignKeyArgs(job *Job) (*AddForeignKeyArgs, error) { return getOrDecodeArgsV2[*AddForeignKeyArgs](job) } +// DropForeignKeyArgs is the arguments for DropForeignKey ddl. +type DropForeignKeyArgs struct { + FkName pmodel.CIStr +} + +func (a *DropForeignKeyArgs) fillJob(job *Job) { + intest.Assert(job.Version == JobVersion1 || job.Version == JobVersion2, "job version is invalid") + if job.Version == JobVersion1 { + job.Args = []any{a.FkName} + } else { + job.Args = []any{a} + } +} + +// GetDropForeignKeyArgs gets the args for DropForeignKey ddl. +func GetDropForeignKeyArgs(job *Job) (*DropForeignKeyArgs, error) { + if job.Version == JobVersion1 { + var fkName pmodel.CIStr + if err := job.DecodeArgs(&fkName); err != nil { + return nil, errors.Trace(err) + } + return &DropForeignKeyArgs{FkName: fkName}, nil + } + + return getOrDecodeArgsV2[*DropForeignKeyArgs](job) +} + // RenameTablesArgs is the arguments for rename tables job. type RenameTablesArgs struct { RenameTableInfos []*RenameTableArgs `json:"rename_table_infos,omitempty"` diff --git a/pkg/meta/model/job_args_test.go b/pkg/meta/model/job_args_test.go index c8dec55389a14..d732a0a44d833 100644 --- a/pkg/meta/model/job_args_test.go +++ b/pkg/meta/model/job_args_test.go @@ -376,3 +376,17 @@ func TestGetAddForeignKeyArgs(t *testing.T) { require.Equal(t, inArgs, args) } } + +func TestGetDropForeignKeyArgs(t *testing.T) { + inArgs := &DropForeignKeyArgs{ + FkName: model.NewCIStr("fk-name"), + } + + for _, v := range []JobVersion{JobVersion1, JobVersion2} { + j2 := &Job{} + require.NoError(t, j2.Decode(getJobBytes(t, inArgs, v, ActionDropForeignKey))) + args, err := GetDropForeignKeyArgs(j2) + require.NoError(t, err) + require.Equal(t, inArgs, args) + } +} From 7386de42d043b5db32fa7fbccd23efd07e3bb005 Mon Sep 17 00:00:00 2001 From: joccau Date: Mon, 23 Sep 2024 11:12:15 +0800 Subject: [PATCH 10/14] remove useless code and add json comments for struct. Signed-off-by: joccau --- pkg/ddl/foreign_key_test.go | 1 - pkg/meta/model/job.go | 1 + pkg/meta/model/job_args.go | 26 +++++++++----------------- 3 files changed, 10 insertions(+), 18 deletions(-) diff --git a/pkg/ddl/foreign_key_test.go b/pkg/ddl/foreign_key_test.go index 752148a14c133..a481525bdd0bd 100644 --- a/pkg/ddl/foreign_key_test.go +++ b/pkg/ddl/foreign_key_test.go @@ -86,7 +86,6 @@ func testDropForeignKey(t *testing.T, ctx sessionctx.Context, d ddl.ExecutorForT TableName: tblInfo.Name.L, Type: model.ActionDropForeignKey, BinlogInfo: &model.HistoryInfo{}, - Args: []any{}, } ctx.SetValue(sessionctx.QueryString, "skip") args := &model.DropForeignKeyArgs{FkName: pmodel.NewCIStr(foreignKeyName)} diff --git a/pkg/meta/model/job.go b/pkg/meta/model/job.go index 769e904a1bc29..24b0bd14b73b1 100644 --- a/pkg/meta/model/job.go +++ b/pkg/meta/model/job.go @@ -482,6 +482,7 @@ func (job *Job) GetWarnings() (map[errors.ErrorID]*terror.Error, map[errors.Erro // FillArgs fills args for new job. func (job *Job) FillArgs(args JobArgs) { + intest.Assert(job.Version == JobVersion1 || job.Version == JobVersion2, "job version is invalid") args.fillJob(job) } diff --git a/pkg/meta/model/job_args.go b/pkg/meta/model/job_args.go index 4a70288dd1af1..332096863b41e 100644 --- a/pkg/meta/model/job_args.go +++ b/pkg/meta/model/job_args.go @@ -645,12 +645,10 @@ func GetRebaseAutoIDArgs(job *Job) (*RebaseAutoIDArgs, error) { // ModifyTableCommentArgs is the arguments for ActionModifyTableComment ddl. type ModifyTableCommentArgs struct { - Comment string + Comment string `json:"comment,omitempty"` } func (a *ModifyTableCommentArgs) fillJob(job *Job) { - intest.Assert(job.Version == JobVersion1 || job.Version == JobVersion2, "job version is invalid") - if job.Version == JobVersion1 { job.Args = []any{a.Comment} } else { @@ -675,13 +673,12 @@ func GetModifyTableCommentArgs(job *Job) (*ModifyTableCommentArgs, error) { // ModifyTableCharsetAndCollateArgs is the arguments for ActionModifyTableCharsetAndCollate ddl. type ModifyTableCharsetAndCollateArgs struct { - ToCharset string - ToCollate string - NeedsOverwriteCols bool + ToCharset string `json:"to_charset,omitempty"` + ToCollate string `json:"to_collate,omitempty"` + NeedsOverwriteCols bool `json:"needs_overwrite_cols,omitempty"` } func (a *ModifyTableCharsetAndCollateArgs) fillJob(job *Job) { - intest.Assert(job.Version == JobVersion1 || job.Version == JobVersion2, "job version is invalid") if job.Version == JobVersion1 { job.Args = []any{a.ToCharset, a.ToCollate, a.NeedsOverwriteCols} } else { @@ -705,13 +702,11 @@ func GetModifyTableCharsetAndCollateArgs(job *Job) (*ModifyTableCharsetAndCollat // AlterIndexVisibilityArgs is the arguments for ActionAlterIndexVisibility ddl. type AlterIndexVisibilityArgs struct { - IndexName pmodel.CIStr - Invisible bool + IndexName pmodel.CIStr `json:"index_name,omitempty"` + Invisible bool `json:"invisible,omitempty"` } func (a *AlterIndexVisibilityArgs) fillJob(job *Job) { - intest.Assert(job.Version == JobVersion1 || job.Version == JobVersion2, "job version is invalid") - if job.Version == JobVersion1 { job.Args = []any{a.IndexName, a.Invisible} } else { @@ -740,13 +735,11 @@ func GetAlterIndexVisibilityArgs(job *Job) (*AlterIndexVisibilityArgs, error) { // AddForeignKeyArgs is the arguments for ActionAddForeignKey ddl. type AddForeignKeyArgs struct { - FkInfo *FKInfo - FkCheck bool + FkInfo *FKInfo `json:"fk_info,omitempty"` + FkCheck bool `json:"fk_check,omitempty"` } func (a *AddForeignKeyArgs) fillJob(job *Job) { - intest.Assert(job.Version == JobVersion1 || job.Version == JobVersion2, "job version is invalid") - if job.Version == JobVersion1 { job.Args = []any{a.FkInfo, a.FkCheck} } else { @@ -775,11 +768,10 @@ func GetAddForeignKeyArgs(job *Job) (*AddForeignKeyArgs, error) { // DropForeignKeyArgs is the arguments for DropForeignKey ddl. type DropForeignKeyArgs struct { - FkName pmodel.CIStr + FkName pmodel.CIStr `json:"fk_name,omitempty"` } func (a *DropForeignKeyArgs) fillJob(job *Job) { - intest.Assert(job.Version == JobVersion1 || job.Version == JobVersion2, "job version is invalid") if job.Version == JobVersion1 { job.Args = []any{a.FkName} } else { From 2bea1a35eccf7a3be86b15a6d647185bbf822f30 Mon Sep 17 00:00:00 2001 From: joccau Date: Mon, 23 Sep 2024 11:23:06 +0800 Subject: [PATCH 11/14] remove version in subjob. Signed-off-by: joccau --- pkg/ddl/multi_schema_change.go | 1 - pkg/meta/model/job.go | 16 ++-------------- 2 files changed, 2 insertions(+), 15 deletions(-) diff --git a/pkg/ddl/multi_schema_change.go b/pkg/ddl/multi_schema_change.go index 798ed56fb8d38..a6bf2346640b5 100644 --- a/pkg/ddl/multi_schema_change.go +++ b/pkg/ddl/multi_schema_change.go @@ -184,7 +184,6 @@ func appendToSubJobs(m *model.MultiSchemaInfo, jobW *JobWrapper) error { reorgTp = jobW.ReorgMeta.ReorgTp } m.SubJobs = append(m.SubJobs, &model.SubJob{ - Version: jobW.Version, Type: jobW.Type, Args: jobW.Args, RawArgs: jobW.RawArgs, diff --git a/pkg/meta/model/job.go b/pkg/meta/model/job.go index 24b0bd14b73b1..2f244542b24ca 100644 --- a/pkg/meta/model/job.go +++ b/pkg/meta/model/job.go @@ -525,7 +525,7 @@ func (job *Job) Encode(updateRawArgs bool) ([]byte, error) { continue } - sub.RawArgs, err = marshalArgs(sub.Version, sub.Args) + sub.RawArgs, err = marshalArgs(job.Version, sub.Args) if err != nil { return nil, errors.Trace(err) } @@ -869,7 +869,6 @@ func (job *Job) GetInvolvingSchemaInfo() []InvolvingSchemaInfo { // SubJob is a representation of one DDL schema change. A Job may contain zero // (when multi-schema change is not applicable) or more SubJobs. type SubJob struct { - Version JobVersion `json:"version"` Type ActionType `json:"type"` Args []any `json:"-"` RawArgs json.RawMessage `json:"raw_args"` @@ -906,17 +905,6 @@ func (sub *SubJob) IsFinished() bool { // ToProxyJob converts a sub-job to a proxy job. func (sub *SubJob) ToProxyJob(parentJob *Job, seq int) Job { - var jobVer JobVersion - // because in mock test case. the version = V1 in ActionMultiSchemaChange, but maybe the version = v2 in subjob. - // we should retain the version from subjob. - // to do: - // we should set Version = sub.Version, after refactor all of DDL type. - if sub.Version == JobVersion2 { - jobVer = JobVersion2 - } else { - jobVer = parentJob.Version - } - return Job{ ID: parentJob.ID, Type: sub.Type, @@ -939,7 +927,7 @@ func (sub *SubJob) ToProxyJob(parentJob *Job, seq int) Job { DependencyID: parentJob.DependencyID, Query: parentJob.Query, BinlogInfo: parentJob.BinlogInfo, - Version: jobVer, + Version: parentJob.Version, ReorgMeta: parentJob.ReorgMeta, MultiSchemaInfo: &MultiSchemaInfo{Revertible: sub.Revertible, Seq: int32(seq)}, Priority: parentJob.Priority, From 444b07ecb6ac4b93fa2e564bd8120f4f2e528cbf Mon Sep 17 00:00:00 2001 From: joccau Date: Mon, 23 Sep 2024 11:40:50 +0800 Subject: [PATCH 12/14] support jobversion1 currently Signed-off-by: joccau --- pkg/ddl/executor.go | 41 ++++++++++++++--------------------------- 1 file changed, 14 insertions(+), 27 deletions(-) diff --git a/pkg/ddl/executor.go b/pkg/ddl/executor.go index f155aaa9bb061..01830e91b701e 100644 --- a/pkg/ddl/executor.go +++ b/pkg/ddl/executor.go @@ -1988,6 +1988,8 @@ func (e *executor) multiSchemaChange(ctx sessionctx.Context, ti ast.Ident, info logFn = logutil.DDLLogger().Fatal } + // to do:(joccau) + // we need refactor this part to support V2 job version after refactor all of ddl types. var involvingSchemaInfo []model.InvolvingSchemaInfo for _, j := range subJobs { switch j.Type { @@ -2007,27 +2009,12 @@ func (e *executor) multiSchemaChange(ctx sessionctx.Context, ti ast.Ident, info Mode: model.SharedInvolving, }) case model.ActionAddForeignKey: - var ( - ref *model.FKInfo - ok bool - ) - if j.Version == model.JobVersion1 { - ref, ok = j.Args[0].(*model.FKInfo) - if !ok { - logFn("unexpected type of foreign key info", - zap.Any("args[0]", j.Args[0]), - zap.String("type", fmt.Sprintf("%T", j.Args[0]))) - continue - } - } else { - args, ok := j.Args[0].(*model.AddForeignKeyArgs) - if !ok { - logFn("unexpected type of foreign key info", - zap.Any("args[0]", j.Args[0]), - zap.String("type", fmt.Sprintf("%T", j.Args[0]))) - continue - } - ref = args.FkInfo + ref, ok := j.Args[0].(*model.FKInfo) + if !ok { + logFn("unexpected type of foreign key info", + zap.Any("args[0]", j.Args[0]), + zap.String("type", fmt.Sprintf("%T", j.Args[0]))) + continue } involvingSchemaInfo = append(involvingSchemaInfo, model.InvolvingSchemaInfo{ Database: ref.RefSchema.L, @@ -2147,7 +2134,7 @@ func (e *executor) RebaseAutoID(ctx sessionctx.Context, ident ast.Ident, newBase newBase = newBaseTemp } job := &model.Job{ - Version: model.GetJobVerInUse(), + Version: model.JobVersion1, SchemaID: schema.ID, TableID: tbInfo.ID, SchemaName: schema.Name.L, @@ -3546,7 +3533,7 @@ func (e *executor) AlterTableComment(ctx sessionctx.Context, ident ast.Ident, sp } job := &model.Job{ - Version: model.GetJobVerInUse(), + Version: model.JobVersion1, SchemaID: schema.ID, TableID: tb.Meta().ID, SchemaName: schema.Name.L, @@ -3629,7 +3616,7 @@ func (e *executor) AlterTableCharsetAndCollate(ctx sessionctx.Context, ident ast } job := &model.Job{ - Version: model.GetJobVerInUse(), + Version: model.JobVersion1, SchemaID: schema.ID, TableID: tb.Meta().ID, SchemaName: schema.Name.L, @@ -4978,7 +4965,7 @@ func (e *executor) CreateForeignKey(ctx sessionctx.Context, ti ast.Ident, fkName } job := &model.Job{ - Version: model.GetJobVerInUse(), + Version: model.JobVersion1, SchemaID: schema.ID, TableID: t.Meta().ID, SchemaName: schema.Name.L, @@ -5021,7 +5008,7 @@ func (e *executor) DropForeignKey(ctx sessionctx.Context, ti ast.Ident, fkName p } job := &model.Job{ - Version: model.GetJobVerInUse(), + Version: model.JobVersion1, SchemaID: schema.ID, TableID: t.Meta().ID, SchemaName: schema.Name.L, @@ -5642,7 +5629,7 @@ func (e *executor) AlterIndexVisibility(ctx sessionctx.Context, ident ast.Ident, } job := &model.Job{ - Version: model.GetJobVerInUse(), + Version: model.JobVersion1, SchemaID: schema.ID, TableID: tb.Meta().ID, SchemaName: schema.Name.L, From e5dff0c2d2403a999cabf1056d7fed37a5e13586 Mon Sep 17 00:00:00 2001 From: joccau Date: Mon, 23 Sep 2024 14:16:06 +0800 Subject: [PATCH 13/14] deal comments in pr Signed-off-by: joccau --- pkg/ddl/multi_schema_change.go | 8 +------- pkg/meta/model/job_args_test.go | 2 +- 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/pkg/ddl/multi_schema_change.go b/pkg/ddl/multi_schema_change.go index a6bf2346640b5..99c8331838326 100644 --- a/pkg/ddl/multi_schema_change.go +++ b/pkg/ddl/multi_schema_change.go @@ -260,13 +260,7 @@ func fillMultiSchemaInfo(info *model.MultiSchemaInfo, job *JobWrapper) error { info.AlterIndexes = append(info.AlterIndexes, idxName) case model.ActionRebaseAutoID, model.ActionModifyTableComment, model.ActionModifyTableCharsetAndCollate: case model.ActionAddForeignKey: - var fkInfo *model.FKInfo - if job.Version == model.JobVersion1 { - fkInfo = job.Args[0].(*model.FKInfo) - } else { - args := job.Args[0].(*model.AddForeignKeyArgs) - fkInfo = args.FkInfo - } + fkInfo := job.JobArgs.(*model.AddForeignKeyArgs).FkInfo info.AddForeignKeys = append(info.AddForeignKeys, model.AddForeignKeyInfo{ Name: fkInfo.Name, Cols: fkInfo.Cols, diff --git a/pkg/meta/model/job_args_test.go b/pkg/meta/model/job_args_test.go index 5041ab3c3222a..e8fb4a8e08674 100644 --- a/pkg/meta/model/job_args_test.go +++ b/pkg/meta/model/job_args_test.go @@ -459,7 +459,7 @@ func TestGetRebaseAutoIDArgs(t *testing.T) { func TestGetModifyTableCommentArgs(t *testing.T) { inArgs := &ModifyTableCommentArgs{ - Comment: "TiDb is great", + Comment: "TiDB is great", } for _, v := range []JobVersion{JobVersion1, JobVersion2} { From 11b29b2de702d9e57e19ec097667134c3da071ec Mon Sep 17 00:00:00 2001 From: joccau Date: Mon, 23 Sep 2024 15:08:50 +0800 Subject: [PATCH 14/14] use job.JobArgs replace job.Args Signed-off-by: joccau --- pkg/ddl/multi_schema_change.go | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/pkg/ddl/multi_schema_change.go b/pkg/ddl/multi_schema_change.go index 99c8331838326..0d8bb4e79f3cb 100644 --- a/pkg/ddl/multi_schema_change.go +++ b/pkg/ddl/multi_schema_change.go @@ -251,12 +251,7 @@ func fillMultiSchemaInfo(info *model.MultiSchemaInfo, job *JobWrapper) error { col := job.Args[0].(*table.Column) info.ModifyColumns = append(info.ModifyColumns, col.Name) case model.ActionAlterIndexVisibility: - var idxName pmodel.CIStr - if job.Version == model.JobVersion1 { - idxName = job.Args[0].(pmodel.CIStr) - } else { - idxName = job.Args[0].(*model.AlterIndexVisibilityArgs).IndexName - } + idxName := job.JobArgs.(*model.AlterIndexVisibilityArgs).IndexName info.AlterIndexes = append(info.AlterIndexes, idxName) case model.ActionRebaseAutoID, model.ActionModifyTableComment, model.ActionModifyTableCharsetAndCollate: case model.ActionAddForeignKey: