Skip to content

Commit

Permalink
dm/test: add a test to reproduce #3487
Browse files Browse the repository at this point in the history
  • Loading branch information
lance6716 committed Nov 17, 2021
1 parent faa070d commit cdfbdeb
Show file tree
Hide file tree
Showing 7 changed files with 180 additions and 0 deletions.
24 changes: 24 additions & 0 deletions dm/relay/relay_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@
package relay

import (
"math/rand"
"path/filepath"
"time"

gmysql "github.com/go-mysql-org/go-mysql/mysql"
"github.com/go-mysql-org/go-mysql/replication"
"github.com/pingcap/failpoint"
"go.uber.org/atomic"
"go.uber.org/zap"

Expand Down Expand Up @@ -233,7 +235,29 @@ func (w *FileWriter) handleEventDefault(ev *replication.BinlogEvent) (WResult, e
}

// write the non-duplicate event
failpoint.Inject("SlowDownWriteDMLRelayLog", func(_ failpoint.Value) {
if rand.Int()%4 == 0 {
switch ev.Header.EventType {
case replication.WRITE_ROWS_EVENTv0, replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2,
replication.DELETE_ROWS_EVENTv0, replication.DELETE_ROWS_EVENTv1, replication.DELETE_ROWS_EVENTv2,
replication.UPDATE_ROWS_EVENTv0, replication.UPDATE_ROWS_EVENTv1, replication.UPDATE_ROWS_EVENTv2:
mid := len(ev.RawData) / 2
first, second := ev.RawData[:mid], ev.RawData[mid:]
err2 := w.out.Write(first)
if err2 != nil {
w.logger.DPanic("error in failpoint SlowDownWriteDMLRelayLog", zap.Error(err2))
}
time.Sleep(time.Second)
err = w.out.Write(second)
failpoint.Goto("afterWrite")
}
}
})

err = w.out.Write(ev.RawData)

failpoint.Label("afterWrite")

return WResult{
Ignore: false,
}, terror.Annotatef(err, "write event %+v", ev.Header)
Expand Down
29 changes: 29 additions & 0 deletions dm/tests/slow_relay_writer/conf/diff_config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# diff Configuration.

check-thread-count = 4

export-fix-sql = true

check-struct-only = false

[task]
output-dir = "/tmp/ticdc_dm_test/output"

source-instances = ["mysql1"]

target-instance = "tidb0"

target-check-tables = ["slow_relay_writer.?*"]

[data-sources]
[data-sources.mysql1]
host = "127.0.0.1"
port = 3306
user = "root"
password = "123456"

[data-sources.tidb0]
host = "127.0.0.1"
port = 4000
user = "test"
password = "123456"
4 changes: 4 additions & 0 deletions dm/tests/slow_relay_writer/conf/dm-master.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Master Configuration.
master-addr = ":8261"
advertise-addr = "127.0.0.1:8261"
auto-compaction-retention = "3s"
42 changes: 42 additions & 0 deletions dm/tests/slow_relay_writer/conf/dm-task.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
---
name: test
task-mode: all
is-sharding: false
meta-schema: "dm_meta"
# enable-heartbeat: true

target-database:
host: "127.0.0.1"
port: 4000
user: "root"
password: ""

mysql-instances:
- source-id: "mysql-replica-01"
block-allow-list: "instance"
mydumper-config-name: "global"
loader-config-name: "global"
syncer-config-name: "global"

block-allow-list:
instance:
do-dbs: ["slow_relay_writer"]

mydumpers:
global:
threads: 4
chunk-filesize: 0
skip-tz-utc: true
statement-size: 100
extra-args: ""

loaders:
global:
pool-size: 16
dir: "./dumped_data"

syncers:
global:
worker-count: 16
batch: 100
checkpoint-flush-interval: 5
2 changes: 2 additions & 0 deletions dm/tests/slow_relay_writer/conf/dm-worker1.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
name = "worker1"
join = "127.0.0.1:8261"
13 changes: 13 additions & 0 deletions dm/tests/slow_relay_writer/conf/source1.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
source-id: mysql-replica-01
flavor: ''
enable-gtid: true
relay-binlog-name: ''
relay-binlog-gtid: ''
enable-relay: true
from:
host: 127.0.0.1
user: root
password: /Q7B9DizNLLTTfiZHv9WoEAKamfpIUs=
port: 3306
checker:
check-enable: false
66 changes: 66 additions & 0 deletions dm/tests/slow_relay_writer/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
#!/bin/bash

set -eu

cur=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
source $cur/../_utils/test_prepare

WORK_DIR=$TEST_DIR/$TEST_NAME
TABLE_NUM=10

function prepare_data() {
run_sql 'DROP DATABASE if exists slow_relay_writer;' $MYSQL_PORT1 $MYSQL_PASSWORD1
run_sql 'CREATE DATABASE slow_relay_writer;' $MYSQL_PORT1 $MYSQL_PASSWORD1
for i in $(seq $TABLE_NUM); do
run_sql "CREATE TABLE slow_relay_writer.t$i(i TINYINT, j INT UNIQUE KEY);" $MYSQL_PORT1 $MYSQL_PASSWORD1
done
}

function incremental_data() {
for j in $(seq 2); do
for i in $(seq $TABLE_NUM); do
run_sql "BEGIN;INSERT INTO slow_relay_writer.t$i VALUES ($j,${j}000$j);INSERT INTO slow_relay_writer.t$i VALUES ($j,${j}001$j);COMMIT;" $MYSQL_PORT1 $MYSQL_PASSWORD1
done
done
}

function run() {
prepare_data

export GO_FAILPOINTS='github.com/pingcap/ticdc/dm/relay/SlowDownWriteDMLRelayLog=return();github.com/pingcap/ticdc/dm/relay/SetHeartbeatInterval=return(5)'

run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml
check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT
run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT
# operate mysql config to worker
cp $cur/conf/source1.yaml $WORK_DIR/source1.yaml
sed -i "/relay-binlog-name/i\relay-dir: $WORK_DIR/worker1/relay_log" $WORK_DIR/source1.yaml
dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1

dmctl_start_task_standalone
check_sync_diff $WORK_DIR $cur/conf/diff_config.toml

echo "start incremental_data"
incremental_data
echo "finish incremental_data"

sleep 20

run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status test" \
"\"synced\": true" 1

check_sync_diff $WORK_DIR $cur/conf/diff_config.toml

export GO_FAILPOINTS=''
}

cleanup_data slow_relay_writer
cleanup_process

run $*

cleanup_process

echo "[$(date)] <<<<<< test case $TEST_NAME success! >>>>>>"

0 comments on commit cdfbdeb

Please sign in to comment.