Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
tracker: don't execute add FK in case tracker doesn't have referenced…
Browse files Browse the repository at this point in the history
… table (#1101)
  • Loading branch information
ti-srebot authored Sep 25, 2020
1 parent 14f2bd1 commit 56c1daa
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 3 deletions.
18 changes: 15 additions & 3 deletions syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2082,12 +2082,13 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e
return nil
}

// input `sql` should be a single DDL, which came from parserpkg.SplitDDL
func (s *Syncer) trackDDL(usedSchema string, sql string, tableNames [][]*filter.Table, stmt ast.StmtNode, ec *eventContext) error {
srcTable := tableNames[0][0]

// Make sure the tables are all loaded into the schema tracker.
var shouldExecDDLOnSchemaTracker, shouldSchemaExist, shouldTableExist bool
switch stmt.(type) {
switch node := stmt.(type) {
case *ast.CreateDatabaseStmt:
shouldExecDDLOnSchemaTracker = true
case *ast.AlterDatabaseStmt:
Expand All @@ -2108,11 +2109,22 @@ func (s *Syncer) trackDDL(usedSchema string, sql string, tableNames [][]*filter.
if err := s.checkpoint.DeleteTablePoint(ec.tctx, srcTable.Schema, srcTable.Name); err != nil {
return err
}
case *ast.RenameTableStmt, *ast.CreateIndexStmt, *ast.DropIndexStmt, *ast.RepairTableStmt, *ast.AlterTableStmt:
// TODO: RENAME TABLE / ALTER TABLE RENAME should require special treatment.
case *ast.RenameTableStmt, *ast.CreateIndexStmt, *ast.DropIndexStmt, *ast.RepairTableStmt:
// TODO: RENAME TABLE should require special treatment.
shouldExecDDLOnSchemaTracker = true
shouldSchemaExist = true
shouldTableExist = true
case *ast.AlterTableStmt:
// TODO: ALTER TABLE RENAME should require special treatment.
// for DDL that adds FK, since TiDB doesn't fully support it yet, and the referenced table may not be stored in
// tracker, we simply ignore execution of this DDL.
if len(node.Specs) == 1 && node.Specs[0].Constraint != nil && node.Specs[0].Constraint.Tp == ast.ConstraintForeignKey {
shouldExecDDLOnSchemaTracker = false
} else {
shouldExecDDLOnSchemaTracker = true
}
shouldSchemaExist = true
shouldTableExist = true
case *ast.LockTablesStmt, *ast.UnlockTablesStmt, *ast.CleanupTableLockStmt, *ast.TruncateTableStmt:
break
default:
Expand Down
74 changes: 74 additions & 0 deletions syncer/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1498,6 +1498,80 @@ func (s *testSyncerSuite) TestRemoveMetadataIsFine(c *C) {
c.Assert(fresh, IsFalse)
}

func (s *testSyncerSuite) TestTrackDDL(c *C) {
var (
testDB = "test_db"
testTbl = "test_tbl"
filter = [][]*filter.Table{{{testDB, testTbl}}, {{testDB, testTbl}}}
ec = &eventContext{tctx: tcontext.Background()}
)
db, mock, err := sqlmock.New()
c.Assert(err, IsNil)
dbConn, err := db.Conn(context.Background())
c.Assert(err, IsNil)

checkPointDB, checkPointMock, err := sqlmock.New()
checkPointDBConn, err := checkPointDB.Conn(context.Background())
c.Assert(err, IsNil)

syncer := NewSyncer(s.cfg, nil)
syncer.toDBConns = []*DBConn{{cfg: s.cfg, baseConn: conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{})},
{cfg: s.cfg, baseConn: conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{})}}
syncer.ddlDBConn = &DBConn{cfg: s.cfg, baseConn: conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{})}
syncer.checkpoint.(*RemoteCheckPoint).dbConn = &DBConn{cfg: s.cfg, baseConn: conn.NewBaseConn(checkPointDBConn, &retry.FiniteRetryStrategy{})}
syncer.schemaTracker, err = schema.NewTracker(defaultTestSessionCfg, syncer.ddlDBConn.baseConn)
c.Assert(err, IsNil)

cases := []struct {
sql string
callback func()
}{
{"CREATE DATABASE IF NOT EXISTS " + testDB, func() {}},
{"ALTER DATABASE " + testDB + " DEFAULT COLLATE utf8_bin", func() {}},
{"DROP DATABASE IF EXISTS " + testDB, func() {}},
{fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s.%s (c int)", testDB, testTbl), func() {}},
{fmt.Sprintf("DROP TABLE IF EXISTS %s.%s", testDB, testTbl), func() {}},
{"CREATE INDEX idx1 ON " + testTbl + " (c)", func() {
mock.ExpectQuery("SHOW VARIABLES LIKE 'sql_mode'").WillReturnRows(
sqlmock.NewRows([]string{"Variable_name", "Value"}).AddRow("sql_mode", ""))
mock.ExpectQuery("SHOW CREATE TABLE.*").WillReturnRows(
sqlmock.NewRows([]string{"Table", "Create Table"}).
AddRow(testTbl, " CREATE TABLE `"+testTbl+"` (\n `c` int(11) DEFAULT NULL\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin"))
}},
{fmt.Sprintf("ALTER TABLE %s.%s add c2 int", testDB, testTbl), func() {
mock.ExpectQuery("SHOW VARIABLES LIKE 'sql_mode'").WillReturnRows(
sqlmock.NewRows([]string{"Variable_name", "Value"}).AddRow("sql_mode", ""))
mock.ExpectQuery("SHOW CREATE TABLE.*").WillReturnRows(
sqlmock.NewRows([]string{"Table", "Create Table"}).
AddRow(testTbl, " CREATE TABLE `"+testTbl+"` (\n `c` int(11) DEFAULT NULL\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin"))
}},
// alter add FK will not executed on tracker (otherwise will report error tb2 not exist)
{fmt.Sprintf("ALTER TABLE %s.%s add constraint foreign key (c) references tb2(c)", testDB, testTbl), func() {
mock.ExpectQuery("SHOW VARIABLES LIKE 'sql_mode'").WillReturnRows(
sqlmock.NewRows([]string{"Variable_name", "Value"}).AddRow("sql_mode", ""))
mock.ExpectQuery("SHOW CREATE TABLE.*").WillReturnRows(
sqlmock.NewRows([]string{"Table", "Create Table"}).
AddRow(testTbl, " CREATE TABLE `"+testTbl+"` (\n `c` int(11) DEFAULT NULL\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin"))
}},
{"TRUNCATE TABLE " + testTbl, func() {}},
}

p := parser.New()
for _, ca := range cases {
stmts, warns, err := p.Parse(ca.sql, "", "")
c.Assert(err, IsNil)
c.Assert(warns, HasLen, 0)
c.Assert(stmts, HasLen, 1)

ca.callback()

c.Assert(syncer.trackDDL(testDB, ca.sql, filter, stmts[0], ec), IsNil)
c.Assert(syncer.schemaTracker.Reset(), IsNil)
c.Assert(mock.ExpectationsWereMet(), IsNil)
c.Assert(checkPointMock.ExpectationsWereMet(), IsNil)
}
}

func executeSQLAndWait(expectJobNum int) {
for i := 0; i < 10; i++ {
time.Sleep(time.Second)
Expand Down

0 comments on commit 56c1daa

Please sign in to comment.