Skip to content
This repository has been archived by the owner on Aug 21, 2023. It is now read-only.

avoid get blocked in dumping when mysql connection is broken #190

Merged
merged 15 commits into from
Nov 13, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion v4/export/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,10 @@ func (config *Config) String() string {

// GetDSN generates DSN from Config
func (conf *Config) GetDSN(db string) string {
dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8mb4", conf.User, conf.Password, conf.Host, conf.Port, db)
// maxAllowedPacket=0 can be used to automatically fetch the max_allowed_packet variable from server on every connection.
// https://github.com/go-sql-driver/mysql#maxallowedpacket
dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8mb4&readTimeout=30s&writeTimeout=30s&interpolateParams=true&maxAllowedPacket=0",

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there any chance of mysql taking longer than 30 seconds to start sending data when the chunk size is large? Probably not right? Since you query by primary key?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! How long do you think is appropriate? Or, are there any more suggestions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about keeping it 30s and make it configurable?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was genuinely curious if it can take over 30 seconds to start sending data when you are querying directly by primary key. Since it’s the time to first byte sent that seems highly unlikely so 30 seconds seems fine. I’m not sure though. Configurable sounds safest.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have raised this variable to 15m and make it configurable.

conf.User, conf.Password, conf.Host, conf.Port, db)
if len(conf.Security.CAPath) > 0 {
dsn += "&tls=dumpling-tls-target"
}
Expand Down
12 changes: 9 additions & 3 deletions v4/export/connectionsPool.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,17 @@ type connectionsPool struct {
func newConnectionsPool(ctx context.Context, n int, pool *sql.DB) (*connectionsPool, error) {
connectPool := &connectionsPool{
conns: make(chan *sql.Conn, n),
createdConns: make([]*sql.Conn, 0, n),
createdConns: make([]*sql.Conn, 0, n+1),
}
for i := 0; i < n; i++ {
for i := 0; i < n+1; i++ {
conn, err := createConnWithConsistency(ctx, pool)
if err != nil {
connectPool.Close()
return connectPool, err
}
connectPool.releaseConn(conn)
if i != n {
connectPool.releaseConn(conn)
}
connectPool.createdConns = append(connectPool.createdConns, conn)
}
return connectPool, nil
Expand All @@ -31,6 +33,10 @@ func (r *connectionsPool) getConn() *sql.Conn {
return <-r.conns
}

func (r *connectionsPool) extraConn() *sql.Conn {
return r.createdConns[len(r.createdConns)-1]
}

func (r *connectionsPool) Close() error {
var err error
for _, conn := range r.createdConns {
Expand Down
21 changes: 5 additions & 16 deletions v4/export/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,22 +184,17 @@ func Dump(pCtx context.Context, conf *Config) (err error) {
defer connectPool.Close()

if conf.PosAfterConnect {
conn := connectPool.getConn()
// record again, to provide a location to exit safe mode for DM
err = m.recordGlobalMetaData(conn, conf.ServerInfo.ServerType, true, snapshot)
err = m.recordGlobalMetaData(connectPool.extraConn(), conf.ServerInfo.ServerType, true, snapshot)
if err != nil {
log.Info("get global metadata (after connection pool established) failed", zap.Error(err))
}
connectPool.releaseConn(conn)
}

if conf.Consistency != "lock" {
conn := connectPool.getConn()
if err = prepareTableListToDump(conf, conn); err != nil {
connectPool.releaseConn(conn)
if err = prepareTableListToDump(conf, connectPool.extraConn()); err != nil {
return err
}
connectPool.releaseConn(conn)
}

if err = conCtrl.TearDown(ctx); err != nil {
Expand Down Expand Up @@ -238,9 +233,7 @@ func dumpDatabases(pCtx context.Context, conf *Config, connectPool *connectionsP
allTables := conf.Tables
g, ctx := errgroup.WithContext(pCtx)
for dbName, tables := range allTables {
conn := connectPool.getConn()
createDatabaseSQL, err := ShowCreateDatabase(conn, dbName)
connectPool.releaseConn(conn)
createDatabaseSQL, err := ShowCreateDatabase(connectPool.extraConn(), dbName)
if err != nil {
return err
}
Expand All @@ -253,9 +246,7 @@ func dumpDatabases(pCtx context.Context, conf *Config, connectPool *connectionsP
}
for _, table := range tables {
table := table
conn := connectPool.getConn()
tableDataIRArray, err := dumpTable(ctx, conf, conn, dbName, table, writer)
connectPool.releaseConn(conn)
tableDataIRArray, err := dumpTable(ctx, conf, connectPool.extraConn(), dbName, table, writer)
if err != nil {
return err
}
Expand Down Expand Up @@ -306,9 +297,7 @@ func prepareTableListToDump(conf *Config, pool *sql.Conn) error {
}

func dumpSql(ctx context.Context, conf *Config, connectPool *connectionsPool, writer Writer) error {
conn := connectPool.getConn()
tableIR, err := SelectFromSql(conf, conn)
connectPool.releaseConn(conn)
tableIR, err := SelectFromSql(conf, connectPool.extraConn())
if err != nil {
return err
}
Expand Down
1 change: 1 addition & 0 deletions v4/export/dump_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func newMockConnectPool(c *C, db *sql.DB) *connectionsPool {
c.Assert(err, IsNil)
connectPool := &connectionsPool{conns: make(chan *sql.Conn, 1)}
connectPool.releaseConn(conn)
connectPool.createdConns = []*sql.Conn{conn}
return connectPool
}

Expand Down