Skip to content

Commit

Permalink
checker: increase total timeout to 30m; set readTimeout for DB check …
Browse files Browse the repository at this point in the history
…operation to 30s (pingcap#315)
  • Loading branch information
csuzhangxc authored Oct 21, 2019
1 parent e512948 commit 938ccd4
Showing 1 changed file with 29 additions and 16 deletions.
45 changes: 29 additions & 16 deletions checker/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/dm/dm/config"
"github.com/pingcap/dm/dm/pb"
"github.com/pingcap/dm/dm/unit"
"github.com/pingcap/dm/pkg/conn"
fr "github.com/pingcap/dm/pkg/func-rollback"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/terror"
Expand All @@ -42,13 +43,21 @@ import (
"go.uber.org/zap"
)

const (
// the total time needed to complete the check depends on the number of instances, databases and tables,
// now increase the total timeout to 30min, but set `readTimeout` to 30s for source/target DB.
// if we can not complete the check in 30min, then we must need to refactor the implementation of the function.
checkTimeout = 30 * time.Minute
readTimeout = "30s"
)

type mysqlInstance struct {
cfg *config.SubTaskConfig

sourceDB *sql.DB
sourceDB *conn.BaseDB
sourceDBinfo *dbutil.DBConfig

targetDB *sql.DB
targetDB *conn.BaseDB
targetDBInfo *dbutil.DBConfig
}

Expand Down Expand Up @@ -125,7 +134,9 @@ func (c *Checker) Init() (err error) {
User: instance.cfg.From.User,
Password: instance.cfg.From.Password,
}
instance.sourceDB, err = dbutil.OpenDB(*instance.sourceDBinfo)
dbCfg := instance.cfg.From
dbCfg.RawDBCfg = config.DefaultRawDBConfig().SetReadTimeout(readTimeout)
instance.sourceDB, err = conn.DefaultDBProvider.Apply(dbCfg)
if err != nil {
return terror.WithScope(terror.ErrTaskCheckFailedOpenDB.Delegate(err, instance.cfg.From.User, instance.cfg.From.Host, instance.cfg.From.Port), terror.ScopeUpstream)
}
Expand All @@ -136,35 +147,37 @@ func (c *Checker) Init() (err error) {
User: instance.cfg.To.User,
Password: instance.cfg.To.Password,
}
instance.targetDB, err = dbutil.OpenDB(*instance.targetDBInfo)
dbCfg = instance.cfg.To
dbCfg.RawDBCfg = config.DefaultRawDBConfig().SetReadTimeout(readTimeout)
instance.targetDB, err = conn.DefaultDBProvider.Apply(dbCfg)
if err != nil {
return terror.WithScope(terror.ErrTaskCheckFailedOpenDB.Delegate(err, instance.cfg.To.User, instance.cfg.To.Host, instance.cfg.To.Port), terror.ScopeDownstream)
}

if _, ok := c.checkingItems[config.VersionChecking]; ok {
c.checkList = append(c.checkList, check.NewMySQLVersionChecker(instance.sourceDB, instance.sourceDBinfo))
c.checkList = append(c.checkList, check.NewMySQLVersionChecker(instance.sourceDB.DB, instance.sourceDBinfo))
}
if _, ok := c.checkingItems[config.BinlogEnableChecking]; ok {
c.checkList = append(c.checkList, check.NewMySQLBinlogEnableChecker(instance.sourceDB, instance.sourceDBinfo))
c.checkList = append(c.checkList, check.NewMySQLBinlogEnableChecker(instance.sourceDB.DB, instance.sourceDBinfo))
}
if _, ok := c.checkingItems[config.BinlogFormatChecking]; ok {
c.checkList = append(c.checkList, check.NewMySQLBinlogFormatChecker(instance.sourceDB, instance.sourceDBinfo))
c.checkList = append(c.checkList, check.NewMySQLBinlogFormatChecker(instance.sourceDB.DB, instance.sourceDBinfo))
}
if _, ok := c.checkingItems[config.BinlogRowImageChecking]; ok {
c.checkList = append(c.checkList, check.NewMySQLBinlogRowImageChecker(instance.sourceDB, instance.sourceDBinfo))
c.checkList = append(c.checkList, check.NewMySQLBinlogRowImageChecker(instance.sourceDB.DB, instance.sourceDBinfo))
}
if _, ok := c.checkingItems[config.DumpPrivilegeChecking]; ok {
c.checkList = append(c.checkList, check.NewSourceDumpPrivilegeChecker(instance.sourceDB, instance.sourceDBinfo))
c.checkList = append(c.checkList, check.NewSourceDumpPrivilegeChecker(instance.sourceDB.DB, instance.sourceDBinfo))
}
if _, ok := c.checkingItems[config.ReplicationPrivilegeChecking]; ok {
c.checkList = append(c.checkList, check.NewSourceReplicationPrivilegeChecker(instance.sourceDB, instance.sourceDBinfo))
c.checkList = append(c.checkList, check.NewSourceReplicationPrivilegeChecker(instance.sourceDB.DB, instance.sourceDBinfo))
}

if !checkingShard && !checkSchema {
continue
}

mapping, err := utils.FetchTargetDoTables(instance.sourceDB, bw, r)
mapping, err := utils.FetchTargetDoTables(instance.sourceDB.DB, bw, r)
if err != nil {
return err
}
Expand Down Expand Up @@ -192,10 +205,10 @@ func (c *Checker) Init() (err error) {
shardingCounter[name]++
}
}
dbs[instance.cfg.SourceID] = instance.sourceDB
dbs[instance.cfg.SourceID] = instance.sourceDB.DB

if checkSchema {
c.checkList = append(c.checkList, check.NewTablesChecker(instance.sourceDB, instance.sourceDBinfo, checkTables))
c.checkList = append(c.checkList, check.NewTablesChecker(instance.sourceDB.DB, instance.sourceDBinfo, checkTables))
}
}

Expand Down Expand Up @@ -229,7 +242,7 @@ func (c *Checker) displayCheckingItems() string {

// Process implements Unit interface
func (c *Checker) Process(ctx context.Context, pr chan pb.ProcessResult) {
cctx, cancel := context.WithTimeout(ctx, time.Minute)
cctx, cancel := context.WithTimeout(ctx, checkTimeout)
defer cancel()

isCanceled := false
Expand Down Expand Up @@ -296,14 +309,14 @@ func (c *Checker) Close() {
func (c *Checker) closeDBs() {
for _, instance := range c.instances {
if instance.sourceDB != nil {
if err := dbutil.CloseDB(instance.sourceDB); err != nil {
if err := instance.sourceDB.Close(); err != nil {
c.logger.Error("close source db", zap.Stringer("db", instance.sourceDBinfo), log.ShortError(err))
}
instance.sourceDB = nil
}

if instance.targetDB != nil {
if err := dbutil.CloseDB(instance.targetDB); err != nil {
if err := instance.targetDB.Close(); err != nil {
c.logger.Error("close target db", zap.Stringer("db", instance.targetDBInfo), log.ShortError(err))
}
instance.targetDB = nil
Expand Down

0 comments on commit 938ccd4

Please sign in to comment.