Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
loader: support percent escape in dump files (#980) (#991)
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-srebot authored Sep 4, 2020
1 parent b13adb9 commit c8287f1
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 3 deletions.
36 changes: 34 additions & 2 deletions loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ package loader

import (
"bufio"
"bytes"
"context"
"encoding/hex"
"fmt"
"io"
"os"
Expand Down Expand Up @@ -153,7 +155,7 @@ func (w *Worker) run(ctx context.Context, fileJobQueue chan *fileJob, runFatalCh
return
}
sqls := make([]string, 0, 3)
sqls = append(sqls, fmt.Sprintf("USE `%s`;", job.schema))
sqls = append(sqls, fmt.Sprintf("USE `%s`;", unescapePercent(job.schema, w.tctx.L())))
sqls = append(sqls, job.sql)

offsetSQL := w.checkPoint.GenSQL(job.file, job.offset)
Expand Down Expand Up @@ -547,11 +549,41 @@ func (l *Loader) closeFileJobQueue() {
l.fileJobQueueClosed.Set(true)
}

// align with https://github.com/pingcap/dumpling/pull/140
// if input is malformed, return original string and print log
func unescapePercent(input string, logger log.Logger) string {
buf := bytes.Buffer{}
buf.Grow(len(input))
i := 0
for i < len(input) {
if input[i] != '%' {
buf.WriteByte(input[i])
i++
} else {
if i+2 >= len(input) {
logger.Error("malformed filename while unescapePercent", zap.String("filename", input))
return input
}
ascii, err := hex.DecodeString(input[i+1 : i+3])
if err != nil {
logger.Error("malformed filename while unescapePercent", zap.String("filename", input))
return input
}
buf.Write(ascii)
i = i + 3
}
}
return buf.String()
}

func (l *Loader) skipSchemaAndTable(table *filter.Table) bool {
if filter.IsSystemSchema(table.Schema) {
return true
}

table.Schema = unescapePercent(table.Schema, l.logCtx.L())
table.Name = unescapePercent(table.Name, l.logCtx.L())

tbs := []*filter.Table{table}
tbs = l.baList.ApplyOn(tbs)
return len(tbs) == 0
Expand Down Expand Up @@ -1060,7 +1092,7 @@ func (l *Loader) restoreStructure(ctx context.Context, conn *DBConn, sqlFile str
dstSchema, dstTable := fetchMatchedLiteral(tctx, l.tableRouter, schema, table)
// for table
if table != "" {
sqls = append(sqls, fmt.Sprintf("USE `%s`;", dstSchema))
sqls = append(sqls, fmt.Sprintf("USE `%s`;", unescapePercent(dstSchema, l.logCtx.L())))
query = renameShardingTable(query, table, dstTable)
} else {
query = renameShardingSchema(query, schema, dstSchema)
Expand Down
2 changes: 1 addition & 1 deletion tests/_utils/test_prepare
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ function cleanup_data() {
rm -rf $WORK_DIR
mkdir $WORK_DIR
for target_db in "$@"; do
run_sql "drop database if exists ${target_db}" $TIDB_PORT $TIDB_PASSWORD
run_sql "drop database if exists \`${target_db}\`" $TIDB_PORT $TIDB_PASSWORD
done
run_sql "drop database if exists dm_meta" $TIDB_PORT $TIDB_PASSWORD
}
Expand Down
48 changes: 48 additions & 0 deletions tests/full_mode/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,54 @@ function fail_acquire_global_lock() {
cleanup_process $*
}

function escape_schema() {
cp $cur/data/db1.prepare.sql $WORK_DIR/db1.prepare.sql
cp $cur/data/db2.prepare.sql $WORK_DIR/db2.prepare.sql
cp $cur/conf/dm-task.yaml $WORK_DIR/dm-task.yaml
cp $cur/conf/diff_config.toml $WORK_DIR/diff_config.toml
sed -i "s/full_mode/full\/mode/g" $WORK_DIR/db1.prepare.sql $WORK_DIR/db2.prepare.sql $WORK_DIR/dm-task.yaml $WORK_DIR/diff_config.toml

run_sql_file $WORK_DIR/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1
check_contains 'Query OK, 2 rows affected'
run_sql_file $WORK_DIR/db2.prepare.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2
check_contains 'Query OK, 3 rows affected'

# test load data with `/` in the table name
run_sql_source1 "create table \`full/mode\`.\`tb\/1\` (id int, name varchar(10), primary key(\`id\`));"
run_sql_source1 "insert into \`full/mode\`.\`tb\/1\` values(1,'haha');"
run_sql_source1 "insert into \`full/mode\`.\`tb\/1\` values(2,'hihi');"

run_sql_file $cur/data/db1.prepare.user.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1
check_count 'Query OK, 0 rows affected' 7
run_sql_file $cur/data/db2.prepare.user.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2
check_count 'Query OK, 0 rows affected' 7

run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml
check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT
run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT
run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT
# operate mysql config to worker
cp $cur/conf/source1.yaml $WORK_DIR/source1.yaml
cp $cur/conf/source2.yaml $WORK_DIR/source2.yaml
sed -i "/relay-binlog-name/i\relay-dir: $WORK_DIR/worker1/relay_log" $WORK_DIR/source1.yaml
sed -i "/relay-binlog-name/i\relay-dir: $WORK_DIR/worker2/relay_log" $WORK_DIR/source2.yaml
dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1
dmctl_operate_source create $WORK_DIR/source2.yaml $SOURCE_ID2

# start DM task only
dmctl_start_task "$WORK_DIR/dm-task.yaml" "--remove-meta"
check_sync_diff $WORK_DIR $WORK_DIR/diff_config.toml

echo "check dump files have been cleaned"
ls $WORK_DIR/worker1/dumped_data.test && exit 1 || echo "worker1 auto removed dump files"
ls $WORK_DIR/worker2/dumped_data.test && exit 1 || echo "worker2 auto removed dump files"

cleanup_data full/mode
cleanup_process $*
}

function run() {
fail_acquire_global_lock

Expand Down

0 comments on commit c8287f1

Please sign in to comment.