Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
syncer: check block-allow list before online DDL components (#1775)
Browse files Browse the repository at this point in the history
  • Loading branch information
lance6716 authored Jul 14, 2021
1 parent 5add2f7 commit 147f5db
Show file tree
Hide file tree
Showing 13 changed files with 176 additions and 98 deletions.
74 changes: 47 additions & 27 deletions syncer/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
package syncer

import (
"time"

"github.com/go-mysql-org/go-mysql/replication"
"github.com/pingcap/parser"
"github.com/pingcap/parser/ast"
Expand Down Expand Up @@ -113,26 +115,55 @@ func (s *Syncer) parseDDLSQL(sql string, p *parser.Parser, schema string) (resul
}
}

// resolveDDLSQL do two things
// * it splits multiple operations in one DDL statement into multiple DDL statements
// * try to apply online ddl by given online
// splitAndFilterDDL will split multi-schema change DDL into multiple one schema change DDL due to TiDB's limitation.
// the returned DDLs is split from `stmt`, or DDLs that online DDL component has saved before (which means the input
// `stmt` is a RENAME statement which often generated by online DDL tools).
// return @spilt sqls, @online ddl table names, @error.
func (s *Syncer) resolveDDLSQL(tctx *tcontext.Context, p *parser.Parser, stmt ast.StmtNode, schema string) (sqls []string, tables map[string]*filter.Table, err error) {
func (s *Syncer) splitAndFilterDDL(
ec eventContext,
p *parser.Parser,
stmt ast.StmtNode,
schema string,
) (sqls []string, tables map[string]*filter.Table, err error) {
sqls, err = parserpkg.SplitDDL(stmt, schema)
if err != nil {
return nil, nil, err
}
if s.onlineDDL == nil {
return sqls, nil, nil
}

statements := make([]string, 0, len(sqls))
tables = make(map[string]*filter.Table)
for _, sql := range sqls {
stmt2, err2 := p.ParseOneStmt(sql, "", "")
if err2 != nil {
return nil, nil, terror.Annotatef(terror.ErrSyncerUnitParseStmt.New(err2.Error()), "ddl %s", sql)
}

tableNames, err2 := parserpkg.FetchDDLTableNames(schema, stmt2)
if err2 != nil {
return nil, nil, err2
}

// get real tableNames before apply block-allow list
if s.onlineDDL != nil {
for _, names := range tableNames {
names.Name = s.onlineDDL.RealName(names.Name)
}
}

shouldSkip, err2 := s.skipQuery(tableNames, stmt2, sql)
if err2 != nil {
return nil, nil, err2
}
if shouldSkip {
skipBinlogDurationHistogram.WithLabelValues("query", s.cfg.Name, s.cfg.SourceID).Observe(time.Since(ec.startTime).Seconds())
ec.tctx.L().Warn("skip event", zap.String("event", "query"), zap.String("statement", sql), zap.String("schema", schema))
continue
}

// filter and store ghost table ddl, transform online ddl
ss, tableName, err := s.handleOnlineDDL(tctx, p, schema, sql)
if err != nil {
return statements, tables, err
ss, tableName, err2 := s.handleOnlineDDL(ec.tctx, p, schema, sql, stmt2)
if err2 != nil {
return nil, nil, err2
}

if tableName != nil {
Expand All @@ -144,7 +175,8 @@ func (s *Syncer) resolveDDLSQL(tctx *tcontext.Context, p *parser.Parser, stmt as
return statements, tables, nil
}

func (s *Syncer) handleDDL(p *parser.Parser, schema, sql string) (string, [][]*filter.Table, ast.StmtNode, error) {
// routeDDL will rename table names in DDL.
func (s *Syncer) routeDDL(p *parser.Parser, schema, sql string) (string, [][]*filter.Table, ast.StmtNode, error) {
stmt, err := p.ParseOneStmt(sql, "", "")
if err != nil {
return "", nil, nil, terror.Annotatef(terror.ErrSyncerUnitParseStmt.New(err.Error()), "ddl %s", sql)
Expand All @@ -155,14 +187,6 @@ func (s *Syncer) handleDDL(p *parser.Parser, schema, sql string) (string, [][]*f
return "", nil, nil, err
}

ignore, err := s.skipQuery(tableNames, stmt, sql)
if err != nil {
return "", nil, nil, err
}
if ignore {
return "", nil, stmt, nil
}

targetTableNames := make([]*filter.Table, 0, len(tableNames))
for i := range tableNames {
schema, table := s.renameShardingSchema(tableNames[i].Schema, tableNames[i].Name)
Expand All @@ -177,18 +201,14 @@ func (s *Syncer) handleDDL(p *parser.Parser, schema, sql string) (string, [][]*f
return ddl, [][]*filter.Table{tableNames, targetTableNames}, stmt, err
}

// handle online ddls
// if sql is online ddls, we would find it's ghost table, and ghost ddls, then replay its table name by real table name.
func (s *Syncer) handleOnlineDDL(tctx *tcontext.Context, p *parser.Parser, schema, sql string) ([]string, *filter.Table, error) {
// handleOnlineDDL checks if the input `sql` is came from online DDL tools.
// If so, it will save actual DDL or return the actual DDL depending on online DDL types of `sql`.
// If not, it returns original SQL and no table names.
func (s *Syncer) handleOnlineDDL(tctx *tcontext.Context, p *parser.Parser, schema, sql string, stmt ast.StmtNode) ([]string, *filter.Table, error) {
if s.onlineDDL == nil {
return []string{sql}, nil, nil
}

stmt, err := p.ParseOneStmt(sql, "", "")
if err != nil {
return nil, nil, terror.Annotatef(terror.ErrSyncerUnitParseStmt.New(err.Error()), "ddl %s", sql)
}

tableNames, err := parserpkg.FetchDDLTableNames(schema, stmt)
if err != nil {
return nil, nil, err
Expand Down
104 changes: 64 additions & 40 deletions syncer/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/pingcap/dm/dm/config"
tcontext "github.com/pingcap/dm/pkg/context"
"github.com/pingcap/dm/pkg/log"
parserpkg "github.com/pingcap/dm/pkg/parser"
"github.com/pingcap/dm/pkg/utils"

Expand All @@ -29,6 +30,7 @@ import (
"github.com/pingcap/parser/ast"
"github.com/pingcap/tidb-tools/pkg/filter"
router "github.com/pingcap/tidb-tools/pkg/table-router"
"go.uber.org/zap"
)

func (s *testSyncerSuite) TestAnsiQuotes(c *C) {
Expand Down Expand Up @@ -86,7 +88,11 @@ func (s *testSyncerSuite) TestCommentQuote(c *C) {
c.Assert(err, IsNil)

syncer := &Syncer{}
sqls, _, err := syncer.resolveDDLSQL(tcontext.Background(), parser, stmt, "schemadb")
tctx := tcontext.Background().WithLogger(log.With(zap.String("test", "TestCommentQuote")))
ec := eventContext{
tctx: tctx,
}
sqls, _, err := syncer.splitAndFilterDDL(ec, parser, stmt, "schemadb")
c.Assert(err, IsNil)
c.Assert(len(sqls), Equals, 1)
c.Assert(sqls[0], Equals, expectedSQL)
Expand All @@ -106,11 +112,12 @@ func (s *testSyncerSuite) TestResolveDDLSQL(c *C) {
"create table `t1` (id int)",
"create table `t1` like `t2`",
"create table `s1`.`t1` like `t2`",
"create table `s1`.`t1` like `s1`.`t2`",
"create table `t1` like `xx`.`t2`",
"truncate table `t1`",
"truncate table `s1`.`t1`",
"rename table `s1`.`t1` to `s2`.`t2`",
"rename table `t1` to `t2`, `s1`.`t1` to `t2`",
"rename table `t1` to `t2`, `s1`.`t1` to `s1`.`t2`",
"drop index i1 on `s1`.`t1`",
"drop index i1 on `t1`",
"create index i1 on `t1`(`c1`)",
Expand All @@ -126,24 +133,25 @@ func (s *testSyncerSuite) TestResolveDDLSQL(c *C) {
{"DROP DATABASE IF EXISTS `s1`"},
{"DROP DATABASE IF EXISTS `s1`"},
{"DROP TABLE IF EXISTS `s1`.`t1`"},
{"DROP TABLE IF EXISTS `s1`.`t1`", "DROP TABLE IF EXISTS `s2`.`t2`"},
{"DROP TABLE IF EXISTS `s1`.`t1`", "DROP TABLE IF EXISTS `s2`.`t2`", "DROP TABLE IF EXISTS `test`.`xx`"},
{"DROP TABLE IF EXISTS `s1`.`t1`"},
{"DROP TABLE IF EXISTS `s1`.`t1`"},
{"CREATE TABLE IF NOT EXISTS `s1`.`t1` (`id` INT)"},
{"CREATE TABLE IF NOT EXISTS `test`.`t1` (`id` INT)"},
{"CREATE TABLE IF NOT EXISTS `test`.`t1` LIKE `test`.`t2`"},
{"CREATE TABLE IF NOT EXISTS `s1`.`t1` LIKE `test`.`t2`"},
{"CREATE TABLE IF NOT EXISTS `test`.`t1` LIKE `xx`.`t2`"},
{"TRUNCATE TABLE `test`.`t1`"},
{},
{},
{},
{"CREATE TABLE IF NOT EXISTS `s1`.`t1` LIKE `s1`.`t2`"},
{},
{},
{"TRUNCATE TABLE `s1`.`t1`"},
{"RENAME TABLE `s1`.`t1` TO `s2`.`t2`"},
{"RENAME TABLE `test`.`t1` TO `test`.`t2`", "RENAME TABLE `s1`.`t1` TO `test`.`t2`"},
{},
{"RENAME TABLE `s1`.`t1` TO `s1`.`t2`"},
{"DROP INDEX IF EXISTS `i1` ON `s1`.`t1`"},
{"DROP INDEX IF EXISTS `i1` ON `test`.`t1`"},
{"CREATE INDEX `i1` ON `test`.`t1` (`c1`)"},
{},
{},
{"CREATE INDEX `i1` ON `s1`.`t1` (`c1`)"},
{"ALTER TABLE `test`.`t1` ADD COLUMN `c1` INT", "ALTER TABLE `test`.`t1` DROP COLUMN `c2`"},
{"ALTER TABLE `s1`.`t1` ADD COLUMN `c1` INT", "ALTER TABLE `s1`.`t1` RENAME AS `test`.`t2`", "ALTER TABLE `test`.`t2` DROP COLUMN `c2`"},
{"ALTER TABLE `s1`.`t1` ADD COLUMN `c1` INT", "ALTER TABLE `s1`.`t1` RENAME AS `xx`.`t2`", "ALTER TABLE `xx`.`t2` DROP COLUMN `c2`"},
{},
{"ALTER TABLE `s1`.`t1` ADD COLUMN `c1` INT"},
{"ALTER TABLE `s1`.`t1` ADD COLUMN `c1` INT"},
}

targetSQLs := [][]string{
Expand All @@ -152,24 +160,25 @@ func (s *testSyncerSuite) TestResolveDDLSQL(c *C) {
{"DROP DATABASE IF EXISTS `xs1`"},
{"DROP DATABASE IF EXISTS `xs1`"},
{"DROP TABLE IF EXISTS `xs1`.`t1`"},
{"DROP TABLE IF EXISTS `xs1`.`t1`", ""},
{"DROP TABLE IF EXISTS `xs1`.`t1`", "", ""},
{"DROP TABLE IF EXISTS `xs1`.`t1`"},
{"DROP TABLE IF EXISTS `xs1`.`t1`"},
{"CREATE TABLE IF NOT EXISTS `xs1`.`t1` (`id` INT)"},
{""},
{""},
{""},
{""},
{""},
{},
{},
{},
{"CREATE TABLE IF NOT EXISTS `xs1`.`t1` LIKE `xs1`.`t2`"},
{},
{},
{"TRUNCATE TABLE `xs1`.`t1`"},
{""},
{"", ""},
{},
{"RENAME TABLE `xs1`.`t1` TO `xs1`.`t2`"},
{"DROP INDEX IF EXISTS `i1` ON `xs1`.`t1`"},
{""},
{""},
{},
{},
{"CREATE INDEX `i1` ON `xs1`.`t1` (`c1`)"},
{"", ""},
{"ALTER TABLE `xs1`.`t1` ADD COLUMN `c1` INT", "", ""},
{"ALTER TABLE `xs1`.`t1` ADD COLUMN `c1` INT", "", ""},
{},
{"ALTER TABLE `xs1`.`t1` ADD COLUMN `c1` INT"},
{"ALTER TABLE `xs1`.`t1` ADD COLUMN `c1` INT"},
}

p := parser.New()
Expand All @@ -191,18 +200,24 @@ func (s *testSyncerSuite) TestResolveDDLSQL(c *C) {
})
c.Assert(err, IsNil)

tctx := tcontext.Background().WithLogger(log.With(zap.String("test", "TestResolveDDLSQL")))
ec := eventContext{
tctx: tctx,
}

for i, sql := range sqls {
result, err := syncer.parseDDLSQL(sql, p, "test")
c.Assert(err, IsNil)
c.Assert(result.ignore, IsFalse)
c.Assert(result.isDDL, IsTrue)

statements, _, err := syncer.resolveDDLSQL(tcontext.Background(), p, result.stmt, "test")
statements, _, err := syncer.splitAndFilterDDL(ec, p, result.stmt, "test")
c.Assert(err, IsNil)
c.Assert(statements, DeepEquals, expectedSQLs[i])
c.Assert(targetSQLs[i], HasLen, len(statements))

for j, statement := range statements {
s, _, _, err := syncer.handleDDL(p, "test", statement)
s, _, _, err := syncer.routeDDL(p, "test", statement)
c.Assert(err, IsNil)
c.Assert(s, Equals, targetSQLs[i][j])
}
Expand Down Expand Up @@ -358,11 +373,16 @@ func (s *testSyncerSuite) TestResolveGeneratedColumnSQL(c *C) {
parser, err := s.mockParser(db, mock)
c.Assert(err, IsNil)

tctx := tcontext.Background().WithLogger(log.With(zap.String("test", "TestResolveGeneratedColumnSQL")))
ec := eventContext{
tctx: tctx,
}

for _, tc := range testCases {
ast1, err := parser.ParseOneStmt(tc.sql, "", "")
c.Assert(err, IsNil)

sqls, _, err := syncer.resolveDDLSQL(tcontext.Background(), parser, ast1, "test")
sqls, _, err := syncer.splitAndFilterDDL(ec, parser, ast1, "test")
c.Assert(err, IsNil)

c.Assert(len(sqls), Equals, 1)
Expand All @@ -388,9 +408,13 @@ func (s *testSyncerSuite) TestResolveOnlineDDL(c *C) {
"_t1_new",
},
}
tctx := tcontext.Background()
tctx := tcontext.Background().WithLogger(log.With(zap.String("test", "TestResolveOnlineDDL")))
p := parser.New()

ec := eventContext{
tctx: tctx,
}

for _, ca := range cases {
fn := OnlineDDLSchemes[ca.onlineType]
plugin, err := fn(tctx, s.cfg)
Expand All @@ -403,7 +427,7 @@ func (s *testSyncerSuite) TestResolveOnlineDDL(c *C) {
sql := "ALTER TABLE `test`.`t1` ADD COLUMN `n` INT"
stmt, err := p.ParseOneStmt(sql, "", "")
c.Assert(err, IsNil)
sqls, tables, err := syncer.resolveDDLSQL(tctx, p, stmt, "test")
sqls, tables, err := syncer.splitAndFilterDDL(ec, p, stmt, "test")
c.Assert(err, IsNil)
c.Assert(sqls, HasLen, 1)
c.Assert(sqls[0], Equals, sql)
Expand All @@ -413,7 +437,7 @@ func (s *testSyncerSuite) TestResolveOnlineDDL(c *C) {
sql = fmt.Sprintf("CREATE TABLE IF NOT EXISTS `test`.`%s` (`n` INT)", ca.trashName)
stmt, err = p.ParseOneStmt(sql, "", "")
c.Assert(err, IsNil)
sqls, tables, err = syncer.resolveDDLSQL(tctx, p, stmt, "test")
sqls, tables, err = syncer.splitAndFilterDDL(ec, p, stmt, "test")
c.Assert(err, IsNil)
c.Assert(sqls, HasLen, 0)
c.Assert(tables, HasLen, 0)
Expand All @@ -423,14 +447,14 @@ func (s *testSyncerSuite) TestResolveOnlineDDL(c *C) {
newSQL := "ALTER TABLE `test`.`t1` ADD COLUMN `n` INT"
stmt, err = p.ParseOneStmt(sql, "", "")
c.Assert(err, IsNil)
sqls, tables, err = syncer.resolveDDLSQL(tctx, p, stmt, "test")
sqls, tables, err = syncer.splitAndFilterDDL(ec, p, stmt, "test")
c.Assert(err, IsNil)
c.Assert(sqls, HasLen, 0)
c.Assert(tables, HasLen, 0)
sql = fmt.Sprintf("RENAME TABLE `test`.`%s` TO `test`.`t1`", ca.ghostname)
stmt, err = p.ParseOneStmt(sql, "", "")
c.Assert(err, IsNil)
sqls, tables, err = syncer.resolveDDLSQL(tctx, p, stmt, "test")
sqls, tables, err = syncer.splitAndFilterDDL(ec, p, stmt, "test")
c.Assert(err, IsNil)
c.Assert(sqls, HasLen, 1)
c.Assert(sqls[0], Equals, newSQL)
Expand Down Expand Up @@ -483,8 +507,8 @@ func (m mockOnlinePlugin) TableType(table string) TableType {
return ""
}

func (m mockOnlinePlugin) RealName(schema, table string) (string, string) {
return "", ""
func (m mockOnlinePlugin) RealName(table string) string {
return ""
}

func (m mockOnlinePlugin) ResetConn(tctx *tcontext.Context) error {
Expand Down
2 changes: 1 addition & 1 deletion syncer/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func isDropColumnWithIndexError(err error) bool {

// handleSpecialDDLError handles special errors for DDL execution.
func (s *Syncer) handleSpecialDDLError(tctx *tcontext.Context, err error, ddls []string, index int, conn *DBConn) error {
// We use default parser because ddls are came from *Syncer.handleDDL, which is StringSingleQuotes, KeyWordUppercase and NameBackQuotes
// We use default parser because ddls are came from *Syncer.routeDDL, which is StringSingleQuotes, KeyWordUppercase and NameBackQuotes
parser2 := parser.New()

// it only ignore `invalid connection` error (timeout or other causes) for `ADD INDEX`.
Expand Down
5 changes: 5 additions & 0 deletions syncer/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ import (
"github.com/pingcap/dm/pkg/utils"
)

// skipQuery will return true when
// - given `sql` matches builtin pattern.
// - any schema of table names is system schema.
// - any table name doesn't pass block-allow list.
// - type of SQL doesn't pass binlog filter.
func (s *Syncer) skipQuery(tables []*filter.Table, stmt ast.StmtNode, sql string) (bool, error) {
if utils.IsBuildInSkipDDL(sql) {
return true, nil
Expand Down
Loading

0 comments on commit 147f5db

Please sign in to comment.