Skip to content

Commit

Permalink
ddl_notifier: create system table in test and escape the string liter…
Browse files Browse the repository at this point in the history
…al (#56734)

ref #55722
  • Loading branch information
lance6716 authored Oct 21, 2024
1 parent dfd6cf2 commit c82ba4e
Show file tree
Hide file tree
Showing 8 changed files with 33 additions and 13 deletions.
11 changes: 11 additions & 0 deletions pkg/ddl/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ const (
BackgroundSubtaskTableID = meta.MaxInt48 - 5
// BackgroundSubtaskHistoryTableID is the table ID of `tidb_background_subtask_history`.
BackgroundSubtaskHistoryTableID = meta.MaxInt48 - 6
// NotifierTableID is the table ID of `tidb_ddl_notifier`.
NotifierTableID = meta.MaxInt48 - 7

// JobTableSQL is the CREATE TABLE SQL of `tidb_ddl_job`.
JobTableSQL = "create table " + JobTable + `(
Expand Down Expand Up @@ -117,4 +119,13 @@ const (
summary json,
key idx_task_key(task_key),
key idx_state_update_time(state_update_time))`

// NotifierTableSQL is the CREATE TABLE SQL of `tidb_ddl_notifier`.
// TODO(lance6716): update the column name multi_schema_change_seq
NotifierTableSQL = `CREATE TABLE tidb_ddl_notifier (
ddl_job_id BIGINT,
multi_schema_change_seq BIGINT COMMENT '-1 if the schema change does not belong to a multi-schema change DDL. 0 or positive numbers representing the sub-job index of a multi-schema change DDL',
schema_change LONGBLOB COMMENT 'SchemaChangeEvent at rest',
processed_by_flag BIGINT UNSIGNED DEFAULT 0 COMMENT 'flag to mark which subscriber has processed the event',
PRIMARY KEY(ddl_job_id, multi_schema_change_seq))`
)
6 changes: 4 additions & 2 deletions pkg/ddl/notifier/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,13 @@ func (t *tableStore) Insert(ctx context.Context, s *sess.Session, change *schema
multi_schema_change_seq,
schema_change,
processed_by_flag
) VALUES (%d, %d, '%s', 0)`,
) VALUES (%%?, %%?, %%?, 0)`,
t.db, t.table,
)
_, err = s.Execute(
ctx, sql, "ddl_notifier",
change.ddlJobID, change.multiSchemaChangeSeq, event,
)
_, err = s.Execute(ctx, sql, "ddl_notifier")
return err
}

Expand Down
10 changes: 5 additions & 5 deletions pkg/ddl/notifier/testkit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,12 @@ CREATE TABLE ddl_notifier (
func TestPublishToTableStore(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("USE test")
tk.MustExec("DROP TABLE IF EXISTS ddl_notifier")
tk.MustExec(tableStructure)
t.Cleanup(func() {
tk.MustExec("TRUNCATE mysql.tidb_ddl_notifier")
})

ctx := context.Background()
s := notifier.OpenTableStore("test", "ddl_notifier")
s := notifier.OpenTableStore("mysql", "tidb_ddl_notifier")
se := sess.NewSession(tk.Session())
event1 := notifier.NewCreateTablesEvent([]*model.TableInfo{{ID: 1000, Name: pmodel.NewCIStr("t1")}})
err := notifier.PubSchemeChangeToStore(ctx, se, 1, -1, event1, s)
Expand Down Expand Up @@ -114,7 +114,7 @@ func TestBasicPubSub(t *testing.T) {
event1 := notifier.NewCreateTablesEvent([]*model.TableInfo{{ID: 1000, Name: pmodel.NewCIStr("t1")}})
err := notifier.PubSchemeChangeToStore(ctx, se, 1, -1, event1, s)
require.NoError(t, err)
event2 := notifier.NewDropTableEvent(&model.TableInfo{ID: 1001, Name: pmodel.NewCIStr("t2")})
event2 := notifier.NewDropTableEvent(&model.TableInfo{ID: 1001, Name: pmodel.NewCIStr("t2#special-char?in'name")})
err = notifier.PubSchemeChangeToStore(ctx, se, 2, -1, event2, s)
require.NoError(t, err)
event3 := notifier.NewDropTableEvent(&model.TableInfo{ID: 1002, Name: pmodel.NewCIStr("t3")})
Expand Down
4 changes: 2 additions & 2 deletions pkg/ddl/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (s *Session) Reset() {
}

// Execute executes a query.
func (s *Session) Execute(ctx context.Context, query string, label string) ([]chunk.Row, error) {
func (s *Session) Execute(ctx context.Context, query string, label string, args ...any) ([]chunk.Row, error) {
startTime := time.Now()
var err error
defer func() {
Expand All @@ -82,7 +82,7 @@ func (s *Session) Execute(ctx context.Context, query string, label string) ([]ch
if ctx.Value(kv.RequestSourceKey) == nil {
ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnDDL)
}
rs, err := s.Context.GetSQLExecutor().ExecuteInternal(ctx, query)
rs, err := s.Context.GetSQLExecutor().ExecuteInternal(ctx, query, args...)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/infoschema_cluster_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ func TestTableStorageStats(t *testing.T) {
"test 2",
))
rows := tk.MustQuery("select TABLE_NAME from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'mysql';").Rows()
result := 57
result := 58
require.Len(t, rows, result)

// More tests about the privileges.
Expand Down
4 changes: 2 additions & 2 deletions pkg/executor/infoschema_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -607,7 +607,7 @@ func TestColumnTable(t *testing.T) {
testkit.RowsWithSep("|",
"test|tbl1|col_2"))
tk.MustQuery(`select count(*) from information_schema.columns;`).Check(
testkit.RowsWithSep("|", "4979"))
testkit.RowsWithSep("|", "4983"))
}

func TestIndexUsageTable(t *testing.T) {
Expand Down Expand Up @@ -654,7 +654,7 @@ func TestIndexUsageTable(t *testing.T) {
testkit.RowsWithSep("|",
"test|idt2|idx_4"))
tk.MustQuery(`select count(*) from information_schema.tidb_index_usage;`).Check(
testkit.RowsWithSep("|", "77"))
testkit.RowsWithSep("|", "78"))

tk.MustQuery(`select TABLE_SCHEMA, TABLE_NAME, INDEX_NAME from information_schema.tidb_index_usage
where TABLE_SCHEMA = 'test1';`).Check(testkit.Rows())
Expand Down
2 changes: 2 additions & 0 deletions pkg/session/bootstrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,8 @@ func TestDDLTableCreateBackfillTable(t *testing.T) {
m.SetDDLTables(meta.MDLTableVersion)
MustExec(t, se, "drop table mysql.tidb_background_subtask")
MustExec(t, se, "drop table mysql.tidb_background_subtask_history")
// TODO(lance6716): remove it after tidb_ddl_notifier GA
MustExec(t, se, "drop table mysql.tidb_ddl_notifier")
err = txn.Commit(context.Background())
require.NoError(t, err)

Expand Down
7 changes: 6 additions & 1 deletion pkg/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -3222,11 +3222,16 @@ func splitAndScatterTable(store kv.Storage, tableIDs []int64) {
}
}

// InitDDLJobTables is to create tidb_ddl_job, tidb_ddl_reorg and tidb_ddl_history, or tidb_background_subtask and tidb_background_subtask_history.
// InitDDLJobTables creates system tables that DDL uses. Because CREATE TABLE is
// also a DDL, we must directly modify KV data to create these tables.
func InitDDLJobTables(store kv.Storage, targetVer meta.DDLTableVersion) error {
targetTables := DDLJobTables
if targetVer == meta.BackfillTableVersion {
targetTables = BackfillTables
if intest.InTest {
// create the system tables to test ddl notifier
targetTables = append(targetTables, tableBasicInfo{ddl.NotifierTableSQL, ddl.NotifierTableID})
}
}
return kv.RunInNewTxn(kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL), store, true, func(_ context.Context, txn kv.Transaction) error {
t := meta.NewMutator(txn)
Expand Down

0 comments on commit c82ba4e

Please sign in to comment.