From 938ccd4b867b79bceeb2f2b582046ad48bfcb600 Mon Sep 17 00:00:00 2001 From: Xuecheng Zhang Date: Mon, 21 Oct 2019 11:37:23 +0800 Subject: [PATCH] checker: increase total timeout to 30m; set readTimeout for DB check operation to 30s (#315) --- checker/checker.go | 45 +++++++++++++++++++++++++++++---------------- 1 file changed, 29 insertions(+), 16 deletions(-) diff --git a/checker/checker.go b/checker/checker.go index aa2877e27f..ef46b06389 100644 --- a/checker/checker.go +++ b/checker/checker.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/dm/dm/config" "github.com/pingcap/dm/dm/pb" "github.com/pingcap/dm/dm/unit" + "github.com/pingcap/dm/pkg/conn" fr "github.com/pingcap/dm/pkg/func-rollback" "github.com/pingcap/dm/pkg/log" "github.com/pingcap/dm/pkg/terror" @@ -42,13 +43,21 @@ import ( "go.uber.org/zap" ) +const ( + // the total time needed to complete the check depends on the number of instances, databases and tables, + // now increase the total timeout to 30min, but set `readTimeout` to 30s for source/target DB. + // if we can not complete the check in 30min, then we must need to refactor the implementation of the function. + checkTimeout = 30 * time.Minute + readTimeout = "30s" +) + type mysqlInstance struct { cfg *config.SubTaskConfig - sourceDB *sql.DB + sourceDB *conn.BaseDB sourceDBinfo *dbutil.DBConfig - targetDB *sql.DB + targetDB *conn.BaseDB targetDBInfo *dbutil.DBConfig } @@ -125,7 +134,9 @@ func (c *Checker) Init() (err error) { User: instance.cfg.From.User, Password: instance.cfg.From.Password, } - instance.sourceDB, err = dbutil.OpenDB(*instance.sourceDBinfo) + dbCfg := instance.cfg.From + dbCfg.RawDBCfg = config.DefaultRawDBConfig().SetReadTimeout(readTimeout) + instance.sourceDB, err = conn.DefaultDBProvider.Apply(dbCfg) if err != nil { return terror.WithScope(terror.ErrTaskCheckFailedOpenDB.Delegate(err, instance.cfg.From.User, instance.cfg.From.Host, instance.cfg.From.Port), terror.ScopeUpstream) } @@ -136,35 +147,37 @@ func (c *Checker) Init() (err error) { User: instance.cfg.To.User, Password: instance.cfg.To.Password, } - instance.targetDB, err = dbutil.OpenDB(*instance.targetDBInfo) + dbCfg = instance.cfg.To + dbCfg.RawDBCfg = config.DefaultRawDBConfig().SetReadTimeout(readTimeout) + instance.targetDB, err = conn.DefaultDBProvider.Apply(dbCfg) if err != nil { return terror.WithScope(terror.ErrTaskCheckFailedOpenDB.Delegate(err, instance.cfg.To.User, instance.cfg.To.Host, instance.cfg.To.Port), terror.ScopeDownstream) } if _, ok := c.checkingItems[config.VersionChecking]; ok { - c.checkList = append(c.checkList, check.NewMySQLVersionChecker(instance.sourceDB, instance.sourceDBinfo)) + c.checkList = append(c.checkList, check.NewMySQLVersionChecker(instance.sourceDB.DB, instance.sourceDBinfo)) } if _, ok := c.checkingItems[config.BinlogEnableChecking]; ok { - c.checkList = append(c.checkList, check.NewMySQLBinlogEnableChecker(instance.sourceDB, instance.sourceDBinfo)) + c.checkList = append(c.checkList, check.NewMySQLBinlogEnableChecker(instance.sourceDB.DB, instance.sourceDBinfo)) } if _, ok := c.checkingItems[config.BinlogFormatChecking]; ok { - c.checkList = append(c.checkList, check.NewMySQLBinlogFormatChecker(instance.sourceDB, instance.sourceDBinfo)) + c.checkList = append(c.checkList, check.NewMySQLBinlogFormatChecker(instance.sourceDB.DB, instance.sourceDBinfo)) } if _, ok := c.checkingItems[config.BinlogRowImageChecking]; ok { - c.checkList = append(c.checkList, check.NewMySQLBinlogRowImageChecker(instance.sourceDB, instance.sourceDBinfo)) + c.checkList = append(c.checkList, check.NewMySQLBinlogRowImageChecker(instance.sourceDB.DB, instance.sourceDBinfo)) } if _, ok := c.checkingItems[config.DumpPrivilegeChecking]; ok { - c.checkList = append(c.checkList, check.NewSourceDumpPrivilegeChecker(instance.sourceDB, instance.sourceDBinfo)) + c.checkList = append(c.checkList, check.NewSourceDumpPrivilegeChecker(instance.sourceDB.DB, instance.sourceDBinfo)) } if _, ok := c.checkingItems[config.ReplicationPrivilegeChecking]; ok { - c.checkList = append(c.checkList, check.NewSourceReplicationPrivilegeChecker(instance.sourceDB, instance.sourceDBinfo)) + c.checkList = append(c.checkList, check.NewSourceReplicationPrivilegeChecker(instance.sourceDB.DB, instance.sourceDBinfo)) } if !checkingShard && !checkSchema { continue } - mapping, err := utils.FetchTargetDoTables(instance.sourceDB, bw, r) + mapping, err := utils.FetchTargetDoTables(instance.sourceDB.DB, bw, r) if err != nil { return err } @@ -192,10 +205,10 @@ func (c *Checker) Init() (err error) { shardingCounter[name]++ } } - dbs[instance.cfg.SourceID] = instance.sourceDB + dbs[instance.cfg.SourceID] = instance.sourceDB.DB if checkSchema { - c.checkList = append(c.checkList, check.NewTablesChecker(instance.sourceDB, instance.sourceDBinfo, checkTables)) + c.checkList = append(c.checkList, check.NewTablesChecker(instance.sourceDB.DB, instance.sourceDBinfo, checkTables)) } } @@ -229,7 +242,7 @@ func (c *Checker) displayCheckingItems() string { // Process implements Unit interface func (c *Checker) Process(ctx context.Context, pr chan pb.ProcessResult) { - cctx, cancel := context.WithTimeout(ctx, time.Minute) + cctx, cancel := context.WithTimeout(ctx, checkTimeout) defer cancel() isCanceled := false @@ -296,14 +309,14 @@ func (c *Checker) Close() { func (c *Checker) closeDBs() { for _, instance := range c.instances { if instance.sourceDB != nil { - if err := dbutil.CloseDB(instance.sourceDB); err != nil { + if err := instance.sourceDB.Close(); err != nil { c.logger.Error("close source db", zap.Stringer("db", instance.sourceDBinfo), log.ShortError(err)) } instance.sourceDB = nil } if instance.targetDB != nil { - if err := dbutil.CloseDB(instance.targetDB); err != nil { + if err := instance.targetDB.Close(); err != nil { c.logger.Error("close target db", zap.Stringer("db", instance.targetDBInfo), log.ShortError(err)) } instance.targetDB = nil