Skip to content

Commit

Permalink
address comment
Browse files Browse the repository at this point in the history
Signed-off-by: lance6716 <lance6716@gmail.com>
  • Loading branch information
lance6716 committed Sep 5, 2024
1 parent e80845b commit a0a25a4
Show file tree
Hide file tree
Showing 7 changed files with 32 additions and 34 deletions.
2 changes: 1 addition & 1 deletion pkg/ddl/add_column.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func onAddColumn(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ver int64, e
tblInfo,
[]*model.ColumnInfo{columnInfo},
)
asyncNotifyEvent(jobCtx, addColumnEvent)
asyncNotifyEvent(jobCtx, addColumnEvent, job)
default:
err = dbterror.ErrInvalidDDLState.GenWithStackByArgs("column", columnInfo.State)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/ddl/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -821,7 +821,7 @@ func (w *worker) onFlashbackCluster(jobCtx *jobContext, t *meta.Meta, job *model
case model.StateWriteReorganization:
// TODO: Support flashback in unistore.
if inFlashbackTest {
asyncNotifyEvent(jobCtx, statsutil.NewFlashbackClusterEvent())
asyncNotifyEvent(jobCtx, statsutil.NewFlashbackClusterEvent(), job)
job.State = model.JobStateDone
job.SchemaState = model.StatePublic
return ver, nil
Expand All @@ -844,7 +844,7 @@ func (w *worker) onFlashbackCluster(jobCtx *jobContext, t *meta.Meta, job *model
}
}

asyncNotifyEvent(jobCtx, statsutil.NewFlashbackClusterEvent())
asyncNotifyEvent(jobCtx, statsutil.NewFlashbackClusterEvent(), job)
job.State = model.JobStateDone
job.SchemaState = model.StatePublic
return updateSchemaVersion(jobCtx, t, job)
Expand Down
27 changes: 10 additions & 17 deletions pkg/ddl/create_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ import (
"github.com/pingcap/tidb/pkg/table/tables"
"github.com/pingcap/tidb/pkg/types"
driver "github.com/pingcap/tidb/pkg/types/parser_driver"
tidbutil "github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/dbterror"
"github.com/pingcap/tidb/pkg/util/mock"
"github.com/pingcap/tidb/pkg/util/set"
Expand Down Expand Up @@ -183,12 +182,10 @@ func onCreateTable(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ver int64,

// Finish this job.
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tbInfo)
if !tidbutil.IsMemOrSysDB(job.SchemaName) {
createTableEvent := &statsutil.DDLEvent{
SchemaChangeEvent: util.NewCreateTableEvent(tbInfo),
}
asyncNotifyEvent(jobCtx, createTableEvent)
createTableEvent := &statsutil.DDLEvent{
SchemaChangeEvent: util.NewCreateTableEvent(tbInfo),
}
asyncNotifyEvent(jobCtx, createTableEvent, job)
return ver, errors.Trace(err)
}

Expand Down Expand Up @@ -216,12 +213,10 @@ func createTableWithForeignKeys(jobCtx *jobContext, t *meta.Meta, job *model.Job
return ver, errors.Trace(err)
}
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tbInfo)
if !tidbutil.IsMemOrSysDB(job.SchemaName) {
createTableEvent := &statsutil.DDLEvent{
SchemaChangeEvent: util.NewCreateTableEvent(tbInfo),
}
asyncNotifyEvent(jobCtx, createTableEvent)
createTableEvent := &statsutil.DDLEvent{
SchemaChangeEvent: util.NewCreateTableEvent(tbInfo),
}
asyncNotifyEvent(jobCtx, createTableEvent, job)
return ver, nil
default:
return ver, errors.Trace(dbterror.ErrInvalidDDLJob.GenWithStackByArgs("table", tbInfo.State))
Expand Down Expand Up @@ -274,13 +269,11 @@ func onCreateTables(jobCtx *jobContext, t *meta.Meta, job *model.Job) (int64, er
job.State = model.JobStateDone
job.SchemaState = model.StatePublic
job.BinlogInfo.SetTableInfos(ver, args)
if !tidbutil.IsMemOrSysDB(job.SchemaName) {
for i := range args {
createTableEvent := &statsutil.DDLEvent{
SchemaChangeEvent: util.NewCreateTableEvent(args[i]),
}
asyncNotifyEvent(jobCtx, createTableEvent)
for i := range args {
createTableEvent := &statsutil.DDLEvent{
SchemaChangeEvent: util.NewCreateTableEvent(args[i]),
}
asyncNotifyEvent(jobCtx, createTableEvent, job)
}

return ver, errors.Trace(err)
Expand Down
9 changes: 8 additions & 1 deletion pkg/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,14 @@ func (d *ddl) RegisterStatsHandle(h *handle.Handle) {

// asyncNotifyEvent will notify the ddl event to outside world, say statistic handle. When the channel is full, we may
// give up notify and log it.
func asyncNotifyEvent(jobCtx *jobContext, e *statsutil.DDLEvent) {
func asyncNotifyEvent(jobCtx *jobContext, e *statsutil.DDLEvent, job *model.Job) {
// skip notify for system databases, system databases are expected to change at
// bootstrap and other nodes can also handle the changing in its bootstrap rather
// than be notified.
if tidbutil.IsMemOrSysDB(job.SchemaName) {
return
}

ch := jobCtx.oldDDLCtx.ddlEventCh
if ch != nil {
for i := 0; i < 10; i++ {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/modify_column.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ func (w *worker) doModifyColumnTypeWithData(
tblInfo,
[]*model.ColumnInfo{changingCol},
)
asyncNotifyEvent(jobCtx, modifyColumnEvent)
asyncNotifyEvent(jobCtx, modifyColumnEvent, job)
default:
err = dbterror.ErrInvalidDDLState.GenWithStackByArgs("column", changingCol.State)
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func (w *worker) onAddTablePartition(jobCtx *jobContext, t *meta.Meta, job *mode
tblInfo,
partInfo,
)
asyncNotifyEvent(jobCtx, addPartitionEvent)
asyncNotifyEvent(jobCtx, addPartitionEvent, job)
default:
err = dbterror.ErrInvalidDDLState.GenWithStackByArgs("partition", job.SchemaState)
}
Expand Down Expand Up @@ -2338,7 +2338,7 @@ func (w *worker) onDropTablePartition(jobCtx *jobContext, t *meta.Meta, job *mod
tblInfo,
&model.PartitionInfo{Definitions: droppedDefs},
)
asyncNotifyEvent(jobCtx, dropPartitionEvent)
asyncNotifyEvent(jobCtx, dropPartitionEvent, job)
// A background job will be created to delete old partition data.
job.Args = []any{physicalTableIDs}
default:
Expand Down Expand Up @@ -2431,7 +2431,7 @@ func (w *worker) onTruncateTablePartition(jobCtx *jobContext, t *meta.Meta, job
&model.PartitionInfo{Definitions: newPartitions},
&model.PartitionInfo{Definitions: oldPartitions},
)
asyncNotifyEvent(jobCtx, truncatePartitionEvent)
asyncNotifyEvent(jobCtx, truncatePartitionEvent, job)
// A background job will be created to delete old partition data.
job.Args = []any{oldIDs}

Expand Down Expand Up @@ -2570,7 +2570,7 @@ func (w *worker) onTruncateTablePartition(jobCtx *jobContext, t *meta.Meta, job
&model.PartitionInfo{Definitions: newPartitions},
&model.PartitionInfo{Definitions: oldPartitions},
)
asyncNotifyEvent(jobCtx, truncatePartitionEvent)
asyncNotifyEvent(jobCtx, truncatePartitionEvent, job)
// A background job will be created to delete old partition data.
job.Args = []any{oldIDs}
default:
Expand Down Expand Up @@ -2943,7 +2943,7 @@ func (w *worker) onExchangeTablePartition(jobCtx *jobContext, t *meta.Meta, job
&model.PartitionInfo{Definitions: []model.PartitionDefinition{originalPartitionDef}},
originalNt,
)
asyncNotifyEvent(jobCtx, exchangePartitionEvent)
asyncNotifyEvent(jobCtx, exchangePartitionEvent, job)
return ver, nil
}

Expand Down Expand Up @@ -3481,7 +3481,7 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, t *meta.Meta, job *mo
if err != nil {
return ver, errors.Trace(err)
}
asyncNotifyEvent(jobCtx, event)
asyncNotifyEvent(jobCtx, event, job)
// A background job will be created to delete old partition data.
job.Args = []any{physicalTableIDs}

Expand Down
10 changes: 4 additions & 6 deletions pkg/ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func onDropTableOrView(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ver in
job.SchemaID,
tblInfo,
)
asyncNotifyEvent(jobCtx, dropTableEvent)
asyncNotifyEvent(jobCtx, dropTableEvent, job)
}
default:
return ver, errors.Trace(dbterror.ErrInvalidDDLState.GenWithStackByArgs("table", tblInfo.State))
Expand Down Expand Up @@ -570,12 +570,10 @@ func (w *worker) onTruncateTable(jobCtx *jobContext, t *meta.Meta, job *model.Jo
return ver, errors.Trace(err)
}
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
if !tidbutil.IsMemOrSysDB(job.SchemaName) {
truncateTableEvent := &statsutil.DDLEvent{
SchemaChangeEvent: util.NewTruncateTableEvent(tblInfo, oldTblInfo),
}
asyncNotifyEvent(jobCtx, truncateTableEvent)
truncateTableEvent := &statsutil.DDLEvent{
SchemaChangeEvent: util.NewTruncateTableEvent(tblInfo, oldTblInfo),
}
asyncNotifyEvent(jobCtx, truncateTableEvent, job)
startKey := tablecodec.EncodeTablePrefix(tableID)
job.Args = []any{startKey, oldPartitionIDs}
return ver, nil
Expand Down

0 comments on commit a0a25a4

Please sign in to comment.