Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

drainer/syncer: add configuration for read-timeout (#1061) #1066

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion drainer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func (rc RelayConfig) IsEnabled() bool {

// Config holds the configuration of drainer
type Config struct {
*flag.FlagSet `json:"-"`
*flag.FlagSet `toml:"-" json:"-"`
LogLevel string `toml:"log-level" json:"log-level"`
NodeID string `toml:"node-id" json:"node-id"`
ListenAddr string `toml:"addr" json:"addr"`
Expand Down Expand Up @@ -515,6 +515,15 @@ func (cfg *Config) adjustConfig() error {
} else if len(cfg.SyncerCfg.To.Password) == 0 {
cfg.SyncerCfg.To.Password = os.Getenv("MYSQL_PSWD")
}
if cfg.SyncerCfg.To.ReadTimeoutStr == "" {
cfg.SyncerCfg.To.ReadTimeout = time.Minute
} else {
var err error
cfg.SyncerCfg.To.ReadTimeout, err = cfg.SyncerCfg.To.ReadTimeoutStr.ParseDuration()
if err != nil {
return err
}
}
}

if len(cfg.SyncerCfg.To.Checkpoint.EncryptedPassword) > 0 {
Expand Down
17 changes: 17 additions & 0 deletions drainer/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,22 @@ func (t *testDrainerSuite) TestConfig(c *C) {
var strSQLMode *string
c.Assert(cfg.SyncerCfg.StrSQLMode, Equals, strSQLMode)
c.Assert(cfg.SyncerCfg.SQLMode, Equals, mysql.SQLMode(0))
c.Assert(cfg.SyncerCfg.To.ReadTimeout, Equals, time.Minute)

dstFile := path.Join(c.MkDir(), "drainer.toml")
err = os.WriteFile(dstFile, []byte(`[syncer.to]
read-timeout = "10m"
`), 0644)
c.Assert(err, IsNil)
cfg2 := NewConfig()
args = []string{
"-data-dir", "data.drainer",
"-dest-db-type", "mysql",
"-config", dstFile,
}
err = cfg2.Parse(args)
c.Assert(err, IsNil)
c.Assert(cfg2.SyncerCfg.To.ReadTimeout, check.Equals, time.Minute*10)
}

func (t *testDrainerSuite) TestValidateFilter(c *C) {
Expand Down Expand Up @@ -238,6 +254,7 @@ func (t *testDrainerSuite) TestAdjustConfig(c *C) {
c.Assert(err, IsNil)
c.Assert(cfg.SyncerCfg.To.Password, check.Equals, "origin")
c.Assert(cfg.SyncerCfg.To.Checkpoint.Password, check.Equals, "origin")
c.Assert(cfg.SyncerCfg.To.ReadTimeout, check.Equals, time.Minute)

// test false positive
cfg.SyncerCfg.To = &dsync.DBConfig{
Expand Down
2 changes: 1 addition & 1 deletion drainer/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func feedByRelayLogIfNeed(cfg *Config) error {
return errors.Annotate(err, "failed to create reader")
}

db, err := loader.CreateDBWithSQLMode(scfg.To.User, scfg.To.Password, scfg.To.Host, scfg.To.Port, scfg.To.TLS, scfg.StrSQLMode, scfg.To.Params)
db, err := loader.CreateDBWithSQLMode(scfg.To.User, scfg.To.Password, scfg.To.Host, scfg.To.Port, scfg.To.TLS, scfg.StrSQLMode, scfg.To.Params, scfg.To.ReadTimeout)
if err != nil {
return errors.Annotate(err, "failed to create SQL db")
}
Expand Down
4 changes: 2 additions & 2 deletions drainer/sync/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func NewMysqlSyncer(
log.Info("enable TLS to connect downstream MySQL/TiDB")
}

db, err := createDB(cfg.User, cfg.Password, cfg.Host, cfg.Port, cfg.TLS, sqlMode, cfg.Params)
db, err := createDB(cfg.User, cfg.Password, cfg.Host, cfg.Port, cfg.TLS, sqlMode, cfg.Params, cfg.ReadTimeout)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -119,7 +119,7 @@ func NewMysqlSyncer(

if newMode != oldMode {
db.Close()
db, err = createDB(cfg.User, cfg.Password, cfg.Host, cfg.Port, cfg.TLS, &newMode, cfg.Params)
db, err = createDB(cfg.User, cfg.Password, cfg.Host, cfg.Port, cfg.TLS, &newMode, cfg.Params, cfg.ReadTimeout)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
3 changes: 2 additions & 1 deletion drainer/sync/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func (s *syncerSuite) SetUpTest(c *check.C) {
Port: 3306,
KafkaVersion: "0.8.2.0",
BinlogFileDir: c.MkDir(),
ReadTimeout: time.Minute,
}

// create pb syncer
Expand All @@ -59,7 +60,7 @@ func (s *syncerSuite) SetUpTest(c *check.C) {

// create mysql syncer
oldCreateDB := createDB
createDB = func(string, string, string, int, *tls.Config, *string, map[string]string) (db *sql.DB, err error) {
createDB = func(string, string, string, int, *tls.Config, *string, map[string]string, time.Duration) (db *sql.DB, err error) {
db, s.mysqlMock, err = sqlmock.New()
return
}
Expand Down
6 changes: 6 additions & 0 deletions drainer/sync/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@ package sync

import (
"crypto/tls"
"time"

// mysql driver
_ "github.com/go-sql-driver/mysql"

"github.com/pingcap/tidb-binlog/pkg/security"
"github.com/pingcap/tidb-binlog/pkg/util"
)

// DBConfig is the DB configuration.
Expand All @@ -37,6 +40,9 @@ type DBConfig struct {
BinlogFileRetentionTime int `toml:"retention-time" json:"retention-time"`
Params map[string]string `toml:"params" json:"params"`

ReadTimeout time.Duration `toml:"-" json:"-"`
ReadTimeoutStr util.Duration `toml:"read-timeout" json:"read-timeout"`

Merge bool `toml:"merge" json:"merge"`

ZKAddrs string `toml:"zookeeper-addrs" json:"zookeeper-addrs"`
Expand Down
7 changes: 4 additions & 3 deletions pkg/loader/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"strconv"
"strings"
"sync/atomic"
"time"

"github.com/go-sql-driver/mysql"
"github.com/pingcap/errors"
Expand Down Expand Up @@ -145,8 +146,8 @@ func createDBWitSessions(dsn string, params map[string]string) (db *gosql.DB, er
}

// CreateDBWithSQLMode return sql.DB
func CreateDBWithSQLMode(user string, password string, host string, port int, tlsConfig *tls.Config, sqlMode *string, params map[string]string) (db *gosql.DB, err error) {
dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8mb4,utf8&interpolateParams=true&readTimeout=1m&multiStatements=true", user, password, host, port)
func CreateDBWithSQLMode(user string, password string, host string, port int, tlsConfig *tls.Config, sqlMode *string, params map[string]string, readTimeout time.Duration) (db *gosql.DB, err error) {
dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8mb4,utf8&interpolateParams=true&readTimeout=%s&multiStatements=true", user, password, host, port, readTimeout)
if sqlMode != nil {
// same as "set sql_mode = '<sqlMode>'"
dsn += "&sql_mode='" + url.QueryEscape(*sqlMode) + "'"
Expand All @@ -166,7 +167,7 @@ func CreateDBWithSQLMode(user string, password string, host string, port int, tl

// CreateDB return sql.DB
func CreateDB(user string, password string, host string, port int, tls *tls.Config) (db *gosql.DB, err error) {
return CreateDBWithSQLMode(user, password, host, port, tls, nil, nil)
return CreateDBWithSQLMode(user, password, host, port, tls, nil, nil, time.Minute)
}

func quoteSchema(schema string, table string) string {
Expand Down
3 changes: 3 additions & 0 deletions pkg/util/duration.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ var _ json.Marshaler = Duration(empty)
var _ json.Unmarshaler = (*Duration)(&empty)

// Duration is a wrapper of time.Duration for TOML and JSON.
// it can be parsed to both integer and string
// integer 7 will be parsed to 7*24h
// string 10m will be parsed to 10m
type Duration string

// NewDuration creates a Duration from time.Duration.
Expand Down