Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
Merge branch 'master' into xiang/relay-pos
Browse files Browse the repository at this point in the history
  • Loading branch information
WangXiangUSTC committed Feb 3, 2020
2 parents 7802a15 + 1433d4a commit 0023942
Show file tree
Hide file tree
Showing 9 changed files with 59 additions and 14 deletions.
1 change: 1 addition & 0 deletions _utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ ErrSyncerUnitReopenStreamNotSupport,[code=36057:class=sync-unit:scope=internal:l
ErrSyncerUnitUpdateConfigInSharding,[code=36058:class=sync-unit:scope=internal:level=high],"try update config when some tables' (%v) sharding DDL not synced not supported"
ErrSyncerUnitExecWithNoBlockingDDL,[code=36059:class=sync-unit:scope=internal:level=high],"process unit not waiting for sharding DDL to sync"
ErrSyncerUnitGenBWList,[code=36060:class=sync-unit:scope=internal:level=high],"generate black white list"
ErrSyncerUnitHandleDDLFailed,[code=36061:class=sync-unit:scope=internal:level=high],"fail to handle ddl job for %s"
ErrMasterSQLOpNilRequest,[code=38001:class=dm-master:scope=internal:level=medium],"nil request not valid"
ErrMasterSQLOpNotSupport,[code=38002:class=dm-master:scope=internal:level=medium],"op %s not supported"
ErrMasterSQLOpWithoutSharding,[code=38003:class=dm-master:scope=internal:level=medium],"operate request without --sharding specified not valid"
Expand Down
2 changes: 2 additions & 0 deletions pkg/terror/error_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,7 @@ const (
codeSyncerUnitUpdateConfigInSharding
codeSyncerUnitExecWithNoBlockingDDL
codeSyncerUnitGenBWList
codeSyncerUnitHandleDDLFailed
)

// DM-master error code
Expand Down Expand Up @@ -831,6 +832,7 @@ var (
ErrSyncerUnitUpdateConfigInSharding = New(codeSyncerUnitUpdateConfigInSharding, ClassSyncUnit, ScopeInternal, LevelHigh, "try update config when some tables' (%v) sharding DDL not synced not supported")
ErrSyncerUnitExecWithNoBlockingDDL = New(codeSyncerUnitExecWithNoBlockingDDL, ClassSyncUnit, ScopeInternal, LevelHigh, "process unit not waiting for sharding DDL to sync")
ErrSyncerUnitGenBWList = New(codeSyncerUnitGenBWList, ClassSyncUnit, ScopeInternal, LevelHigh, "generate black white list")
ErrSyncerUnitHandleDDLFailed = New(codeSyncerUnitHandleDDLFailed, ClassSyncUnit, ScopeInternal, LevelHigh, "fail to handle ddl job for %s")

// DM-master error
ErrMasterSQLOpNilRequest = New(codeMasterSQLOpNilRequest, ClassDMMaster, ScopeInternal, LevelMedium, "nil request not valid")
Expand Down
18 changes: 18 additions & 0 deletions syncer/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,9 @@ type CheckPoint interface {
// corresponding to to Meta.Pos
GlobalPoint() mysql.Position

// TablePoint returns all table's stream checkpoint
TablePoint() map[string]map[string]mysql.Position

// FlushedGlobalPoint returns the flushed global binlog stream's checkpoint
// corresponding to to Meta.Pos
FlushedGlobalPoint() mysql.Position
Expand Down Expand Up @@ -473,6 +476,21 @@ func (cp *RemoteCheckPoint) GlobalPoint() mysql.Position {
return cp.globalPoint.MySQLPos()
}

// TablePoint implements CheckPoint.TablePoint
func (cp *RemoteCheckPoint) TablePoint() map[string]map[string]mysql.Position {
cp.RLock()
defer cp.RUnlock()

tablePoint := make(map[string]map[string]mysql.Position)
for schema, tables := range cp.points {
tablePoint[schema] = make(map[string]mysql.Position)
for table, point := range tables {
tablePoint[schema][table] = point.MySQLPos()
}
}
return tablePoint
}

// FlushedGlobalPoint implements CheckPoint.FlushedGlobalPoint
func (cp *RemoteCheckPoint) FlushedGlobalPoint() mysql.Position {
return cp.globalPoint.FlushedMySQLPos()
Expand Down
9 changes: 9 additions & 0 deletions syncer/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package syncer

import (
"database/sql"
"strings"
"time"

"github.com/pingcap/dm/dm/config"
Expand Down Expand Up @@ -192,6 +193,14 @@ func (conn *DBConn) querySQL(tctx *tcontext.Context, query string, args ...inter
}

func (conn *DBConn) executeSQLWithIgnore(tctx *tcontext.Context, ignoreError func(error) bool, queries []string, args ...[]interface{}) (int, error) {
failpoint.Inject("ExecuteSQLWithIgnoreFailed", func(val failpoint.Value) {
queryPattern := val.(string)
if len(queries) == 1 && strings.Contains(queries[0], queryPattern) {
tctx.L().Warn("executeSQLWithIgnore failed", zap.String("failpoint", "ExecuteSQLWithIgnoreFailed"))
failpoint.Return(0, terror.ErrDBUnExpect.Generate("invalid connection"))
}
})

if len(queries) == 0 {
return 0, nil
}
Expand Down
9 changes: 9 additions & 0 deletions syncer/online_ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/pingcap/parser/ast"
"github.com/pingcap/tidb-tools/pkg/filter"
"go.uber.org/zap"
)

var (
Expand Down Expand Up @@ -176,6 +177,10 @@ func (s *OnlineDDLStorage) Get(ghostSchema, ghostTable string) *GhostDDLInfo {
return nil
}

if mSchema == nil || mSchema[ghostTable] == nil {
return nil
}

clone := new(GhostDDLInfo)
*clone = *mSchema[ghostTable]

Expand Down Expand Up @@ -204,6 +209,10 @@ func (s *OnlineDDLStorage) Save(tctx *tcontext.Context, ghostSchema, ghostTable,

// maybe we meed more checks for it

if len(info.DDLs) != 0 && info.DDLs[len(info.DDLs)-1] == ddl {
tctx.L().Warn("online ddl may be saved before, just ignore it", zap.String("ddl", ddl))
return nil
}
info.DDLs = append(info.DDLs, ddl)
ddlsBytes, err := json.Marshal(mSchema[ghostTable])
if err != nil {
Expand Down
22 changes: 18 additions & 4 deletions syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,8 @@ func (s *Syncer) initShardingGroups() error {
// IsFreshTask implements Unit.IsFreshTask
func (s *Syncer) IsFreshTask(ctx context.Context) (bool, error) {
globalPoint := s.checkpoint.GlobalPoint()
return binlog.ComparePosition(globalPoint, minCheckpoint) <= 0, nil
tablePoint := s.checkpoint.TablePoint()
return binlog.ComparePosition(globalPoint, minCheckpoint) <= 0 && len(tablePoint) == 0, nil
}

func (s *Syncer) reset() {
Expand Down Expand Up @@ -1054,9 +1055,10 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
s.tctx.L().Error("panic log", zap.Reflect("error message", err1), zap.Stack("statck"))
err = terror.ErrSyncerUnitPanic.Generate(err1)
}
// flush the jobs channels, but if error occurred, we should not flush the checkpoints
if err1 := s.flushJobs(); err1 != nil {
s.tctx.L().Error("fail to finish all jobs when binlog replication exits", log.ShortError(err1))

s.jobWg.Wait()
if err2 := s.flushCheckPoints(); err2 != nil {
s.tctx.L().Warn("fail to flush check points when exit task", zap.Error(err2))
}
}()

Expand Down Expand Up @@ -1675,11 +1677,19 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e
s.tctx.L().Info("replace ddls to preset ddls by sql operator in normal mode", zap.String("event", "query"), zap.Strings("preset ddls", appliedSQLs), zap.Strings("ddls", needHandleDDLs), zap.ByteString("raw statement", ev.Query), log.WrapStringerField("position", ec.currentPos))
needHandleDDLs = appliedSQLs // maybe nil
}

job := newDDLJob(nil, needHandleDDLs, *ec.lastPos, *ec.currentPos, nil, nil, *ec.traceID)
err = s.addJobFunc(job)
if err != nil {
return err
}

// when add ddl job, will execute ddl and then flush checkpoint.
// if execute ddl failed, the execErrorDetected will be true.
if s.execErrorDetected.Get() {
return terror.ErrSyncerUnitHandleDDLFailed.Generate(ev.Query)
}

s.tctx.L().Info("finish to handle ddls in normal mode", zap.String("event", "query"), zap.Strings("ddls", needHandleDDLs), zap.ByteString("raw statement", ev.Query), log.WrapStringerField("position", ec.currentPos))

for _, td := range needTrackDDLs {
Expand Down Expand Up @@ -1872,6 +1882,10 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e
return err
}

if s.execErrorDetected.Get() {
return terror.ErrSyncerUnitHandleDDLFailed.Generate(ev.Query)
}

if len(onlineDDLTableNames) > 0 {
err = s.clearOnlineDDL(ec.tctx, ddlInfo.tableNames[1][0].Schema, ddlInfo.tableNames[1][0].Name)
if err != nil {
Expand Down
8 changes: 0 additions & 8 deletions syncer/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1498,10 +1498,6 @@ func (s *testSyncerSuite) TestRun(c *C) {
update,
"REPLACE INTO `test_1`.`t_1` (`id`,`name`) VALUES (?,?);",
[]interface{}{int64(580981944116838401), "b"},
}, {
flush,
"",
nil,
},
}

Expand Down Expand Up @@ -1561,10 +1557,6 @@ func (s *testSyncerSuite) TestRun(c *C) {
del,
"DELETE FROM `test_1`.`t_2` WHERE `id` = ? LIMIT 1;",
[]interface{}{int32(3)},
}, {
flush,
"",
nil,
},
}

Expand Down
2 changes: 1 addition & 1 deletion tests/online_ddl/conf/diff_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ tables = ["t_target"]
schema = "online_ddl"
table = "t_target"
ignore-columns = ["id"]
is-online_ddl = true
is-sharding = true
index-field = "uid"

[[table-config.source-tables]]
Expand Down
2 changes: 1 addition & 1 deletion tests/online_ddl/conf/dm-task.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name: test
task-mode: all
is-sharding: true
meta-schema: "dm_meta"
remove-meta: true
remove-meta: false
disable-heartbeat: true
timezone: "Asia/Shanghai"
online-ddl-scheme: online-ddl-scheme-placeholder
Expand Down

0 comments on commit 0023942

Please sign in to comment.