Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
*: address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
csuzhangxc committed Dec 4, 2019
1 parent bf32426 commit f59a6aa
Showing 1 changed file with 4 additions and 8 deletions.
12 changes: 4 additions & 8 deletions syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ var (
maxDMLConnectionTimeout = "5m"
maxDDLConnectionTimeout = fmt.Sprintf("%dm", MaxDDLConnectionTimeoutMinute)

maxDMLConnectionDuration, _ = time.ParseDuration(maxDMLConnectionTimeout)

adminQueueName = "admin queue"
defaultBucketCount = 8
)
Expand Down Expand Up @@ -859,15 +861,9 @@ func (s *Syncer) flushCheckPoints() error {
s.tctx.L().Info("prepare flush sqls", zap.Strings("shard meta sqls", shardMetaSQLs), zap.Reflect("shard meta arguments", shardMetaArgs))
}

// when canceling (stop-task/pause-task), we still need to flush the checkpoint.
timeout, err := time.ParseDuration(maxDMLConnectionTimeout)
if err != nil {
timeout = time.Minute // still use a default value
}
tctx, cancel := s.tctx.WithContext(context.Background()).WithTimeout(timeout)
tctx, cancel := s.tctx.WithContext(context.Background()).WithTimeout(maxDMLConnectionDuration)
defer cancel()

err = s.checkpoint.FlushPointsExcept(tctx, exceptTables, shardMetaSQLs, shardMetaArgs)
err := s.checkpoint.FlushPointsExcept(tctx, exceptTables, shardMetaSQLs, shardMetaArgs)
if err != nil {
return terror.Annotatef(err, "flush checkpoint %s", s.checkpoint)
}
Expand Down

0 comments on commit f59a6aa

Please sign in to comment.