Skip to content

Commit

Permalink
dm/syncer : add integration test for dml using downstream schema (pin…
Browse files Browse the repository at this point in the history
  • Loading branch information
WizardXiao authored and GMHDBJD committed Dec 27, 2021
1 parent 70b52e9 commit 6bc06ad
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 0 deletions.
11 changes: 11 additions & 0 deletions dm/syncer/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
package syncer

import (
"fmt"
"strconv"
"time"

"github.com/pingcap/failpoint"
Expand Down Expand Up @@ -165,6 +167,15 @@ func (c *compactor) compactJob(j *job) {
}

key := j.dml.identifyKey()

failpoint.Inject("DownstreamIdentifyKeyCheckInCompact", func(v failpoint.Value) {
value, err := strconv.Atoi(key)
upper := v.(int)
if err != nil || value > upper {
panic(fmt.Sprintf("downstream identifyKey check failed. key value %v should less than %v", value, upper))
}
})

prevPos, ok := tableKeyMap[key]
// if no such key in the buffer, add it
if !ok {
Expand Down
21 changes: 21 additions & 0 deletions dm/tests/_utils/test_prepare
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,27 @@ function run_sql_tidb_with_retry() {
fi
}

# shortcut for run tidb sql and check result with retry
function run_sql_tidb_with_retry_times() {
rc=0
for ((k=1; k<$3; k++)); do
run_sql_tidb "$1"
if grep -Fq "$2" "$TEST_DIR/sql_res.$TEST_NAME.txt"; then
rc=1
break
fi
echo "run tidb sql failed $k-th time, retry later"
sleep 2
done
if [[ $rc = 0 ]]; then
echo "TEST FAILED: OUTPUT DOES NOT CONTAIN '$2'"
echo "____________________________________"
cat "$TEST_DIR/sql_res.$TEST_NAME.txt"
echo "^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^"
exit 1
fi
}

# shortcut for check log contain with retry
function check_log_contain_with_retry() {
text=$1
Expand Down
89 changes: 89 additions & 0 deletions dm/tests/shardddl1/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,54 @@ function DM_COMPACT() {
"clean_table" ""
}

function DM_COMPACT_USE_DOWNSTREAM_SCHEMA_CASE() {
END=10
# As this kind of sql is no use, like "update tb1 set c=1 where a=100" which is behind of "insert into tb1(a,b,c) values(100,1,1)"
# We should avoid this kind of sql to make sure the count of dmls
for i in $(seq 0 $END); do
run_sql_source1 "insert into ${shardddl1}.${tb1}(a,b,c) values($((i + 100)),$i,$i)"
run_sql_source1 "update ${shardddl1}.${tb1} set c=20 where a=$((i + 100))"
run_sql_source1 "update ${shardddl1}.${tb1} set c=c+1 where a=$((i + 100))"
# Use downstream uk 'b' as key and this sql which modifiies 'b' will be splited to two job(delete+insert)
run_sql_source1 "update ${shardddl1}.${tb1} set b=b+1 where a=$((i + 100))"
run_sql_source1 "update ${shardddl1}.${tb1} set a=a+100 where a=$((i + 100))"
run_sql_source1 "delete from ${shardddl1}.${tb1} where a=$((i + 200))"
run_sql_source1 "insert into ${shardddl1}.${tb1}(a,b,c) values($((i + 100)),$i,$i)"
done
run_sql_tidb_with_retry_times "select count(1) from ${shardddl}.${tb};" "count(1): 11" 30
run_sql_tidb "create table ${shardddl}.${tb}_temp (a int primary key auto_increment, b int unique not null, c int) auto_increment = 100;
insert into ${shardddl}.${tb}_temp (a, b, c) select a, b, c from ${shardddl}.${tb};
drop table ${shardddl}.${tb}; rename table ${shardddl}.${tb}_temp to ${shardddl}.${tb};"
check_sync_diff $WORK_DIR $cur/conf/diff_config.toml 30
compactCnt=$(cat $WORK_DIR/worker1/log/dm-worker.log $WORK_DIR/worker2/log/dm-worker.log | grep "finish to compact" | wc -l)
# As compact is affected by "j.tp == flush", the check count of compact use "-le 50"
if [[ "$compactCnt" -le 50 ]]; then
echo "compact $compactCnt dmls which is less than 50"
exit 1
fi
}

function DM_COMPACT_USE_DOWNSTREAM_SCHEMA() {
# downstream pk/uk/column is diffrent with upstream, compact use downstream schema.
ps aux | grep dm-worker | awk '{print $2}' | xargs kill || true
check_port_offline $WORKER1_PORT 20
check_port_offline $WORKER2_PORT 20
# DownstreamIdentifyKeyCheckInCompact=return(20) will check whether the key value in compact is less than 20, if false, it will be panic.
# This goal is check whether it use downstream schema in compator.
# if use downstream schema, key will be 'b' with value less than 20.
# If use upstream schema, key will be 'a' with value greater than 100.
export GO_FAILPOINTS='github.com/pingcap/ticdc/dm/syncer/SkipFlushCompactor=return();github.com/pingcap/ticdc/dm/syncer/DownstreamIdentifyKeyCheckInCompact=return(20)'
run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml
run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT

run_case COMPACT_USE_DOWNSTREAM_SCHEMA "single-source-no-sharding" \
"run_sql_source1 \"create table ${shardddl1}.${tb1} (a int primary key, b int unique not null, c int);\";
run_sql_tidb \"drop database if exists ${shardddl}; create database ${shardddl}; create table ${shardddl}.${tb} (a int, b int unique not null, c int, d int primary key auto_increment) auto_increment = 100;\"" \
"clean_table" ""
}

function DM_MULTIPLE_ROWS_CASE() {
END=100
for i in $(seq 1 10 $END); do
Expand Down Expand Up @@ -668,6 +716,16 @@ function DM_MULTIPLE_ROWS_CASE() {
}

function DM_MULTIPLE_ROWS() {

ps aux | grep dm-worker | awk '{print $2}' | xargs kill || true
check_port_offline $WORKER1_PORT 20
check_port_offline $WORKER2_PORT 20
export GO_FAILPOINTS='github.com/pingcap/ticdc/dm/syncer/BlockExecuteSQLs=return(1)'
run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml
run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT

run_case MULTIPLE_ROWS "single-source-no-sharding" \
"run_sql_source1 \"create table ${shardddl1}.${tb1} (a int primary key, b int unique, c int);\"" \
"clean_table" ""
Expand Down Expand Up @@ -699,13 +757,43 @@ function DM_CAUSALITY() {
"clean_table" ""
}

function DM_CAUSALITY_USE_DOWNSTREAM_SCHEMA_CASE() {
run_sql_source1 "insert into ${shardddl1}.${tb1} values(1,2)"
run_sql_source1 "insert into ${shardddl1}.${tb1} values(2,3)"
run_sql_source1 "update ${shardddl1}.${tb1} set a=3, b=4 where b=3"
run_sql_source1 "delete from ${shardddl1}.${tb1} where a=1"
run_sql_source1 "insert into ${shardddl1}.${tb1} values(1,3)"

run_sql_tidb_with_retry_times "select count(1) from ${shardddl}.${tb} where a =1 and b=3;" "count(1): 1" 30
run_sql_tidb "create table ${shardddl}.${tb}_temp (a int primary key, b int unique);
insert into ${shardddl}.${tb}_temp (a, b) select a, b from ${shardddl}.${tb};
drop table ${shardddl}.${tb}; rename table ${shardddl}.${tb}_temp to ${shardddl}.${tb};"
check_sync_diff $WORK_DIR $cur/conf/diff_config.toml

causalityCnt=$(cat $WORK_DIR/worker1/log/dm-worker.log $WORK_DIR/worker2/log/dm-worker.log | grep "meet causality key, will generate a conflict job to flush all sqls" | wc -l)
if [[ "$causalityCnt" -ne 0 ]]; then
echo "causalityCnt is $causalityCnt, but it should be 0"
exit 1
fi
}

function DM_CAUSALITY_USE_DOWNSTREAM_SCHEMA() {
# downstream pk/uk/column is diffrent with upstream, causality use downstream schema.
run_case CAUSALITY_USE_DOWNSTREAM_SCHEMA "single-source-no-sharding" \
"run_sql_source1 \"create table ${shardddl1}.${tb1} (a int primary key, b int unique);\";
run_sql_tidb \"drop database if exists ${shardddl}; create database ${shardddl}; create table ${shardddl}.${tb} (a int, b int unique, c int primary key auto_increment) auto_increment = 100;\"" \
"clean_table" ""
}

function run() {
init_cluster
init_database

DM_COMPACT
DM_COMPACT_USE_DOWNSTREAM_SCHEMA
DM_MULTIPLE_ROWS
DM_CAUSALITY
DM_CAUSALITY_USE_DOWNSTREAM_SCHEMA
DM_UpdateBARule
DM_RENAME_TABLE
DM_RENAME_COLUMN_OPTIMISTIC
Expand All @@ -719,6 +807,7 @@ function run() {
DM_${i}
sleep 1
done

}

cleanup_data $shardddl
Expand Down

0 comments on commit 6bc06ad

Please sign in to comment.