Skip to content

Commit

Permalink
ddl_notifier: publish schema change event to store by `asyncNotifyEve…
Browse files Browse the repository at this point in the history
…nt` (#56425)

ref #55722
  • Loading branch information
fzzf678 authored Oct 17, 2024
1 parent 5e1423c commit 1e76e6e
Show file tree
Hide file tree
Showing 13 changed files with 264 additions and 88 deletions.
9 changes: 6 additions & 3 deletions pkg/ddl/add_column.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ import (
"go.uber.org/zap"
)

func onAddColumn(jobCtx *jobContext, job *model.Job) (ver int64, err error) {
func (w *worker) onAddColumn(jobCtx *jobContext, job *model.Job) (ver int64, err error) {
// Handle the rolling back job.
if job.IsRollingback() {
ver, err = onDropColumn(jobCtx, job)
Expand Down Expand Up @@ -130,10 +130,13 @@ func onAddColumn(jobCtx *jobContext, job *model.Job) (ver int64, err error) {
return ver, errors.Trace(err)
}

addColumnEvent := notifier.NewAddColumnEvent(tblInfo, []*model.ColumnInfo{columnInfo})
err = asyncNotifyEvent(jobCtx, addColumnEvent, job, w.sess)
if err != nil {
return ver, errors.Trace(err)
}
// Finish this job.
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
addColumnEvent := notifier.NewAddColumnEvent(tblInfo, []*model.ColumnInfo{columnInfo})
asyncNotifyEvent(jobCtx, addColumnEvent, job)
default:
err = dbterror.ErrInvalidDDLState.GenWithStackByArgs("column", columnInfo.State)
}
Expand Down
10 changes: 8 additions & 2 deletions pkg/ddl/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -806,7 +806,10 @@ func (w *worker) onFlashbackCluster(jobCtx *jobContext, job *model.Job) (ver int
case model.StateWriteReorganization:
// TODO: Support flashback in unistore.
if inFlashbackTest {
asyncNotifyEvent(jobCtx, notifier.NewFlashbackClusterEvent(), job)
err = asyncNotifyEvent(jobCtx, notifier.NewFlashbackClusterEvent(), job, w.sess)
if err != nil {
return ver, errors.Trace(err)
}
job.State = model.JobStateDone
job.SchemaState = model.StatePublic
return ver, nil
Expand All @@ -829,7 +832,10 @@ func (w *worker) onFlashbackCluster(jobCtx *jobContext, job *model.Job) (ver int
}
}

asyncNotifyEvent(jobCtx, notifier.NewFlashbackClusterEvent(), job)
err = asyncNotifyEvent(jobCtx, notifier.NewFlashbackClusterEvent(), job, w.sess)
if err != nil {
return ver, errors.Trace(err)
}
job.State = model.JobStateDone
job.SchemaState = model.StatePublic
return updateSchemaVersion(jobCtx, job)
Expand Down
33 changes: 20 additions & 13 deletions pkg/ddl/create_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func createTable(jobCtx *jobContext, job *model.Job, args *model.CreateTableArgs
}
}

func onCreateTable(jobCtx *jobContext, job *model.Job) (ver int64, _ error) {
func (w *worker) onCreateTable(jobCtx *jobContext, job *model.Job) (ver int64, _ error) {
failpoint.Inject("mockExceedErrorLimit", func(val failpoint.Value) {
if val.(bool) {
failpoint.Return(ver, errors.New("mock do job error"))
Expand All @@ -170,7 +170,7 @@ func onCreateTable(jobCtx *jobContext, job *model.Job) (ver int64, _ error) {
tbInfo := args.TableInfo

if len(tbInfo.ForeignKeys) > 0 {
return createTableWithForeignKeys(jobCtx, job, args)
return w.createTableWithForeignKeys(jobCtx, job, args)
}

tbInfo, err = createTable(jobCtx, job, args)
Expand All @@ -182,15 +182,18 @@ func onCreateTable(jobCtx *jobContext, job *model.Job) (ver int64, _ error) {
if err != nil {
return ver, errors.Trace(err)
}
createTableEvent := notifier.NewCreateTablesEvent([]*model.TableInfo{tbInfo})
err = asyncNotifyEvent(jobCtx, createTableEvent, job, w.sess)
if err != nil {
return ver, errors.Trace(err)
}

// Finish this job.
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tbInfo)
createTableEvent := notifier.NewCreateTableEvent(tbInfo)
asyncNotifyEvent(jobCtx, createTableEvent, job)
return ver, errors.Trace(err)
}

func createTableWithForeignKeys(jobCtx *jobContext, job *model.Job, args *model.CreateTableArgs) (ver int64, err error) {
func (w *worker) createTableWithForeignKeys(jobCtx *jobContext, job *model.Job, args *model.CreateTableArgs) (ver int64, err error) {
tbInfo := args.TableInfo
switch tbInfo.State {
case model.StateNone, model.StatePublic:
Expand All @@ -214,17 +217,21 @@ func createTableWithForeignKeys(jobCtx *jobContext, job *model.Job, args *model.
if err != nil {
return ver, errors.Trace(err)
}
createTableEvent := notifier.NewCreateTablesEvent([]*model.TableInfo{tbInfo})
err = asyncNotifyEvent(jobCtx, createTableEvent, job, w.sess)
if err != nil {
return ver, errors.Trace(err)
}

job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tbInfo)
createTableEvent := notifier.NewCreateTableEvent(tbInfo)
asyncNotifyEvent(jobCtx, createTableEvent, job)
return ver, nil
default:
return ver, errors.Trace(dbterror.ErrInvalidDDLJob.GenWithStackByArgs("table", tbInfo.State))
}
return ver, errors.Trace(err)
}

func onCreateTables(jobCtx *jobContext, job *model.Job) (int64, error) {
func (w *worker) onCreateTables(jobCtx *jobContext, job *model.Job) (int64, error) {
var ver int64

args, err := model.GetBatchCreateTableArgs(job)
Expand Down Expand Up @@ -266,15 +273,15 @@ func onCreateTables(jobCtx *jobContext, job *model.Job) (int64, error) {
if err != nil {
return ver, errors.Trace(err)
}
createTablesEvent := notifier.NewCreateTablesEvent(tableInfos)
err = asyncNotifyEvent(jobCtx, createTablesEvent, job, w.sess)
if err != nil {
return ver, errors.Trace(err)
}

job.State = model.JobStateDone
job.SchemaState = model.StatePublic
job.BinlogInfo.SetTableInfos(ver, tableInfos)
for i := range tableInfos {
createTableEvent := notifier.NewCreateTableEvent(tableInfos[i])
asyncNotifyEvent(jobCtx, createTableEvent, job)
}

return ver, errors.Trace(err)
}

Expand Down
27 changes: 24 additions & 3 deletions pkg/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ import (
"github.com/pingcap/tidb/pkg/util/dbterror"
"github.com/pingcap/tidb/pkg/util/gcutil"
"github.com/pingcap/tidb/pkg/util/generic"
"github.com/pingcap/tidb/pkg/util/intest"
clientv3 "go.etcd.io/etcd/client/v3"
atomicutil "go.uber.org/atomic"
"go.uber.org/zap"
Expand Down Expand Up @@ -565,26 +566,46 @@ func (d *ddl) RegisterStatsHandle(h *handle.Handle) {

// asyncNotifyEvent will notify the ddl event to outside world, say statistic handle. When the channel is full, we may
// give up notify and log it.
func asyncNotifyEvent(jobCtx *jobContext, e *notifier.SchemaChangeEvent, job *model.Job) {
func asyncNotifyEvent(jobCtx *jobContext, e *notifier.SchemaChangeEvent, job *model.Job, sctx *sess.Session) error {
// skip notify for system databases, system databases are expected to change at
// bootstrap and other nodes can also handle the changing in its bootstrap rather
// than be notified.
if tidbutil.IsMemOrSysDB(job.SchemaName) {
return
return nil
}

if intest.InTest && notifier.DefaultStore != nil {
failpoint.Inject("asyncNotifyEventError", func() {
failpoint.Return(errors.New("mock publish event error"))
})
var multiSchemaChangeSeq int64 = -1
if job.MultiSchemaInfo != nil {
multiSchemaChangeSeq = int64(job.MultiSchemaInfo.Seq)
}
err := notifier.PubSchemaChange(jobCtx.ctx, sctx, job.ID, multiSchemaChangeSeq, e)
if err != nil {
logutil.DDLLogger().Error("Error publish schema change event",
zap.Int64("jobID", job.ID),
zap.Int64("multiSchemaChangeSeq", multiSchemaChangeSeq),
zap.String("event", e.String()), zap.Error(err))
return err
}
return nil
}

ch := jobCtx.oldDDLCtx.ddlEventCh
if ch != nil {
for i := 0; i < 10; i++ {
select {
case ch <- e:
return
return nil
default:
time.Sleep(time.Microsecond * 10)
}
}
logutil.DDLLogger().Warn("fail to notify DDL event", zap.Stringer("event", e))
}
return nil
}

// NewDDL creates a new DDL.
Expand Down
8 changes: 4 additions & 4 deletions pkg/ddl/job_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -887,23 +887,23 @@ func (w *worker) runOneJobStep(
case model.ActionModifySchemaDefaultPlacement:
ver, err = onModifySchemaDefaultPlacement(jobCtx, job)
case model.ActionCreateTable:
ver, err = onCreateTable(jobCtx, job)
ver, err = w.onCreateTable(jobCtx, job)
case model.ActionCreateTables:
ver, err = onCreateTables(jobCtx, job)
ver, err = w.onCreateTables(jobCtx, job)
case model.ActionRepairTable:
ver, err = onRepairTable(jobCtx, job)
case model.ActionCreateView:
ver, err = onCreateView(jobCtx, job)
case model.ActionDropTable, model.ActionDropView, model.ActionDropSequence:
ver, err = onDropTableOrView(jobCtx, job)
ver, err = w.onDropTableOrView(jobCtx, job)
case model.ActionDropTablePartition:
ver, err = w.onDropTablePartition(jobCtx, job)
case model.ActionTruncateTablePartition:
ver, err = w.onTruncateTablePartition(jobCtx, job)
case model.ActionExchangeTablePartition:
ver, err = w.onExchangeTablePartition(jobCtx, job)
case model.ActionAddColumn:
ver, err = onAddColumn(jobCtx, job)
ver, err = w.onAddColumn(jobCtx, job)
case model.ActionDropColumn:
ver, err = onDropColumn(jobCtx, job)
case model.ActionModifyColumn:
Expand Down
7 changes: 5 additions & 2 deletions pkg/ddl/modify_column.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,15 +574,18 @@ func (w *worker) doModifyColumnTypeWithData(
if err != nil {
return ver, errors.Trace(err)
}
modifyColumnEvent := notifier.NewModifyColumnEvent(tblInfo, []*model.ColumnInfo{changingCol})
err = asyncNotifyEvent(jobCtx, modifyColumnEvent, job, w.sess)
if err != nil {
return ver, errors.Trace(err)
}

// Finish this job.
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
// Refactor the job args to add the old index ids into delete range table.
args.IndexIDs = rmIdxs
args.PartitionIDs = getPartitionIDs(tblInfo)
job.FillFinishedArgs(args)
modifyColumnEvent := notifier.NewModifyColumnEvent(tblInfo, []*model.ColumnInfo{changingCol})
asyncNotifyEvent(jobCtx, modifyColumnEvent, job)
default:
err = dbterror.ErrInvalidDDLState.GenWithStackByArgs("column", changingCol.State)
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/ddl/notifier/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,14 @@ go_test(
],
embed = [":notifier"],
flaky = True,
shard_count = 4,
shard_count = 6,
deps = [
"//pkg/ddl/session",
"//pkg/meta/model",
"//pkg/parser/model",
"//pkg/sessionctx",
"//pkg/testkit",
"//pkg/testkit/testfailpoint",
"@com_github_stretchr_testify//require",
],
)
25 changes: 15 additions & 10 deletions pkg/ddl/notifier/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ func (s *SchemaChangeEvent) String() string {
if s.inner.TableInfo != nil {
_, _ = fmt.Fprintf(&sb, ", Table ID: %d, Table Name: %s", s.inner.TableInfo.ID, s.inner.TableInfo.Name)
}
for _, tableInfo := range s.inner.TableInfos {
_, _ = fmt.Fprintf(&sb, ", Table ID: %d, Table Name: %s", tableInfo.ID, tableInfo.Name)
}
if s.inner.OldTableInfo != nil {
_, _ = fmt.Fprintf(&sb, ", Old Table ID: %d, Old Table Name: %s", s.inner.OldTableInfo.ID, s.inner.OldTableInfo.Name)
}
Expand Down Expand Up @@ -83,24 +86,25 @@ func (s *SchemaChangeEvent) GetType() model.ActionType {
return s.inner.Tp
}

// NewCreateTableEvent creates a SchemaChangeEvent whose type is
// ActionCreateTable.
func NewCreateTableEvent(
newTableInfo *model.TableInfo,
// NewCreateTablesEvent creates a SchemaChangeEvent whose type is
// ActionCreateTables.
// The type of the ActionCreateTable should also be included in ActionCreateTables.
func NewCreateTablesEvent(
newTableInfos []*model.TableInfo,
) *SchemaChangeEvent {
return &SchemaChangeEvent{
inner: &jsonSchemaChangeEvent{
Tp: model.ActionCreateTable,
TableInfo: newTableInfo,
Tp: model.ActionCreateTables,
TableInfos: newTableInfos,
},
}
}

// GetCreateTableInfo returns the table info of the SchemaChangeEvent whose type
// GetCreateTablesInfo returns the table info of the SchemaChangeEvent whose type
// is ActionCreateTable.
func (s *SchemaChangeEvent) GetCreateTableInfo() *model.TableInfo {
intest.Assert(s.inner.Tp == model.ActionCreateTable)
return s.inner.TableInfo
func (s *SchemaChangeEvent) GetCreateTablesInfo() []*model.TableInfo {
intest.Assert(s.inner.Tp == model.ActionCreateTables)
return s.inner.TableInfos
}

// NewTruncateTableEvent creates a SchemaChangeEvent whose type is
Expand Down Expand Up @@ -430,6 +434,7 @@ func NewFlashbackClusterEvent() *SchemaChangeEvent {
// we want to hide the details to subscribers, so SchemaChangeEvent contain this struct.
type jsonSchemaChangeEvent struct {
TableInfo *model.TableInfo `json:"table_info,omitempty"`
TableInfos []*model.TableInfo `json:"table_infos,omitempty"`
OldTableInfo *model.TableInfo `json:"old_table_info,omitempty"`
AddedPartInfo *model.PartitionInfo `json:"added_partition_info,omitempty"`
DroppedPartInfo *model.PartitionInfo `json:"dropped_partition_info,omitempty"`
Expand Down
Loading

0 comments on commit 1e76e6e

Please sign in to comment.