diff --git a/checker/checker.go b/checker/checker.go index 9f6d656e93..e34acb0d17 100644 --- a/checker/checker.go +++ b/checker/checker.go @@ -97,7 +97,7 @@ func NewChecker(cfgs []*config.SubTaskConfig, checkingItems map[string]string) * } // Init implements Unit interface -func (c *Checker) Init() (err error) { +func (c *Checker) Init(ctx context.Context) (err error) { rollbackHolder := fr.NewRollbackHolder("checker") defer func() { if err != nil { diff --git a/checker/cmd.go b/checker/cmd.go index cf830f1870..3c4ae0e1d1 100644 --- a/checker/cmd.go +++ b/checker/cmd.go @@ -58,7 +58,7 @@ func CheckSyncConfig(ctx context.Context, cfgs []*config.SubTaskConfig) error { c := NewChecker(cfgs, checkingItems) - err := c.Init() + err := c.Init(ctx) if err != nil { return terror.Annotate(err, "fail to initial checker") } diff --git a/dm/unit/unit.go b/dm/unit/unit.go index 563d466b42..00075bc62f 100644 --- a/dm/unit/unit.go +++ b/dm/unit/unit.go @@ -15,6 +15,7 @@ package unit import ( "context" + "time" "github.com/pingcap/errors" @@ -23,6 +24,11 @@ import ( "github.com/pingcap/dm/pkg/terror" ) +const ( + // DefaultInitTimeout represents the default timeout value when initializing a process unit. + DefaultInitTimeout = time.Minute +) + // Unit defines interface for sub task process units, like syncer, loader, relay, etc. type Unit interface { // Init initializes the dm process unit @@ -30,7 +36,7 @@ type Unit interface { // other setups can be done in `Process`, but this should be treated carefully, let it's compatible with Pause / Resume // if initialing successfully, the outer caller should call `Close` when the unit (or the task) finished, stopped or canceled (because other units Init fail). // if initialing fail, Init itself should release resources it acquired before (rolling itself back). - Init() error + Init(ctx context.Context) error // Process processes sub task // When ctx.Done, stops the process and returns // When not in processing, call Process to continue or resume the process @@ -52,7 +58,7 @@ type Unit interface { Type() pb.UnitType // IsFreshTask return whether is a fresh task (not processed before) // it will be used to decide where the task should become restoring - IsFreshTask() (bool, error) + IsFreshTask(ctx context.Context) (bool, error) } // NewProcessError creates a new ProcessError diff --git a/dm/worker/relay.go b/dm/worker/relay.go index 91ac0cae4c..36384ac518 100644 --- a/dm/worker/relay.go +++ b/dm/worker/relay.go @@ -22,6 +22,7 @@ import ( "go.uber.org/zap" "github.com/pingcap/dm/dm/pb" + "github.com/pingcap/dm/dm/unit" "github.com/pingcap/dm/pkg/log" "github.com/pingcap/dm/pkg/streamer" "github.com/pingcap/dm/pkg/terror" @@ -125,7 +126,9 @@ func (h *realRelayHolder) Init(interceptors []purger.PurgeInterceptor) (purger.P streamer.GetReaderHub(), } - if err := h.relay.Init(); err != nil { + ctx, cancel := context.WithTimeout(context.Background(), unit.DefaultInitTimeout) + defer cancel() + if err := h.relay.Init(ctx); err != nil { return nil, terror.Annotate(err, "initial relay unit") } diff --git a/dm/worker/relay_test.go b/dm/worker/relay_test.go index ad225e213e..cb523dbce5 100644 --- a/dm/worker/relay_test.go +++ b/dm/worker/relay_test.go @@ -50,7 +50,7 @@ func NewDummyRelay(cfg *relay.Config) relay.Process { } // Init implements Process interface -func (d *DummyRelay) Init() error { +func (d *DummyRelay) Init(ctx context.Context) error { return d.initErr } diff --git a/dm/worker/subtask.go b/dm/worker/subtask.go index 6b58e36297..32a8e774dd 100644 --- a/dm/worker/subtask.go +++ b/dm/worker/subtask.go @@ -125,7 +125,9 @@ func (st *SubTask) Init() error { // other setups can be done in `Process`, like Loader's prepare which depends on Mydumper's output // but setups in `Process` should be treated carefully, let it's compatible with Pause / Resume for i, u := range st.units { - err := u.Init() + ctx, cancel := context.WithTimeout(context.Background(), unit.DefaultInitTimeout) + err := u.Init(ctx) + cancel() if err != nil { initializeUnitSuccess = false // when init fail, other units initialized before should be closed @@ -140,7 +142,9 @@ func (st *SubTask) Init() error { var skipIdx = 0 for i := len(st.units) - 1; i > 0; i-- { u := st.units[i] - isFresh, err := u.IsFreshTask() + ctx, cancel := context.WithTimeout(context.Background(), unit.DefaultInitTimeout) + isFresh, err := u.IsFreshTask(ctx) + cancel() if err != nil { initializeUnitSuccess = false return terror.Annotatef(err, "fail to get fresh status of subtask %s %s", st.cfg.Name, u.Type()) diff --git a/dm/worker/subtask_test.go b/dm/worker/subtask_test.go index bdc5e882ca..fa06a9d5cc 100644 --- a/dm/worker/subtask_test.go +++ b/dm/worker/subtask_test.go @@ -83,7 +83,7 @@ func NewMockUnit(typ pb.UnitType) *MockUnit { } } -func (m *MockUnit) Init() error { +func (m *MockUnit) Init(ctx context.Context) error { return m.errInit } @@ -121,7 +121,7 @@ func (m *MockUnit) Error() interface{} { return nil } func (m *MockUnit) Type() pb.UnitType { return m.typ } -func (m *MockUnit) IsFreshTask() (bool, error) { return m.isFresh, m.errFresh } +func (m *MockUnit) IsFreshTask(ctx context.Context) (bool, error) { return m.isFresh, m.errFresh } func (m *MockUnit) InjectProcessError(ctx context.Context, err error) error { newCtx, cancel := context.WithTimeout(ctx, time.Second) diff --git a/loader/checkpoint.go b/loader/checkpoint.go index dc5a0371d1..d34698c8d2 100644 --- a/loader/checkpoint.go +++ b/loader/checkpoint.go @@ -33,7 +33,7 @@ type CheckPoint interface { // Load loads all checkpoints recorded before. // because of no checkpoints updated in memory when error occurred // when resuming, Load will be called again to load checkpoints - Load() error + Load(tctx *tcontext.Context) error // GetRestoringFileInfo get restoring data files for table GetRestoringFileInfo(db, table string) map[string][]int64 @@ -48,19 +48,19 @@ type CheckPoint interface { CalcProgress(allFiles map[string]Tables2DataFiles) error // Init initialize checkpoint data in tidb - Init(filename string, endpos int64) error + Init(tctx *tcontext.Context, filename string, endpos int64) error // ResetConn resets database connections owned by the Checkpoint - ResetConn() error + ResetConn(tctx *tcontext.Context) error // Close closes the CheckPoint Close() // Clear clears all recorded checkpoints - Clear() error + Clear(tctx *tcontext.Context) error // Count returns recorded checkpoints' count - Count() (int, error) + Count(tctx *tcontext.Context) (int, error) // GenSQL generates sql to update checkpoint to DB GenSQL(filename string, offset int64) string @@ -76,7 +76,7 @@ type RemoteCheckPoint struct { table string restoringFiles map[string]map[string]FilePosSet finishedTables map[string]struct{} - tctx *tcontext.Context + logCtx *tcontext.Context } func newRemoteCheckPoint(tctx *tcontext.Context, cfg *config.SubTaskConfig, id string) (CheckPoint, error) { @@ -85,8 +85,6 @@ func newRemoteCheckPoint(tctx *tcontext.Context, cfg *config.SubTaskConfig, id s return nil, err } - newtctx := tctx.WithLogger(tctx.L().WithFields(zap.String("component", "remote checkpoint"))) - cp := &RemoteCheckPoint{ db: db, conn: dbConns[0], @@ -95,10 +93,10 @@ func newRemoteCheckPoint(tctx *tcontext.Context, cfg *config.SubTaskConfig, id s finishedTables: make(map[string]struct{}), schema: cfg.MetaSchema, table: fmt.Sprintf("%s_loader_checkpoint", cfg.Name), - tctx: newtctx, + logCtx: tcontext.Background().WithLogger(tctx.L().WithFields(zap.String("component", "remote checkpoint"))), } - err = cp.prepare() + err = cp.prepare(tctx) if err != nil { return nil, err } @@ -106,25 +104,25 @@ func newRemoteCheckPoint(tctx *tcontext.Context, cfg *config.SubTaskConfig, id s return cp, nil } -func (cp *RemoteCheckPoint) prepare() error { +func (cp *RemoteCheckPoint) prepare(tctx *tcontext.Context) error { // create schema - if err := cp.createSchema(); err != nil { + if err := cp.createSchema(tctx); err != nil { return err } // create table - if err := cp.createTable(); err != nil { + if err := cp.createTable(tctx); err != nil { return err } return nil } -func (cp *RemoteCheckPoint) createSchema() error { +func (cp *RemoteCheckPoint) createSchema(tctx *tcontext.Context) error { sql2 := fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS `%s`", cp.schema) - err := cp.conn.executeSQL(cp.tctx, []string{sql2}) + err := cp.conn.executeSQL(tctx, []string{sql2}) return terror.WithScope(err, terror.ScopeDownstream) } -func (cp *RemoteCheckPoint) createTable() error { +func (cp *RemoteCheckPoint) createTable(tctx *tcontext.Context) error { tableName := fmt.Sprintf("`%s`.`%s`", cp.schema, cp.table) createTable := `CREATE TABLE IF NOT EXISTS %s ( id char(32) NOT NULL, @@ -139,19 +137,19 @@ func (cp *RemoteCheckPoint) createTable() error { ); ` sql2 := fmt.Sprintf(createTable, tableName) - err := cp.conn.executeSQL(cp.tctx, []string{sql2}) + err := cp.conn.executeSQL(tctx, []string{sql2}) return terror.WithScope(err, terror.ScopeDownstream) } // Load implements CheckPoint.Load -func (cp *RemoteCheckPoint) Load() error { +func (cp *RemoteCheckPoint) Load(tctx *tcontext.Context) error { begin := time.Now() defer func() { - cp.tctx.L().Info("load checkpoint", zap.Duration("cost time", time.Since(begin))) + cp.logCtx.L().Info("load checkpoint", zap.Duration("cost time", time.Since(begin))) }() query := fmt.Sprintf("SELECT `filename`,`cp_schema`,`cp_table`,`offset`,`end_pos` from `%s`.`%s` where `id`=?", cp.schema, cp.table) - rows, err := cp.conn.querySQL(cp.tctx, query, cp.id) + rows, err := cp.conn.querySQL(tctx, query, cp.id) if err != nil { return terror.WithScope(err, terror.ScopeDownstream) } @@ -248,14 +246,14 @@ func (cp *RemoteCheckPoint) CalcProgress(allFiles map[string]Tables2DataFiles) e } } - cp.tctx.L().Info("calculate checkpoint finished.", zap.Reflect("finished tables", cp.finishedTables)) + cp.logCtx.L().Info("calculate checkpoint finished.", zap.Reflect("finished tables", cp.finishedTables)) return nil } func (cp *RemoteCheckPoint) allFilesFinished(files map[string][]int64) bool { for file, pos := range files { if len(pos) != 2 { - cp.tctx.L().Error("unexpected checkpoint record", zap.String("data file", file), zap.Int64s("position", pos)) + cp.logCtx.L().Error("unexpected checkpoint record", zap.String("data file", file), zap.Int64s("position", pos)) return false } if pos[0] != pos[1] { @@ -266,7 +264,7 @@ func (cp *RemoteCheckPoint) allFilesFinished(files map[string][]int64) bool { } // Init implements CheckPoint.Init -func (cp *RemoteCheckPoint) Init(filename string, endPos int64) error { +func (cp *RemoteCheckPoint) Init(tctx *tcontext.Context, filename string, endPos int64) error { idx := strings.Index(filename, ".sql") if idx < 0 { return terror.ErrCheckpointInvalidTableFile.Generate(filename) @@ -279,7 +277,7 @@ func (cp *RemoteCheckPoint) Init(filename string, endPos int64) error { // fields[0] -> db name, fields[1] -> table name sql2 := fmt.Sprintf("INSERT INTO `%s`.`%s` (`id`, `filename`, `cp_schema`, `cp_table`, `offset`, `end_pos`) VALUES(?,?,?,?,?,?)", cp.schema, cp.table) - cp.tctx.L().Debug("initial checkpoint record", + cp.logCtx.L().Debug("initial checkpoint record", zap.String("sql", sql2), zap.String("id", cp.id), zap.String("filename", filename), @@ -288,10 +286,10 @@ func (cp *RemoteCheckPoint) Init(filename string, endPos int64) error { zap.Int64("offset", 0), zap.Int64("end position", endPos)) args := []interface{}{cp.id, filename, fields[0], fields[1], 0, endPos} - err := cp.conn.executeSQL(cp.tctx, []string{sql2}, args) + err := cp.conn.executeSQL(tctx, []string{sql2}, args) if err != nil { if isErrDupEntry(err) { - cp.tctx.L().Info("checkpoint record already exists, skip it.", zap.String("id", cp.id), zap.String("filename", filename)) + cp.logCtx.L().Info("checkpoint record already exists, skip it.", zap.String("id", cp.id), zap.String("filename", filename)) return nil } return terror.WithScope(terror.Annotate(err, "initialize checkpoint"), terror.ScopeDownstream) @@ -300,15 +298,15 @@ func (cp *RemoteCheckPoint) Init(filename string, endPos int64) error { } // ResetConn implements CheckPoint.ResetConn -func (cp *RemoteCheckPoint) ResetConn() error { - return cp.conn.resetConn(cp.tctx) +func (cp *RemoteCheckPoint) ResetConn(tctx *tcontext.Context) error { + return cp.conn.resetConn(tctx) } // Close implements CheckPoint.Close func (cp *RemoteCheckPoint) Close() { err := cp.db.Close() if err != nil { - cp.tctx.L().Error("close checkpoint db", log.ShortError(err)) + cp.logCtx.L().Error("close checkpoint db", log.ShortError(err)) } } @@ -320,16 +318,16 @@ func (cp *RemoteCheckPoint) GenSQL(filename string, offset int64) string { } // Clear implements CheckPoint.Clear -func (cp *RemoteCheckPoint) Clear() error { +func (cp *RemoteCheckPoint) Clear(tctx *tcontext.Context) error { sql2 := fmt.Sprintf("DELETE FROM `%s`.`%s` WHERE `id` = '%s'", cp.schema, cp.table, cp.id) - err := cp.conn.executeSQL(cp.tctx, []string{sql2}) + err := cp.conn.executeSQL(tctx, []string{sql2}) return terror.WithScope(err, terror.ScopeDownstream) } // Count implements CheckPoint.Count -func (cp *RemoteCheckPoint) Count() (int, error) { +func (cp *RemoteCheckPoint) Count(tctx *tcontext.Context) (int, error) { query := fmt.Sprintf("SELECT COUNT(id) FROM `%s`.`%s` WHERE `id` = ?", cp.schema, cp.table) - rows, err := cp.conn.querySQL(cp.tctx, query, cp.id) + rows, err := cp.conn.querySQL(tctx, query, cp.id) if err != nil { return 0, terror.WithScope(err, terror.ScopeDownstream) } @@ -344,12 +342,17 @@ func (cp *RemoteCheckPoint) Count() (int, error) { if rows.Err() != nil { return 0, terror.WithScope(terror.DBErrorAdapt(rows.Err(), terror.ErrDBDriverError), terror.ScopeDownstream) } - cp.tctx.L().Debug("checkpoint record", zap.Int("count", count)) + cp.logCtx.L().Debug("checkpoint record", zap.Int("count", count)) return count, nil } func (cp *RemoteCheckPoint) String() string { - if err := cp.Load(); err != nil { + // `String` is often used to log something, it's not a big problem even fail, + // so 1min should be enough. + tctx2, cancel := cp.logCtx.WithTimeout(time.Minute) + defer cancel() + + if err := cp.Load(tctx2); err != nil { return err.Error() } diff --git a/loader/checkpoint_test.go b/loader/checkpoint_test.go index 7eeb93c7c3..46bd040228 100644 --- a/loader/checkpoint_test.go +++ b/loader/checkpoint_test.go @@ -76,26 +76,26 @@ func (t *testCheckPointSuite) TestForDB(c *C) { c.Assert(err, IsNil) defer cp.Close() - cp.Clear() + cp.Clear(tctx) // no checkpoint exist - err = cp.Load() + err = cp.Load(tctx) c.Assert(err, IsNil) infos := cp.GetAllRestoringFileInfo() c.Assert(len(infos), Equals, 0) - count, err := cp.Count() + count, err := cp.Count(tctx) c.Assert(err, IsNil) c.Assert(count, Equals, 0) // insert default checkpoints for _, cs := range cases { - err = cp.Init(cs.filename, cs.endPos) + err = cp.Init(tctx, cs.filename, cs.endPos) c.Assert(err, IsNil) } - err = cp.Load() + err = cp.Load(tctx) c.Assert(err, IsNil) infos = cp.GetAllRestoringFileInfo() @@ -108,7 +108,7 @@ func (t *testCheckPointSuite) TestForDB(c *C) { c.Assert(info[1], Equals, cs.endPos) } - count, err = cp.Count() + count, err = cp.Count(tctx) c.Assert(err, IsNil) c.Assert(count, Equals, len(cases)) @@ -126,7 +126,7 @@ func (t *testCheckPointSuite) TestForDB(c *C) { c.Assert(err, IsNil) } - err = cp.Load() + err = cp.Load(tctx) c.Assert(err, IsNil) infos = cp.GetAllRestoringFileInfo() @@ -139,22 +139,22 @@ func (t *testCheckPointSuite) TestForDB(c *C) { c.Assert(info[1], Equals, cs.endPos) } - count, err = cp.Count() + count, err = cp.Count(tctx) c.Assert(err, IsNil) c.Assert(count, Equals, len(cases)) // clear all - cp.Clear() + cp.Clear(tctx) // no checkpoint exist - err = cp.Load() + err = cp.Load(tctx) c.Assert(err, IsNil) infos = cp.GetAllRestoringFileInfo() c.Assert(len(infos), Equals, 0) // obtain count again - count, err = cp.Count() + count, err = cp.Count(tctx) c.Assert(err, IsNil) c.Assert(count, Equals, 0) } diff --git a/loader/loader.go b/loader/loader.go index a2167f4fcb..9d42150202 100644 --- a/loader/loader.go +++ b/loader/loader.go @@ -44,7 +44,7 @@ import ( "go.uber.org/zap" ) -var ( +const ( jobCount = 1000 ) @@ -89,7 +89,7 @@ type Worker struct { // NewWorker returns a Worker. func NewWorker(loader *Loader, id int) (worker *Worker, err error) { - ctctx := loader.tctx.WithLogger(loader.tctx.L().WithFields(zap.Int("worker ID", id))) + ctctx := loader.logCtx.WithLogger(loader.logCtx.L().WithFields(zap.Int("worker ID", id))) return &Worker{ id: id, @@ -254,7 +254,7 @@ func (w *Worker) dispatchSQL(ctx context.Context, file string, offset int64, tab } baseFile := filepath.Base(file) - err = w.checkPoint.Init(baseFile, finfo.Size()) + err = w.checkPoint.Init(w.tctx.WithContext(ctx), baseFile, finfo.Size()) if err != nil { w.tctx.L().Error("fail to initial checkpoint", zap.String("data file", file), log.ShortError(err)) return err @@ -349,7 +349,7 @@ type Loader struct { cfg *config.SubTaskConfig checkPoint CheckPoint - tctx *tcontext.Context + logCtx *tcontext.Context // db -> tables // table -> data files @@ -387,7 +387,7 @@ func NewLoader(cfg *config.SubTaskConfig) *Loader { db2Tables: make(map[string]Tables2DataFiles), tableInfos: make(map[string]*tableInfo), workerWg: new(sync.WaitGroup), - tctx: tcontext.Background().WithLogger(log.With(zap.String("task", cfg.Name), zap.String("unit", "load"))), + logCtx: tcontext.Background().WithLogger(log.With(zap.String("task", cfg.Name), zap.String("unit", "load"))), } loader.fileJobQueueClosed.Set(true) // not open yet return loader @@ -400,7 +400,7 @@ func (l *Loader) Type() pb.UnitType { // Init initializes loader for a load task, but not start Process. // if fail, it should not call l.Close. -func (l *Loader) Init() (err error) { +func (l *Loader) Init(ctx context.Context) (err error) { rollbackHolder := fr.NewRollbackHolder("loader") defer func() { if err != nil { @@ -408,7 +408,9 @@ func (l *Loader) Init() (err error) { } }() - checkpoint, err := newRemoteCheckPoint(l.tctx, l.cfg, l.checkpointID()) + tctx := l.logCtx.WithContext(ctx) + + checkpoint, err := newRemoteCheckPoint(tctx, l.cfg, l.checkpointID()) if err != nil { return err } @@ -421,11 +423,11 @@ func (l *Loader) Init() (err error) { } if l.cfg.RemoveMeta { - err2 := l.checkPoint.Clear() + err2 := l.checkPoint.Clear(tctx) if err2 != nil { return err2 } - l.tctx.L().Info("all previous checkpoints cleared") + l.logCtx.L().Info("all previous checkpoints cleared") } err = l.genRouter(l.cfg.RouteRules) @@ -444,7 +446,7 @@ func (l *Loader) Init() (err error) { dbCfg.RawDBCfg = config.DefaultRawDBConfig(). SetMaxIdleConns(l.cfg.PoolSize) - l.toDB, l.toDBConns, err = createConns(l.tctx, l.cfg, l.cfg.PoolSize) + l.toDB, l.toDBConns, err = createConns(tctx, l.cfg, l.cfg.PoolSize) if err != nil { return err } @@ -485,7 +487,7 @@ func (l *Loader) Process(ctx context.Context, pr chan pb.ProcessResult) { close(l.runFatalChan) // Restore returned, all potential fatal sent to l.runFatalChan failpoint.Inject("dontWaitWorkerExit", func(_ failpoint.Value) { - l.tctx.L().Info("", zap.String("failpoint", "dontWaitWorkerExit")) + l.logCtx.L().Info("", zap.String("failpoint", "dontWaitWorkerExit")) l.workerWg.Wait() }) @@ -542,8 +544,8 @@ func (l *Loader) isClosed() bool { } // IsFreshTask implements Unit.IsFreshTask -func (l *Loader) IsFreshTask() (bool, error) { - count, err := l.checkPoint.Count() +func (l *Loader) IsFreshTask(ctx context.Context) (bool, error) { + count, err := l.checkPoint.Count(l.logCtx.WithContext(ctx)) return count == 0, err } @@ -554,24 +556,24 @@ func (l *Loader) Restore(ctx context.Context) error { l.finishedDataSize.Set(0) // reset before load from checkpoint if err := l.prepare(); err != nil { - l.tctx.L().Error("scan directory failed", zap.String("directory", l.cfg.Dir), log.ShortError(err)) + l.logCtx.L().Error("scan directory failed", zap.String("directory", l.cfg.Dir), log.ShortError(err)) return err } // not update checkpoint in memory when restoring, so when re-Restore, we need to load checkpoint from DB - err := l.checkPoint.Load() + err := l.checkPoint.Load(l.logCtx.WithContext(ctx)) if err != nil { return err } err = l.checkPoint.CalcProgress(l.db2Tables) if err != nil { - l.tctx.L().Error("calc load process", log.ShortError(err)) + l.logCtx.L().Error("calc load process", log.ShortError(err)) } l.loadFinishedSize() - if err := l.initAndStartWorkerPool(ctx); err != nil { - l.tctx.L().Error("initial and start worker pools failed", log.ShortError(err)) - return err + if err2 := l.initAndStartWorkerPool(ctx); err2 != nil { + l.logCtx.L().Error("initial and start worker pools failed", log.ShortError(err)) + return err2 } go l.PrintStatus(ctx) @@ -580,7 +582,7 @@ func (l *Loader) Restore(ctx context.Context) error { err = l.restoreData(ctx) failpoint.Inject("dontWaitWorkerExit", func(_ failpoint.Value) { - l.tctx.L().Info("", zap.String("failpoint", "dontWaitWorkerExit")) + l.logCtx.L().Info("", zap.String("failpoint", "dontWaitWorkerExit")) failpoint.Return(nil) }) @@ -589,7 +591,7 @@ func (l *Loader) Restore(ctx context.Context) error { l.workerWg.Wait() if err == nil { - l.tctx.L().Info("all data files have been finished", zap.Duration("cost time", time.Since(begin))) + l.logCtx.L().Info("all data files have been finished", zap.Duration("cost time", time.Since(begin))) } else if errors.Cause(err) != context.Canceled { return err } @@ -616,7 +618,7 @@ func (l *Loader) Close() { err := l.toDB.Close() if err != nil { - l.tctx.L().Error("close downstream DB error", log.ShortError(err)) + l.logCtx.L().Error("close downstream DB error", log.ShortError(err)) } l.checkPoint.Close() l.closed.Set(true) @@ -627,19 +629,19 @@ func (l *Loader) Close() { func (l *Loader) stopLoad() { // before re-write workflow, simply close all job queue and job workers // when resuming, re-create them - l.tctx.L().Info("stop importing data process") + l.logCtx.L().Info("stop importing data process") l.closeFileJobQueue() l.workerWg.Wait() - l.tctx.L().Debug("all workers have been closed") + l.logCtx.L().Debug("all workers have been closed") } // Pause pauses the process, and it can be resumed later // should cancel context from external func (l *Loader) Pause() { if l.isClosed() { - l.tctx.L().Warn("try to pause, but already closed") + l.logCtx.L().Warn("try to pause, but already closed") return } @@ -649,11 +651,11 @@ func (l *Loader) Pause() { // Resume resumes the paused process func (l *Loader) Resume(ctx context.Context, pr chan pb.ProcessResult) { if l.isClosed() { - l.tctx.L().Warn("try to resume, but already closed") + l.logCtx.L().Warn("try to resume, but already closed") return } - err := l.resetDBs() + err := l.resetDBs(ctx) if err != nil { pr <- pb.ProcessResult{ IsCanceled: false, @@ -667,17 +669,18 @@ func (l *Loader) Resume(ctx context.Context, pr chan pb.ProcessResult) { l.Process(ctx, pr) } -func (l *Loader) resetDBs() error { +func (l *Loader) resetDBs(ctx context.Context) error { var err error + tctx := l.logCtx.WithContext(ctx) for i := 0; i < len(l.toDBConns); i++ { - err = l.toDBConns[i].resetConn(l.tctx) + err = l.toDBConns[i].resetConn(tctx) if err != nil { return terror.WithScope(err, terror.ScopeDownstream) } } - err = l.checkPoint.ResetConn() + err = l.checkPoint.ResetConn(tctx) if err != nil { return terror.WithScope(err, terror.ScopeDownstream) } @@ -749,7 +752,7 @@ func (l *Loader) genRouter(rules []*router.TableRule) error { } } schemaRules, tableRules := l.tableRouter.AllRules() - l.tctx.L().Debug("all route rules", zap.Reflect("schema route rules", schemaRules), zap.Reflect("table route rules", tableRules)) + l.logCtx.L().Debug("all route rules", zap.Reflect("schema route rules", schemaRules), zap.Reflect("table route rules", tableRules)) return nil } @@ -784,7 +787,7 @@ func (l *Loader) prepareDbFiles(files map[string]struct{}) error { schemaFileCount++ db := file[:idx] if l.skipSchemaAndTable(&filter.Table{Schema: db}) { - l.tctx.L().Warn("ignore schema file", zap.String("schema file", file)) + l.logCtx.L().Warn("ignore schema file", zap.String("schema file", file)) continue } @@ -794,10 +797,10 @@ func (l *Loader) prepareDbFiles(files map[string]struct{}) error { } if schemaFileCount == 0 { - l.tctx.L().Warn("invalid mydumper files for there are no `-schema-create.sql` files found, and will generate later") + l.logCtx.L().Warn("invalid mydumper files for there are no `-schema-create.sql` files found, and will generate later") } if len(l.db2Tables) == 0 { - l.tctx.L().Warn("no available `-schema-create.sql` files, check mydumper parameter matches black-white-list in task config, will generate later") + l.logCtx.L().Warn("no available `-schema-create.sql` files, check mydumper parameter matches black-white-list in task config, will generate later") } return nil @@ -813,18 +816,18 @@ func (l *Loader) prepareTableFiles(files map[string]struct{}) error { name := file[:idx] fields := strings.Split(name, ".") if len(fields) != 2 { - l.tctx.L().Warn("invalid table schema file", zap.String("file", file)) + l.logCtx.L().Warn("invalid table schema file", zap.String("file", file)) continue } db, table := fields[0], fields[1] if l.skipSchemaAndTable(&filter.Table{Schema: db, Name: table}) { - l.tctx.L().Warn("ignore table file", zap.String("table file", file)) + l.logCtx.L().Warn("ignore table file", zap.String("table file", file)) continue } tables, ok := l.db2Tables[db] if !ok { - l.tctx.L().Warn("can't find schema create file, will generate one", zap.String("schema", db)) + l.logCtx.L().Warn("can't find schema create file, will generate one", zap.String("schema", db)) if err := generateSchemaCreateFile(l.cfg.Dir, db); err != nil { return err } @@ -854,7 +857,7 @@ func (l *Loader) prepareDataFiles(files map[string]struct{}) error { // ignore view / triggers if strings.Contains(file, "-schema-view.sql") || strings.Contains(file, "-schema-triggers.sql") || strings.Contains(file, "-schema-post.sql") { - l.tctx.L().Warn("ignore unsupport view/trigger file", zap.String("file", file)) + l.logCtx.L().Warn("ignore unsupport view/trigger file", zap.String("file", file)) continue } @@ -862,13 +865,13 @@ func (l *Loader) prepareDataFiles(files map[string]struct{}) error { name := file[:idx] fields := strings.Split(name, ".") if len(fields) != 2 && len(fields) != 3 { - l.tctx.L().Warn("invalid db table sql file", zap.String("file", file)) + l.logCtx.L().Warn("invalid db table sql file", zap.String("file", file)) continue } db, table := fields[0], fields[1] if l.skipSchemaAndTable(&filter.Table{Schema: db, Name: table}) { - l.tctx.L().Warn("ignore data file", zap.String("data file", file)) + l.logCtx.L().Warn("ignore data file", zap.String("data file", file)) continue } tables, ok := l.db2Tables[db] @@ -900,7 +903,7 @@ func (l *Loader) prepareDataFiles(files map[string]struct{}) error { func (l *Loader) prepare() error { begin := time.Now() defer func() { - l.tctx.L().Info("prepare loading", zap.Duration("cost time", time.Since(begin))) + l.logCtx.L().Info("prepare loading", zap.Duration("cost time", time.Since(begin))) }() // check if mydumper dir data exists. @@ -911,7 +914,7 @@ func (l *Loader) prepare() error { if strings.HasSuffix(l.cfg.Dir, dirSuffix) { dirPrefix := strings.TrimSuffix(l.cfg.Dir, dirSuffix) if utils.IsDirExists(dirPrefix) { - l.tctx.L().Warn("directory doesn't exist, try to load data from old fashion directory", zap.String("directory", l.cfg.Dir), zap.String("old fashion directory", dirPrefix)) + l.logCtx.L().Warn("directory doesn't exist, try to load data from old fashion directory", zap.String("directory", l.cfg.Dir), zap.String("old fashion directory", dirPrefix)) l.cfg.Dir = dirPrefix trimmed = true } @@ -924,7 +927,7 @@ func (l *Loader) prepare() error { // collect dir files. files := CollectDirFiles(l.cfg.Dir) - l.tctx.L().Debug("collected files", zap.Reflect("files", files)) + l.logCtx.L().Debug("collected files", zap.Reflect("files", files)) /* Mydumper file names format * db {db}-schema-create.sql @@ -947,11 +950,11 @@ func (l *Loader) prepare() error { } // restoreSchema creates schema -func (l *Loader) restoreSchema(conn *DBConn, sqlFile, schema string) error { - err := l.restoreStructure(conn, sqlFile, schema, "") +func (l *Loader) restoreSchema(ctx context.Context, conn *DBConn, sqlFile, schema string) error { + err := l.restoreStructure(ctx, conn, sqlFile, schema, "") if err != nil { if isErrDBExists(err) { - l.tctx.L().Info("database already exists, skip it", zap.String("db schema file", sqlFile)) + l.logCtx.L().Info("database already exists, skip it", zap.String("db schema file", sqlFile)) } else { return terror.Annotatef(err, "run db schema failed - dbfile %s", sqlFile) } @@ -960,11 +963,11 @@ func (l *Loader) restoreSchema(conn *DBConn, sqlFile, schema string) error { } // restoreTable creates table -func (l *Loader) restoreTable(conn *DBConn, sqlFile, schema, table string) error { - err := l.restoreStructure(conn, sqlFile, schema, table) +func (l *Loader) restoreTable(ctx context.Context, conn *DBConn, sqlFile, schema, table string) error { + err := l.restoreStructure(ctx, conn, sqlFile, schema, table) if err != nil { if isErrTableExists(err) { - l.tctx.L().Info("table already exists, skip it", zap.String("table schema file", sqlFile)) + l.logCtx.L().Info("table already exists, skip it", zap.String("table schema file", sqlFile)) } else { return terror.Annotatef(err, "run table schema failed - dbfile %s", sqlFile) } @@ -973,13 +976,15 @@ func (l *Loader) restoreTable(conn *DBConn, sqlFile, schema, table string) error } // restoreStruture creates schema or table -func (l *Loader) restoreStructure(conn *DBConn, sqlFile string, schema string, table string) error { +func (l *Loader) restoreStructure(ctx context.Context, conn *DBConn, sqlFile string, schema string, table string) error { f, err := os.Open(sqlFile) if err != nil { return terror.ErrLoadUnitReadSchemaFile.Delegate(err) } defer f.Close() + tctx := l.logCtx.WithContext(ctx) + data := make([]byte, 0, 1024*1024) br := bufio.NewReader(f) for { @@ -1002,7 +1007,7 @@ func (l *Loader) restoreStructure(conn *DBConn, sqlFile string, schema string, t } var sqls []string - dstSchema, dstTable := fetchMatchedLiteral(l.tctx, l.tableRouter, schema, table) + dstSchema, dstTable := fetchMatchedLiteral(tctx, l.tableRouter, schema, table) // for table if table != "" { sqls = append(sqls, fmt.Sprintf("USE `%s`;", dstSchema)) @@ -1011,10 +1016,10 @@ func (l *Loader) restoreStructure(conn *DBConn, sqlFile string, schema string, t query = renameShardingSchema(query, schema, dstSchema) } - l.tctx.L().Debug("schema create statement", zap.String("sql", query)) + l.logCtx.L().Debug("schema create statement", zap.String("sql", query)) sqls = append(sqls, query) - err = conn.executeSQL(l.tctx, sqls) + err = conn.executeSQL(tctx, sqls) if err != nil { return terror.WithScope(err, terror.ScopeDownstream) } @@ -1081,17 +1086,19 @@ func (l *Loader) restoreData(ctx context.Context) error { dbs = append(dbs, db) } + tctx := l.logCtx.WithContext(ctx) + for _, db := range dbs { tables := l.db2Tables[db] // create db dbFile := fmt.Sprintf("%s/%s-schema-create.sql", l.cfg.Dir, db) - l.tctx.L().Info("start to create schema", zap.String("schema file", dbFile)) - err = l.restoreSchema(dbConn, dbFile, db) + l.logCtx.L().Info("start to create schema", zap.String("schema file", dbFile)) + err = l.restoreSchema(ctx, dbConn, dbFile, db) if err != nil { return err } - l.tctx.L().Info("finish to create schema", zap.String("schema file", dbFile)) + l.logCtx.L().Info("finish to create schema", zap.String("schema file", dbFile)) tnames := make([]string, 0, len(tables)) for t := range tables { @@ -1101,39 +1108,39 @@ func (l *Loader) restoreData(ctx context.Context) error { dataFiles := tables[table] tableFile := fmt.Sprintf("%s/%s.%s-schema.sql", l.cfg.Dir, db, table) if _, ok := l.tableInfos[tableName(db, table)]; !ok { - l.tableInfos[tableName(db, table)], err = parseTable(l.tctx, l.tableRouter, db, table, tableFile) + l.tableInfos[tableName(db, table)], err = parseTable(tctx, l.tableRouter, db, table, tableFile) if err != nil { return terror.Annotatef(err, "parse table %s/%s", db, table) } } if l.checkPoint.IsTableFinished(db, table) { - l.tctx.L().Info("table has finished, skip it.", zap.String("schema", db), zap.String("table", table)) + l.logCtx.L().Info("table has finished, skip it.", zap.String("schema", db), zap.String("table", table)) continue } // create table - l.tctx.L().Info("start to create table", zap.String("table file", tableFile)) - err := l.restoreTable(dbConn, tableFile, db, table) + l.logCtx.L().Info("start to create table", zap.String("table file", tableFile)) + err := l.restoreTable(ctx, dbConn, tableFile, db, table) if err != nil { return err } - l.tctx.L().Info("finish to create table", zap.String("table file", tableFile)) + l.logCtx.L().Info("finish to create table", zap.String("table file", tableFile)) restoringFiles := l.checkPoint.GetRestoringFileInfo(db, table) - l.tctx.L().Debug("restoring table data", zap.String("schema", db), zap.String("table", table), zap.Reflect("data files", restoringFiles)) + l.logCtx.L().Debug("restoring table data", zap.String("schema", db), zap.String("table", table), zap.Reflect("data files", restoringFiles)) info := l.tableInfos[tableName(db, table)] for _, file := range dataFiles { select { case <-ctx.Done(): - l.tctx.L().Warn("stop generate data file job", log.ShortError(ctx.Err())) + l.logCtx.L().Warn("stop generate data file job", log.ShortError(ctx.Err())) return ctx.Err() default: // do nothing } - l.tctx.L().Debug("dispatch data file", zap.String("schema", db), zap.String("table", table), zap.String("data file", file)) + l.logCtx.L().Debug("dispatch data file", zap.String("schema", db), zap.String("table", table), zap.String("data file", file)) var offset int64 posSet, ok := restoringFiles[file] @@ -1152,19 +1159,19 @@ func (l *Loader) restoreData(ctx context.Context) error { } } } - l.tctx.L().Info("finish to create tables", zap.Duration("cost time", time.Since(begin))) + l.logCtx.L().Info("finish to create tables", zap.Duration("cost time", time.Since(begin))) // a simple and naive approach to dispatch files randomly based on the feature of golang map(range by random) for _, j := range dispatchMap { select { case <-ctx.Done(): - l.tctx.L().Warn("stop dispatch data file job", log.ShortError(ctx.Err())) + l.logCtx.L().Warn("stop dispatch data file job", log.ShortError(ctx.Err())) return ctx.Err() case l.fileJobQueue <- j: } } - l.tctx.L().Info("all data files have been dispatched, waiting for them finished") + l.logCtx.L().Info("all data files have been dispatched, waiting for them finished") return nil } @@ -1175,7 +1182,7 @@ func (l *Loader) checkpointID() string { } dir, err := filepath.Abs(l.cfg.Dir) if err != nil { - l.tctx.L().Warn("get abs dir", zap.String("directory", l.cfg.Dir), log.ShortError(err)) + l.logCtx.L().Warn("get abs dir", zap.String("directory", l.cfg.Dir), log.ShortError(err)) return l.cfg.Dir } return shortSha1(dir) @@ -1185,7 +1192,7 @@ func (l *Loader) getMydumpMetadata() error { metafile := filepath.Join(l.cfg.LoaderConfig.Dir, "metadata") pos, err := utils.ParseMetaData(metafile) if err != nil { - l.tctx.L().Error("fail to parse dump metadata", log.ShortError(err)) + l.logCtx.L().Error("fail to parse dump metadata", log.ShortError(err)) return err } diff --git a/loader/status.go b/loader/status.go index e26aa47314..717d667bcc 100644 --- a/loader/status.go +++ b/loader/status.go @@ -51,7 +51,7 @@ func (l *Loader) PrintStatus(ctx context.Context) { failpoint.Inject("PrintStatusCheckSeconds", func(val failpoint.Value) { if seconds, ok := val.(int); ok { printStatusInterval = time.Duration(seconds) * time.Second - l.tctx.L().Info("set printStatusInterval", zap.String("failpoint", "PrintStatusCheckSeconds"), zap.Int("value", seconds)) + l.logCtx.L().Info("set printStatusInterval", zap.String("failpoint", "PrintStatusCheckSeconds"), zap.Int("value", seconds)) } }) @@ -72,7 +72,7 @@ func (l *Loader) PrintStatus(ctx context.Context) { finishedSize := l.finishedDataSize.Get() totalSize := l.totalDataSize.Get() totalFileCount := l.totalFileCount.Get() - l.tctx.L().Info("progress status of load", + l.logCtx.L().Info("progress status of load", zap.Int64("finished_bytes", finishedSize), zap.Int64("total_bytes", totalSize), zap.Int64("total_file_count", totalFileCount), diff --git a/mydumper/mydumper.go b/mydumper/mydumper.go index fafe1efa0d..2144062c77 100644 --- a/mydumper/mydumper.go +++ b/mydumper/mydumper.go @@ -57,7 +57,7 @@ func NewMydumper(cfg *config.SubTaskConfig) *Mydumper { } // Init implements Unit.Init -func (m *Mydumper) Init() error { +func (m *Mydumper) Init(ctx context.Context) error { var err error m.args, err = m.constructArgs() return err @@ -229,7 +229,7 @@ func (m *Mydumper) Type() pb.UnitType { } // IsFreshTask implements Unit.IsFreshTask -func (m *Mydumper) IsFreshTask() (bool, error) { +func (m *Mydumper) IsFreshTask(ctx context.Context) (bool, error) { return true, nil } diff --git a/pkg/conn/baseconn.go b/pkg/conn/baseconn.go index d969717dea..86757872be 100644 --- a/pkg/conn/baseconn.go +++ b/pkg/conn/baseconn.go @@ -15,14 +15,19 @@ package conn import ( "database/sql" + "fmt" + "strings" + + "github.com/go-sql-driver/mysql" + "github.com/pingcap/failpoint" + gmysql "github.com/siddontang/go-mysql/mysql" + "go.uber.org/zap" tcontext "github.com/pingcap/dm/pkg/context" "github.com/pingcap/dm/pkg/log" "github.com/pingcap/dm/pkg/retry" "github.com/pingcap/dm/pkg/terror" "github.com/pingcap/dm/pkg/utils" - - "go.uber.org/zap" ) // BaseConn is the basic connection we use in dm @@ -109,6 +114,24 @@ func (conn *BaseConn) QuerySQL(tctx *tcontext.Context, query string, args ...int // 1. failed: (the index of sqls executed error, error) // 2. succeed: (len(sqls), nil) func (conn *BaseConn) ExecuteSQLWithIgnoreError(tctx *tcontext.Context, ignoreErr func(error) bool, queries []string, args ...[]interface{}) (int, error) { + // inject an error to trigger retry, this should be placed before the real execution of the SQL statement. + failpoint.Inject("retryableError", func(val failpoint.Value) { + if mark, ok := val.(string); ok { + enabled := false + for _, query := range queries { + if strings.Contains(query, mark) { + enabled = true // only enable if the `mark` matched. + } + } + if enabled { + tctx.L().Info("", zap.String("failpoint", "retryableError"), zap.String("mark", mark)) + failpoint.Return(0, &mysql.MySQLError{ + Number: gmysql.ER_LOCK_DEADLOCK, + Message: fmt.Sprintf("failpoint inject retryable error for %s", mark)}) + } + } + }) + if len(queries) == 0 { return 0, nil } diff --git a/pkg/context/context.go b/pkg/context/context.go index 06165b3dcb..ab3c178a70 100644 --- a/pkg/context/context.go +++ b/pkg/context/context.go @@ -15,6 +15,7 @@ package context import ( "context" + "time" "github.com/pingcap/dm/pkg/log" ) @@ -51,6 +52,15 @@ func (c *Context) WithContext(ctx context.Context) *Context { } } +// WithTimeout sets a timeout associated context. +func (c *Context) WithTimeout(timeout time.Duration) (*Context, context.CancelFunc) { + ctx, cancel := context.WithTimeout(c.Ctx, timeout) + return &Context{ + Ctx: ctx, + Logger: c.Logger, + }, cancel +} + // Context returns real context func (c *Context) Context() context.Context { return c.Ctx diff --git a/pkg/retry/strategy.go b/pkg/retry/strategy.go index 6fc5348ee4..798298e14e 100644 --- a/pkg/retry/strategy.go +++ b/pkg/retry/strategy.go @@ -81,7 +81,7 @@ func (*FiniteRetryStrategy) Apply(ctx *tcontext.Context, params Params, select { case <-ctx.Context().Done(): - return nil, i, err + return ret, i, err // return `ret` rather than `nil` case <-time.After(duration): } continue diff --git a/pkg/streamer/reader_test.go b/pkg/streamer/reader_test.go index 54f64a0bf5..a08c10cc20 100644 --- a/pkg/streamer/reader_test.go +++ b/pkg/streamer/reader_test.go @@ -269,7 +269,7 @@ func (t *testReaderSuite) TestParseFileRelaySubDirUpdated(c *C) { _, err2 := f.Write(extraEvents[0].RawData) c.Assert(err2, IsNil) }() - ctx2, cancel2 := context.WithTimeout(context.Background(), time.Second) + ctx2, cancel2 := context.WithTimeout(context.Background(), 2*time.Second) defer cancel2() needSwitch, needReParse, latestPos, nextUUID, nextBinlogName, err = r.parseFile( ctx2, s, filename, offset, relayDir, firstParse, currentUUID, possibleLast) @@ -290,7 +290,7 @@ func (t *testReaderSuite) TestParseFileRelaySubDirUpdated(c *C) { err2 := ioutil.WriteFile(nextPath, replication.BinLogFileHeader, 0600) c.Assert(err2, IsNil) }() - ctx3, cancel3 := context.WithTimeout(context.Background(), time.Second) + ctx3, cancel3 := context.WithTimeout(context.Background(), 2*time.Second) defer cancel3() needSwitch, needReParse, latestPos, nextUUID, nextBinlogName, err = r.parseFile( ctx3, s, filename, offset, relayDir, firstParse, currentUUID, possibleLast) diff --git a/pkg/utils/db_test.go b/pkg/utils/db_test.go index 51fafc7b76..fa76873bd0 100644 --- a/pkg/utils/db_test.go +++ b/pkg/utils/db_test.go @@ -43,8 +43,8 @@ func (t *testUtilsSuite) TestGetAllServerID(c *C) { for _, testCase := range testCases { for _, flavor := range flavors { t.createMockResult(mock, testCase.masterID, testCase.serverIDs, flavor) - serverIDs, err := GetAllServerID(context.Background(), db) - c.Assert(err, IsNil) + serverIDs, err2 := GetAllServerID(context.Background(), db) + c.Assert(err2, IsNil) for _, serverID := range testCase.serverIDs { _, ok := serverIDs[serverID] diff --git a/relay/relay.go b/relay/relay.go index e53a49ca99..8e2b41a306 100755 --- a/relay/relay.go +++ b/relay/relay.go @@ -69,7 +69,7 @@ var NewRelay = NewRealRelay // Process defines mysql-like relay log process unit type Process interface { // Init initial relat log unit - Init() (err error) + Init(ctx context.Context) (err error) // Process run background logic of relay log unit Process(ctx context.Context, pr chan pb.ProcessResult) // SwitchMaster switches relay's master server @@ -152,7 +152,7 @@ func NewRealRelay(cfg *Config) Process { } // Init implements the dm.Unit interface. -func (r *Relay) Init() (err error) { +func (r *Relay) Init(ctx context.Context) (err error) { rollbackHolder := fr.NewRollbackHolder("relay") defer func() { if err != nil { @@ -296,7 +296,7 @@ func (r *Relay) process(parentCtx context.Context) error { return err } - r.tctx.L().Info("receive retryable error for binlog reader", log.ShortError(err)) + r.tctx.L().Warn("receive retryable error for binlog reader", log.ShortError(err)) err = reader2.Close() // close the previous reader if err != nil { r.tctx.L().Error("fail to close binlog event reader", zap.Error(err)) @@ -508,9 +508,9 @@ func (r *Relay) reSetupMeta() error { var latestPosName, latestGTIDStr string if (r.cfg.EnableGTID && len(r.cfg.BinlogGTID) == 0) || (!r.cfg.EnableGTID && len(r.cfg.BinLogName) == 0) { - latestPos, latestGTID, err := utils.GetMasterStatus(r.db, r.cfg.Flavor) - if err != nil { - return err + latestPos, latestGTID, err2 := utils.GetMasterStatus(r.db, r.cfg.Flavor) + if err2 != nil { + return err2 } latestPosName = latestPos.Name latestGTIDStr = latestGTID.String() diff --git a/syncer/checkpoint.go b/syncer/checkpoint.go index c4a1d7c0f0..e266544449 100644 --- a/syncer/checkpoint.go +++ b/syncer/checkpoint.go @@ -123,19 +123,19 @@ func (b *binlogPoint) String() string { // because, when restarting to continue the sync, all sharding DDLs must try-sync again type CheckPoint interface { // Init initializes the CheckPoint - Init() error + Init(tctx *tcontext.Context) error // Close closes the CheckPoint Close() // ResetConn resets database connections owned by the Checkpoint - ResetConn() error + ResetConn(tctx *tcontext.Context) error // Clear clears all checkpoints - Clear() error + Clear(tctx *tcontext.Context) error // Load loads all checkpoints saved by CheckPoint - Load() error + Load(tctx *tcontext.Context) error // LoadMeta loads checkpoints from meta config item or file LoadMeta() error @@ -144,7 +144,7 @@ type CheckPoint interface { SaveTablePoint(sourceSchema, sourceTable string, pos mysql.Position) // DeleteTablePoint deletes checkpoint for specified table in memory and storage - DeleteTablePoint(sourceSchema, sourceTable string) error + DeleteTablePoint(tctx *tcontext.Context, sourceSchema, sourceTable string) error // IsNewerTablePoint checks whether job's checkpoint is newer than previous saved checkpoint IsNewerTablePoint(sourceSchema, sourceTable string, pos mysql.Position) bool @@ -158,7 +158,7 @@ type CheckPoint interface { // by extraSQLs and extraArgs. Currently extraSQLs contain shard meta only. // @exceptTables: [[schema, table]... ] // corresponding to Meta.Flush - FlushPointsExcept(exceptTables [][]string, extraSQLs []string, extraArgs [][]interface{}) error + FlushPointsExcept(tctx *tcontext.Context, exceptTables [][]string, extraSQLs []string, extraArgs [][]interface{}) error // GlobalPoint returns the global binlog stream's checkpoint // corresponding to to Meta.Pos @@ -207,14 +207,11 @@ type RemoteCheckPoint struct { globalPoint *binlogPoint globalPointSaveTime time.Time - tctx *tcontext.Context + logCtx *tcontext.Context } // NewRemoteCheckPoint creates a new RemoteCheckPoint func NewRemoteCheckPoint(tctx *tcontext.Context, cfg *config.SubTaskConfig, id string) CheckPoint { - - newtctx := tctx.WithLogger(tctx.L().WithFields(zap.String("component", "remote checkpoint"))) - cp := &RemoteCheckPoint{ cfg: cfg, schema: cfg.MetaSchema, @@ -222,45 +219,45 @@ func NewRemoteCheckPoint(tctx *tcontext.Context, cfg *config.SubTaskConfig, id s id: id, points: make(map[string]map[string]*binlogPoint), globalPoint: newBinlogPoint(minCheckpoint, minCheckpoint), - tctx: newtctx, + logCtx: tcontext.Background().WithLogger(tctx.L().WithFields(zap.String("component", "remote checkpoint"))), } return cp } // Init implements CheckPoint.Init -func (cp *RemoteCheckPoint) Init() error { +func (cp *RemoteCheckPoint) Init(tctx *tcontext.Context) error { checkPointDB := cp.cfg.To checkPointDB.RawDBCfg = config.DefaultRawDBConfig().SetReadTimeout(maxCheckPointTimeout) - db, dbConns, err := createConns(cp.tctx, cp.cfg, checkPointDB, 1) + db, dbConns, err := createConns(tctx, cp.cfg, checkPointDB, 1) if err != nil { return err } cp.db = db cp.dbConn = dbConns[0] - return cp.prepare() + return cp.prepare(tctx) } // Close implements CheckPoint.Close func (cp *RemoteCheckPoint) Close() { - closeBaseDB(cp.tctx, cp.db) + closeBaseDB(cp.logCtx, cp.db) } // ResetConn implements CheckPoint.ResetConn -func (cp *RemoteCheckPoint) ResetConn() error { - return cp.dbConn.resetConn(cp.tctx) +func (cp *RemoteCheckPoint) ResetConn(tctx *tcontext.Context) error { + return cp.dbConn.resetConn(tctx) } // Clear implements CheckPoint.Clear -func (cp *RemoteCheckPoint) Clear() error { +func (cp *RemoteCheckPoint) Clear(tctx *tcontext.Context) error { cp.Lock() defer cp.Unlock() // delete all checkpoints sql2 := fmt.Sprintf("DELETE FROM `%s`.`%s` WHERE `id` = '%s'", cp.schema, cp.table, cp.id) args := make([]interface{}, 0) - _, err := cp.dbConn.executeSQL(cp.tctx, []string{sql2}, [][]interface{}{args}...) + _, err := cp.dbConn.executeSQL(tctx, []string{sql2}, [][]interface{}{args}...) if err != nil { return err } @@ -286,7 +283,7 @@ func (cp *RemoteCheckPoint) saveTablePoint(sourceSchema, sourceTable string, pos } // we save table checkpoint while we meet DDL or DML - cp.tctx.L().Debug("save table checkpoint", zap.Stringer("position", pos), zap.String("schema", sourceSchema), zap.String("table", sourceTable)) + cp.logCtx.L().Debug("save table checkpoint", zap.Stringer("position", pos), zap.String("schema", sourceSchema), zap.String("table", sourceTable)) mSchema, ok := cp.points[sourceSchema] if !ok { mSchema = make(map[string]*binlogPoint) @@ -297,13 +294,13 @@ func (cp *RemoteCheckPoint) saveTablePoint(sourceSchema, sourceTable string, pos mSchema[sourceTable] = newBinlogPoint(pos, minCheckpoint) } else { if err := point.save(pos); err != nil { - cp.tctx.L().Error("fail to save table point", zap.String("schema", sourceSchema), zap.String("table", sourceTable), log.ShortError(err)) + cp.logCtx.L().Error("fail to save table point", zap.String("schema", sourceSchema), zap.String("table", sourceTable), log.ShortError(err)) } } } // DeleteTablePoint implements CheckPoint.DeleteTablePoint -func (cp *RemoteCheckPoint) DeleteTablePoint(sourceSchema, sourceTable string) error { +func (cp *RemoteCheckPoint) DeleteTablePoint(tctx *tcontext.Context, sourceSchema, sourceTable string) error { cp.Lock() defer cp.Unlock() mSchema, ok := cp.points[sourceSchema] @@ -315,11 +312,11 @@ func (cp *RemoteCheckPoint) DeleteTablePoint(sourceSchema, sourceTable string) e return nil } - cp.tctx.L().Info("delete table checkpoint", zap.String("schema", sourceSchema), zap.String("table", sourceTable)) + cp.logCtx.L().Info("delete table checkpoint", zap.String("schema", sourceSchema), zap.String("table", sourceTable)) // delete checkpoint sql2 := fmt.Sprintf("DELETE FROM `%s`.`%s` WHERE `id` = '%s' AND `cp_schema` = '%s' AND `cp_table` = '%s'", cp.schema, cp.table, cp.id, sourceSchema, sourceTable) args := make([]interface{}, 0) - _, err := cp.dbConn.executeSQL(cp.tctx, []string{sql2}, [][]interface{}{args}...) + _, err := cp.dbConn.executeSQL(tctx, []string{sql2}, [][]interface{}{args}...) if err != nil { return err } @@ -348,14 +345,14 @@ func (cp *RemoteCheckPoint) SaveGlobalPoint(pos mysql.Position) { cp.Lock() defer cp.Unlock() - cp.tctx.L().Debug("save global checkpoint", zap.Stringer("position", pos)) + cp.logCtx.L().Debug("save global checkpoint", zap.Stringer("position", pos)) if err := cp.globalPoint.save(pos); err != nil { - cp.tctx.L().Error("fail to save global checkpoint", log.ShortError(err)) + cp.logCtx.L().Error("fail to save global checkpoint", log.ShortError(err)) } } // FlushPointsExcept implements CheckPoint.FlushPointsExcept -func (cp *RemoteCheckPoint) FlushPointsExcept(exceptTables [][]string, extraSQLs []string, extraArgs [][]interface{}) error { +func (cp *RemoteCheckPoint) FlushPointsExcept(tctx *tcontext.Context, exceptTables [][]string, extraSQLs []string, extraArgs [][]interface{}) error { cp.RLock() defer cp.RUnlock() @@ -405,7 +402,7 @@ func (cp *RemoteCheckPoint) FlushPointsExcept(exceptTables [][]string, extraSQLs args = append(args, extraArgs[i]) } - _, err := cp.dbConn.executeSQL(cp.tctx, sqls, args...) + _, err := cp.dbConn.executeSQL(tctx, sqls, args...) if err != nil { return err } @@ -448,32 +445,32 @@ func (cp *RemoteCheckPoint) Rollback() { cp.globalPoint.rollback() for schema, mSchema := range cp.points { for table, point := range mSchema { - cp.tctx.L().Info("rollback checkpoint", log.WrapStringerField("checkpoint", point), zap.String("schema", schema), zap.String("table", table)) + cp.logCtx.L().Info("rollback checkpoint", log.WrapStringerField("checkpoint", point), zap.String("schema", schema), zap.String("table", table)) point.rollback() } } } -func (cp *RemoteCheckPoint) prepare() error { - if err := cp.createSchema(); err != nil { +func (cp *RemoteCheckPoint) prepare(tctx *tcontext.Context) error { + if err := cp.createSchema(tctx); err != nil { return err } - if err := cp.createTable(); err != nil { + if err := cp.createTable(tctx); err != nil { return err } return nil } -func (cp *RemoteCheckPoint) createSchema() error { +func (cp *RemoteCheckPoint) createSchema(tctx *tcontext.Context) error { sql2 := fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS `%s`", cp.schema) args := make([]interface{}, 0) - _, err := cp.dbConn.executeSQL(cp.tctx, []string{sql2}, [][]interface{}{args}...) - cp.tctx.L().Info("create checkpoint schema", zap.String("statement", sql2)) + _, err := cp.dbConn.executeSQL(tctx, []string{sql2}, [][]interface{}{args}...) + cp.logCtx.L().Info("create checkpoint schema", zap.String("statement", sql2)) return err } -func (cp *RemoteCheckPoint) createTable() error { +func (cp *RemoteCheckPoint) createTable(tctx *tcontext.Context) error { tableName := fmt.Sprintf("`%s`.`%s`", cp.schema, cp.table) sql2 := fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s ( id VARCHAR(32) NOT NULL, @@ -487,15 +484,15 @@ func (cp *RemoteCheckPoint) createTable() error { UNIQUE KEY uk_id_schema_table (id, cp_schema, cp_table) )`, tableName) args := make([]interface{}, 0) - _, err := cp.dbConn.executeSQL(cp.tctx, []string{sql2}, [][]interface{}{args}...) - cp.tctx.L().Info("create checkpoint table", zap.String("statement", sql2)) + _, err := cp.dbConn.executeSQL(tctx, []string{sql2}, [][]interface{}{args}...) + cp.logCtx.L().Info("create checkpoint table", zap.String("statement", sql2)) return err } // Load implements CheckPoint.Load -func (cp *RemoteCheckPoint) Load() error { +func (cp *RemoteCheckPoint) Load(tctx *tcontext.Context) error { query := fmt.Sprintf("SELECT `cp_schema`, `cp_table`, `binlog_name`, `binlog_pos`, `is_global` FROM `%s`.`%s` WHERE `id`='%s'", cp.schema, cp.table, cp.id) - rows, err := cp.dbConn.querySQL(cp.tctx, query) + rows, err := cp.dbConn.querySQL(tctx, query) defer func() { if rows != nil { rows.Close() @@ -532,7 +529,7 @@ func (cp *RemoteCheckPoint) Load() error { if isGlobal { if pos.Compare(minCheckpoint) > 0 { cp.globalPoint = newBinlogPoint(pos, pos) - cp.tctx.L().Info("fetch global checkpoint from DB", log.WrapStringerField("global checkpoint", cp.globalPoint)) + cp.logCtx.L().Info("fetch global checkpoint from DB", log.WrapStringerField("global checkpoint", cp.globalPoint)) } continue // skip global checkpoint } @@ -563,7 +560,7 @@ func (cp *RemoteCheckPoint) LoadMeta() error { case config.ModeIncrement: // load meta from task config if cp.cfg.Meta == nil { - cp.tctx.L().Warn("don't set meta in increment task-mode") + cp.logCtx.L().Warn("don't set meta in increment task-mode") return nil } pos = &mysql.Position{ @@ -578,7 +575,7 @@ func (cp *RemoteCheckPoint) LoadMeta() error { // if meta loaded, we will start syncing from meta's pos if pos != nil { cp.globalPoint = newBinlogPoint(*pos, *pos) - cp.tctx.L().Info("loaded checkpoints from meta", log.WrapStringerField("global checkpoint", cp.globalPoint)) + cp.logCtx.L().Info("loaded checkpoints from meta", log.WrapStringerField("global checkpoint", cp.globalPoint)) } return nil @@ -601,6 +598,6 @@ func (cp *RemoteCheckPoint) genUpdateSQL(cpSchema, cpTable string, binlogName st func (cp *RemoteCheckPoint) parseMetaData() (*mysql.Position, error) { // `metadata` is mydumper's output meta file name filename := path.Join(cp.cfg.Dir, "metadata") - cp.tctx.L().Info("parsing metadata from file", zap.String("file", filename)) + cp.logCtx.L().Info("parsing metadata from file", zap.String("file", filename)) return utils.ParseMetaData(filename) } diff --git a/syncer/checkpoint_test.go b/syncer/checkpoint_test.go index 58228dae20..c04ad95427 100644 --- a/syncer/checkpoint_test.go +++ b/syncer/checkpoint_test.go @@ -67,7 +67,9 @@ func (s *testCheckpointSuite) prepareCheckPointSQL() { // this test case uses sqlmock to simulate all SQL operations in tests func (s *testCheckpointSuite) TestCheckPoint(c *C) { - cp := NewRemoteCheckPoint(tcontext.Background(), s.cfg, cpid) + tctx := tcontext.Background() + + cp := NewRemoteCheckPoint(tctx, s.cfg, cpid) defer func() { s.mock.ExpectClose() cp.Close() @@ -95,9 +97,9 @@ func (s *testCheckpointSuite) TestCheckPoint(c *C) { conn := &DBConn{cfg: s.cfg, baseConn: conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{})} cp.(*RemoteCheckPoint).dbConn = conn - err = cp.(*RemoteCheckPoint).prepare() + err = cp.(*RemoteCheckPoint).prepare(tctx) c.Assert(err, IsNil) - cp.Clear() + cp.Clear(tctx) // test operation for global checkpoint s.testGlobalCheckPoint(c, cp) @@ -107,13 +109,15 @@ func (s *testCheckpointSuite) TestCheckPoint(c *C) { } func (s *testCheckpointSuite) testGlobalCheckPoint(c *C, cp CheckPoint) { + tctx := tcontext.Background() + // global checkpoint init to min c.Assert(cp.GlobalPoint(), Equals, minCheckpoint) c.Assert(cp.FlushedGlobalPoint(), Equals, minCheckpoint) // try load, but should load nothing s.mock.ExpectQuery(loadCheckPointSQL).WillReturnRows(sqlmock.NewRows(nil)) - err := cp.Load() + err := cp.Load(tctx) c.Assert(err, IsNil) c.Assert(cp.GlobalPoint(), Equals, minCheckpoint) c.Assert(cp.FlushedGlobalPoint(), Equals, minCheckpoint) @@ -143,14 +147,14 @@ func (s *testCheckpointSuite) testGlobalCheckPoint(c *C, cp CheckPoint) { s.cfg.Dir = dir s.mock.ExpectQuery(loadCheckPointSQL).WillReturnRows(sqlmock.NewRows(nil)) - err = cp.Load() + err = cp.Load(tctx) c.Assert(err, IsNil) cp.SaveGlobalPoint(pos1) s.mock.ExpectBegin() s.mock.ExpectExec(flushCheckPointSQL).WithArgs(cpid, "", "", pos1.Name, pos1.Pos, true, pos1.Name, pos1.Pos).WillReturnResult(sqlmock.NewResult(0, 1)) s.mock.ExpectCommit() - err = cp.FlushPointsExcept(nil, nil, nil) + err = cp.FlushPointsExcept(tctx, nil, nil, nil) c.Assert(err, IsNil) c.Assert(cp.GlobalPoint(), Equals, pos1) c.Assert(cp.FlushedGlobalPoint(), Equals, pos1) @@ -190,7 +194,7 @@ func (s *testCheckpointSuite) testGlobalCheckPoint(c *C, cp CheckPoint) { s.mock.ExpectBegin() s.mock.ExpectExec(flushCheckPointSQL).WithArgs(cpid, "", "", pos2.Name, pos2.Pos, true, pos2.Name, pos2.Pos).WillReturnResult(sqlmock.NewResult(0, 1)) s.mock.ExpectCommit() - err = cp.FlushPointsExcept(nil, nil, nil) + err = cp.FlushPointsExcept(tctx, nil, nil, nil) c.Assert(err, IsNil) cp.Rollback() c.Assert(cp.GlobalPoint(), Equals, pos2) @@ -202,7 +206,7 @@ func (s *testCheckpointSuite) testGlobalCheckPoint(c *C, cp CheckPoint) { cp.SaveGlobalPoint(pos3) columns := []string{"cp_schema", "cp_table", "binlog_name", "binlog_pos", "is_global"} s.mock.ExpectQuery(loadCheckPointSQL).WillReturnRows(sqlmock.NewRows(columns).AddRow("", "", pos2.Name, pos2.Pos, true)) - err = cp.Load() + err = cp.Load(tctx) c.Assert(err, IsNil) c.Assert(cp.GlobalPoint(), Equals, pos2) c.Assert(cp.FlushedGlobalPoint(), Equals, pos2) @@ -222,13 +226,13 @@ func (s *testCheckpointSuite) testGlobalCheckPoint(c *C, cp CheckPoint) { s.mock.ExpectBegin() s.mock.ExpectExec(clearCheckPointSQL).WillReturnResult(sqlmock.NewResult(0, 1)) s.mock.ExpectCommit() - err = cp.Clear() + err = cp.Clear(tctx) c.Assert(err, IsNil) c.Assert(cp.GlobalPoint(), Equals, minCheckpoint) c.Assert(cp.FlushedGlobalPoint(), Equals, minCheckpoint) s.mock.ExpectQuery(loadCheckPointSQL).WillReturnRows(sqlmock.NewRows(nil)) - err = cp.Load() + err = cp.Load(tctx) c.Assert(err, IsNil) c.Assert(cp.GlobalPoint(), Equals, minCheckpoint) c.Assert(cp.FlushedGlobalPoint(), Equals, minCheckpoint) @@ -236,6 +240,7 @@ func (s *testCheckpointSuite) testGlobalCheckPoint(c *C, cp CheckPoint) { func (s *testCheckpointSuite) testTableCheckPoint(c *C, cp CheckPoint) { var ( + tctx = tcontext.Background() schema = "test_db" table = "test_table" pos1 = mysql.Position{ @@ -272,7 +277,7 @@ func (s *testCheckpointSuite) testTableCheckPoint(c *C, cp CheckPoint) { s.mock.ExpectBegin() s.mock.ExpectExec(flushCheckPointSQL).WithArgs(cpid, schema, table, pos2.Name, pos2.Pos, false, pos2.Name, pos2.Pos).WillReturnResult(sqlmock.NewResult(0, 1)) s.mock.ExpectCommit() - err = cp.FlushPointsExcept(nil, nil, nil) + err = cp.FlushPointsExcept(tctx, nil, nil, nil) c.Assert(err, IsNil) cp.Rollback() newer = cp.IsNewerTablePoint(schema, table, pos1) @@ -282,7 +287,7 @@ func (s *testCheckpointSuite) testTableCheckPoint(c *C, cp CheckPoint) { s.mock.ExpectBegin() s.mock.ExpectExec(clearCheckPointSQL).WillReturnResult(sqlmock.NewResult(0, 1)) s.mock.ExpectCommit() - err = cp.Clear() + err = cp.Clear(tctx) c.Assert(err, IsNil) newer = cp.IsNewerTablePoint(schema, table, pos1) c.Assert(newer, IsTrue) @@ -308,7 +313,7 @@ func (s *testCheckpointSuite) testTableCheckPoint(c *C, cp CheckPoint) { s.mock.ExpectBegin() s.mock.ExpectExec(flushCheckPointSQL).WithArgs(cpid, "", "", pos2.Name, pos2.Pos, true, pos2.Name, pos2.Pos).WillReturnResult(sqlmock.NewResult(0, 1)) s.mock.ExpectCommit() - err = cp.FlushPointsExcept([][]string{{schema, table}}, nil, nil) + err = cp.FlushPointsExcept(tctx, [][]string{{schema, table}}, nil, nil) c.Assert(err, IsNil) cp.Rollback() newer = cp.IsNewerTablePoint(schema, table, pos1) diff --git a/syncer/db.go b/syncer/db.go index 12b7d22a0c..2385608d58 100644 --- a/syncer/db.go +++ b/syncer/db.go @@ -81,11 +81,11 @@ func createBaseDB(dbCfg config.DBConfig) (*conn.BaseDB, error) { } // close baseDB to release all connection generated by this baseDB and this baseDB -func closeBaseDB(tctx *tcontext.Context, baseDB *conn.BaseDB) { +func closeBaseDB(logCtx *tcontext.Context, baseDB *conn.BaseDB) { if baseDB != nil { err := baseDB.Close() if err != nil { - tctx.L().Error("fail to close baseDB", log.ShortError(err)) + logCtx.L().Error("fail to close baseDB", log.ShortError(err)) } } } @@ -187,7 +187,8 @@ func (conn *DBConn) querySQL(tctx *tcontext.Context, query string, args ...inter if dbutil.IsRetryableError(err) { tctx.L().Warn("query statement", zap.Int("retry", retryTime), zap.String("query", utils.TruncateString(query, -1)), - zap.String("argument", utils.TruncateInterface(args, -1))) + zap.String("argument", utils.TruncateInterface(args, -1)), + log.ShortError(err)) sqlRetriesTotal.WithLabelValues("query", conn.cfg.Name).Add(1) return true } @@ -242,7 +243,8 @@ func (conn *DBConn) executeSQLWithIgnore(tctx *tcontext.Context, ignoreError fun if dbutil.IsRetryableError(err) { tctx.L().Warn("execute statements", zap.Int("retry", retryTime), zap.String("queries", utils.TruncateInterface(queries, -1)), - zap.String("arguments", utils.TruncateInterface(args, -1))) + zap.String("arguments", utils.TruncateInterface(args, -1)), + log.ShortError(err)) sqlRetriesTotal.WithLabelValues("stmt_exec", conn.cfg.Name).Add(1) return true } diff --git a/syncer/ddl.go b/syncer/ddl.go index 1531d46dbd..1c78048279 100644 --- a/syncer/ddl.go +++ b/syncer/ddl.go @@ -20,6 +20,7 @@ import ( "github.com/siddontang/go-mysql/replication" "go.uber.org/zap" + tcontext "github.com/pingcap/dm/pkg/context" parserpkg "github.com/pingcap/dm/pkg/parser" "github.com/pingcap/dm/pkg/terror" "github.com/pingcap/dm/pkg/utils" @@ -128,7 +129,7 @@ func (s *Syncer) parseDDLSQL(sql string, p *parser.Parser, schema string) (resul // * it splits multiple operations in one DDL statement into multiple DDL statements // * try to apply online ddl by given online // return @spilted sqls, @online ddl table names, @error -func (s *Syncer) resolveDDLSQL(p *parser.Parser, stmt ast.StmtNode, schema string) (sqls []string, tables map[string]*filter.Table, err error) { +func (s *Syncer) resolveDDLSQL(tctx *tcontext.Context, p *parser.Parser, stmt ast.StmtNode, schema string) (sqls []string, tables map[string]*filter.Table, err error) { sqls, err = parserpkg.SplitDDL(stmt, schema) if err != nil { return nil, nil, err @@ -141,7 +142,7 @@ func (s *Syncer) resolveDDLSQL(p *parser.Parser, stmt ast.StmtNode, schema strin tables = make(map[string]*filter.Table) for _, sql := range sqls { // filter and store ghost table ddl, transform online ddl - ss, tableName, err := s.handleOnlineDDL(p, schema, sql) + ss, tableName, err := s.handleOnlineDDL(tctx, p, schema, sql) if err != nil { return statements, tables, err } @@ -190,7 +191,7 @@ func (s *Syncer) handleDDL(p *parser.Parser, schema, sql string) (string, [][]*f // handle online ddls // if sql is online ddls, we would find it's ghost table, and ghost ddls, then replay its table name by real table name -func (s *Syncer) handleOnlineDDL(p *parser.Parser, schema, sql string) ([]string, *filter.Table, error) { +func (s *Syncer) handleOnlineDDL(tctx *tcontext.Context, p *parser.Parser, schema, sql string) ([]string, *filter.Table, error) { if s.onlineDDL == nil { return []string{sql}, nil, nil } @@ -205,7 +206,7 @@ func (s *Syncer) handleOnlineDDL(p *parser.Parser, schema, sql string) ([]string return nil, nil, err } - sqls, realSchema, realTable, err := s.onlineDDL.Apply(tableNames, sql, stmt) + sqls, realSchema, realTable, err := s.onlineDDL.Apply(tctx, tableNames, sql, stmt) if err != nil { return nil, nil, err } @@ -233,7 +234,7 @@ func (s *Syncer) handleOnlineDDL(p *parser.Parser, schema, sql string) ([]string return sqls, tableNames[0], nil } -func (s *Syncer) dropSchemaInSharding(sourceSchema string) error { +func (s *Syncer) dropSchemaInSharding(tctx *tcontext.Context, sourceSchema string) error { sources := make(map[string][][]string) sgs := s.sgk.Groups() for name, sg := range sgs { @@ -268,7 +269,7 @@ func (s *Syncer) dropSchemaInSharding(sourceSchema string) error { for _, table := range tables { // refine clear them later if failed // now it doesn't have problems - if err1 := s.checkpoint.DeleteTablePoint(table[0], table[1]); err1 != nil { + if err1 := s.checkpoint.DeleteTablePoint(tctx, table[0], table[1]); err1 != nil { s.tctx.L().Error("fail to delete checkpoint", zap.String("schema", table[0]), zap.String("table", table[1])) } } @@ -276,7 +277,7 @@ func (s *Syncer) dropSchemaInSharding(sourceSchema string) error { return nil } -func (s *Syncer) clearOnlineDDL(targetSchema, targetTable string) error { +func (s *Syncer) clearOnlineDDL(tctx *tcontext.Context, targetSchema, targetTable string) error { group := s.sgk.Group(targetSchema, targetTable) if group == nil { return nil @@ -287,7 +288,7 @@ func (s *Syncer) clearOnlineDDL(targetSchema, targetTable string) error { for _, table := range tables { s.tctx.L().Info("finish online ddl", zap.String("schema", table[0]), zap.String("table", table[1])) - err := s.onlineDDL.Finish(table[0], table[1]) + err := s.onlineDDL.Finish(tctx, table[0], table[1]) if err != nil { return terror.Annotatef(err, "finish online ddl on %s.%s", table[0], table[1]) } diff --git a/syncer/ddl_test.go b/syncer/ddl_test.go index 9769ead996..ac5a609ecc 100644 --- a/syncer/ddl_test.go +++ b/syncer/ddl_test.go @@ -17,6 +17,7 @@ import ( "bytes" "github.com/pingcap/dm/dm/config" + tcontext "github.com/pingcap/dm/pkg/context" parserpkg "github.com/pingcap/dm/pkg/parser" "github.com/pingcap/dm/pkg/utils" @@ -114,7 +115,7 @@ func (s *testSyncerSuite) TestCommentQuote(c *C) { c.Assert(err, IsNil) syncer := &Syncer{} - sqls, _, err := syncer.resolveDDLSQL(parser, stmt, "schemadb") + sqls, _, err := syncer.resolveDDLSQL(tcontext.Background(), parser, stmt, "schemadb") c.Assert(err, IsNil) c.Assert(len(sqls), Equals, 1) c.Assert(sqls[0], Equals, expectedSQL) @@ -225,7 +226,7 @@ func (s *testSyncerSuite) TestresolveDDLSQL(c *C) { c.Assert(result.ignore, IsFalse) c.Assert(result.isDDL, IsTrue) - statements, _, err := syncer.resolveDDLSQL(p, result.stmt, "test") + statements, _, err := syncer.resolveDDLSQL(tcontext.Background(), p, result.stmt, "test") c.Assert(err, IsNil) c.Assert(statements, DeepEquals, expectedSQLs[i]) @@ -383,7 +384,7 @@ func (s *testSyncerSuite) TestResolveGeneratedColumnSQL(c *C) { ast1, err := parser.ParseOneStmt(tc.sql, "", "") c.Assert(err, IsNil) - sqls, _, err := syncer.resolveDDLSQL(parser, ast1, "test") + sqls, _, err := syncer.resolveDDLSQL(tcontext.Background(), parser, ast1, "test") c.Assert(err, IsNil) c.Assert(len(sqls), Equals, 1) diff --git a/syncer/ghost.go b/syncer/ghost.go index 9dcd26d4ce..dbe7d88ae3 100644 --- a/syncer/ghost.go +++ b/syncer/ghost.go @@ -35,19 +35,16 @@ type Ghost struct { // NewGhost returns gh-oat online plugin func NewGhost(tctx *tcontext.Context, cfg *config.SubTaskConfig) (OnlinePlugin, error) { - - newtctx := tctx.WithLogger(tctx.L().WithFields(zap.String("online ddl", "gh-ost"))) - g := &Ghost{ - storge: NewOnlineDDLStorage(newtctx, cfg), + storge: NewOnlineDDLStorage(tcontext.Background().WithLogger(tctx.L().WithFields(zap.String("online ddl", "gh-ost"))), cfg), } - return g, g.storge.Init() + return g, g.storge.Init(tctx) } // Apply implements interface. // returns ddls, real schema, real table, error -func (g *Ghost) Apply(tables []*filter.Table, statement string, stmt ast.StmtNode) ([]string, string, string, error) { +func (g *Ghost) Apply(tctx *tcontext.Context, tables []*filter.Table, statement string, stmt ast.StmtNode) ([]string, string, string, error) { if len(tables) < 1 { return nil, "", "", terror.ErrSyncerUnitGhostApplyEmptyTable.Generate() } @@ -89,12 +86,12 @@ func (g *Ghost) Apply(tables []*filter.Table, statement string, stmt ast.StmtNod // record ghost table ddl changes switch stmt.(type) { case *ast.CreateTableStmt: - err := g.storge.Delete(schema, table) + err := g.storge.Delete(tctx, schema, table) if err != nil { return nil, "", "", err } case *ast.DropTableStmt: - err := g.storge.Delete(schema, table) + err := g.storge.Delete(tctx, schema, table) if err != nil { return nil, "", "", err } @@ -115,13 +112,13 @@ func (g *Ghost) Apply(tables []*filter.Table, statement string, stmt ast.StmtNod } // rename ghost table to trash table - err := g.storge.Delete(schema, table) + err := g.storge.Delete(tctx, schema, table) if err != nil { return nil, "", "", err } default: - err := g.storge.Save(schema, table, targetSchema, targetTable, statement) + err := g.storge.Save(tctx, schema, table, targetSchema, targetTable, statement) if err != nil { return nil, "", "", err } @@ -132,12 +129,12 @@ func (g *Ghost) Apply(tables []*filter.Table, statement string, stmt ast.StmtNod } // Finish implements interface -func (g *Ghost) Finish(schema, table string) error { +func (g *Ghost) Finish(tctx *tcontext.Context, schema, table string) error { if g == nil { return nil } - return g.storge.Delete(schema, table) + return g.storge.Delete(tctx, schema, table) } // TableType implements interface @@ -167,8 +164,8 @@ func (g *Ghost) RealName(schema, table string) (string, string) { } // Clear clears online ddl information -func (g *Ghost) Clear() error { - return g.storge.Clear() +func (g *Ghost) Clear(tctx *tcontext.Context) error { + return g.storge.Clear(tctx) } // Close implements interface @@ -177,6 +174,6 @@ func (g *Ghost) Close() { } // ResetConn implements interface -func (g *Ghost) ResetConn() error { - return g.storge.ResetConn() +func (g *Ghost) ResetConn(tctx *tcontext.Context) error { + return g.storge.ResetConn(tctx) } diff --git a/syncer/job.go b/syncer/job.go index c56d8d6fa4..c67cec147b 100644 --- a/syncer/job.go +++ b/syncer/job.go @@ -148,7 +148,9 @@ func newXIDJob(pos, cmdPos mysql.Position, currentGtidSet gtid.Set, traceID stri } func newFlushJob() *job { - return &job{tp: flush} + return &job{ + tp: flush, + } } func newSkipJob(pos mysql.Position, currentGtidSet gtid.Set) *job { diff --git a/syncer/online_ddl.go b/syncer/online_ddl.go index 895035a9e3..8b5705592b 100644 --- a/syncer/online_ddl.go +++ b/syncer/online_ddl.go @@ -44,17 +44,17 @@ type OnlinePlugin interface { // * record changes // * apply online ddl on real table // returns sqls, replaced/self schema, repliaced/slef table, error - Apply(tables []*filter.Table, statement string, stmt ast.StmtNode) ([]string, string, string, error) + Apply(tctx *tcontext.Context, tables []*filter.Table, statement string, stmt ast.StmtNode) ([]string, string, string, error) // Finish would delete online ddl from memory and storage - Finish(schema, table string) error + Finish(tctx *tcontext.Context, schema, table string) error // TableType returns ghhost/real table TableType(table string) TableType // RealName returns real table name that removed ghost suffix and handled by table router RealName(schema, table string) (string, string) // ResetConn reset db connection - ResetConn() error + ResetConn(tctx *tcontext.Context) error // Clear clears all online information - Clear() error + Clear(tctx *tcontext.Context) error // Close closes online ddl plugin Close() } @@ -91,49 +91,49 @@ type OnlineDDLStorage struct { // map ghost schema => [ghost table => ghost ddl info, ...] ddls map[string]map[string]*GhostDDLInfo - tctx *tcontext.Context + logCtx *tcontext.Context } // NewOnlineDDLStorage creates a new online ddl storager -func NewOnlineDDLStorage(newtctx *tcontext.Context, cfg *config.SubTaskConfig) *OnlineDDLStorage { +func NewOnlineDDLStorage(logCtx *tcontext.Context, cfg *config.SubTaskConfig) *OnlineDDLStorage { s := &OnlineDDLStorage{ cfg: cfg, schema: cfg.MetaSchema, table: fmt.Sprintf("%s_onlineddl", cfg.Name), id: strconv.FormatUint(uint64(cfg.ServerID), 10), ddls: make(map[string]map[string]*GhostDDLInfo), - tctx: newtctx, + logCtx: logCtx, } return s } // Init initials online handler -func (s *OnlineDDLStorage) Init() error { +func (s *OnlineDDLStorage) Init(tctx *tcontext.Context) error { onlineDB := s.cfg.To onlineDB.RawDBCfg = config.DefaultRawDBConfig().SetReadTimeout(maxCheckPointTimeout) - db, dbConns, err := createConns(s.tctx, s.cfg, onlineDB, 1) + db, dbConns, err := createConns(tctx, s.cfg, onlineDB, 1) if err != nil { return terror.WithScope(err, terror.ScopeDownstream) } s.db = db s.dbConn = dbConns[0] - err = s.prepare() + err = s.prepare(tctx) if err != nil { return err } - return s.Load() + return s.Load(tctx) } // Load loads information from storage -func (s *OnlineDDLStorage) Load() error { +func (s *OnlineDDLStorage) Load(tctx *tcontext.Context) error { s.Lock() defer s.Unlock() query := fmt.Sprintf("SELECT `ghost_schema`, `ghost_table`, `ddls` FROM `%s`.`%s` WHERE `id`='%s'", s.schema, s.table, s.id) - rows, err := s.dbConn.querySQL(s.tctx, query) + rows, err := s.dbConn.querySQL(tctx, query) if err != nil { return terror.WithScope(err, terror.ScopeDownstream) } @@ -183,7 +183,7 @@ func (s *OnlineDDLStorage) Get(ghostSchema, ghostTable string) *GhostDDLInfo { } // Save saves online ddl information -func (s *OnlineDDLStorage) Save(ghostSchema, ghostTable, realSchema, realTable, ddl string) error { +func (s *OnlineDDLStorage) Save(tctx *tcontext.Context, ghostSchema, ghostTable, realSchema, realTable, ddl string) error { s.Lock() defer s.Unlock() @@ -211,12 +211,12 @@ func (s *OnlineDDLStorage) Save(ghostSchema, ghostTable, realSchema, realTable, } query := fmt.Sprintf("REPLACE INTO `%s`.`%s`(`id`,`ghost_schema`, `ghost_table`, `ddls`) VALUES ('%s', '%s', '%s', '%s')", s.schema, s.table, s.id, ghostSchema, ghostTable, escapeSingleQuote(string(ddlsBytes))) - _, err = s.dbConn.executeSQL(s.tctx, []string{query}) + _, err = s.dbConn.executeSQL(tctx, []string{query}) return terror.WithScope(err, terror.ScopeDownstream) } // Delete deletes online ddl informations -func (s *OnlineDDLStorage) Delete(ghostSchema, ghostTable string) error { +func (s *OnlineDDLStorage) Delete(tctx *tcontext.Context, ghostSchema, ghostTable string) error { s.Lock() defer s.Unlock() @@ -227,7 +227,7 @@ func (s *OnlineDDLStorage) Delete(ghostSchema, ghostTable string) error { // delete all checkpoints sql := fmt.Sprintf("DELETE FROM `%s`.`%s` WHERE `id` = '%s' and `ghost_schema` = '%s' and `ghost_table` = '%s'", s.schema, s.table, s.id, ghostSchema, ghostTable) - _, err := s.dbConn.executeSQL(s.tctx, []string{sql}) + _, err := s.dbConn.executeSQL(tctx, []string{sql}) if err != nil { return terror.WithScope(err, terror.ScopeDownstream) } @@ -237,13 +237,13 @@ func (s *OnlineDDLStorage) Delete(ghostSchema, ghostTable string) error { } // Clear clears online ddl information from storage -func (s *OnlineDDLStorage) Clear() error { +func (s *OnlineDDLStorage) Clear(tctx *tcontext.Context) error { s.Lock() defer s.Unlock() // delete all checkpoints sql := fmt.Sprintf("DELETE FROM `%s`.`%s` WHERE `id` = '%s'", s.schema, s.table, s.id) - _, err := s.dbConn.executeSQL(s.tctx, []string{sql}) + _, err := s.dbConn.executeSQL(tctx, []string{sql}) if err != nil { return terror.WithScope(err, terror.ScopeDownstream) } @@ -253,8 +253,8 @@ func (s *OnlineDDLStorage) Clear() error { } // ResetConn implements OnlinePlugin.ResetConn -func (s *OnlineDDLStorage) ResetConn() error { - return s.dbConn.resetConn(s.tctx) +func (s *OnlineDDLStorage) ResetConn(tctx *tcontext.Context) error { + return s.dbConn.resetConn(tctx) } // Close closes database connection @@ -262,27 +262,27 @@ func (s *OnlineDDLStorage) Close() { s.Lock() defer s.Unlock() - closeBaseDB(s.tctx, s.db) + closeBaseDB(s.logCtx, s.db) } -func (s *OnlineDDLStorage) prepare() error { - if err := s.createSchema(); err != nil { +func (s *OnlineDDLStorage) prepare(tctx *tcontext.Context) error { + if err := s.createSchema(tctx); err != nil { return err } - if err := s.createTable(); err != nil { + if err := s.createTable(tctx); err != nil { return err } return nil } -func (s *OnlineDDLStorage) createSchema() error { +func (s *OnlineDDLStorage) createSchema(tctx *tcontext.Context) error { sql := fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS `%s`", s.schema) - _, err := s.dbConn.executeSQL(s.tctx, []string{sql}) + _, err := s.dbConn.executeSQL(tctx, []string{sql}) return terror.WithScope(err, terror.ScopeDownstream) } -func (s *OnlineDDLStorage) createTable() error { +func (s *OnlineDDLStorage) createTable(tctx *tcontext.Context) error { tableName := fmt.Sprintf("`%s`.`%s`", s.schema, s.table) sql := fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s ( id VARCHAR(32) NOT NULL, @@ -292,7 +292,7 @@ func (s *OnlineDDLStorage) createTable() error { update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, UNIQUE KEY uk_id_schema_table (id, ghost_schema, ghost_table) )`, tableName) - _, err := s.dbConn.executeSQL(s.tctx, []string{sql}) + _, err := s.dbConn.executeSQL(tctx, []string{sql}) return terror.WithScope(err, terror.ScopeDownstream) } diff --git a/syncer/pt_osc.go b/syncer/pt_osc.go index ba681aa1fc..2843a7e680 100644 --- a/syncer/pt_osc.go +++ b/syncer/pt_osc.go @@ -35,18 +35,16 @@ type PT struct { // NewPT returns pt online schema changes plugin func NewPT(tctx *tcontext.Context, cfg *config.SubTaskConfig) (OnlinePlugin, error) { - newtctx := tctx.WithLogger(tctx.L().WithFields(zap.String("online ddl", "pt-ost"))) - g := &PT{ - storge: NewOnlineDDLStorage(newtctx, cfg), + storge: NewOnlineDDLStorage(tcontext.Background().WithLogger(tctx.L().WithFields(zap.String("online ddl", "pt-ost"))), cfg), } - return g, g.storge.Init() + return g, g.storge.Init(tctx) } // Apply implements interface. // returns ddls, real schema, real table, error -func (p *PT) Apply(tables []*filter.Table, statement string, stmt ast.StmtNode) ([]string, string, string, error) { +func (p *PT) Apply(tctx *tcontext.Context, tables []*filter.Table, statement string, stmt ast.StmtNode) ([]string, string, string, error) { if len(tables) < 1 { return nil, "", "", terror.ErrSyncerUnitPTApplyEmptyTable.Generate() } @@ -88,12 +86,12 @@ func (p *PT) Apply(tables []*filter.Table, statement string, stmt ast.StmtNode) // record ghost table ddl changes switch stmt.(type) { case *ast.CreateTableStmt: - err := p.storge.Delete(schema, table) + err := p.storge.Delete(tctx, schema, table) if err != nil { return nil, "", "", err } case *ast.DropTableStmt: - err := p.storge.Delete(schema, table) + err := p.storge.Delete(tctx, schema, table) if err != nil { return nil, "", "", err } @@ -114,13 +112,13 @@ func (p *PT) Apply(tables []*filter.Table, statement string, stmt ast.StmtNode) } // rename ghost table to trash table - err := p.storge.Delete(schema, table) + err := p.storge.Delete(tctx, schema, table) if err != nil { return nil, "", "", err } default: - err := p.storge.Save(schema, table, targetSchema, targetTable, statement) + err := p.storge.Save(tctx, schema, table, targetSchema, targetTable, statement) if err != nil { return nil, "", "", err } @@ -131,12 +129,12 @@ func (p *PT) Apply(tables []*filter.Table, statement string, stmt ast.StmtNode) } // Finish implements interface -func (p *PT) Finish(schema, table string) error { +func (p *PT) Finish(tcxt *tcontext.Context, schema, table string) error { if p == nil { return nil } - return p.storge.Delete(schema, table) + return p.storge.Delete(tcxt, schema, table) } // TableType implements interface @@ -167,8 +165,8 @@ func (p *PT) RealName(schema, table string) (string, string) { } // Clear clears online ddl information -func (p *PT) Clear() error { - return p.storge.Clear() +func (p *PT) Clear(tctx *tcontext.Context) error { + return p.storge.Clear(tctx) } // Close implements interface @@ -177,6 +175,6 @@ func (p *PT) Close() { } // ResetConn implements interface -func (p *PT) ResetConn() error { - return p.storge.ResetConn() +func (p *PT) ResetConn(tctx *tcontext.Context) error { + return p.storge.ResetConn(tctx) } diff --git a/syncer/syncer.go b/syncer/syncer.go index 8c2963eade..06cce55ded 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -68,6 +68,8 @@ var ( maxDMLConnectionTimeout = "5m" maxDDLConnectionTimeout = fmt.Sprintf("%dm", MaxDDLConnectionTimeoutMinute) + maxDMLConnectionDuration, _ = time.ParseDuration(maxDMLConnectionTimeout) + adminQueueName = "admin queue" defaultBucketCount = 8 ) @@ -298,7 +300,7 @@ func (s *Syncer) Type() pb.UnitType { // Init initializes syncer for a sync task, but not start Process. // if fail, it should not call s.Close. // some check may move to checker later. -func (s *Syncer) Init() (err error) { +func (s *Syncer) Init(ctx context.Context) (err error) { rollbackHolder := fr.NewRollbackHolder("syncer") defer func() { if err != nil { @@ -306,6 +308,8 @@ func (s *Syncer) Init() (err error) { } }() + tctx := s.tctx.WithContext(ctx) + err = s.createDBs() if err != nil { return err @@ -334,7 +338,7 @@ func (s *Syncer) Init() (err error) { if !ok { return terror.ErrSyncerUnitOnlineDDLSchemeNotSupport.Generate(s.cfg.OnlineDDLScheme) } - s.onlineDDL, err = fn(s.tctx, s.cfg) + s.onlineDDL, err = fn(tctx, s.cfg) if err != nil { return err } @@ -359,20 +363,20 @@ func (s *Syncer) Init() (err error) { rollbackHolder.Add(fr.FuncRollback{Name: "close-sharding-group-keeper", Fn: s.sgk.Close}) } - err = s.checkpoint.Init() + err = s.checkpoint.Init(tctx) if err != nil { return err } rollbackHolder.Add(fr.FuncRollback{Name: "close-checkpoint", Fn: s.checkpoint.Close}) if s.cfg.RemoveMeta { - err = s.checkpoint.Clear() + err = s.checkpoint.Clear(tctx) if err != nil { return terror.Annotate(err, "clear checkpoint in syncer") } if s.onlineDDL != nil { - err = s.onlineDDL.Clear() + err = s.onlineDDL.Clear(tctx) if err != nil { return terror.Annotate(err, "clear online ddl in syncer") } @@ -380,7 +384,7 @@ func (s *Syncer) Init() (err error) { s.tctx.L().Info("all previous meta cleared") } - err = s.checkpoint.Load() + err = s.checkpoint.Load(tctx) if err != nil { return err } @@ -464,7 +468,7 @@ func (s *Syncer) initShardingGroups() error { } // IsFreshTask implements Unit.IsFreshTask -func (s *Syncer) IsFreshTask() (bool, error) { +func (s *Syncer) IsFreshTask(ctx context.Context) (bool, error) { globalPoint := s.checkpoint.GlobalPoint() return globalPoint.Compare(minCheckpoint) <= 0, nil } @@ -510,36 +514,36 @@ func (s *Syncer) reset() { } } -func (s *Syncer) resetDBs() error { +func (s *Syncer) resetDBs(tctx *tcontext.Context) error { var err error for i := 0; i < len(s.toDBConns); i++ { - err = s.toDBConns[i].resetConn(s.tctx) + err = s.toDBConns[i].resetConn(tctx) if err != nil { return terror.WithScope(err, terror.ScopeDownstream) } } if s.onlineDDL != nil { - err = s.onlineDDL.ResetConn() + err = s.onlineDDL.ResetConn(tctx) if err != nil { return terror.WithScope(err, terror.ScopeDownstream) } } if s.sgk != nil { - err = s.sgk.dbConn.resetConn(s.tctx) + err = s.sgk.dbConn.resetConn(tctx) if err != nil { return terror.WithScope(err, terror.ScopeDownstream) } } - err = s.ddlDBConn.resetConn(s.tctx) + err = s.ddlDBConn.resetConn(tctx) if err != nil { return terror.WithScope(err, terror.ScopeDownstream) } - err = s.checkpoint.ResetConn() + err = s.checkpoint.ResetConn(tctx) if err != nil { return terror.WithScope(err, terror.ScopeDownstream) } @@ -856,7 +860,10 @@ func (s *Syncer) flushCheckPoints() error { shardMetaSQLs, shardMetaArgs = s.sgk.PrepareFlushSQLs(exceptTableIDs) s.tctx.L().Info("prepare flush sqls", zap.Strings("shard meta sqls", shardMetaSQLs), zap.Reflect("shard meta arguments", shardMetaArgs)) } - err := s.checkpoint.FlushPointsExcept(exceptTables, shardMetaSQLs, shardMetaArgs) + + tctx, cancel := s.tctx.WithContext(context.Background()).WithTimeout(maxDMLConnectionDuration) + defer cancel() + err := s.checkpoint.FlushPointsExcept(tctx, exceptTables, shardMetaSQLs, shardMetaArgs) if err != nil { return terror.Annotatef(err, "flush checkpoint %s", s.checkpoint) } @@ -871,7 +878,7 @@ func (s *Syncer) flushCheckPoints() error { } // DDL synced one by one, so we only need to process one DDL at a time -func (s *Syncer) syncDDL(ctx *tcontext.Context, queueBucket string, db *DBConn, ddlJobChan chan *job) { +func (s *Syncer) syncDDL(tctx *tcontext.Context, queueBucket string, db *DBConn, ddlJobChan chan *job) { defer s.wg.Done() var err error @@ -882,12 +889,12 @@ func (s *Syncer) syncDDL(ctx *tcontext.Context, queueBucket string, db *DBConn, } if sqlJob.ddlExecItem != nil && sqlJob.ddlExecItem.req != nil && !sqlJob.ddlExecItem.req.Exec { - s.tctx.L().Info("ignore sharding DDLs", zap.Strings("ddls", sqlJob.ddls)) + tctx.L().Info("ignore sharding DDLs", zap.Strings("ddls", sqlJob.ddls)) } else { var affected int - affected, err = db.executeSQLWithIgnore(s.tctx, ignoreDDLError, sqlJob.ddls) + affected, err = db.executeSQLWithIgnore(tctx, ignoreDDLError, sqlJob.ddls) if err != nil { - err = s.handleSpecialDDLError(s.tctx, err, sqlJob.ddls, affected, db) + err = s.handleSpecialDDLError(tctx, err, sqlJob.ddls, affected, db) err = terror.WithScope(err, terror.ScopeDownstream) } @@ -899,7 +906,7 @@ func (s *Syncer) syncDDL(ctx *tcontext.Context, queueBucket string, db *DBConn, } _, traceErr := s.tracer.CollectSyncerJobEvent(sqlJob.traceID, sqlJob.traceGID, int32(sqlJob.tp), sqlJob.pos, sqlJob.currentPos, queueBucket, sqlJob.sql, sqlJob.ddls, nil, execDDLReq, syncerJobState) if traceErr != nil { - s.tctx.L().Error("fail to collect binlog replication job event", log.ShortError(traceErr)) + tctx.L().Error("fail to collect binlog replication job event", log.ShortError(traceErr)) } } } @@ -930,7 +937,7 @@ func (s *Syncer) syncDDL(ctx *tcontext.Context, queueBucket string, db *DBConn, } } -func (s *Syncer) sync(ctx *tcontext.Context, queueBucket string, db *DBConn, jobChan chan *job) { +func (s *Syncer) sync(tctx *tcontext.Context, queueBucket string, db *DBConn, jobChan chan *job) { defer s.wg.Done() idx := 0 @@ -969,7 +976,7 @@ func (s *Syncer) sync(ctx *tcontext.Context, queueBucket string, db *DBConn, job queries = append(queries, j.sql) args = append(args, j.args) } - affected, err := db.executeSQL(s.tctx, queries, args...) + affected, err := db.executeSQL(tctx, queries, args...) if err != nil { errCtx := &ExecErrorContext{err, jobs[affected].currentPos, fmt.Sprintf("%v", jobs)} s.appendExecErrors(errCtx) @@ -979,7 +986,7 @@ func (s *Syncer) sync(ctx *tcontext.Context, queueBucket string, db *DBConn, job for _, job := range jobs { _, err2 := s.tracer.CollectSyncerJobEvent(job.traceID, job.traceGID, int32(job.tp), job.pos, job.currentPos, queueBucket, job.sql, job.ddls, nil, nil, syncerJobState) if err2 != nil { - s.tctx.L().Error("fail to collect binlog replication job event", log.ShortError(err2)) + tctx.L().Error("fail to collect binlog replication job event", log.ShortError(err2)) } } } @@ -1035,6 +1042,8 @@ func (s *Syncer) redirectStreamer(pos mysql.Position) error { // Run starts running for sync, we should guarantee it can rerun when paused. func (s *Syncer) Run(ctx context.Context) (err error) { + tctx := s.tctx.WithContext(ctx) + defer func() { if s.done != nil { close(s.done) @@ -1046,7 +1055,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) { return err } - fresh, err := s.IsFreshTask() + fresh, err := s.IsFreshTask(ctx) if err != nil { return err } else if fresh { @@ -1127,7 +1136,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) { // if we start syncer at an early position, database must bear a period of inconsistent state, // it's eventual consistency. safeMode := sm.NewSafeMode() - s.enableSafeModeInitializationPhase(s.tctx.WithContext(ctx), safeMode) + s.enableSafeModeInitializationPhase(tctx, safeMode) // syncing progress with sharding DDL group // 1. use the global streamer to sync regular binlog events @@ -1270,6 +1279,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) { s.tctx.L().Debug("receive binlog event", zap.Reflect("header", e.Header)) ec := eventContext{ + tctx: tctx, header: e.Header, currentPos: ¤tPos, lastPos: &lastPos, @@ -1331,6 +1341,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) { } type eventContext struct { + tctx *tcontext.Context header *replication.EventHeader currentPos *mysql.Position lastPos *mysql.Position @@ -1596,7 +1607,7 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e // for DDL, we don't apply operator until we try to execute it. // so can handle sharding cases - sqls, onlineDDLTableNames, err = s.resolveDDLSQL(ec.parser2, parseResult.stmt, usedSchema) + sqls, onlineDDLTableNames, err = s.resolveDDLSQL(ec.tctx, ec.parser2, parseResult.stmt, usedSchema) if err != nil { s.tctx.L().Error("fail to resolve statement", zap.String("event", "query"), zap.String("statement", sql), zap.String("schema", usedSchema), zap.Stringer("last position", ec.lastPos), log.WrapStringerField("position", ec.currentPos), log.WrapStringerField("gtid set", ev.GSet), log.ShortError(err)) return err @@ -1647,7 +1658,7 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e if s.cfg.IsSharding { switch stmt.(type) { case *ast.DropDatabaseStmt: - err = s.dropSchemaInSharding(tableNames[0][0].Schema) + err = s.dropSchemaInSharding(ec.tctx, tableNames[0][0].Schema) if err != nil { return err } @@ -1658,7 +1669,7 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e if err != nil { return err } - err = s.checkpoint.DeleteTablePoint(tableNames[0][0].Schema, tableNames[0][0].Name) + err = s.checkpoint.DeleteTablePoint(ec.tctx, tableNames[0][0].Schema, tableNames[0][0].Name) if err != nil { return err } @@ -1726,7 +1737,7 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e for _, table := range onlineDDLTableNames { s.tctx.L().Info("finish online ddl and clear online ddl metadata in normal mode", zap.String("event", "query"), zap.Strings("ddls", needHandleDDLs), zap.ByteString("raw statement", ev.Query), zap.String("schema", table.Schema), zap.String("table", table.Name)) - err = s.onlineDDL.Finish(table.Schema, table.Name) + err = s.onlineDDL.Finish(ec.tctx, table.Schema, table.Name) if err != nil { return terror.Annotatef(err, "finish online ddl on %s.%s", table.Schema, table.Name) } @@ -1899,7 +1910,7 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e } if len(onlineDDLTableNames) > 0 { - err = s.clearOnlineDDL(ddlInfo.tableNames[1][0].Schema, ddlInfo.tableNames[1][0].Name) + err = s.clearOnlineDDL(ec.tctx, ddlInfo.tableNames[1][0].Schema, ddlInfo.tableNames[1][0].Name) if err != nil { return err } @@ -2282,7 +2293,7 @@ func (s *Syncer) Resume(ctx context.Context, pr chan pb.ProcessResult) { // continue the processing s.reset() // reset database conns - err := s.resetDBs() + err := s.resetDBs(s.tctx.WithContext(ctx)) if err != nil { pr <- pb.ProcessResult{ IsCanceled: false, diff --git a/syncer/syncer_test.go b/syncer/syncer_test.go index ce7c58d779..ce3ae57a2a 100644 --- a/syncer/syncer_test.go +++ b/syncer/syncer_test.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/dm/dm/pb" "github.com/pingcap/dm/pkg/binlog/event" "github.com/pingcap/dm/pkg/conn" + tcontext "github.com/pingcap/dm/pkg/context" "github.com/pingcap/dm/pkg/gtid" "github.com/pingcap/dm/pkg/log" parserpkg "github.com/pingcap/dm/pkg/parser" @@ -389,7 +390,7 @@ func (s *testSyncerSuite) TestSelectTable(c *C) { if !result.isDDL { continue // BEGIN event } - querys, _, err := syncer.resolveDDLSQL(p, result.stmt, string(ev.Schema)) + querys, _, err := syncer.resolveDDLSQL(tcontext.Background(), p, result.stmt, string(ev.Schema)) c.Assert(err, IsNil) if len(querys) == 0 { continue @@ -555,7 +556,7 @@ func (s *testSyncerSuite) TestIgnoreTable(c *C) { continue // BEGIN event } - querys, _, err := syncer.resolveDDLSQL(p, result.stmt, string(ev.Schema)) + querys, _, err := syncer.resolveDDLSQL(tcontext.Background(), p, result.stmt, string(ev.Schema)) c.Assert(err, IsNil) if len(querys) == 0 { continue @@ -1248,7 +1249,7 @@ func (s *testSyncerSuite) TestSharding(c *C) { // mock syncer.checkpoint.Init() function syncer.checkpoint.(*RemoteCheckPoint).dbConn = &DBConn{cfg: s.cfg, baseConn: conn.NewBaseConn(checkPointDBConn, &retry.FiniteRetryStrategy{})} - syncer.checkpoint.(*RemoteCheckPoint).prepare() + syncer.checkpoint.(*RemoteCheckPoint).prepare(tcontext.Background()) syncer.reset() events := append(createEvents, s.generateEvents(_case.testEvents, c)...) @@ -1415,7 +1416,7 @@ func (s *testSyncerSuite) TestRun(c *C) { // mock syncer.checkpoint.Init() function syncer.checkpoint.(*RemoteCheckPoint).dbConn = &DBConn{cfg: s.cfg, baseConn: conn.NewBaseConn(checkPointDBConn, &retry.FiniteRetryStrategy{})} - syncer.checkpoint.(*RemoteCheckPoint).prepare() + syncer.checkpoint.(*RemoteCheckPoint).prepare(tcontext.Background()) syncer.reset() events1 := mockBinlogEvents{ diff --git a/tests/others_integration.txt b/tests/others_integration.txt index 7af76a1b83..a87a73b533 100644 --- a/tests/others_integration.txt +++ b/tests/others_integration.txt @@ -1,2 +1,3 @@ full_mode +retry_cancel start_task diff --git a/tests/retry_cancel/conf/diff_config.toml b/tests/retry_cancel/conf/diff_config.toml new file mode 100644 index 0000000000..87fb86fc83 --- /dev/null +++ b/tests/retry_cancel/conf/diff_config.toml @@ -0,0 +1,58 @@ +# diff Configuration. + +log-level = "info" + +chunk-size = 1000 + +check-thread-count = 4 + +sample-percent = 100 + +use-rowid = false + +use-checksum = true + +fix-sql-file = "fix.sql" + +# tables need to check. +[[check-tables]] +schema = "retry_cancel" +tables = ["~t.*"] + +[[table-config]] +schema = "retry_cancel" +table = "t1" + +[[table-config.source-tables]] +instance-id = "source-1" +schema = "retry_cancel" +table = "t1" + +[[table-config]] +schema = "retry_cancel" +table = "t2" + +[[table-config.source-tables]] +instance-id = "source-2" +schema = "retry_cancel" +table = "t2" + +[[source-db]] +host = "127.0.0.1" +port = 3306 +user = "root" +password = "" +instance-id = "source-1" + +[[source-db]] +host = "127.0.0.1" +port = 3307 +user = "root" +password = "" +instance-id = "source-2" + +[target-db] +host = "127.0.0.1" +port = 4000 +user = "root" +password = "" diff --git a/tests/retry_cancel/conf/dm-master.toml b/tests/retry_cancel/conf/dm-master.toml new file mode 100644 index 0000000000..334e0de993 --- /dev/null +++ b/tests/retry_cancel/conf/dm-master.toml @@ -0,0 +1,9 @@ +# Master Configuration. + +[[deploy]] +source-id = "mysql-replica-01" +dm-worker = "127.0.0.1:8262" + +[[deploy]] +source-id = "mysql-replica-02" +dm-worker = "127.0.0.1:8263" diff --git a/tests/retry_cancel/conf/dm-task.yaml b/tests/retry_cancel/conf/dm-task.yaml new file mode 100644 index 0000000000..525cc0c25d --- /dev/null +++ b/tests/retry_cancel/conf/dm-task.yaml @@ -0,0 +1,22 @@ +--- +name: test +task-mode: all +enable-heartbeat: true +timezone: "Asia/Shanghai" + +target-database: + host: "127.0.0.1" + port: 4000 + user: "root" + password: "" + +mysql-instances: + - source-id: "mysql-replica-01" + black-white-list: "instance" + + - source-id: "mysql-replica-02" + black-white-list: "instance" + +black-white-list: + instance: + do-dbs: ["retry_cancel"] diff --git a/tests/retry_cancel/conf/dm-worker1.toml b/tests/retry_cancel/conf/dm-worker1.toml new file mode 100644 index 0000000000..d9a39de33e --- /dev/null +++ b/tests/retry_cancel/conf/dm-worker1.toml @@ -0,0 +1,9 @@ +# Worker Configuration. + +source-id = "mysql-replica-01" + +[from] +host = "127.0.0.1" +user = "root" +password = "" +port = 3306 diff --git a/tests/retry_cancel/conf/dm-worker2.toml b/tests/retry_cancel/conf/dm-worker2.toml new file mode 100644 index 0000000000..ae6bc163f4 --- /dev/null +++ b/tests/retry_cancel/conf/dm-worker2.toml @@ -0,0 +1,9 @@ +# Worker Configuration. + +source-id = "mysql-replica-02" + +[from] +host = "127.0.0.1" +user = "root" +password = "" +port = 3307 diff --git a/tests/retry_cancel/data/db1.increment.sql b/tests/retry_cancel/data/db1.increment.sql new file mode 100644 index 0000000000..a2b46781fa --- /dev/null +++ b/tests/retry_cancel/data/db1.increment.sql @@ -0,0 +1,3 @@ +USE `retry_cancel`; +INSERT INTO t1 (id, uid) VALUES (5, 105), (6, 106); +INSERT INTO t1 (id, uid) VALUES (7, 107), (8, 108); diff --git a/tests/retry_cancel/data/db1.prepare.sql b/tests/retry_cancel/data/db1.prepare.sql new file mode 100644 index 0000000000..34090eaf6d --- /dev/null +++ b/tests/retry_cancel/data/db1.prepare.sql @@ -0,0 +1,6 @@ +DROP DATABASE IF EXISTS `retry_cancel`; +CREATE DATABASE `retry_cancel`; +USE `retry_cancel`; +CREATE TABLE t1 (id BIGINT PRIMARY KEY, uid BIGINT) DEFAULT CHARSET=utf8mb4; +INSERT INTO t1 (id, uid) VALUES (1, 101), (2, 102); +INSERT INTO t1 (id, uid) VALUES (3, 103), (4, 104); diff --git a/tests/retry_cancel/data/db2.increment.sql b/tests/retry_cancel/data/db2.increment.sql new file mode 100644 index 0000000000..2ea1cb831b --- /dev/null +++ b/tests/retry_cancel/data/db2.increment.sql @@ -0,0 +1,3 @@ +USE `retry_cancel`; +INSERT INTO t2 (id, uid) VALUES (5, 105), (6, 106); +INSERT INTO t2 (id, uid) VALUES (7, 107), (8, 108); diff --git a/tests/retry_cancel/data/db2.prepare.sql b/tests/retry_cancel/data/db2.prepare.sql new file mode 100644 index 0000000000..b348b1408e --- /dev/null +++ b/tests/retry_cancel/data/db2.prepare.sql @@ -0,0 +1,6 @@ +DROP DATABASE IF EXISTS `retry_cancel`; +CREATE DATABASE `retry_cancel`; +USE `retry_cancel`; +CREATE TABLE t2 (id BIGINT PRIMARY KEY, uid BIGINT) DEFAULT CHARSET=utf8mb4; +INSERT INTO t2 (id, uid) VALUES (1, 101), (2, 102); +INSERT INTO t2 (id, uid) VALUES (3, 103), (4, 104); diff --git a/tests/retry_cancel/run.sh b/tests/retry_cancel/run.sh new file mode 100755 index 0000000000..94b83333db --- /dev/null +++ b/tests/retry_cancel/run.sh @@ -0,0 +1,153 @@ +#!/bin/bash + +set -eu + +cur=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd ) +source $cur/../_utils/test_prepare +WORK_DIR=$TEST_DIR/$TEST_NAME + +function run() { + run_sql_file $cur/data/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 + check_contains 'Query OK, 2 rows affected' + run_sql_file $cur/data/db2.prepare.sql $MYSQL_HOST2 $MYSQL_PORT2 + check_contains 'Query OK, 2 rows affected' + + # inject error for loading data + export GO_FAILPOINTS='github.com/pingcap/dm/pkg/conn/retryableError=return("retry_cancel")' + + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT + run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml + check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT + + # start-task with retry_cancel enabled + dmctl_start_task + + sleep 5 # should sleep > retryTimeout (now 3s) + + # query-task, it should still be running (retrying) + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status test" \ + "\"stage\": \"Running\"" 4 + + # check log, retrying in load unit + check_log_contains $WORK_DIR/worker1/log/dm-worker.log '\["execute statements"\] \[task=test\] \[unit=load\] \[retry=0\] \[queries="\[CREATE DATABASE `retry_cancel`;\]"\]' + + # stop-task, should not block too much time + start_time=$(date +%s) + dmctl_stop_task test + duration=$(( $(date +%s)-$start_time )) + if [[ $duration -gt 3 ]]; then + echo "stop-task tasks for full import too long duration $duration" + exit 1 + fi + + # stop DM-worker, then update failpoint for checkpoint + kill_dm_worker + export GO_FAILPOINTS='github.com/pingcap/dm/pkg/conn/retryableError=return("UPDATE `dm_meta`.`test_loader_checkpoint`")' + + # start DM-worker again + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT + sleep 2 # wait gRPC from DM-master to DM-worker established again + + dmctl_start_task + + sleep 5 # should sleep > retryTimeout (now 3s) + + # query-task, it should still be running (retrying) + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status test" \ + "\"stage\": \"Running\"" 4 + + # check log, retrying in load unit + check_log_contains $WORK_DIR/worker1/log/dm-worker.log 'Error 1213: failpoint inject retryable error for UPDATE `dm_meta`.`test_loader_checkpoint`' + + # stop-task, should not block too much time + start_time=$(date +%s) + dmctl_stop_task test + duration=$(( $(date +%s)-$start_time )) + if [[ $duration -gt 3 ]]; then + echo "stop-task tasks for updating loader checkpoint too long duration $duration" + exit 1 + fi + + # stop DM-worker, then disable failponits + kill_dm_worker + export GO_FAILPOINTS='' + + # start DM-worker again + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT + sleep 2 # wait gRPC from DM-master to DM-worker established again + + # start-task with retry_cancel disabled + dmctl_start_task + + # use sync_diff_inspector to check full dump loader + check_sync_diff $WORK_DIR $cur/conf/diff_config.toml + + # ---------- test for incremental replication ---------- + # stop DM-worker, then enable failponits + kill_dm_worker + export GO_FAILPOINTS="github.com/pingcap/dm/pkg/conn/retryableError=return(\"retry_cancel\")" + + # run sql files to trigger incremental replication + run_sql_file $cur/data/db1.increment.sql $MYSQL_HOST1 $MYSQL_PORT1 + run_sql_file $cur/data/db2.increment.sql $MYSQL_HOST2 $MYSQL_PORT2 + + # start DM-worker again (with auto restart task) + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT + sleep 5 # should sleep > retryTimeout (now 3s) + + # query-task, it should still be running (retrying) + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status test" \ + "\"stage\": \"Running\"" 4 + + # check log, retrying in binlog replication unit + check_log_contains $WORK_DIR/worker1/log/dm-worker.log '\["execute statements"\] \[task=test\] \[unit="binlog replication"\] \[retry=0\] \[queries="\[REPLACE INTO `retry_cancel`' + + # stop-task, should not block too much time + start_time=$(date +%s) + dmctl_stop_task test + duration=$(( $(date +%s)-$start_time )) + if [[ $duration -gt 3 ]]; then + echo "stop-task tasks for incremental replication too long duration $duration" + exit 1 + fi + + # stop DM-worker, then disable failponits + kill_dm_worker + export GO_FAILPOINTS='' + + # start DM-worker again + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT + sleep 2 # wait gRPC from DM-master to DM-worker established again + + # start-task with retry_cancel disabled + dmctl_start_task + + # use sync_diff_inspector to check data now! + check_sync_diff $WORK_DIR $cur/conf/diff_config.toml +} + +cleanup_data retry_cancel +# also cleanup dm processes in case of last run failed +cleanup_process $* +run $* +cleanup_process $* + +echo "[$(date)] <<<<<< test case $TEST_NAME success! >>>>>>"