Skip to content

Commit

Permalink
model: add schema name, table name to ddl job. (pingcap#11561) (pingc…
Browse files Browse the repository at this point in the history
  • Loading branch information
crazycs520 authored Mar 19, 2020
1 parent c745300 commit 8750363
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 6 deletions.
27 changes: 27 additions & 0 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ func (d *ddl) CreateSchema(ctx sessionctx.Context, schema model.CIStr, charsetIn

job := &model.Job{
SchemaID: schemaID,
SchemaName: dbInfo.Name.L,
Type: model.ActionCreateSchema,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{dbInfo},
Expand Down Expand Up @@ -144,6 +145,7 @@ func (d *ddl) AlterSchema(ctx sessionctx.Context, stmt *ast.AlterDatabaseStmt) (
// Do the DDL job.
job := &model.Job{
SchemaID: dbInfo.ID,
SchemaName: dbInfo.Name.L,
Type: model.ActionModifySchemaCharsetAndCollate,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{toCharset, toCollate},
Expand All @@ -161,6 +163,7 @@ func (d *ddl) DropSchema(ctx sessionctx.Context, schema model.CIStr) (err error)
}
job := &model.Job{
SchemaID: old.ID,
SchemaName: old.Name.L,
Type: model.ActionDropSchema,
BinlogInfo: &model.HistoryInfo{},
}
Expand Down Expand Up @@ -1293,6 +1296,7 @@ func (d *ddl) CreateTableWithLike(ctx sessionctx.Context, ident, referIdent ast.
job := &model.Job{
SchemaID: schema.ID,
TableID: tblInfo.ID,
SchemaName: schema.Name.L,
Type: model.ActionCreateTable,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{tblInfo},
Expand Down Expand Up @@ -1476,6 +1480,7 @@ func (d *ddl) CreateTable(ctx sessionctx.Context, s *ast.CreateTableStmt) (err e
job := &model.Job{
SchemaID: schema.ID,
TableID: tbInfo.ID,
SchemaName: schema.Name.L,
Type: model.ActionCreateTable,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{tbInfo},
Expand Down Expand Up @@ -1543,6 +1548,7 @@ func (d *ddl) RecoverTable(ctx sessionctx.Context, recoverInfo *RecoverInfo) (er
job := &model.Job{
SchemaID: schemaID,
TableID: tbInfo.ID,
SchemaName: schema.Name.L,
Type: model.ActionRecoverTable,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{tbInfo, recoverInfo.CurAutoIncID, recoverInfo.DropJobID,
Expand Down Expand Up @@ -1610,6 +1616,7 @@ func (d *ddl) CreateView(ctx sessionctx.Context, s *ast.CreateViewStmt) (err err
job := &model.Job{
SchemaID: schema.ID,
TableID: tbInfo.ID,
SchemaName: schema.Name.L,
Type: model.ActionCreateView,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{tbInfo, s.OrReplace, oldViewTblID},
Expand Down Expand Up @@ -2079,6 +2086,7 @@ func (d *ddl) RebaseAutoID(ctx sessionctx.Context, ident ast.Ident, newBase int6
job := &model.Job{
SchemaID: schema.ID,
TableID: t.Meta().ID,
SchemaName: schema.Name.L,
Type: model.ActionRebaseAutoID,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{newBase},
Expand Down Expand Up @@ -2109,6 +2117,7 @@ func (d *ddl) ShardRowID(ctx sessionctx.Context, tableIdent ast.Ident, uVal uint
Type: model.ActionShardRowID,
SchemaID: schema.ID,
TableID: t.Meta().ID,
SchemaName: schema.Name.L,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{uVal},
}
Expand Down Expand Up @@ -2225,6 +2234,7 @@ func (d *ddl) AddColumn(ctx sessionctx.Context, ti ast.Ident, spec *ast.AlterTab
job := &model.Job{
SchemaID: schema.ID,
TableID: t.Meta().ID,
SchemaName: schema.Name.L,
Type: model.ActionAddColumn,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{col, spec.Position, 0},
Expand Down Expand Up @@ -2270,6 +2280,7 @@ func (d *ddl) AddTablePartitions(ctx sessionctx.Context, ident ast.Ident, spec *
job := &model.Job{
SchemaID: schema.ID,
TableID: meta.ID,
SchemaName: schema.Name.L,
Type: model.ActionAddTablePartition,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{partInfo},
Expand Down Expand Up @@ -2342,6 +2353,7 @@ func (d *ddl) TruncateTablePartition(ctx sessionctx.Context, ident ast.Ident, sp
job := &model.Job{
SchemaID: schema.ID,
TableID: meta.ID,
SchemaName: schema.Name.L,
Type: model.ActionTruncateTablePartition,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{pid},
Expand Down Expand Up @@ -2384,6 +2396,7 @@ func (d *ddl) DropTablePartition(ctx sessionctx.Context, ident ast.Ident, spec *
job := &model.Job{
SchemaID: schema.ID,
TableID: meta.ID,
SchemaName: schema.Name.L,
Type: model.ActionDropTablePartition,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{partName},
Expand Down Expand Up @@ -2422,6 +2435,7 @@ func (d *ddl) DropColumn(ctx sessionctx.Context, ti ast.Ident, colName model.CIS
job := &model.Job{
SchemaID: schema.ID,
TableID: t.Meta().ID,
SchemaName: schema.Name.L,
Type: model.ActionDropColumn,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{colName},
Expand Down Expand Up @@ -2765,6 +2779,7 @@ func (d *ddl) getModifiableColumnJob(ctx sessionctx.Context, ident ast.Ident, or
job := &model.Job{
SchemaID: schema.ID,
TableID: t.Meta().ID,
SchemaName: schema.Name.L,
Type: model.ActionModifyColumn,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{&newCol, originalColName, spec.Position, modifyColumnTp},
Expand Down Expand Up @@ -2929,6 +2944,7 @@ func (d *ddl) AlterColumn(ctx sessionctx.Context, ident ast.Ident, spec *ast.Alt
job := &model.Job{
SchemaID: schema.ID,
TableID: t.Meta().ID,
SchemaName: schema.Name.L,
Type: model.ActionSetDefaultValue,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{col},
Expand All @@ -2955,6 +2971,7 @@ func (d *ddl) AlterTableComment(ctx sessionctx.Context, ident ast.Ident, spec *a
job := &model.Job{
SchemaID: schema.ID,
TableID: tb.Meta().ID,
SchemaName: schema.Name.L,
Type: model.ActionModifyTableComment,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{spec.Comment},
Expand Down Expand Up @@ -3006,6 +3023,7 @@ func (d *ddl) AlterTableCharsetAndCollate(ctx sessionctx.Context, ident ast.Iden
job := &model.Job{
SchemaID: schema.ID,
TableID: tb.Meta().ID,
SchemaName: schema.Name.L,
Type: model.ActionModifyTableCharsetAndCollate,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{toCharset, toCollate},
Expand Down Expand Up @@ -3166,6 +3184,7 @@ func (d *ddl) RenameIndex(ctx sessionctx.Context, ident ast.Ident, spec *ast.Alt
job := &model.Job{
SchemaID: schema.ID,
TableID: tb.Meta().ID,
SchemaName: schema.Name.L,
Type: model.ActionRenameIndex,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{spec.FromKey, spec.ToKey},
Expand All @@ -3190,6 +3209,7 @@ func (d *ddl) DropTable(ctx sessionctx.Context, ti ast.Ident) (err error) {
job := &model.Job{
SchemaID: schema.ID,
TableID: tb.Meta().ID,
SchemaName: schema.Name.L,
Type: model.ActionDropTable,
BinlogInfo: &model.HistoryInfo{},
}
Expand Down Expand Up @@ -3222,6 +3242,7 @@ func (d *ddl) DropView(ctx sessionctx.Context, ti ast.Ident) (err error) {
job := &model.Job{
SchemaID: schema.ID,
TableID: tb.Meta().ID,
SchemaName: schema.Name.L,
Type: model.ActionDropView,
BinlogInfo: &model.HistoryInfo{},
}
Expand All @@ -3244,6 +3265,7 @@ func (d *ddl) TruncateTable(ctx sessionctx.Context, ti ast.Ident) error {
job := &model.Job{
SchemaID: schema.ID,
TableID: tb.Meta().ID,
SchemaName: schema.Name.L,
Type: model.ActionTruncateTable,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{newTableID},
Expand Down Expand Up @@ -3312,6 +3334,7 @@ func (d *ddl) RenameTable(ctx sessionctx.Context, oldIdent, newIdent ast.Ident,
job := &model.Job{
SchemaID: newSchema.ID,
TableID: oldTbl.Meta().ID,
SchemaName: newSchema.Name.L,
Type: model.ActionRenameTable,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{oldSchema.ID, newIdent.Name},
Expand Down Expand Up @@ -3444,6 +3467,7 @@ func (d *ddl) CreateIndex(ctx sessionctx.Context, ti ast.Ident, unique bool, ind
job := &model.Job{
SchemaID: schema.ID,
TableID: t.Meta().ID,
SchemaName: schema.Name.L,
Type: model.ActionAddIndex,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{unique, indexName, idxColNames, indexOption},
Expand Down Expand Up @@ -3500,6 +3524,7 @@ func (d *ddl) CreateForeignKey(ctx sessionctx.Context, ti ast.Ident, fkName mode
job := &model.Job{
SchemaID: schema.ID,
TableID: t.Meta().ID,
SchemaName: schema.Name.L,
Type: model.ActionAddForeignKey,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{fkInfo},
Expand All @@ -3526,6 +3551,7 @@ func (d *ddl) DropForeignKey(ctx sessionctx.Context, ti ast.Ident, fkName model.
job := &model.Job{
SchemaID: schema.ID,
TableID: t.Meta().ID,
SchemaName: schema.Name.L,
Type: model.ActionDropForeignKey,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{fkName},
Expand Down Expand Up @@ -3586,6 +3612,7 @@ func (d *ddl) DropIndex(ctx sessionctx.Context, ti ast.Ident, indexName model.CI
SchemaID: schema.ID,
TableID: t.Meta().ID,
Type: jobTp,
SchemaName: schema.Name.L,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{indexName},
}
Expand Down
26 changes: 23 additions & 3 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,15 +412,35 @@ func (e *ShowDDLJobsExec) Next(ctx context.Context, req *chunk.Chunk) error {
numCurBatch := mathutil.Min(req.Capacity(), len(e.jobs)-e.cursor)
for i := e.cursor; i < e.cursor+numCurBatch; i++ {
req.AppendInt64(0, e.jobs[i].ID)
req.AppendString(1, getSchemaName(e.is, e.jobs[i].SchemaID))
req.AppendString(2, getTableName(e.is, e.jobs[i].TableID))
schemaName := e.jobs[i].SchemaName
tableName := ""
finishTS := uint64(0)
if e.jobs[i].BinlogInfo != nil {
finishTS = e.jobs[i].BinlogInfo.FinishedTS
if e.jobs[i].BinlogInfo.TableInfo != nil {
tableName = e.jobs[i].BinlogInfo.TableInfo.Name.L
}
if len(schemaName) == 0 && e.jobs[i].BinlogInfo.DBInfo != nil {
schemaName = e.jobs[i].BinlogInfo.DBInfo.Name.L
}
}
// For compatibility, the old version of DDL Job wasn't store the schema name and table name.
if len(schemaName) == 0 {
schemaName = getSchemaName(e.is, e.jobs[i].SchemaID)
}
if len(tableName) == 0 {
tableName = getTableName(e.is, e.jobs[i].TableID)
}
req.AppendString(1, schemaName)
req.AppendString(2, tableName)
req.AppendString(3, e.jobs[i].Type.String())
req.AppendString(4, e.jobs[i].SchemaState.String())
req.AppendInt64(5, e.jobs[i].SchemaID)
req.AppendInt64(6, e.jobs[i].TableID)
req.AppendInt64(7, e.jobs[i].RowCount)
req.AppendString(8, model.TSConvert2Time(e.jobs[i].StartTS).String())
req.AppendString(9, e.jobs[i].State.String())
req.AppendString(9, model.TSConvert2Time(finishTS).String())
req.AppendString(10, e.jobs[i].State.String())
}
e.cursor += numCurBatch
return nil
Expand Down
44 changes: 42 additions & 2 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func (s *testSuite) TestAdmin(c *C) {
err = r.Next(ctx, req)
c.Assert(err, IsNil)
row = req.GetRow(0)
c.Assert(row.Len(), Equals, 10)
c.Assert(row.Len(), Equals, 11)
txn, err = s.store.Begin()
c.Assert(err, IsNil)
historyJobs, err := admin.GetHistoryDDLJobs(txn, admin.DefNumHistoryJobs)
Expand All @@ -233,7 +233,7 @@ func (s *testSuite) TestAdmin(c *C) {
err = r.Next(ctx, req)
c.Assert(err, IsNil)
row = req.GetRow(0)
c.Assert(row.Len(), Equals, 10)
c.Assert(row.Len(), Equals, 11)
c.Assert(row.GetInt64(0), Equals, historyJobs[0].ID)
c.Assert(err, IsNil)

Expand Down Expand Up @@ -302,6 +302,13 @@ func (s *testSuite) TestAdmin(c *C) {
tk.MustExec("ALTER TABLE t1 ADD INDEX idx3 (c4);")
tk.MustExec("admin check table t1;")

// Test admin show ddl jobs table name after table has been droped.
tk.MustExec("drop table if exists t1;")
re := tk.MustQuery("admin show ddl jobs 1")
rows := re.Rows()
c.Assert(len(rows), Equals, 1)
c.Assert(rows[0][2], Equals, "t1")

// Test for reverse scan get history ddl jobs when ddl history jobs queue has multiple regions.
txn, err = s.store.Begin()
c.Assert(err, IsNil)
Expand All @@ -319,6 +326,39 @@ func (s *testSuite) TestAdmin(c *C) {
c.Assert(historyJobs, DeepEquals, historyJobs2)
}

func (s *testSuite) TestAdminShowDDLJobs(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("create database if not exists test_admin_show_ddl_jobs")
tk.MustExec("use test_admin_show_ddl_jobs")
tk.MustExec("create table t (a int);")

re := tk.MustQuery("admin show ddl jobs 1")
row := re.Rows()[0]
c.Assert(row[1], Equals, "test_admin_show_ddl_jobs")
jobID, err := strconv.Atoi(row[0].(string))
c.Assert(err, IsNil)

c.Assert(tk.Se.NewTxn(context.Background()), IsNil)
txn, err := tk.Se.Txn(true)
c.Assert(err, IsNil)
t := meta.NewMeta(txn)
job, err := t.GetHistoryDDLJob(int64(jobID))
c.Assert(err, IsNil)
c.Assert(job, NotNil)
// Test for compatibility. Old TiDB version doesn't have SchemaName field, and the BinlogInfo maybe nil.
// See PR: 11561.
job.BinlogInfo = nil
job.SchemaName = ""
err = t.AddHistoryDDLJob(job, true)
c.Assert(err, IsNil)
err = tk.Se.CommitTxn(context.Background())
c.Assert(err, IsNil)

re = tk.MustQuery("admin show ddl jobs 1")
row = re.Rows()[0]
c.Assert(row[1], Equals, "test_admin_show_ddl_jobs")
}

func (s *testSuite) TestAdminChecksumOfPartitionedTable(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("USE test;")
Expand Down
3 changes: 2 additions & 1 deletion planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1256,7 +1256,7 @@ func buildCleanupIndexFields() *expression.Schema {
}

func buildShowDDLJobsFields() *expression.Schema {
schema := expression.NewSchema(make([]*expression.Column, 0, 10)...)
schema := expression.NewSchema(make([]*expression.Column, 0, 11)...)
schema.Append(buildColumn("", "JOB_ID", mysql.TypeLonglong, 4))
schema.Append(buildColumn("", "DB_NAME", mysql.TypeVarchar, 64))
schema.Append(buildColumn("", "TABLE_NAME", mysql.TypeVarchar, 64))
Expand All @@ -1266,6 +1266,7 @@ func buildShowDDLJobsFields() *expression.Schema {
schema.Append(buildColumn("", "TABLE_ID", mysql.TypeLonglong, 4))
schema.Append(buildColumn("", "ROW_COUNT", mysql.TypeLonglong, 4))
schema.Append(buildColumn("", "START_TIME", mysql.TypeVarchar, 64))
schema.Append(buildColumn("", "END_TIME", mysql.TypeVarchar, 64))
schema.Append(buildColumn("", "STATE", mysql.TypeVarchar, 64))
return schema
}
Expand Down

0 comments on commit 8750363

Please sign in to comment.