Skip to content
This repository has been archived by the owner on Aug 21, 2023. It is now read-only.

avoid get blocked in dumping when mysql connection is broken #190

Merged
merged 15 commits into from
Nov 13, 2020
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions v4/export/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ const (
flagOutputFilenameTemplate = "output-filename-template"
flagCompleteInsert = "complete-insert"
flagParams = "params"
FlagHelp = "help"
flagReadTimeout = "read-timeout"

FlagHelp = "help"
)

type Config struct {
Expand Down Expand Up @@ -104,6 +106,7 @@ type Config struct {
Sql string
CsvSeparator string
CsvDelimiter string
ReadTimeout string

TableFilter filter.Filter `json:"-"`
Rows uint64
Expand Down Expand Up @@ -168,8 +171,8 @@ func (config *Config) String() string {
func (conf *Config) GetDSN(db string) string {
// maxAllowedPacket=0 can be used to automatically fetch the max_allowed_packet variable from server on every connection.
// https://github.com/go-sql-driver/mysql#maxallowedpacket
dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8mb4&readTimeout=30s&writeTimeout=30s&interpolateParams=true&maxAllowedPacket=0",
conf.User, conf.Password, conf.Host, conf.Port, db)
dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8mb4&readTimeout=%s&writeTimeout=30s&interpolateParams=true&maxAllowedPacket=0",
conf.User, conf.Password, conf.Host, conf.Port, db, conf.ReadTimeout)
if len(conf.Security.CAPath) > 0 {
dsn += "&tls=dumpling-tls-target"
}
Expand Down Expand Up @@ -225,6 +228,8 @@ func (conf *Config) DefineFlags(flags *pflag.FlagSet) {
flags.Bool(flagCompleteInsert, false, "Use complete INSERT statements that include column names")
flags.StringToString(flagParams, nil, `Extra session variables used while dumping, accepted format: --params "character_set_client=latin1,character_set_connection=latin1"`)
flags.Bool(FlagHelp, false, "Print help message and quit")
flags.String(flagReadTimeout, "15m", "I/O read timeout for db connection. Should be a decimal number with a unit suffix, such as '30s', '1m30s'")
kennytm marked this conversation as resolved.
Show resolved Hide resolved
flags.MarkHidden(flagReadTimeout)
}

// GetDSN generates DSN from Config
Expand Down
10 changes: 7 additions & 3 deletions v4/export/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,11 @@ func Dump(pCtx context.Context, conf *Config) (err error) {

m := newGlobalMetadata(conf.ExternalStorage)
// write metadata even if dump failed
defer m.writeGlobalMetaData(ctx)
defer func() {
if err == nil {
m.writeGlobalMetaData(ctx)
}
}()

// for consistency lock, we should lock tables at first to get the tables we want to lock & dump
// for consistency lock, record meta pos before lock tables because other tables may still be modified while locking tables
Expand Down Expand Up @@ -255,12 +259,12 @@ func dumpDatabases(pCtx context.Context, conf *Config, connectPool *connectionsP
for _, tableIR := range tableDataIRArray {
tableIR := tableIR
g.Go(func() error {
conn := connectPool.getConn()
defer connectPool.releaseConn(conn)
retryTime := 1
return utils.WithRetry(ctx, func() error {
log.Debug("trying to dump table chunk", zap.Int("retryTime", retryTime), zap.String("db", tableIR.DatabaseName()),
zap.String("table", tableIR.TableName()), zap.Int("chunkIndex", tableIR.ChunkIndex()))
conn := connectPool.getConn()
defer connectPool.releaseConn(conn)
retryTime += 1
err := tableIR.Start(ctx, conn)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions v4/export/ir.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type TableDataIR interface {

SpecialComments() StringIter
Rows() SQLRowIter
Close() error
}

// SQLRowIter is the iterator on a collection of sql.Row.
Expand Down
14 changes: 12 additions & 2 deletions v4/export/ir_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,13 @@ type tableData struct {
selectedField string
specCmts []string
escapeBackslash bool
cancel context.CancelFunc
SQLRowIter
}

func (td *tableData) Start(ctx context.Context, conn *sql.Conn) error {
func (td *tableData) Start(pCtx context.Context, conn *sql.Conn) error {
var ctx context.Context
ctx, td.cancel = context.WithCancel(pCtx)
rows, err := conn.QueryContext(ctx, td.query)
if err != nil {
return err
Expand Down Expand Up @@ -137,6 +140,13 @@ func (td *tableData) Rows() SQLRowIter {
return td.SQLRowIter
}

func (td *tableData) Close() error {
if td.cancel != nil {
td.cancel()
}
return td.Rows().Close()
}

func (td *tableData) SelectedField() string {
if td.selectedField == "*" || td.selectedField == "" {
return td.selectedField
Expand Down Expand Up @@ -187,7 +197,7 @@ func splitTableDataIntoChunks(
return
}
if !smax.Valid || !smin.Valid {
// smax and smin are not valid, but there can also be data to dump, so just skip split chunk logic.
// smax and smin are not valid, but there can also be data to dump, so just skip split chunk logic.
log.Debug("skip concurrent dump due to no valid smax or smin", zap.String("schema", dbName), zap.String("table", tableName))
linear <- struct{}{}
return
Expand Down
4 changes: 4 additions & 0 deletions v4/export/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ func (b *dumpChunkBackoffer) NextBackoff(err error) time.Duration {
if _, ok := err.(*mysql.MySQLError); ok && !dbutil.IsRetryableError(err) {
b.attempt = 0
return 0
} else if _, ok := err.(*writerError); ok {
// the uploader writer's retry logic is already done in aws client. needn't retry here
b.attempt = 0
return 0
}
b.delayTime = 2 * b.delayTime
b.attempt--
Expand Down
4 changes: 4 additions & 0 deletions v4/export/test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,10 @@ func (m *mockTableIR) Rows() SQLRowIter {
return m.SQLRowIter
}

func (m *mockTableIR) Close() error {
return nil
}

func (m *mockTableIR) EscapeBackSlash() bool {
return m.escapeBackSlash
}
Expand Down
22 changes: 6 additions & 16 deletions v4/export/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,29 +63,20 @@ func (f SimpleWriter) WriteViewMeta(ctx context.Context, db, view, createTableSQ

type SQLWriter struct{ SimpleWriter }

func (f SQLWriter) WriteTableData(ctx context.Context, ir TableDataIR) error {
func (f SQLWriter) WriteTableData(ctx context.Context, ir TableDataIR) (err error) {
log.Debug("start dumping table...", zap.String("table", ir.TableName()))

// just let `database.table.sql` be `database.table.0.sql`
/*if fileName == "" {
// set initial file name
fileName = fmt.Sprintf("%s.%s.sql", ir.DatabaseName(), ir.TableName())
if f.cfg.FileSize != UnspecifiedSize {
fileName = fmt.Sprintf("%s.%s.%d.sql", ir.DatabaseName(), ir.TableName(), 0)
}
}*/
defer ir.Close()
namer := newOutputFileNamer(ir, f.cfg.Rows != UnspecifiedSize, f.cfg.FileSize != UnspecifiedSize)
fileType := strings.ToLower(f.cfg.FileType)
fileName, err := namer.NextName(f.cfg.OutputFileTemplate, fileType)
if err != nil {
return err
}
chunksIter := ir
defer chunksIter.Rows().Close()

for {
fileWriter, tearDown := buildInterceptFileWriter(f.cfg.ExternalStorage, fileName)
err = WriteInsert(ctx, chunksIter, fileWriter, f.cfg.FileSize, f.cfg.StatementSize)
err = WriteInsert(ctx, ir, fileWriter, f.cfg.FileSize, f.cfg.StatementSize)
tearDown(ctx)
if err != nil {
return err
Expand Down Expand Up @@ -175,17 +166,16 @@ func (namer *outputFileNamer) NextName(tmpl *template.Template, fileType string)
return res + "." + fileType, err
}

func (f CSVWriter) WriteTableData(ctx context.Context, ir TableDataIR) error {
func (f CSVWriter) WriteTableData(ctx context.Context, ir TableDataIR) (err error) {
log.Debug("start dumping table in csv format...", zap.String("table", ir.TableName()))

defer ir.Close()
namer := newOutputFileNamer(ir, f.cfg.Rows != UnspecifiedSize, f.cfg.FileSize != UnspecifiedSize)
fileType := strings.ToLower(f.cfg.FileType)
fileName, err := namer.NextName(f.cfg.OutputFileTemplate, fileType)
if err != nil {
return err
}
chunksIter := ir
defer chunksIter.Rows().Close()

opt := &csvOption{
nullValue: f.cfg.CsvNullValue,
Expand All @@ -195,7 +185,7 @@ func (f CSVWriter) WriteTableData(ctx context.Context, ir TableDataIR) error {

for {
fileWriter, tearDown := buildInterceptFileWriter(f.cfg.ExternalStorage, fileName)
err = WriteInsertInCsv(ctx, chunksIter, fileWriter, f.cfg.NoHeader, opt, f.cfg.FileSize)
err = WriteInsertInCsv(ctx, ir, fileWriter, f.cfg.NoHeader, opt, f.cfg.FileSize)
tearDown(ctx)
if err != nil {
return err
Expand Down
22 changes: 19 additions & 3 deletions v4/export/writer_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,13 +389,13 @@ func buildInterceptFileWriter(s storage.ExternalStorage, path string) (storage.W
log.Error("open file failed",
zap.String("path", fullPath),
zap.Error(err))
return err
return newWriterError(err)
}
w := storage.NewUploaderWriter(uploader, hardcodedS3ChunkSize)
writer = w
log.Debug("opened file", zap.String("path", fullPath))
fileWriter.Writer = writer
return err
return nil
}
fileWriter.initRoutine = initRoutine

Expand Down Expand Up @@ -427,6 +427,21 @@ func (l *LazyStringWriter) WriteString(str string) (int, error) {
return l.StringWriter.WriteString(str)
}

type writerError struct {
error
}

func (e *writerError) Error() string {
return e.error.Error()
}

func newWriterError(err error) error {
if err == nil {
return nil
}
return &writerError{error: err}
}

// InterceptFileWriter is an interceptor of os.File,
// tracking whether a StringWriter has written something.
type InterceptFileWriter struct {
Expand All @@ -446,7 +461,8 @@ func (w *InterceptFileWriter) Write(ctx context.Context, p []byte) (int, error)
if w.err != nil {
return 0, errors.Annotate(w.err, "open file error")
}
return w.Writer.Write(ctx, p)
n, err := w.Writer.Write(ctx, p)
return n, newWriterError(err)
}

func (w *InterceptFileWriter) Close(ctx context.Context) error {
Expand Down