Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
*: manual pause a task which has been pause by error (#1900) (#1917)
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Jul 26, 2021
1 parent 9c8ef55 commit e60fb1f
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 5 deletions.
22 changes: 21 additions & 1 deletion dm/worker/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,22 @@ func (st *SubTask) setResult(result *pb.ProcessResult) {
st.result = result
}

// markResultCanceled mark result as canceled if stage is Paused.
// This func is used to pause a task which has been paused by error,
// so the task will not auto resume by task checker.
func (st *SubTask) markResultCanceled() bool {
st.Lock()
defer st.Unlock()
if st.stage == pb.Stage_Paused {
if st.result != nil && !st.result.IsCanceled {
st.l.Info("manually pause task which has been paused by errors")
st.result.IsCanceled = true
return true
}
}
return false
}

// Result returns the result of the sub task.
func (st *SubTask) Result() *pb.ProcessResult {
st.RLock()
Expand All @@ -457,8 +473,12 @@ func (st *SubTask) Close() {
updateTaskState(st.cfg.Name, st.cfg.SourceID, pb.Stage_Stopped)
}

// Pause pauses the running sub task.
// Pause pauses a running sub task or a sub task paused by error.
func (st *SubTask) Pause() error {
if st.markResultCanceled() {
return nil
}

if !st.stageCAS(pb.Stage_Running, pb.Stage_Pausing) {
return terror.ErrWorkerNotRunningStage.Generate(st.Stage().String())
}
Expand Down
3 changes: 2 additions & 1 deletion dm/worker/subtask_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,11 +363,12 @@ func (t *testSubTask) TestPauseAndResumeSubtask(c *C) {
c.Assert(st.Stage(), Equals, pb.Stage_Paused)

// pause
c.Assert(st.Pause(), NotNil)
c.Assert(st.Pause(), IsNil)
c.Assert(st.Stage(), Equals, pb.Stage_Paused)
c.Assert(st.CurrUnit(), Equals, mockDumper)
c.Assert(st.Result(), NotNil)
c.Assert(st.Result().Errors, HasLen, 1)
c.Assert(st.Result().IsCanceled, IsTrue)
c.Assert(strings.Contains(st.Result().Errors[0].Message, "dumper process error"), IsTrue)

// resume twice
Expand Down
30 changes: 27 additions & 3 deletions tests/drop_column_with_index/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@ WORK_DIR=$TEST_DIR/$TEST_NAME

function run() {
run_sql_file $cur/data/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1

export GO_FAILPOINTS="github.com/pingcap/dm/syncer/SyncerGetEventError=return"
inject_points=(
"github.com/pingcap/dm/syncer/SyncerGetEventError=return"
"github.com/pingcap/dm/syncer/GetEventError=1*return"
)
export GO_FAILPOINTS="$(join_string \; ${inject_points[@]})"

# start DM worker and master
run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml
Expand All @@ -33,9 +36,30 @@ function run() {
check_sync_diff $WORK_DIR $cur/conf/diff_config.toml

check_log_contain_with_retry "mock upstream instance restart" $WORK_DIR/worker1/log/dm-worker.log
# check_log_contain_with_retry "dispatch auto resume task" $WORK_DIR/worker1/log/dm-worker.log
check_log_contain_with_retry "meet error when read from local binlog, will switch to remote binlog" $WORK_DIR/worker1/log/dm-worker.log

run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status test" \
"go-mysql returned an error" 1 \
"\"stage\": \"Paused\"" 1 \
"\"isCanceled\": false" 1
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"pause-task test" \
"\"result\": true" 1 \
"go-mysql returned an error" 1
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status test" \
"go-mysql returned an error" 1 \
"\"stage\": \"Paused\"" 1 \
"\"isCanceled\": true" 1

sleep 5
check_log_not_contains "dispatch auto resume task" $WORK_DIR/worker1/log/dm-worker.log

run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"resume-task test" \
"\"result\": true" 2

run_sql_file $cur/data/db1.increment.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1

# use sync_diff_inspector to check data now!
Expand Down

0 comments on commit e60fb1f

Please sign in to comment.