diff --git a/loader/db.go b/loader/db.go index 989b457ca1..95ecca0037 100644 --- a/loader/db.go +++ b/loader/db.go @@ -95,7 +95,7 @@ func (conn *DBConn) querySQL(ctx *tcontext.Context, query string, args ...interf return ret, err }) if err != nil { - ctx.L().Error("query statement failed after retry", + ctx.L().ErrorFilterContextCanceled("query statement failed after retry", zap.String("query", utils.TruncateString(query, -1)), zap.String("argument", utils.TruncateInterface(args, -1)), log.ShortError(err)) @@ -169,7 +169,7 @@ func (conn *DBConn) executeSQL(ctx *tcontext.Context, queries []string, args ... }) if err != nil { - ctx.L().Error("execute statements failed after retry", + ctx.L().ErrorFilterContextCanceled("execute statements failed after retry", zap.String("queries", utils.TruncateInterface(queries, -1)), zap.String("arguments", utils.TruncateInterface(args, -1)), log.ShortError(err)) diff --git a/loader/loader.go b/loader/loader.go index 0519d7e048..a2167f4fcb 100644 --- a/loader/loader.go +++ b/loader/loader.go @@ -39,7 +39,7 @@ import ( "github.com/pingcap/failpoint" cm "github.com/pingcap/tidb-tools/pkg/column-mapping" "github.com/pingcap/tidb-tools/pkg/filter" - "github.com/pingcap/tidb-tools/pkg/table-router" + router "github.com/pingcap/tidb-tools/pkg/table-router" "github.com/siddontang/go/sync2" "go.uber.org/zap" ) @@ -174,7 +174,9 @@ func (w *Worker) run(ctx context.Context, fileJobQueue chan *fileJob, runFatalCh if err != nil { // expect pause rather than exit err = terror.WithScope(terror.Annotatef(err, "file %s", job.file), terror.ScopeDownstream) - runFatalChan <- unit.NewProcessError(pb.ErrorType_ExecSQL, err) + if !utils.IsContextCanceledError(err) { + runFatalChan <- unit.NewProcessError(pb.ErrorType_ExecSQL, err) + } return } w.loader.finishedDataSize.Add(job.offset - job.lastOffset) diff --git a/pkg/conn/baseconn.go b/pkg/conn/baseconn.go index d9f9ac0e10..d969717dea 100644 --- a/pkg/conn/baseconn.go +++ b/pkg/conn/baseconn.go @@ -95,7 +95,7 @@ func (conn *BaseConn) QuerySQL(tctx *tcontext.Context, query string, args ...int rows, err := conn.DBConn.QueryContext(tctx.Context(), query, args...) if err != nil { - tctx.L().Error("query statement failed", + tctx.L().ErrorFilterContextCanceled("query statement failed", zap.String("query", utils.TruncateString(query, -1)), zap.String("argument", utils.TruncateInterface(args, -1)), log.ShortError(err)) @@ -144,7 +144,7 @@ func (conn *BaseConn) ExecuteSQLWithIgnoreError(tctx *tcontext.Context, ignoreEr continue } - tctx.L().Error("execute statement failed", + tctx.L().ErrorFilterContextCanceled("execute statement failed", zap.String("query", utils.TruncateString(query, -1)), zap.String("argument", utils.TruncateInterface(arg, -1)), log.ShortError(err)) diff --git a/pkg/log/log.go b/pkg/log/log.go index 3a3e8c722a..3a1c7de212 100644 --- a/pkg/log/log.go +++ b/pkg/log/log.go @@ -14,6 +14,7 @@ package log import ( + "context" "fmt" pclog "github.com/pingcap/log" @@ -72,6 +73,16 @@ func (l Logger) WithFields(fields ...zap.Field) Logger { return Logger{l.With(fields...)} } +// ErrorFilterContextCanceled wraps Logger.Error() and will filter error log when error is context.Canceled +func (l Logger) ErrorFilterContextCanceled(msg string, fields ...zap.Field) { + for _, field := range fields { + if field.Key == "error" && field.String == context.Canceled.Error() { + return + } + } + l.Logger.Error(msg, fields...) +} + // logger for DM var ( appLogger = Logger{zap.NewNop()} diff --git a/pkg/utils/util.go b/pkg/utils/util.go index 14598b59e3..08ed222824 100644 --- a/pkg/utils/util.go +++ b/pkg/utils/util.go @@ -14,6 +14,7 @@ package utils import ( + "context" "math" "os" "regexp" @@ -21,6 +22,7 @@ import ( "strings" "time" + "github.com/pingcap/errors" "github.com/siddontang/go-mysql/mysql" "github.com/pingcap/dm/pkg/terror" @@ -152,6 +154,11 @@ func WaitSomething(backoff int, waitTime time.Duration, fn func() bool) bool { return false } +// IsContextCanceledError checks whether err is context.Canceled +func IsContextCanceledError(err error) bool { + return errors.Cause(err) == context.Canceled +} + // IsBuildInSkipDDL return true when checked sql that will be skipped for syncer func IsBuildInSkipDDL(sql string) bool { return builtInSkipDDLPatterns.FindStringIndex(sql) != nil diff --git a/syncer/db.go b/syncer/db.go index 7031366913..8af616954e 100644 --- a/syncer/db.go +++ b/syncer/db.go @@ -203,7 +203,7 @@ func (conn *DBConn) querySQL(tctx *tcontext.Context, query string, args ...inter ) if err != nil { - tctx.L().Error("query statement failed after retry", + tctx.L().ErrorFilterContextCanceled("query statement failed after retry", zap.String("query", utils.TruncateString(query, -1)), zap.String("argument", utils.TruncateInterface(args, -1)), log.ShortError(err)) @@ -266,7 +266,7 @@ func (conn *DBConn) executeSQLWithIgnore(tctx *tcontext.Context, ignoreError fun }) if err != nil { - tctx.L().Error("execute statements failed after retry", + tctx.L().ErrorFilterContextCanceled("execute statements failed after retry", zap.String("queries", utils.TruncateInterface(queries, -1)), zap.String("arguments", utils.TruncateInterface(args, -1)), log.ShortError(err)) diff --git a/syncer/syncer.go b/syncer/syncer.go index d945f2b9be..b8c3c28e6b 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -921,7 +921,9 @@ func (s *Syncer) syncDDL(ctx *tcontext.Context, queueBucket string, db *DBConn, s.jobWg.Done() if err != nil { s.execErrorDetected.Set(true) - s.runFatalChan <- unit.NewProcessError(pb.ErrorType_ExecSQL, err) + if !utils.IsContextCanceledError(err) { + s.runFatalChan <- unit.NewProcessError(pb.ErrorType_ExecSQL, err) + } continue } s.addCount(true, queueBucket, sqlJob.tp, int64(len(sqlJob.ddls))) diff --git a/syncer/warning.go b/syncer/warning.go index 484a2fa952..4a95ee1d81 100644 --- a/syncer/warning.go +++ b/syncer/warning.go @@ -41,11 +41,13 @@ func (s *Syncer) Error() interface{} { errors := make([]*pb.SyncSQLError, 0, len(s.execErrors.errors)) for _, ctx := range s.execErrors.errors { - errors = append(errors, &pb.SyncSQLError{ - Msg: ctx.err.Error(), - FailedBinlogPosition: fmt.Sprintf("%s:%d", ctx.pos.Name, ctx.pos.Pos), - ErrorSQL: ctx.jobs, - }) + if !utils.IsContextCanceledError(ctx.err) { + errors = append(errors, &pb.SyncSQLError{ + Msg: ctx.err.Error(), + FailedBinlogPosition: fmt.Sprintf("%s:%d", ctx.pos.Name, ctx.pos.Pos), + ErrorSQL: ctx.jobs, + }) + } } return &pb.SyncError{Errors: errors}