Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: show more jobs in the tidb_mdl_view #40860

Merged
merged 4 commits into from
Feb 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 49 additions & 29 deletions infoschema/cluster_tables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -816,42 +816,62 @@ func (s *clusterTablesSuite) newTestKitWithRoot(t *testing.T) *testkit.TestKit {
}

func TestMDLView(t *testing.T) {
// setup suite
s := new(clusterTablesSuite)
s.store, s.dom = testkit.CreateMockStoreAndDomain(t)
s.httpServer, s.mockAddr = s.setUpMockPDHTTPServer()
s.startTime = time.Now()
defer s.httpServer.Close()

tk := s.newTestKitWithRoot(t)
tkDDL := s.newTestKitWithRoot(t)
tk3 := s.newTestKitWithRoot(t)
tk.MustExec("use test")
tk.MustExec("set global tidb_enable_metadata_lock=1")
tk.MustExec("create table t(a int);")
tk.MustExec("insert into t values(1);")
testCases := []struct {
name string
createTable string
ddl string
queryInTxn []string
sqlDigest string
}{
{"add column", "create table t(a int)", "alter table test.t add column b int", []string{"select 1", "select * from t"}, "[\"begin\",\"select ?\",\"select * from `t`\"]"},
{"change column in 1 step", "create table t(a int)", "alter table test.t change column a b int", []string{"select 1", "select * from t"}, "[\"begin\",\"select ?\",\"select * from `t`\"]"},
}
for _, c := range testCases {
t.Run(c.name, func(t *testing.T) {
// setup suite
s := new(clusterTablesSuite)
s.store, s.dom = testkit.CreateMockStoreAndDomain(t)
s.httpServer, s.mockAddr = s.setUpMockPDHTTPServer()
s.startTime = time.Now()
defer s.httpServer.Close()

tk.MustExec("begin")
tk.MustQuery("select 1;")
tk.MustQuery("select * from t;")
tk := s.newTestKitWithRoot(t)
tkDDL := s.newTestKitWithRoot(t)
tk3 := s.newTestKitWithRoot(t)
tk.MustExec("use test")
tk.MustExec("set global tidb_enable_metadata_lock=1")
tk.MustExec(c.createTable)

tk.MustExec("begin")
for _, q := range c.queryInTxn {
tk.MustQuery(q)
}

var wg sync.WaitGroup
wg.Add(1)
go func() {
tkDDL.MustExec("alter table test.t add column b int;")
wg.Done()
}()
var wg sync.WaitGroup
wg.Add(1)
go func() {
tkDDL.MustExec(c.ddl)
wg.Done()
}()

time.Sleep(200 * time.Millisecond)
time.Sleep(200 * time.Millisecond)

s.rpcserver, s.listenAddr = s.setUpRPCService(t, "127.0.0.1:0", tk3.Session().GetSessionManager())
defer s.rpcserver.Stop()
s.rpcserver, s.listenAddr = s.setUpRPCService(t, "127.0.0.1:0", tk3.Session().GetSessionManager())
defer s.rpcserver.Stop()

tk3.MustQuery("select DB_NAME, QUERY, SQL_DIGESTS from mysql.tidb_mdl_view").Check(testkit.Rows("test alter table test.t add column b int; [\"begin\",\"select ? ;\",\"select * from `t` ;\"]"))
tk3.MustQuery("select DB_NAME, QUERY, SQL_DIGESTS from mysql.tidb_mdl_view").Check(testkit.Rows(
strings.Join([]string{
"test",
c.ddl,
c.sqlDigest,
}, " "),
))

tk.MustExec("commit")
tk.MustExec("commit")

wg.Wait()
wg.Wait()
})
}
}

func TestCreateBindingFromHistory(t *testing.T) {
Expand Down
26 changes: 24 additions & 2 deletions session/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,19 @@ const (
);`
// CreateMDLView is a view about metadata locks.
CreateMDLView = `CREATE OR REPLACE VIEW mysql.tidb_mdl_view as (
select JOB_ID, DB_NAME, TABLE_NAME, QUERY, SESSION_ID, TxnStart, TIDB_DECODE_SQL_DIGESTS(ALL_SQL_DIGESTS, 4096) AS SQL_DIGESTS from information_schema.ddl_jobs, information_schema.CLUSTER_TIDB_TRX, information_schema.CLUSTER_PROCESSLIST where ddl_jobs.STATE = 'running' and find_in_set(ddl_jobs.table_id, CLUSTER_TIDB_TRX.RELATED_TABLE_IDS) and CLUSTER_TIDB_TRX.SESSION_ID=CLUSTER_PROCESSLIST.ID
SELECT job_id,
db_name,
table_name,
query,
session_id,
txnstart,
tidb_decode_sql_digests(all_sql_digests, 4096) AS SQL_DIGESTS
FROM information_schema.ddl_jobs,
information_schema.cluster_tidb_trx,
information_schema.cluster_processlist
WHERE (ddl_jobs.state != 'synced' and ddl_jobs.state != 'cancelled')
AND Find_in_set(ddl_jobs.table_id, cluster_tidb_trx.related_table_ids)
AND cluster_tidb_trx.session_id = cluster_processlist.id
);`

// CreatePlanReplayerStatusTable is a table about plan replayer status
Expand Down Expand Up @@ -779,11 +791,13 @@ const (
version110 = 110
// version111 adds the table tidb_ttl_task and tidb_ttl_job_history
version111 = 111
// version112 modifies the view tidb_mdl_view
version112 = 112
)

// currentBootstrapVersion is defined as a variable, so we can modify its value for testing.
// please make sure this is the largest version
var currentBootstrapVersion int64 = version111
var currentBootstrapVersion int64 = version112

// DDL owner key's expired time is ManagerSessionTTL seconds, we should wait the time and give more time to have a chance to finish it.
var internalSQLTimeout = owner.ManagerSessionTTL + 15
Expand Down Expand Up @@ -902,6 +916,7 @@ var (
upgradeToVer109,
upgradeToVer110,
upgradeToVer111,
upgradeToVer112,
}
)

Expand Down Expand Up @@ -2262,6 +2277,13 @@ func upgradeToVer111(s Session, ver int64) {
doReentrantDDL(s, CreateTTLJobHistory)
}

func upgradeToVer112(s Session, ver int64) {
if ver >= version112 {
return
}
doReentrantDDL(s, CreateMDLView)
}

func writeOOMAction(s Session) {
comment := "oom-action is `log` by default in v3.0.x, `cancel` by default in v4.0.11+"
mustExecute(s, `INSERT HIGH_PRIORITY INTO %n.%n VALUES (%?, %?, %?) ON DUPLICATE KEY UPDATE VARIABLE_VALUE= %?`,
Expand Down