Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

loader/syncer: filter context cancel error while executing sqls #355

Merged
merged 16 commits into from
Nov 28, 2019
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 12 additions & 8 deletions loader/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,12 @@ func (conn *DBConn) querySQL(ctx *tcontext.Context, query string, args ...interf
return ret, err
})
if err != nil {
ctx.L().Error("query statement failed after retry",
zap.String("query", utils.TruncateString(query, -1)),
zap.String("argument", utils.TruncateInterface(args, -1)),
log.ShortError(err))
if !utils.IsContextCanceledError(err) {
csuzhangxc marked this conversation as resolved.
Show resolved Hide resolved
ctx.L().Error("query statement failed after retry",
zap.String("query", utils.TruncateString(query, -1)),
zap.String("argument", utils.TruncateInterface(args, -1)),
log.ShortError(err))
}
return nil, err
}
return ret.(*sql.Rows), nil
Expand Down Expand Up @@ -169,10 +171,12 @@ func (conn *DBConn) executeSQL(ctx *tcontext.Context, queries []string, args ...
})

if err != nil {
ctx.L().Error("execute statements failed after retry",
zap.String("queries", utils.TruncateInterface(queries, -1)),
zap.String("arguments", utils.TruncateInterface(args, -1)),
log.ShortError(err))
if !utils.IsContextCanceledError(err) {
ctx.L().Error("execute statements failed after retry",
zap.String("queries", utils.TruncateInterface(queries, -1)),
zap.String("arguments", utils.TruncateInterface(args, -1)),
log.ShortError(err))
}
}

return err
Expand Down
12 changes: 9 additions & 3 deletions loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import (
"github.com/pingcap/failpoint"
cm "github.com/pingcap/tidb-tools/pkg/column-mapping"
"github.com/pingcap/tidb-tools/pkg/filter"
"github.com/pingcap/tidb-tools/pkg/table-router"
router "github.com/pingcap/tidb-tools/pkg/table-router"
"github.com/siddontang/go/sync2"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -174,7 +174,11 @@ func (w *Worker) run(ctx context.Context, fileJobQueue chan *fileJob, runFatalCh
if err != nil {
// expect pause rather than exit
err = terror.WithScope(terror.Annotatef(err, "file %s", job.file), terror.ScopeDownstream)
runFatalChan <- unit.NewProcessError(pb.ErrorType_ExecSQL, err)
if utils.IsContextCanceledError(err) {
runFatalChan <- nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this needed?

Copy link
Contributor Author

@lichunzhu lichunzhu Nov 25, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we don't deal with runFatalChan, the user will still get cancel error from dmctl. Shall we revise unit.NewProcessError to filter that instead of this way?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if err is context cancel error, don't need to send nil to runFatalChan

} else {
runFatalChan <- unit.NewProcessError(pb.ErrorType_ExecSQL, err)
}
return
}
w.loader.finishedDataSize.Add(job.offset - job.lastOffset)
Expand Down Expand Up @@ -475,7 +479,9 @@ func (l *Loader) Process(ctx context.Context, pr chan pb.ProcessResult) {
defer wg.Done()
for err := range l.runFatalChan {
cancel() // cancel l.Restore
errs = append(errs, err)
if err != nil {
errs = append(errs, err)
}
}
}()

Expand Down
13 changes: 9 additions & 4 deletions pkg/conn/baseconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,12 @@ func (conn *BaseConn) QuerySQL(tctx *tcontext.Context, query string, args ...int
rows, err := conn.DBConn.QueryContext(tctx.Context(), query, args...)

if err != nil {
tctx.L().Error("query statement failed",
zap.String("query", utils.TruncateString(query, -1)),
zap.String("argument", utils.TruncateInterface(args, -1)),
log.ShortError(err))
if !utils.IsContextCanceledError(err) {
tctx.L().Error("query statement failed",
zap.String("query", utils.TruncateString(query, -1)),
zap.String("argument", utils.TruncateInterface(args, -1)),
log.ShortError(err))
}
return nil, terror.ErrDBQueryFailed.Delegate(err, utils.TruncateString(query, -1))
}
return rows, nil
Expand Down Expand Up @@ -143,6 +145,9 @@ func (conn *BaseConn) ExecuteSQLWithIgnoreError(tctx *tcontext.Context, ignoreEr
log.ShortError(err))
continue
}
if utils.IsContextCanceledError(err) {
return i, terror.ErrDBExecuteFailed.Delegate(err, utils.TruncateString(query, -1))
}

tctx.L().Error("execute statement failed",
zap.String("query", utils.TruncateString(query, -1)),
Expand Down
7 changes: 7 additions & 0 deletions pkg/utils/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@
package utils

import (
"context"
"math"
"os"
"regexp"
"strconv"
"strings"
"time"

"github.com/pingcap/errors"
"github.com/siddontang/go-mysql/mysql"

"github.com/pingcap/dm/pkg/terror"
Expand Down Expand Up @@ -152,6 +154,11 @@ func WaitSomething(backoff int, waitTime time.Duration, fn func() bool) bool {
return false
}

// IsContextCanceledError checks whether err is context.Canceled
func IsContextCanceledError(err error) bool {
return errors.Cause(err) == context.Canceled
}

// IsBuildInSkipDDL return true when checked sql that will be skipped for syncer
func IsBuildInSkipDDL(sql string) bool {
return builtInSkipDDLPatterns.FindStringIndex(sql) != nil
Expand Down
20 changes: 12 additions & 8 deletions syncer/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,10 +203,12 @@ func (conn *DBConn) querySQL(tctx *tcontext.Context, query string, args ...inter
)

if err != nil {
tctx.L().Error("query statement failed after retry",
zap.String("query", utils.TruncateString(query, -1)),
zap.String("argument", utils.TruncateInterface(args, -1)),
log.ShortError(err))
if !utils.IsContextCanceledError(err) {
tctx.L().Error("query statement failed after retry",
zap.String("query", utils.TruncateString(query, -1)),
zap.String("argument", utils.TruncateInterface(args, -1)),
log.ShortError(err))
}
return nil, err
}
return ret.(*sql.Rows), nil
Expand Down Expand Up @@ -266,10 +268,12 @@ func (conn *DBConn) executeSQLWithIgnore(tctx *tcontext.Context, ignoreError fun
})

if err != nil {
tctx.L().Error("execute statements failed after retry",
zap.String("queries", utils.TruncateInterface(queries, -1)),
zap.String("arguments", utils.TruncateInterface(args, -1)),
log.ShortError(err))
if !utils.IsContextCanceledError(err) {
tctx.L().Error("execute statements failed after retry",
zap.String("queries", utils.TruncateInterface(queries, -1)),
zap.String("arguments", utils.TruncateInterface(args, -1)),
log.ShortError(err))
}
return ret.(int), err
}
return ret.(int), nil
Expand Down
10 changes: 8 additions & 2 deletions syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,9 @@ func (s *Syncer) Process(ctx context.Context, pr chan pb.ProcessResult) {
}
cancel() // cancel s.Run
syncerExitWithErrorCounter.WithLabelValues(s.cfg.Name).Inc()
errs = append(errs, err)
if err != nil {
errs = append(errs, err)
}
}
}()

Expand Down Expand Up @@ -921,7 +923,11 @@ func (s *Syncer) syncDDL(ctx *tcontext.Context, queueBucket string, db *DBConn,
s.jobWg.Done()
if err != nil {
s.execErrorDetected.Set(true)
s.runFatalChan <- unit.NewProcessError(pb.ErrorType_ExecSQL, err)
if utils.IsContextCanceledError(err) {
s.runFatalChan <- nil
} else {
s.runFatalChan <- unit.NewProcessError(pb.ErrorType_ExecSQL, err)
}
continue
}
s.addCount(true, queueBucket, sqlJob.tp, int64(len(sqlJob.ddls)))
Expand Down
12 changes: 7 additions & 5 deletions syncer/warning.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,13 @@ func (s *Syncer) Error() interface{} {

errors := make([]*pb.SyncSQLError, 0, len(s.execErrors.errors))
for _, ctx := range s.execErrors.errors {
errors = append(errors, &pb.SyncSQLError{
Msg: ctx.err.Error(),
FailedBinlogPosition: fmt.Sprintf("%s:%d", ctx.pos.Name, ctx.pos.Pos),
ErrorSQL: ctx.jobs,
})
if !utils.IsContextCanceledError(ctx.err) {
errors = append(errors, &pb.SyncSQLError{
Msg: ctx.err.Error(),
FailedBinlogPosition: fmt.Sprintf("%s:%d", ctx.pos.Name, ctx.pos.Pos),
ErrorSQL: ctx.jobs,
})
}
}

return &pb.SyncError{Errors: errors}
Expand Down