Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

master(dm): clean and treat invalid load task #4004

Merged
merged 9 commits into from
Dec 29, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 44 additions & 13 deletions dm/dm/master/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2330,16 +2330,56 @@ func (s *Scheduler) getNextLoadTaskTransfer(worker, source string) (string, stri
return "", ""
}

// hasLoadTaskByWorkerAndSource check whether there is a load subtask for the worker and source.
// hasLoadTaskByWorkerAndSource check whether there is an existing load subtask for the worker and source.
func (s *Scheduler) hasLoadTaskByWorkerAndSource(worker, source string) bool {
for _, sourceWorkerMap := range s.loadTasks {
if workerName, ok := sourceWorkerMap[source]; ok && workerName == worker {
for taskName, sourceWorkerMap := range s.loadTasks {
// don't consider removed subtask
subtasksV, ok := s.subTaskCfgs.Load(taskName)
if !ok {
continue
}
subtasks := subtasksV.(map[string]config.SubTaskConfig)
if _, ok2 := subtasks[source]; !ok2 {
continue
}

if workerName, ok2 := sourceWorkerMap[source]; ok2 && workerName == worker {
return true
}
}
return false
}

// TryResolveLoadTask checks if there are sources whose load task has local files and not bound to the worker which is
// accessible to the local files. If so, trigger a transfer source.
func (s *Scheduler) TryResolveLoadTask(sources []string) {
for _, source := range sources {
s.mu.Lock()
worker, ok := s.bounds[source]
if !ok {
s.mu.Unlock()
continue
}
if err := s.tryResolveLoadTask(worker.baseInfo.Name, source); err != nil {
s.logger.Error("tryResolveLoadTask failed", zap.Error(err))
}
s.mu.Unlock()
}
}

func (s *Scheduler) tryResolveLoadTask(originWorker, originSource string) error {
if s.hasLoadTaskByWorkerAndSource(originWorker, originSource) {
return nil
}

worker, source := s.getNextLoadTaskTransfer(originWorker, originSource)
if worker == "" && source == "" {
return nil
}

return s.transferWorkerAndSource(originWorker, originSource, worker, source)
}

func (s *Scheduler) handleLoadTaskDel(loadTask ha.LoadTask) error {
s.mu.Lock()
defer s.mu.Unlock()
Expand All @@ -2357,16 +2397,7 @@ func (s *Scheduler) handleLoadTaskDel(loadTask ha.LoadTask) error {
delete(s.loadTasks, loadTask.Task)
}

if s.hasLoadTaskByWorkerAndSource(originWorker, loadTask.Source) {
return nil
}

worker, source := s.getNextLoadTaskTransfer(originWorker, loadTask.Source)
if worker == "" && source == "" {
return nil
}

return s.transferWorkerAndSource(originWorker, loadTask.Source, worker, source)
return s.tryResolveLoadTask(originWorker, loadTask.Source)
}

func (s *Scheduler) handleLoadTaskPut(loadTask ha.LoadTask) {
Expand Down
12 changes: 12 additions & 0 deletions dm/dm/master/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1653,6 +1653,13 @@ func (t *testScheduler) TestWatchLoadTask(c *C) {
s.workers[workerName4] = worker4
s.sourceCfgs[sourceID1] = &config.SourceConfig{}
s.sourceCfgs[sourceID2] = &config.SourceConfig{}
s.subTaskCfgs.Store(task1, map[string]config.SubTaskConfig{
sourceID1: {},
})
s.subTaskCfgs.Store(task2, map[string]config.SubTaskConfig{
sourceID1: {},
sourceID2: {},
})

worker1.ToFree()
c.Assert(s.boundSourceToWorker(sourceID1, worker1), IsNil)
Expand Down Expand Up @@ -1726,6 +1733,11 @@ func (t *testScheduler) TestWatchLoadTask(c *C) {
c.Assert(s.bounds[sourceID2], DeepEquals, worker4)
c.Assert(worker2.stage, Equals, WorkerFree)

// after stop-task, hasLoadTaskByWorkerAndSource is no longer valid
c.Assert(s.hasLoadTaskByWorkerAndSource(workerName4, sourceID2), IsTrue)
s.subTaskCfgs.Delete(task2)
c.Assert(s.hasLoadTaskByWorkerAndSource(workerName4, sourceID2), IsFalse)

cancel1()
wg.Wait()
}
Expand Down
2 changes: 2 additions & 0 deletions dm/dm/master/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,8 @@ func (s *Server) StartTask(ctx context.Context, req *pb.StartTaskRequest) (*pb.S
release()
}

go s.scheduler.TryResolveLoadTask(sources)
Ehco1996 marked this conversation as resolved.
Show resolved Hide resolved

resp.Result = true
if cfg.RemoveMeta {
resp.Msg = "`remove-meta` in task config is deprecated, please use `start-task ... --remove-meta` instead"
Expand Down
7 changes: 6 additions & 1 deletion dm/tests/_utils/test_prepare
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,18 @@ function join_string() {

# shortcut for start task on one DM-worker
function dmctl_start_task_standalone() {
if [ $# -ge 2 ]; then
remove_meta=$2
else
remove_meta=""
fi
if [ $# -ge 1 ]; then
task_conf=$1
else
task_conf="$cur/conf/dm-task.yaml"
fi
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"start-task $task_conf" \
"start-task $task_conf $remove_meta" \
"\"result\": true" 2 \
"\"source\": \"$SOURCE_ID1\"" 1
}
Expand Down
41 changes: 41 additions & 0 deletions dm/tests/load_task/conf/dm-task-standalone.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
---
name: load_task1
task-mode: all
is-sharding: false
meta-schema: "dm_meta"
heartbeat-update-interval: 1
heartbeat-report-interval: 1

target-database:
host: "127.0.0.1"
port: 4000
user: "test"
password: "/Q7B9DizNLLTTfiZHv9WoEAKamfpIUs="

mysql-instances:
- source-id: "mysql-replica-01"
block-allow-list: "instance"
mydumper-config-name: "global"
loader-config-name: "global"
syncer-config-name: "global"

block-allow-list:
instance:
do-dbs: ["load_task1"]

mydumpers:
global:
threads: 4
chunk-filesize: 64
skip-tz-utc: true
extra-args: ""

loaders:
global:
pool-size: 16
dir: "./dumped_data"

syncers:
global:
worker-count: 16
batch: 100
41 changes: 41 additions & 0 deletions dm/tests/load_task/conf/dm-task2-standalone.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
---
name: load_task2
task-mode: all
is-sharding: false
meta-schema: "dm_meta"
heartbeat-update-interval: 1
heartbeat-report-interval: 1

target-database:
host: "127.0.0.1"
port: 4000
user: "test"
password: "/Q7B9DizNLLTTfiZHv9WoEAKamfpIUs="

mysql-instances:
- source-id: "mysql-replica-01"
block-allow-list: "instance"
mydumper-config-name: "global"
loader-config-name: "global"
syncer-config-name: "global"

block-allow-list:
instance:
do-dbs: ["load_task2"]

mydumpers:
global:
threads: 4
chunk-filesize: 64
skip-tz-utc: true
extra-args: ""

loaders:
global:
pool-size: 16
dir: "./dumped_data"

syncers:
global:
worker-count: 16
batch: 100
81 changes: 81 additions & 0 deletions dm/tests/load_task/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -170,13 +170,94 @@ function test_transfer_two_sources() {
"\"taskStatus\": \"Running\"" 4
}

function stop_task_left_load() {
echo "start DM master, workers and sources"
run_dm_master $WORK_DIR/master $MASTER_PORT1 $cur/conf/dm-master.toml
check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT1

export GO_FAILPOINTS="github.com/pingcap/tiflow/dm/loader/LoadDataSlowDownByTask=return(\"load_task1\")"
run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT
cp $cur/conf/source1.yaml $WORK_DIR/source1.yaml
sed -i "/relay-binlog-name/i\relay-dir: $WORK_DIR/worker1/relay_log" $WORK_DIR/source1.yaml
dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1

dmctl_start_task_standalone "$cur/conf/dm-task-standalone.yaml" "--remove-meta"

export GO_FAILPOINTS=""
run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT

# kill worker1, load_task1 will be transferred to worker2, but lack local files
ps aux | grep dm-worker1 | awk '{print $2}' | xargs kill || true
check_port_offline $WORKER1_PORT 20
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status load_task1" \
"different worker in load stage, previous worker: worker1, current worker: worker2" 1

# now stop this task without clean meta (left a load_task KV in etcd)
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"stop-task load_task1" \
"\"result\": true" 2

dmctl_start_task_standalone "$cur/conf/dm-task2-standalone.yaml" "--remove-meta"
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status load_task2" \
"\"unit\": \"Sync\"" 1

# after worker1 goes online, although it has unfinished load_task1, but load_task1 is stopped so should not rebound
export GO_FAILPOINTS="github.com/pingcap/tiflow/dm/loader/LoadDataSlowDownByTask=return(\"load_task1\")"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need clean the GO_FAILPOINTS env after this test done?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still need this task stuck in load unit so following test can use it

run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT

run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"list-member --name worker1" \
"\"source\": \"\"" 1

# start-task again, expect the source is auto transferred back
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"start-task $cur/conf/dm-task-standalone.yaml"

run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"list-member --name worker1" \
"\"source\": \"mysql-replica-01\"" 1

# repeat again and check start-task --remove-meta will not cause transfer
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"stop-task load_task1" \
"\"result\": true" 2
ps aux | grep dm-worker1 | awk '{print $2}' | xargs kill || true
check_port_offline $WORKER1_PORT 20

run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status load_task2" \
"\"unit\": \"Sync\"" 1

run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT

dmctl_start_task_standalone "$cur/conf/dm-task-standalone.yaml" "--remove-meta"
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status load_task1" \
"\"unit\": \"Sync\"" 1
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"list-member --name worker1" \
"\"source\": \"\"" 1

cleanup_process $*
cleanup_data load_task1
cleanup_data load_task2
}

function run() {
echo "import prepare data"
run_sql_file $cur/data/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1
check_contains 'Query OK, 2 rows affected'
run_sql_file $cur/data/db2.prepare.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2
check_contains 'Query OK, 3 rows affected'

stop_task_left_load

echo "start DM master, workers and sources"
run_dm_master $WORK_DIR/master $MASTER_PORT1 $cur/conf/dm-master.toml
check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT1
Expand Down