Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

infoschema/executor : add DDLJobs sys table #14837

Merged
merged 88 commits into from
Mar 19, 2020
Merged
Show file tree
Hide file tree
Changes from 82 commits
Commits
Show all changes
88 commits
Select commit Hold shift + click to select a range
fca2edb
Merge pull request #1 from pingcap/master
reafans Nov 26, 2019
c1d631e
Merge remote-tracking branch 'origin/master'
reafans Nov 26, 2019
43000f4
Merge remote-tracking branch 'origin/master'
reafans Nov 27, 2019
f028aa5
Merge pull request #2 from pingcap/master
reafans Nov 27, 2019
46bd5cd
Merge remote-tracking branch 'origin/master'
reafans Dec 2, 2019
a633220
Merge pull request #4 from pingcap/master
reafans Dec 4, 2019
ff562d4
Merge pull request #6 from pingcap/master
reafans Dec 4, 2019
3cb0499
Merge pull request #7 from pingcap/master
reafans Dec 10, 2019
ccea6bb
Merge pull request #8 from pingcap/master
reafans Dec 12, 2019
9e1ac7e
Merge pull request #10 from pingcap/master
reafans Dec 13, 2019
3a0858b
Merge remote-tracking branch 'origin/master'
reafans Dec 17, 2019
9897e0f
Merge remote-tracking branch 'origin/master'
reafans Dec 19, 2019
5fac8ba
Merge branch 'master' of github.com:pingcap/tidb
reafans Jan 3, 2020
30acb58
Merge pull request #12 from pingcap/master
reafans Jan 6, 2020
4a3b0bd
add ddl_jobs sys table
reafans Jan 9, 2020
a6f081a
update
reafans Jan 9, 2020
028f35c
Merge branch 'add_ddlJob_inforschema' of https://github.com/reafans/t…
reafans Jan 9, 2020
ee8328c
Merge branch 'master' of https://github.com/pingcap/tidb into add_ddl…
reafans Feb 11, 2020
f381472
update
reafans Feb 12, 2020
234dca0
Merge branch 'master' of https://github.com/pingcap/tidb into add_ddl…
reafans Feb 13, 2020
41b6d9e
add ddljobs sys table
reafans Feb 18, 2020
7261211
Merge branch 'master' of https://github.com/pingcap/tidb into add_ddl…
reafans Feb 18, 2020
4fd4d4c
add comment
reafans Feb 18, 2020
61503a2
add Query field
reafans Feb 19, 2020
bf6b9f6
fmt
reafans Feb 19, 2020
8391515
fixd bug
reafans Feb 19, 2020
7b26867
addressd comment
reafans Feb 19, 2020
a95ee13
Merge branch 'master' into add_ddljobs_sys_table
reafans Feb 19, 2020
35dda53
addressd comment
reafans Feb 19, 2020
658538f
add err check
reafans Feb 19, 2020
ea2c89b
Merge branch 'master' into add_ddljobs_sys_table
reafans Feb 19, 2020
dd6c445
add checker
reafans Feb 20, 2020
7af00ab
Merge branch 'add_ddljobs_sys_table' of https://github.com/reafans/ti…
reafans Feb 20, 2020
df1af82
Merge branch 'master' into add_ddljobs_sys_table
reafans Feb 20, 2020
c3c0599
fix bug
reafans Feb 20, 2020
f992f65
Merge branch 'add_ddljobs_sys_table' of https://github.com/reafans/ti…
reafans Feb 20, 2020
dd3ed4f
fmt import
reafans Feb 21, 2020
014900e
addressd comment
reafans Feb 21, 2020
45bb6fc
fix bug
reafans Feb 21, 2020
51b12e9
fix conflict
reafans Mar 6, 2020
189579b
fix conflict
reafans Mar 6, 2020
1a16c58
fmt
reafans Mar 6, 2020
e79d574
Merge branch 'master' into add_ddljobs_sys_table
reafans Mar 6, 2020
10ae282
Merge branch 'master' into add_ddljobs_sys_table
crazycs520 Mar 6, 2020
7884ae9
update
reafans Mar 9, 2020
3f78a5d
Merge branch 'master' of https://github.com/pingcap/tidb into add_ddl…
reafans Mar 9, 2020
334f1f8
Merge branch 'add_ddljobs_sys_table' of https://github.com/reafans/ti…
reafans Mar 9, 2020
4bc617c
address comment
reafans Mar 9, 2020
308a9ff
address comment
reafans Mar 9, 2020
731faf2
fix conflict
reafans Mar 9, 2020
63dd3c3
fmt
reafans Mar 9, 2020
8a1a69c
fix bug
reafans Mar 9, 2020
a2f65c3
update
reafans Mar 9, 2020
f880c85
Merge branch 'add_ddljobs_sys_table' of https://github.com/reafans/ti…
reafans Mar 9, 2020
09ba7bc
fix bug
reafans Mar 9, 2020
a53cfb1
Merge branch 'master' into add_ddljobs_sys_table
reafans Mar 10, 2020
cc0dce3
Merge branch 'master' into add_ddljobs_sys_table
reafans Mar 10, 2020
a3511dd
Merge branch 'master' into add_ddljobs_sys_table
reafans Mar 10, 2020
b8b8b3d
Merge branch 'master' into add_ddljobs_sys_table
reafans Mar 10, 2020
5b9ff6e
Merge branch 'master' into add_ddljobs_sys_table
reafans Mar 10, 2020
46129ce
update
reafans Mar 10, 2020
bbf1da5
fix bug
reafans Mar 10, 2020
6fd77f6
Merge branch 'add_ddljobs_sys_table' of https://github.com/reafans/ti…
reafans Mar 10, 2020
92aabf0
update
reafans Mar 10, 2020
0e73fa9
fix bug
reafans Mar 10, 2020
656093e
Merge branch 'master' of https://github.com/pingcap/tidb into add_ddl…
reafans Mar 10, 2020
fcdcf8f
Merge branch 'add_ddljobs_sys_table' of https://github.com/reafans/ti…
reafans Mar 10, 2020
8ca7fe6
fi bug
reafans Mar 10, 2020
5eab7cb
fix bug
reafans Mar 10, 2020
c7b4191
Merge branch 'master' of https://github.com/pingcap/tidb into add_ddl…
reafans Mar 15, 2020
434f5af
add ut
reafans Mar 15, 2020
b547ea5
fix bug
reafans Mar 15, 2020
4ed63f0
add ut
reafans Mar 15, 2020
b755372
fix ut
reafans Mar 15, 2020
9fecbed
fmt
reafans Mar 15, 2020
c5dc337
add ut
reafans Mar 15, 2020
58059f9
fix
reafans Mar 15, 2020
3f077ba
fix ut
reafans Mar 15, 2020
60b81fb
fix bug
reafans Mar 15, 2020
86682ff
fix ut
reafans Mar 15, 2020
a2cbc98
fmt
reafans Mar 15, 2020
57a72af
fix ut
reafans Mar 15, 2020
70c817c
Merge branch 'master' into add_ddljobs_sys_table
crazycs520 Mar 16, 2020
5a27fa0
Merge branch 'master' into add_ddljobs_sys_table
reafans Mar 19, 2020
bff71ca
Merge branch 'master' of https://github.com/pingcap/tidb into add_ddl…
reafans Mar 19, 2020
3291810
addresss comment
reafans Mar 19, 2020
6f0cd18
Merge branch 'add_ddljobs_sys_table' of https://github.com/reafans/ti…
reafans Mar 19, 2020
2880add
Merge branch 'master' into add_ddljobs_sys_table
reafans Mar 19, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1455,6 +1455,11 @@ func (b *executorBuilder) buildMemTable(v *plannercore.PhysicalMemTable) Executo
extractor: v.Extractor.(*plannercore.SlowQueryExtractor),
},
}
case strings.ToLower(infoschema.TableDDLJobs):
return &DDLJobsReaderExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
is: b.is,
}
}
}
tb, _ := b.is.TableByID(v.Table.ID)
Expand Down
132 changes: 79 additions & 53 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"
"runtime"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
Expand All @@ -26,6 +27,7 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/auth"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/parser/terror"
Expand All @@ -38,6 +40,7 @@ import (
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/meta/autoid"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/privilege"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
Expand Down Expand Up @@ -329,14 +332,79 @@ func (e *ShowDDLExec) Next(ctx context.Context, req *chunk.Chunk) error {
// ShowDDLJobsExec represent a show DDL jobs executor.
type ShowDDLJobsExec struct {
baseExecutor
DDLJobRetriever

cursor int
jobNumber int
is infoschema.InfoSchema
done bool
}

// DDLJobRetriever retrieve the DDLJobs.
type DDLJobRetriever struct {
runningJobs []*model.Job
historyJobIter *meta.LastJobIterator
cacheJobs []*model.Job
jobNumber int
cursor int
is infoschema.InfoSchema
done bool
activeRoles []*auth.RoleIdentity
cacheJobs []*model.Job
}

func (e *DDLJobRetriever) initial(txn kv.Transaction) error {
jobs, err := admin.GetDDLJobs(txn)
if err != nil {
return err
}
m := meta.NewMeta(txn)
e.historyJobIter, err = m.GetLastHistoryDDLJobsIterator()
if err != nil {
return err
}
e.runningJobs = jobs
e.cursor = 0
return nil
}

func (e *DDLJobRetriever) appendJobToChunk(req *chunk.Chunk, job *model.Job, checker privilege.Manager) {
schemaName := job.SchemaName
tableName := ""
finishTS := uint64(0)
if job.BinlogInfo != nil {
finishTS = job.BinlogInfo.FinishedTS
if job.BinlogInfo.TableInfo != nil {
tableName = job.BinlogInfo.TableInfo.Name.L
}
if len(schemaName) == 0 && job.BinlogInfo.DBInfo != nil {
schemaName = job.BinlogInfo.DBInfo.Name.L
}
}
// For compatibility, the old version of DDL Job wasn't store the schema name and table name.
if len(schemaName) == 0 {
schemaName = getSchemaName(e.is, job.SchemaID)
}
if len(tableName) == 0 {
tableName = getTableName(e.is, job.TableID)
}

// Check the privilege.
if checker != nil && !checker.RequestVerification(e.activeRoles, strings.ToLower(schemaName), strings.ToLower(tableName), "", mysql.AllPrivMask) {
return
}

req.AppendInt64(0, job.ID)
req.AppendString(1, schemaName)
req.AppendString(2, tableName)
req.AppendString(3, job.Type.String())
req.AppendString(4, job.SchemaState.String())
req.AppendInt64(5, job.SchemaID)
req.AppendInt64(6, job.TableID)
req.AppendInt64(7, job.RowCount)
req.AppendString(8, model.TSConvert2Time(job.StartTS).String())
if finishTS > 0 {
req.AppendString(9, model.TSConvert2Time(finishTS).String())
} else {
req.AppendString(9, "")
}
req.AppendString(10, job.State.String())
}

// ShowDDLJobQueriesExec represents a show DDL job queries executor.
Expand Down Expand Up @@ -404,21 +472,14 @@ func (e *ShowDDLJobsExec) Open(ctx context.Context) error {
if err != nil {
return err
}
jobs, err := admin.GetDDLJobs(txn)
if err != nil {
return err
}
e.DDLJobRetriever.is = e.is
if e.jobNumber == 0 {
e.jobNumber = admin.DefNumHistoryJobs
}

m := meta.NewMeta(txn)
e.historyJobIter, err = m.GetLastHistoryDDLJobsIterator()
err = e.DDLJobRetriever.initial(txn)
if err != nil {
return err
}
e.runningJobs = append(e.runningJobs, jobs...)
e.cursor = 0
return nil
}

Expand All @@ -429,17 +490,19 @@ func (e *ShowDDLJobsExec) Next(ctx context.Context, req *chunk.Chunk) error {
return nil
}
count := 0

// Append running ddl jobs.
if e.cursor < len(e.runningJobs) {
numCurBatch := mathutil.Min(req.Capacity(), len(e.runningJobs)-e.cursor)
for i := e.cursor; i < e.cursor+numCurBatch; i++ {
e.appendJobToChunk(req, e.runningJobs[i])
e.appendJobToChunk(req, e.runningJobs[i], nil)
}
e.cursor += numCurBatch
count += numCurBatch
}
var err error

// Append history ddl jobs.
var err error
if count < req.Capacity() {
num := req.Capacity() - count
remainNum := e.jobNumber - (e.cursor - len(e.runningJobs))
Expand All @@ -449,50 +512,13 @@ func (e *ShowDDLJobsExec) Next(ctx context.Context, req *chunk.Chunk) error {
return err
}
for _, job := range e.cacheJobs {
e.appendJobToChunk(req, job)
e.appendJobToChunk(req, job, nil)
}
e.cursor += len(e.cacheJobs)
}
return nil
}

func (e *ShowDDLJobsExec) appendJobToChunk(req *chunk.Chunk, job *model.Job) {
req.AppendInt64(0, job.ID)
schemaName := job.SchemaName
tableName := ""
finishTS := uint64(0)
if job.BinlogInfo != nil {
finishTS = job.BinlogInfo.FinishedTS
if job.BinlogInfo.TableInfo != nil {
tableName = job.BinlogInfo.TableInfo.Name.L
}
if len(schemaName) == 0 && job.BinlogInfo.DBInfo != nil {
schemaName = job.BinlogInfo.DBInfo.Name.L
}
}
// For compatibility, the old version of DDL Job wasn't store the schema name and table name.
if len(schemaName) == 0 {
schemaName = getSchemaName(e.is, job.SchemaID)
}
if len(tableName) == 0 {
tableName = getTableName(e.is, job.TableID)
}
req.AppendString(1, schemaName)
req.AppendString(2, tableName)
req.AppendString(3, job.Type.String())
req.AppendString(4, job.SchemaState.String())
req.AppendInt64(5, job.SchemaID)
req.AppendInt64(6, job.TableID)
req.AppendInt64(7, job.RowCount)
req.AppendString(8, model.TSConvert2Time(job.StartTS).String())
if finishTS > 0 {
req.AppendString(9, model.TSConvert2Time(finishTS).String())
} else {
req.AppendString(9, "")
}
req.AppendString(10, job.State.String())
}

func getSchemaName(is infoschema.InfoSchema, id int64) string {
var schemaName string
DBInfo, ok := is.SchemaByID(id)
Expand Down
62 changes: 62 additions & 0 deletions executor/infoschema_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"sync"
"time"

"github.com/cznic/mathutil"
"github.com/pingcap/errors"
"github.com/pingcap/parser/charset"
"github.com/pingcap/parser/model"
Expand All @@ -35,6 +36,7 @@ import (
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/pdapi"
"github.com/pingcap/tidb/util/set"
"github.com/pingcap/tidb/util/sqlexec"
Expand Down Expand Up @@ -705,6 +707,66 @@ func (e *memtableRetriever) setDataFromViews(ctx sessionctx.Context, schemas []*
e.rows = rows
}

// DDLJobsReaderExec executes DDLJobs information retrieving.
type DDLJobsReaderExec struct {
reafans marked this conversation as resolved.
Show resolved Hide resolved
baseExecutor
DDLJobRetriever

cacheJobs []*model.Job
reafans marked this conversation as resolved.
Show resolved Hide resolved
is infoschema.InfoSchema
}

// Open implements the Executor Next interface.
func (e *DDLJobsReaderExec) Open(ctx context.Context) error {
if err := e.baseExecutor.Open(ctx); err != nil {
return err
}
txn, err := e.ctx.Txn(true)
if err != nil {
return err
}
e.DDLJobRetriever.is = e.is
e.activeRoles = e.ctx.GetSessionVars().ActiveRoles
err = e.DDLJobRetriever.initial(txn)
if err != nil {
return err
}
return nil
}

// Next implements the Executor Next interface.
func (e *DDLJobsReaderExec) Next(ctx context.Context, req *chunk.Chunk) error {
req.GrowAndReset(e.maxChunkSize)
checker := privilege.GetPrivilegeManager(e.ctx)
count := 0

// Append running DDL jobs.
if e.cursor < len(e.runningJobs) {
num := mathutil.Min(req.Capacity(), len(e.runningJobs)-e.cursor)
for i := e.cursor; i < e.cursor+num; i++ {
e.appendJobToChunk(req, e.runningJobs[i], checker)
req.AppendString(11, e.runningJobs[i].Query)
}
e.cursor += num
count += num
}
var err error

// Append history DDL jobs.
if count < req.Capacity() {
e.cacheJobs, err = e.historyJobIter.GetLastJobs(req.Capacity()-count, e.cacheJobs)
if err != nil {
return err
}
for _, job := range e.cacheJobs {
e.appendJobToChunk(req, job, checker)
req.AppendString(11, job.Query)
}
e.cursor += len(e.cacheJobs)
}
return nil
}

func (e *memtableRetriever) setDataFromEngines() {
var rows [][]types.Datum
rows = append(rows,
Expand Down
40 changes: 37 additions & 3 deletions executor/infoschema_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package executor_test
import (
"crypto/tls"
"fmt"
"google.golang.org/grpc"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

format code

"net"
"net/http/httptest"
"strconv"
Expand Down Expand Up @@ -45,7 +46,6 @@ import (
"github.com/pingcap/tidb/util/testkit"
"github.com/pingcap/tidb/util/testleak"
"github.com/pingcap/tidb/util/testutil"
"google.golang.org/grpc"
)

var _ = Suite(&testInfoschemaTableSuite{})
Expand Down Expand Up @@ -165,7 +165,7 @@ func (s *testInfoschemaTableSuite) TestSchemataTables(c *C) {
tk.MustQuery("select * from information_schema.SCHEMATA where schema_name='mysql';").Check(
testkit.Rows("def mysql utf8mb4 utf8mb4_bin <nil>"))

//test the privilege of new user for information_schema.schemata
// Test the privilege of new user for information_schema.schemata.
tk.MustExec("create user schemata_tester")
schemataTester := testkit.NewTestKit(c, s.store)
schemataTester.MustExec("use information_schema")
Expand All @@ -179,7 +179,7 @@ func (s *testInfoschemaTableSuite) TestSchemataTables(c *C) {
schemataTester.MustQuery("select * from information_schema.SCHEMATA where schema_name='INFORMATION_SCHEMA';").Check(
testkit.Rows("def INFORMATION_SCHEMA utf8mb4 utf8mb4_bin <nil>"))

//test the privilege of user with privilege of mysql for information_schema.schemata
// Test the privilege of user with privilege of mysql for information_schema.schemata.
tk.MustExec("CREATE ROLE r_mysql_priv;")
tk.MustExec("GRANT ALL PRIVILEGES ON mysql.* TO r_mysql_priv;")
tk.MustExec("GRANT r_mysql_priv TO schemata_tester;")
Expand Down Expand Up @@ -235,6 +235,40 @@ func (s *testInfoschemaTableSuite) TestCharacterSetCollations(c *C) {
testkit.Rows("utf8mb4_bin utf8mb4"))
}

func (s *testInfoschemaTableSuite) TestDDLJobs(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("create database if not exists test_ddl_jobs")
tk.MustQuery("select db_name, job_type from information_schema.DDL_JOBS limit 1").Check(
testkit.Rows("test_ddl_jobs create schema"))

tk.MustExec("use test_ddl_jobs")
tk.MustExec("create table t (a int);")
tk.MustQuery("select db_name, table_name, job_type from information_schema.DDL_JOBS where table_name = 't'").Check(
testkit.Rows("test_ddl_jobs t create table"))

tk.MustQuery("select job_type from information_schema.DDL_JOBS group by job_type having job_type = 'create table'").Check(
testkit.Rows("create table"))

// Test the privilege of new user for information_schema.DDL_JOBS.
tk.MustExec("create user DDL_JOBS_tester")
DDLJobsTester := testkit.NewTestKit(c, s.store)
DDLJobsTester.MustExec("use information_schema")
c.Assert(DDLJobsTester.Se.Auth(&auth.UserIdentity{
Username: "DDL_JOBS_tester",
Hostname: "127.0.0.1",
}, nil, nil), IsTrue)

// Test the privilege of user for information_schema.ddl_jobs.
DDLJobsTester.MustQuery("select DB_NAME, TABLE_NAME from information_schema.DDL_JOBS where DB_NAME = 'test_ddl_jobs' and TABLE_NAME = 't';").Check(
[][]interface{}{})
tk.MustExec("CREATE ROLE r_priv;")
tk.MustExec("GRANT ALL PRIVILEGES ON test_ddl_jobs.* TO r_priv;")
tk.MustExec("GRANT r_priv TO DDL_JOBS_tester;")
DDLJobsTester.MustExec("set role r_priv")
DDLJobsTester.MustQuery("select DB_NAME, TABLE_NAME from information_schema.DDL_JOBS where DB_NAME = 'test_ddl_jobs' and TABLE_NAME = 't';").Check(
testkit.Rows("test_ddl_jobs t"))
}

func (s *testInfoschemaTableSuite) TestKeyColumnUsage(c *C) {
tk := testkit.NewTestKit(c, s.store)

Expand Down
Loading