Skip to content

Commit

Permalink
syncer(dm): track skipped ddls when using dmctl binlog to skip some d…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Feb 8, 2022
1 parent 6511ab0 commit 520e2b0
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 14 deletions.
85 changes: 85 additions & 0 deletions dm/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1726,7 +1726,28 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
// revert currentLocation to startLocation
currentLocation = startLocation
} else if op == pb.ErrorOp_Skip {
ec := eventContext{
tctx: tctx,
header: e.Header,
startLocation: &startLocation,
currentLocation: &currentLocation,
lastLocation: &lastLocation,
}
var sourceTbls map[string]map[string]struct{}
sourceTbls, err = s.trackOriginDDL(ev, ec)
if err != nil {
tctx.L().Warn("failed to track query when handle-error skip", zap.Error(err), zap.ByteString("sql", ev.Query))
}

s.saveGlobalPoint(currentLocation)
for sourceSchema, tableMap := range sourceTbls {
if sourceSchema == "" {
continue
}
for sourceTable := range tableMap {
s.saveTablePoint(&filter.Table{Schema: sourceSchema, Name: sourceTable}, currentLocation)
}
}
err = s.flushJobs()
if err != nil {
tctx.L().Warn("failed to flush jobs when handle-error skip", zap.Error(err))
Expand Down Expand Up @@ -2774,6 +2795,70 @@ func (s *Syncer) trackDDL(usedSchema string, trackInfo *ddlInfo, ec *eventContex
return nil
}

func (s *Syncer) trackOriginDDL(ev *replication.QueryEvent, ec eventContext) (map[string]map[string]struct{}, error) {
originSQL := strings.TrimSpace(string(ev.Query))
if originSQL == "BEGIN" || originSQL == "" || utils.IsBuildInSkipDDL(originSQL) {
return nil, nil
}
var err error
qec := &queryEventContext{
eventContext: &ec,
ddlSchema: string(ev.Schema),
originSQL: utils.TrimCtrlChars(originSQL),
splitDDLs: make([]string, 0),
appliedDDLs: make([]string, 0),
sourceTbls: make(map[string]map[string]struct{}),
}
qec.p, err = event.GetParserForStatusVars(ev.StatusVars)
if err != nil {
log.L().Warn("found error when get sql_mode from binlog status_vars", zap.Error(err))
}
stmt, err := parseOneStmt(qec)
if err != nil {
// originSQL can't be parsed => can't be tracked by schema tracker
// we can use operate-schema to set a compatible schema after this
return nil, err
}

if _, ok := stmt.(ast.DDLNode); !ok {
return nil, nil
}

// TiDB can't handle multi schema change DDL, so we split it here.
qec.splitDDLs, err = parserpkg.SplitDDL(stmt, qec.ddlSchema)
if err != nil {
return nil, err
}

affectedTbls := make(map[string]map[string]struct{})
for _, sql := range qec.splitDDLs {
ddlInfo, err := s.genDDLInfo(qec.p, qec.ddlSchema, sql)
if err != nil {
return nil, err
}
sourceTable := ddlInfo.sourceTables[0]
switch ddlInfo.originStmt.(type) {
case *ast.DropDatabaseStmt:
delete(affectedTbls, sourceTable.Schema)
case *ast.DropTableStmt:
if affectedTable, ok := affectedTbls[sourceTable.Schema]; ok {
delete(affectedTable, sourceTable.Name)
}
default:
if _, ok := affectedTbls[sourceTable.Schema]; !ok {
affectedTbls[sourceTable.Schema] = make(map[string]struct{})
}
affectedTbls[sourceTable.Schema][sourceTable.Name] = struct{}{}
}
err = s.trackDDL(qec.ddlSchema, ddlInfo, qec.eventContext)
if err != nil {
return nil, err
}
}

return affectedTbls, nil
}

func (s *Syncer) genRouter() error {
s.tableRouter, _ = router.NewTableRouter(s.cfg.CaseSensitive, []*router.TableRule{})
for _, rule := range s.cfg.RouteRules {
Expand Down
12 changes: 4 additions & 8 deletions dm/tests/shardddl1/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -148,15 +148,11 @@ function DM_RENAME_COLUMN_OPTIMISTIC_CASE() {
# dmls fail
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status test" \
"Paused" 2
#"Error 1054: Unknown column 'a' in 'field list'" 2 // may more than 2 dml error
"Paused" 1 \
"Unknown column 'a' in 'field list'" 1

# third, set schema to be same with upstream
# TODO: support set schema automatically base on upstream schema
echo 'CREATE TABLE `tb1` ( `c` int NOT NULL, `b` varchar(10) DEFAULT NULL, PRIMARY KEY (`c`)) ENGINE=InnoDB DEFAULT CHARSET=latin1' >${WORK_DIR}/schema1.sql
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"binlog-schema update -s mysql-replica-01 test ${shardddl1} ${tb1} ${WORK_DIR}/schema1.sql --flush --sync" \
"\"result\": true" 2
echo 'CREATE TABLE `tb1` ( `c` int NOT NULL, `b` varchar(10) DEFAULT NULL, PRIMARY KEY (`c`)) ENGINE=InnoDB DEFAULT CHARSET=latin1 COLLATE=latin1_bin' >${WORK_DIR}/schema1.sql
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"binlog-schema update -s mysql-replica-02 test ${shardddl1} ${tb1} ${WORK_DIR}/schema1.sql --flush --sync" \
"\"result\": true" 2
Expand All @@ -169,7 +165,7 @@ function DM_RENAME_COLUMN_OPTIMISTIC_CASE() {
# source2.table2's dml fails
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status test" \
"Error 1054: Unknown column 'a' in 'field list'" 1
"Unknown column 'a' in 'field list'" 1

# WARN: set schema of source2.table2
# Actually it should be tb2(a,b), dml is {a: 9, b: 'iii'}
Expand Down
6 changes: 0 additions & 6 deletions dm/tests/shardddl2/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -484,16 +484,10 @@ function DM_DropAddColumn_CASE() {
"\"result\": true" 2 \
"\"source 'mysql-replica-02' has no error\"" 1

# after we skip ADD COLUMN, we should fix the table structure
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"pause-task test" \
"\"result\": true" 3

echo 'CREATE TABLE `tb1` ( `a` int(11) NOT NULL, `b` int(11) DEFAULT NULL, `c` int(11) DEFAULT NULL, PRIMARY KEY (`a`) /*T![clustered_index] NONCLUSTERED */) ENGINE=InnoDB DEFAULT CHARSET=latin1 COLLATE=latin1_bin' >${WORK_DIR}/schema.sql
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"binlog-schema update test ${shardddl1} ${tb1} ${WORK_DIR}/schema.sql -s mysql-replica-01" \
"\"result\": true" 2

run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"resume-task test" \
"\"result\": true" 3
Expand Down

0 comments on commit 520e2b0

Please sign in to comment.