diff --git a/drainer/config.go b/drainer/config.go index 3e9d69223..52c78f224 100644 --- a/drainer/config.go +++ b/drainer/config.go @@ -146,7 +146,7 @@ func (rc RelayConfig) IsEnabled() bool { // Config holds the configuration of drainer type Config struct { - *flag.FlagSet `json:"-"` + *flag.FlagSet `toml:"-" json:"-"` LogLevel string `toml:"log-level" json:"log-level"` NodeID string `toml:"node-id" json:"node-id"` ListenAddr string `toml:"addr" json:"addr"` @@ -515,6 +515,15 @@ func (cfg *Config) adjustConfig() error { } else if len(cfg.SyncerCfg.To.Password) == 0 { cfg.SyncerCfg.To.Password = os.Getenv("MYSQL_PSWD") } + if cfg.SyncerCfg.To.ReadTimeoutStr == "" { + cfg.SyncerCfg.To.ReadTimeout = time.Minute + } else { + var err error + cfg.SyncerCfg.To.ReadTimeout, err = cfg.SyncerCfg.To.ReadTimeoutStr.ParseDuration() + if err != nil { + return err + } + } } if len(cfg.SyncerCfg.To.Checkpoint.EncryptedPassword) > 0 { diff --git a/drainer/config_test.go b/drainer/config_test.go index 62438a038..4729549cb 100644 --- a/drainer/config_test.go +++ b/drainer/config_test.go @@ -71,6 +71,22 @@ func (t *testDrainerSuite) TestConfig(c *C) { var strSQLMode *string c.Assert(cfg.SyncerCfg.StrSQLMode, Equals, strSQLMode) c.Assert(cfg.SyncerCfg.SQLMode, Equals, mysql.SQLMode(0)) + c.Assert(cfg.SyncerCfg.To.ReadTimeout, Equals, time.Minute) + + dstFile := path.Join(c.MkDir(), "drainer.toml") + err = os.WriteFile(dstFile, []byte(`[syncer.to] +read-timeout = "10m" +`), 0644) + c.Assert(err, IsNil) + cfg2 := NewConfig() + args = []string{ + "-data-dir", "data.drainer", + "-dest-db-type", "mysql", + "-config", dstFile, + } + err = cfg2.Parse(args) + c.Assert(err, IsNil) + c.Assert(cfg2.SyncerCfg.To.ReadTimeout, check.Equals, time.Minute*10) } func (t *testDrainerSuite) TestValidateFilter(c *C) { @@ -238,6 +254,7 @@ func (t *testDrainerSuite) TestAdjustConfig(c *C) { c.Assert(err, IsNil) c.Assert(cfg.SyncerCfg.To.Password, check.Equals, "origin") c.Assert(cfg.SyncerCfg.To.Checkpoint.Password, check.Equals, "origin") + c.Assert(cfg.SyncerCfg.To.ReadTimeout, check.Equals, time.Minute) // test false positive cfg.SyncerCfg.To = &dsync.DBConfig{ diff --git a/drainer/relay.go b/drainer/relay.go index 48a2f37f4..a10a8110b 100644 --- a/drainer/relay.go +++ b/drainer/relay.go @@ -43,7 +43,7 @@ func feedByRelayLogIfNeed(cfg *Config) error { return errors.Annotate(err, "failed to create reader") } - db, err := loader.CreateDBWithSQLMode(scfg.To.User, scfg.To.Password, scfg.To.Host, scfg.To.Port, scfg.To.TLS, scfg.StrSQLMode, scfg.To.Params) + db, err := loader.CreateDBWithSQLMode(scfg.To.User, scfg.To.Password, scfg.To.Host, scfg.To.Port, scfg.To.TLS, scfg.StrSQLMode, scfg.To.Params, scfg.To.ReadTimeout) if err != nil { return errors.Annotate(err, "failed to create SQL db") } diff --git a/drainer/sync/mysql.go b/drainer/sync/mysql.go index 41ae28219..a91f6f1f0 100644 --- a/drainer/sync/mysql.go +++ b/drainer/sync/mysql.go @@ -103,7 +103,7 @@ func NewMysqlSyncer( log.Info("enable TLS to connect downstream MySQL/TiDB") } - db, err := createDB(cfg.User, cfg.Password, cfg.Host, cfg.Port, cfg.TLS, sqlMode, cfg.Params) + db, err := createDB(cfg.User, cfg.Password, cfg.Host, cfg.Port, cfg.TLS, sqlMode, cfg.Params, cfg.ReadTimeout) if err != nil { return nil, errors.Trace(err) } @@ -119,7 +119,7 @@ func NewMysqlSyncer( if newMode != oldMode { db.Close() - db, err = createDB(cfg.User, cfg.Password, cfg.Host, cfg.Port, cfg.TLS, &newMode, cfg.Params) + db, err = createDB(cfg.User, cfg.Password, cfg.Host, cfg.Port, cfg.TLS, &newMode, cfg.Params, cfg.ReadTimeout) if err != nil { return nil, errors.Trace(err) } diff --git a/drainer/sync/syncer_test.go b/drainer/sync/syncer_test.go index 8c51dede2..3acd6278c 100644 --- a/drainer/sync/syncer_test.go +++ b/drainer/sync/syncer_test.go @@ -49,6 +49,7 @@ func (s *syncerSuite) SetUpTest(c *check.C) { Port: 3306, KafkaVersion: "0.8.2.0", BinlogFileDir: c.MkDir(), + ReadTimeout: time.Minute, } // create pb syncer @@ -59,7 +60,7 @@ func (s *syncerSuite) SetUpTest(c *check.C) { // create mysql syncer oldCreateDB := createDB - createDB = func(string, string, string, int, *tls.Config, *string, map[string]string) (db *sql.DB, err error) { + createDB = func(string, string, string, int, *tls.Config, *string, map[string]string, time.Duration) (db *sql.DB, err error) { db, s.mysqlMock, err = sqlmock.New() return } diff --git a/drainer/sync/util.go b/drainer/sync/util.go index 9c2fcaaf3..3cfce3762 100644 --- a/drainer/sync/util.go +++ b/drainer/sync/util.go @@ -15,10 +15,13 @@ package sync import ( "crypto/tls" + "time" // mysql driver _ "github.com/go-sql-driver/mysql" + "github.com/pingcap/tidb-binlog/pkg/security" + "github.com/pingcap/tidb-binlog/pkg/util" ) // DBConfig is the DB configuration. @@ -37,6 +40,9 @@ type DBConfig struct { BinlogFileRetentionTime int `toml:"retention-time" json:"retention-time"` Params map[string]string `toml:"params" json:"params"` + ReadTimeout time.Duration `toml:"-" json:"-"` + ReadTimeoutStr util.Duration `toml:"read-timeout" json:"read-timeout"` + Merge bool `toml:"merge" json:"merge"` ZKAddrs string `toml:"zookeeper-addrs" json:"zookeeper-addrs"` diff --git a/pkg/loader/util.go b/pkg/loader/util.go index dea714466..75aa672d1 100644 --- a/pkg/loader/util.go +++ b/pkg/loader/util.go @@ -22,6 +22,7 @@ import ( "strconv" "strings" "sync/atomic" + "time" "github.com/go-sql-driver/mysql" "github.com/pingcap/errors" @@ -145,8 +146,8 @@ func createDBWitSessions(dsn string, params map[string]string) (db *gosql.DB, er } // CreateDBWithSQLMode return sql.DB -func CreateDBWithSQLMode(user string, password string, host string, port int, tlsConfig *tls.Config, sqlMode *string, params map[string]string) (db *gosql.DB, err error) { - dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8mb4,utf8&interpolateParams=true&readTimeout=1m&multiStatements=true", user, password, host, port) +func CreateDBWithSQLMode(user string, password string, host string, port int, tlsConfig *tls.Config, sqlMode *string, params map[string]string, readTimeout time.Duration) (db *gosql.DB, err error) { + dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8mb4,utf8&interpolateParams=true&readTimeout=%s&multiStatements=true", user, password, host, port, readTimeout) if sqlMode != nil { // same as "set sql_mode = ''" dsn += "&sql_mode='" + url.QueryEscape(*sqlMode) + "'" @@ -166,7 +167,7 @@ func CreateDBWithSQLMode(user string, password string, host string, port int, tl // CreateDB return sql.DB func CreateDB(user string, password string, host string, port int, tls *tls.Config) (db *gosql.DB, err error) { - return CreateDBWithSQLMode(user, password, host, port, tls, nil, nil) + return CreateDBWithSQLMode(user, password, host, port, tls, nil, nil, time.Minute) } func quoteSchema(schema string, table string) string { diff --git a/pkg/util/duration.go b/pkg/util/duration.go index 20141c1c4..04e8f2624 100644 --- a/pkg/util/duration.go +++ b/pkg/util/duration.go @@ -30,6 +30,9 @@ var _ json.Marshaler = Duration(empty) var _ json.Unmarshaler = (*Duration)(&empty) // Duration is a wrapper of time.Duration for TOML and JSON. +// it can be parsed to both integer and string +// integer 7 will be parsed to 7*24h +// string 10m will be parsed to 10m type Duration string // NewDuration creates a Duration from time.Duration.