Skip to content

Commit

Permalink
Merge branch 'master' into add_integration_test_for_dml_using_downstr…
Browse files Browse the repository at this point in the history
…eam_schema
  • Loading branch information
WizardXiao authored Nov 17, 2021
2 parents 3ab0f34 + faa070d commit 8fbe603
Show file tree
Hide file tree
Showing 8 changed files with 107 additions and 4 deletions.
10 changes: 10 additions & 0 deletions dm/dm/master/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1753,6 +1753,16 @@ func (s *Scheduler) handleWorkerOnline(ev ha.WorkerEvent, toLock bool) error {

// 3. change the stage (from Offline) to Free or Relay.
lastRelaySource := w.RelaySourceID()
if lastRelaySource == "" {
// when worker is removed (for example lost keepalive when master scheduler boots up), w.RelaySourceID() is
// of course nothing, so we find the relay source from a better place
for source, workerM := range s.relayWorkers {
if _, ok2 := workerM[w.BaseInfo().Name]; ok2 {
lastRelaySource = source
break
}
}
}
w.ToFree()
// TODO: rename ToFree to Online and move below logic inside it
if lastRelaySource != "" {
Expand Down
2 changes: 1 addition & 1 deletion dm/dm/master/shardddl/optimist.go
Original file line number Diff line number Diff line change
Expand Up @@ -698,7 +698,7 @@ func (o *Optimist) handleLock(info optimism.Info, tts []optimism.TargetTable, sk
func (o *Optimist) removeLock(lock *optimism.Lock) (bool, error) {
failpoint.Inject("SleepWhenRemoveLock", func(val failpoint.Value) {
t := val.(int)
log.L().Info("wait new ddl info putted into etcd",
log.L().Info("wait new ddl info putted into etcd in optimistic",
zap.String("failpoint", "SleepWhenRemoveLock"),
zap.Int("max wait second", t))

Expand Down
2 changes: 1 addition & 1 deletion dm/dm/master/shardddl/pessimist.go
Original file line number Diff line number Diff line change
Expand Up @@ -645,7 +645,7 @@ func (p *Pessimist) removeLock(lock *pessimism.Lock) error {

failpoint.Inject("SleepWhenRemoveLock", func(val failpoint.Value) {
t := val.(int)
log.L().Info("wait new ddl info putted into etcd",
log.L().Info("wait new ddl info putted into etcd in pessimistic",
zap.String("failpoint", "SleepWhenRemoveLock"),
zap.Int("max wait second", t))

Expand Down
1 change: 1 addition & 0 deletions dm/dm/worker/task_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ func (s *testTaskCheckerSuite) TestIsResumableError(c *check.C) {
{nil, true},
{errors.New("unknown error"), true},
{terror.ErrNotSet.Delegate(&tmysql.SQLError{Code: 1236, Message: "Could not find first log file name in binary log index file", State: tmysql.DefaultMySQLState}), false},
{terror.ErrNotSet.Delegate(&tmysql.SQLError{Code: 1236, Message: "The slave is connecting using CHANGE MASTER TO MASTER_AUTO_POSITION = 1, but the master has purged binary logs containing GTIDs that the slave requires", State: tmysql.DefaultMySQLState}), false},
}

for _, tc := range testCases {
Expand Down
1 change: 1 addition & 0 deletions dm/pkg/retry/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ var (
// ReplicationErrMsgs list the error message of un-recoverable replication error.
ReplicationErrMsgs = []string{
"Could not find first log file name in binary log index file",
"The slave is connecting using CHANGE MASTER TO MASTER_AUTO_POSITION = 1, but the master has purged binary logs containing GTIDs that the slave requires",
}

// ParseRelayLogErrMsgs list the error messages of some un-recoverable relay log parsing error, which is used in task auto recovery.
Expand Down
14 changes: 14 additions & 0 deletions dm/tests/new_relay/conf/source2.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
source-id: mysql-replica-02
server-id: 123456
flavor: 'mysql'
enable-gtid: true
relay-binlog-name: ''
relay-binlog-gtid: ''
enable-relay: false
from:
host: 127.0.0.1
user: root
password: /Q7B9DizNLLTTfiZHv9WoEAKamfpIUs=
port: 3307
checker:
check-enable: false
79 changes: 78 additions & 1 deletion dm/tests/new_relay/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,82 @@ function test_cant_dail_downstream() {
cleanup_data $TEST_NAME
}

function test_restart_relay_status() {
cleanup_data $TEST_NAME
cleanup_process

run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml
check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT
run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT

dmctl_operate_source create $cur/conf/source1.yaml $SOURCE_ID1

run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"start-relay -s $SOURCE_ID1 worker1"
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status -s $SOURCE_ID1" \
"\"result\": true" 2 \
"\"worker\": \"worker1\"" 1

run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT

dmctl_operate_source create $cur/conf/source2.yaml $SOURCE_ID2

run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"start-relay -s $SOURCE_ID2 worker2"
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status -s $SOURCE_ID2" \
"\"result\": true" 2 \
"\"worker\": \"worker2\"" 1

run_dm_worker $WORK_DIR/worker3 $WORKER3_PORT $cur/conf/dm-worker3.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER3_PORT

run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"start-relay -s $SOURCE_ID2 worker3"
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status -s $SOURCE_ID2" \
"\"result\": true" 3 \
"\"worker\": \"worker2\"" 1 \
"\"worker\": \"worker3\"" 1

run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"list-member -n worker3" \
"relay" 1

kill_dm_worker
kill_dm_master

run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml
check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT

run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml
run_dm_worker $WORK_DIR/worker3 $WORKER3_PORT $cur/conf/dm-worker3.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER3_PORT

run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT

run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status -s $SOURCE_ID1" \
"\"result\": true" 2 \
"\"worker\": \"worker1\"" 1

run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status -s $SOURCE_ID2" \
"\"result\": true" 3 \
"\"worker\": \"worker2\"" 1 \
"\"worker\": \"worker3\"" 1

run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"list-member --worker" \
"relay" 1 \
"bound" 2
}

function test_kill_dump_connection() {
cleanup_data $TEST_NAME
cleanup_process
Expand All @@ -108,7 +184,7 @@ function test_kill_dump_connection() {
"\"worker\": \"worker1\"" 1
run_sql_source1 "show processlist"

# kill dumop connection to test wheather relay will auto reconnect db
# kill dump connection to test whether relay will auto reconnect db
dump_conn_id=$(cat $TEST_DIR/sql_res.$TEST_NAME.txt | grep Binlog -B 4 | grep Id | cut -d : -f2)
run_sql_source1 "kill ${dump_conn_id}"

Expand All @@ -123,6 +199,7 @@ function test_kill_dump_connection() {
}

function run() {
test_restart_relay_status
test_cant_dail_downstream
test_cant_dail_upstream

Expand Down
2 changes: 1 addition & 1 deletion dm/tests/shardddl1/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ function DM_RemoveLock_CASE() {
run_sql_source1 "alter table ${shardddl1}.${tb1} add column c double;"
run_sql_source2 "alter table ${shardddl1}.${tb1} add column c double;"
run_sql_source2 "alter table ${shardddl1}.${tb2} add column c double;"
check_log_contain_with_retry "wait new ddl info putted into etcd" $WORK_DIR/master/log/dm-master.log
check_log_contain_with_retry "wait new ddl info putted into etcd in ${1}" $WORK_DIR/master/log/dm-master.log
check_metric_not_contains $MASTER_PORT "dm_master_shard_ddl_error" 3
run_sql_source1 "alter table ${shardddl1}.${tb1} drop column b;"

Expand Down

0 comments on commit 8fbe603

Please sign in to comment.