Skip to content

Commit

Permalink
drainer/syncer: add configuration for read-timeout (pingcap#1061) (pi…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Oct 13, 2021
1 parent 5775a0c commit 679d62d
Show file tree
Hide file tree
Showing 8 changed files with 45 additions and 8 deletions.
11 changes: 10 additions & 1 deletion drainer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func (rc RelayConfig) IsEnabled() bool {

// Config holds the configuration of drainer
type Config struct {
*flag.FlagSet `json:"-"`
*flag.FlagSet `toml:"-" json:"-"`
LogLevel string `toml:"log-level" json:"log-level"`
NodeID string `toml:"node-id" json:"node-id"`
ListenAddr string `toml:"addr" json:"addr"`
Expand Down Expand Up @@ -515,6 +515,15 @@ func (cfg *Config) adjustConfig() error {
} else if len(cfg.SyncerCfg.To.Password) == 0 {
cfg.SyncerCfg.To.Password = os.Getenv("MYSQL_PSWD")
}
if cfg.SyncerCfg.To.ReadTimeoutStr == "" {
cfg.SyncerCfg.To.ReadTimeout = time.Minute
} else {
var err error
cfg.SyncerCfg.To.ReadTimeout, err = cfg.SyncerCfg.To.ReadTimeoutStr.ParseDuration()
if err != nil {
return err
}
}
}

if len(cfg.SyncerCfg.To.Checkpoint.EncryptedPassword) > 0 {
Expand Down
17 changes: 17 additions & 0 deletions drainer/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,22 @@ func (t *testDrainerSuite) TestConfig(c *C) {
var strSQLMode *string
c.Assert(cfg.SyncerCfg.StrSQLMode, Equals, strSQLMode)
c.Assert(cfg.SyncerCfg.SQLMode, Equals, mysql.SQLMode(0))
c.Assert(cfg.SyncerCfg.To.ReadTimeout, Equals, time.Minute)

dstFile := path.Join(c.MkDir(), "drainer.toml")
err = os.WriteFile(dstFile, []byte(`[syncer.to]
read-timeout = "10m"
`), 0644)
c.Assert(err, IsNil)
cfg2 := NewConfig()
args = []string{
"-data-dir", "data.drainer",
"-dest-db-type", "mysql",
"-config", dstFile,
}
err = cfg2.Parse(args)
c.Assert(err, IsNil)
c.Assert(cfg2.SyncerCfg.To.ReadTimeout, check.Equals, time.Minute*10)
}

func (t *testDrainerSuite) TestValidateFilter(c *C) {
Expand Down Expand Up @@ -238,6 +254,7 @@ func (t *testDrainerSuite) TestAdjustConfig(c *C) {
c.Assert(err, IsNil)
c.Assert(cfg.SyncerCfg.To.Password, check.Equals, "origin")
c.Assert(cfg.SyncerCfg.To.Checkpoint.Password, check.Equals, "origin")
c.Assert(cfg.SyncerCfg.To.ReadTimeout, check.Equals, time.Minute)

// test false positive
cfg.SyncerCfg.To = &dsync.DBConfig{
Expand Down
2 changes: 1 addition & 1 deletion drainer/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func feedByRelayLogIfNeed(cfg *Config) error {
return errors.Annotate(err, "failed to create reader")
}

db, err := loader.CreateDBWithSQLMode(scfg.To.User, scfg.To.Password, scfg.To.Host, scfg.To.Port, scfg.To.TLS, scfg.StrSQLMode, scfg.To.Params)
db, err := loader.CreateDBWithSQLMode(scfg.To.User, scfg.To.Password, scfg.To.Host, scfg.To.Port, scfg.To.TLS, scfg.StrSQLMode, scfg.To.Params, scfg.To.ReadTimeout)
if err != nil {
return errors.Annotate(err, "failed to create SQL db")
}
Expand Down
4 changes: 2 additions & 2 deletions drainer/sync/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func NewMysqlSyncer(
log.Info("enable TLS to connect downstream MySQL/TiDB")
}

db, err := createDB(cfg.User, cfg.Password, cfg.Host, cfg.Port, cfg.TLS, sqlMode, cfg.Params)
db, err := createDB(cfg.User, cfg.Password, cfg.Host, cfg.Port, cfg.TLS, sqlMode, cfg.Params, cfg.ReadTimeout)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -119,7 +119,7 @@ func NewMysqlSyncer(

if newMode != oldMode {
db.Close()
db, err = createDB(cfg.User, cfg.Password, cfg.Host, cfg.Port, cfg.TLS, &newMode, cfg.Params)
db, err = createDB(cfg.User, cfg.Password, cfg.Host, cfg.Port, cfg.TLS, &newMode, cfg.Params, cfg.ReadTimeout)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
3 changes: 2 additions & 1 deletion drainer/sync/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func (s *syncerSuite) SetUpTest(c *check.C) {
Port: 3306,
KafkaVersion: "0.8.2.0",
BinlogFileDir: c.MkDir(),
ReadTimeout: time.Minute,
}

// create pb syncer
Expand All @@ -59,7 +60,7 @@ func (s *syncerSuite) SetUpTest(c *check.C) {

// create mysql syncer
oldCreateDB := createDB
createDB = func(string, string, string, int, *tls.Config, *string, map[string]string) (db *sql.DB, err error) {
createDB = func(string, string, string, int, *tls.Config, *string, map[string]string, time.Duration) (db *sql.DB, err error) {
db, s.mysqlMock, err = sqlmock.New()
return
}
Expand Down
6 changes: 6 additions & 0 deletions drainer/sync/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@ package sync

import (
"crypto/tls"
"time"

// mysql driver
_ "github.com/go-sql-driver/mysql"

"github.com/pingcap/tidb-binlog/pkg/security"
"github.com/pingcap/tidb-binlog/pkg/util"
)

// DBConfig is the DB configuration.
Expand All @@ -37,6 +40,9 @@ type DBConfig struct {
BinlogFileRetentionTime int `toml:"retention-time" json:"retention-time"`
Params map[string]string `toml:"params" json:"params"`

ReadTimeout time.Duration `toml:"-" json:"-"`
ReadTimeoutStr util.Duration `toml:"read-timeout" json:"read-timeout"`

Merge bool `toml:"merge" json:"merge"`

ZKAddrs string `toml:"zookeeper-addrs" json:"zookeeper-addrs"`
Expand Down
7 changes: 4 additions & 3 deletions pkg/loader/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"strconv"
"strings"
"sync/atomic"
"time"

"github.com/go-sql-driver/mysql"
"github.com/pingcap/errors"
Expand Down Expand Up @@ -145,8 +146,8 @@ func createDBWitSessions(dsn string, params map[string]string) (db *gosql.DB, er
}

// CreateDBWithSQLMode return sql.DB
func CreateDBWithSQLMode(user string, password string, host string, port int, tlsConfig *tls.Config, sqlMode *string, params map[string]string) (db *gosql.DB, err error) {
dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8mb4,utf8&interpolateParams=true&readTimeout=1m&multiStatements=true", user, password, host, port)
func CreateDBWithSQLMode(user string, password string, host string, port int, tlsConfig *tls.Config, sqlMode *string, params map[string]string, readTimeout time.Duration) (db *gosql.DB, err error) {
dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8mb4,utf8&interpolateParams=true&readTimeout=%s&multiStatements=true", user, password, host, port, readTimeout)
if sqlMode != nil {
// same as "set sql_mode = '<sqlMode>'"
dsn += "&sql_mode='" + url.QueryEscape(*sqlMode) + "'"
Expand All @@ -166,7 +167,7 @@ func CreateDBWithSQLMode(user string, password string, host string, port int, tl

// CreateDB return sql.DB
func CreateDB(user string, password string, host string, port int, tls *tls.Config) (db *gosql.DB, err error) {
return CreateDBWithSQLMode(user, password, host, port, tls, nil, nil)
return CreateDBWithSQLMode(user, password, host, port, tls, nil, nil, time.Minute)
}

func quoteSchema(schema string, table string) string {
Expand Down
3 changes: 3 additions & 0 deletions pkg/util/duration.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ var _ json.Marshaler = Duration(empty)
var _ json.Unmarshaler = (*Duration)(&empty)

// Duration is a wrapper of time.Duration for TOML and JSON.
// it can be parsed to both integer and string
// integer 7 will be parsed to 7*24h
// string 10m will be parsed to 10m
type Duration string

// NewDuration creates a Duration from time.Duration.
Expand Down

0 comments on commit 679d62d

Please sign in to comment.