Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

checker: increase total timeout to 30m; set readTimeout for DB check operation to 30s #315

Merged
merged 9 commits into from
Oct 21, 2019
40 changes: 24 additions & 16 deletions checker/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/dm/dm/config"
"github.com/pingcap/dm/dm/pb"
"github.com/pingcap/dm/dm/unit"
"github.com/pingcap/dm/pkg/conn"
fr "github.com/pingcap/dm/pkg/func-rollback"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/terror"
Expand All @@ -45,10 +46,10 @@ import (
type mysqlInstance struct {
cfg *config.SubTaskConfig

sourceDB *sql.DB
sourceDB *conn.BaseDB
sourceDBinfo *dbutil.DBConfig

targetDB *sql.DB
targetDB *conn.BaseDB
targetDBInfo *dbutil.DBConfig
}

Expand Down Expand Up @@ -125,7 +126,9 @@ func (c *Checker) Init() (err error) {
User: instance.cfg.From.User,
Password: instance.cfg.From.Password,
}
instance.sourceDB, err = dbutil.OpenDB(*instance.sourceDBinfo)
dbCfg := instance.cfg.From
dbCfg.RawDBCfg = config.DefaultRawDBConfig().SetReadTimeout("30s")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about define a const value for 30s and 30*time.Minute?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instance is a pointer, we need make sure it will not do influence to others if we SetReadTimeout here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instance only used for checking and I want all operation in checking affected by this readTime.

BTW, instance.cfg.From is a value.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about define a const value for 30s and 30*time.Minute?

addressed in c72263d.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

30s not define as a value

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 done in 1bc7a3f.

instance.sourceDB, err = conn.DefaultDBProvider.Apply(dbCfg)
if err != nil {
return terror.WithScope(terror.ErrTaskCheckFailedOpenDB.Delegate(err, instance.cfg.From.User, instance.cfg.From.Host, instance.cfg.From.Port), terror.ScopeUpstream)
}
Expand All @@ -136,35 +139,37 @@ func (c *Checker) Init() (err error) {
User: instance.cfg.To.User,
Password: instance.cfg.To.Password,
}
instance.targetDB, err = dbutil.OpenDB(*instance.targetDBInfo)
dbCfg = instance.cfg.To
dbCfg.RawDBCfg = config.DefaultRawDBConfig().SetReadTimeout("30s")
instance.targetDB, err = conn.DefaultDBProvider.Apply(dbCfg)
if err != nil {
return terror.WithScope(terror.ErrTaskCheckFailedOpenDB.Delegate(err, instance.cfg.To.User, instance.cfg.To.Host, instance.cfg.To.Port), terror.ScopeDownstream)
}

if _, ok := c.checkingItems[config.VersionChecking]; ok {
c.checkList = append(c.checkList, check.NewMySQLVersionChecker(instance.sourceDB, instance.sourceDBinfo))
c.checkList = append(c.checkList, check.NewMySQLVersionChecker(instance.sourceDB.DB, instance.sourceDBinfo))
}
if _, ok := c.checkingItems[config.BinlogEnableChecking]; ok {
c.checkList = append(c.checkList, check.NewMySQLBinlogEnableChecker(instance.sourceDB, instance.sourceDBinfo))
c.checkList = append(c.checkList, check.NewMySQLBinlogEnableChecker(instance.sourceDB.DB, instance.sourceDBinfo))
}
if _, ok := c.checkingItems[config.BinlogFormatChecking]; ok {
c.checkList = append(c.checkList, check.NewMySQLBinlogFormatChecker(instance.sourceDB, instance.sourceDBinfo))
c.checkList = append(c.checkList, check.NewMySQLBinlogFormatChecker(instance.sourceDB.DB, instance.sourceDBinfo))
}
if _, ok := c.checkingItems[config.BinlogRowImageChecking]; ok {
c.checkList = append(c.checkList, check.NewMySQLBinlogRowImageChecker(instance.sourceDB, instance.sourceDBinfo))
c.checkList = append(c.checkList, check.NewMySQLBinlogRowImageChecker(instance.sourceDB.DB, instance.sourceDBinfo))
}
if _, ok := c.checkingItems[config.DumpPrivilegeChecking]; ok {
c.checkList = append(c.checkList, check.NewSourceDumpPrivilegeChecker(instance.sourceDB, instance.sourceDBinfo))
c.checkList = append(c.checkList, check.NewSourceDumpPrivilegeChecker(instance.sourceDB.DB, instance.sourceDBinfo))
}
if _, ok := c.checkingItems[config.ReplicationPrivilegeChecking]; ok {
c.checkList = append(c.checkList, check.NewSourceReplicationPrivilegeChecker(instance.sourceDB, instance.sourceDBinfo))
c.checkList = append(c.checkList, check.NewSourceReplicationPrivilegeChecker(instance.sourceDB.DB, instance.sourceDBinfo))
}

if !checkingShard && !checkSchema {
continue
}

mapping, err := utils.FetchTargetDoTables(instance.sourceDB, bw, r)
mapping, err := utils.FetchTargetDoTables(instance.sourceDB.DB, bw, r)
if err != nil {
return err
}
Expand Down Expand Up @@ -192,10 +197,10 @@ func (c *Checker) Init() (err error) {
shardingCounter[name]++
}
}
dbs[instance.cfg.SourceID] = instance.sourceDB
dbs[instance.cfg.SourceID] = instance.sourceDB.DB

if checkSchema {
c.checkList = append(c.checkList, check.NewTablesChecker(instance.sourceDB, instance.sourceDBinfo, checkTables))
c.checkList = append(c.checkList, check.NewTablesChecker(instance.sourceDB.DB, instance.sourceDBinfo, checkTables))
}
}

Expand Down Expand Up @@ -229,7 +234,10 @@ func (c *Checker) displayCheckingItems() string {

// Process implements Unit interface
func (c *Checker) Process(ctx context.Context, pr chan pb.ProcessResult) {
cctx, cancel := context.WithTimeout(ctx, time.Minute)
// the total time needed to complete the check depends on the number of instances, databases and tables,
// now increase the total timeout to 30min, but set `readTimeout` to 30s for source/target DB.
// if we can not complete the check in 30min, then we must need to refactor the implementation of the function.
cctx, cancel := context.WithTimeout(ctx, 30*time.Minute)
defer cancel()

isCanceled := false
Expand Down Expand Up @@ -296,14 +304,14 @@ func (c *Checker) Close() {
func (c *Checker) closeDBs() {
for _, instance := range c.instances {
if instance.sourceDB != nil {
if err := dbutil.CloseDB(instance.sourceDB); err != nil {
if err := instance.sourceDB.Close(); err != nil {
c.logger.Error("close source db", zap.Stringer("db", instance.sourceDBinfo), log.ShortError(err))
}
instance.sourceDB = nil
}

if instance.targetDB != nil {
if err := dbutil.CloseDB(instance.targetDB); err != nil {
if err := instance.targetDB.Close(); err != nil {
c.logger.Error("close target db", zap.Stringer("db", instance.targetDBInfo), log.ShortError(err))
}
instance.targetDB = nil
Expand Down