Skip to content
This repository has been archived by the owner on Dec 8, 2021. It is now read-only.

Commit

Permalink
restore: fix several bugs related to column permutations (#437)
Browse files Browse the repository at this point in the history
* fix multi error in column permutation

* add unit test

* add a integration test

* rename test db

* change log
  • Loading branch information
glorv authored Nov 4, 2020
1 parent 8013216 commit eb609ec
Show file tree
Hide file tree
Showing 10 changed files with 287 additions and 10 deletions.
23 changes: 17 additions & 6 deletions lightning/backend/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,10 @@ func (row tidbRows) MarshalLogArray(encoder zapcore.ArrayEncoder) error {
}

type tidbEncoder struct {
mode mysql.SQLMode
tbl table.Table
se *session
mode mysql.SQLMode
tbl table.Table
se *session
columnIdx []int
}

type tidbBackend struct {
Expand Down Expand Up @@ -230,14 +231,24 @@ func (*tidbEncoder) Close() {}
func (enc *tidbEncoder) Encode(logger log.Logger, row []types.Datum, _ int64, columnPermutation []int) (Row, error) {
cols := enc.tbl.Cols()

if len(enc.columnIdx) == 0 {
columnIdx := make([]int, len(columnPermutation))
for i, idx := range columnPermutation {
if idx >= 0 {
columnIdx[idx] = i
}
}
enc.columnIdx = columnIdx
}

var encoded strings.Builder
encoded.Grow(8 * len(row))
encoded.WriteByte('(')
for i, field := range row {
if i != 0 {
encoded.WriteByte(',')
}
if err := enc.appendSQL(&encoded, &field, cols[columnPermutation[i]]); err != nil {
if err := enc.appendSQL(&encoded, &field, cols[enc.columnIdx[i]]); err != nil {
logger.Error("tidb encode failed",
zap.Array("original", rowArrayMarshaler(row)),
zap.Int("originalCol", i),
Expand Down Expand Up @@ -347,8 +358,8 @@ func (be *tidbBackend) WriteRows(ctx context.Context, _ uuid.UUID, tableName str
// Retry will be done externally, so we're not going to retry here.
_, err := be.db.ExecContext(ctx, insertStmt.String())
if err != nil {
log.L().Error("execute statement failed",
zap.Array("rows", rows), zap.String("stmt", insertStmt.String()), zap.Error(err))
log.L().Error("execute statement failed", zap.String("stmt", insertStmt.String()),
zap.Array("rows", rows), zap.Error(err))
}
failpoint.Inject("FailIfImportedSomeRows", func() {
panic("forcing failure due to FailIfImportedSomeRows, before saving checkpoint")
Expand Down
3 changes: 2 additions & 1 deletion lightning/backend/tidb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,9 @@ func (s *mysqlSuite) TestStrictMode(c *C) {
c.Assert(err, ErrorMatches, `.*incorrect utf8 value .* for column s0`)

_, err = encoder.Encode(logger, []types.Datum{
types.NewStringDatum(""),
types.NewStringDatum("非 ASCII 字符串"),
}, 1, []int{1, 0, -1})
}, 1, []int{0, 1, -1})
c.Assert(err, ErrorMatches, ".*incorrect ascii value .* for column s1")
}

Expand Down
2 changes: 1 addition & 1 deletion lightning/checkpoints/checkpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -1441,7 +1441,7 @@ func (cpdb *FileCheckpointsDB) DumpChunks(context.Context, io.Writer) error {
}

func intSlice2Int32Slice(s []int) []int32 {
res := make([]int32, len(s))
res := make([]int32, 0, len(s))
for _, i := range s {
res = append(res, int32(i))
}
Expand Down
16 changes: 14 additions & 2 deletions lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -1576,8 +1576,20 @@ func (t *TableRestore) parseColumnPermutations(columns []string) ([]int, error)
}

func getColumnNames(tableInfo *model.TableInfo, permutation []int) []string {
names := make([]string, 0, len(permutation))
for _, idx := range permutation {
colIndexes := make([]int, 0, len(permutation))
for i := 0; i < len(permutation); i++ {
colIndexes = append(colIndexes, -1)
}
colCnt := 0
for i, p := range permutation {
if p >= 0 {
colIndexes[p] = i
colCnt++
}
}

names := make([]string, 0, colCnt)
for _, idx := range colIndexes {
// skip columns with index -1
if idx >= 0 {
names = append(names, tableInfo.Columns[idx].Name.O)
Expand Down
203 changes: 203 additions & 0 deletions lightning/restore/restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,209 @@ func (s *tableRestoreSuite) TestPopulateChunks(c *C) {
s.cfg.Mydumper.CSV.Header = false
}

func (s *tableRestoreSuite) TestPopulateChunksCSVHeader(c *C) {
fakeDataDir := c.MkDir()
store, err := storage.NewLocalStorage(fakeDataDir)
c.Assert(err, IsNil)

fakeDataFiles := make([]mydump.FileInfo, 0)

fakeCsvContents := []string{
// small full header
"a,b,c\r\n1,2,3\r\n",
// small partial header
"b,c\r\n2,3\r\n",
// big full header
"a,b,c\r\n90000,80000,700000\r\n1000,2000,3000\r\n11,22,33\r\n3,4,5\r\n",
// big full header unordered
"c,a,b\r\n,1000,2000,3000\r\n11,22,33\r\n1000,2000,404\r\n3,4,5\r\n90000,80000,700000\r\n7999999,89999999,9999999\r\n",
// big partial header
"b,c\r\n2000001,30000001\r\n35231616,462424626\r\n62432,434898934\r\n",
}
total := 0
for i, s := range fakeCsvContents {
csvName := fmt.Sprintf("db.table.%02d.csv", i)
err := ioutil.WriteFile(filepath.Join(fakeDataDir, csvName), []byte(s), 0644)
c.Assert(err, IsNil)
fakeDataFiles = append(fakeDataFiles, mydump.FileInfo{
TableName: filter.Table{"db", "table"},
FileMeta: mydump.SourceFileMeta{Path: csvName, Type: mydump.SourceTypeCSV, SortKey: fmt.Sprintf("%02d", i)},
Size: int64(len(s)),
})
total += len(s)
}
tableMeta := &mydump.MDTableMeta{
DB: "db",
Name: "table",
TotalSize: int64(total),
SchemaFile: mydump.FileInfo{TableName: filter.Table{Schema: "db", Name: "table"}, FileMeta: mydump.SourceFileMeta{Path: "db.table-schema.sql", Type: mydump.SourceTypeTableSchema}},
DataFiles: fakeDataFiles,
}

failpoint.Enable("github.com/pingcap/tidb-lightning/lightning/restore/PopulateChunkTimestamp", "return(1234567897)")
defer failpoint.Disable("github.com/pingcap/tidb-lightning/lightning/restore/PopulateChunkTimestamp")

cp := &TableCheckpoint{
Engines: make(map[int32]*EngineCheckpoint),
}

cfg := config.NewConfig()
cfg.Mydumper.BatchSize = 100
cfg.Mydumper.MaxRegionSize = 40

cfg.Mydumper.CSV.Header = true
cfg.Mydumper.StrictFormat = true
rc := &RestoreController{cfg: cfg, ioWorkers: worker.NewPool(context.Background(), 1, "io"), store: store}

tr, err := NewTableRestore("`db`.`table`", tableMeta, s.dbInfo, s.tableInfo, &TableCheckpoint{})
c.Assert(err, IsNil)
c.Assert(tr.populateChunks(context.Background(), rc, cp), IsNil)

c.Assert(cp.Engines, DeepEquals, map[int32]*EngineCheckpoint{
-1: {
Status: CheckpointStatusLoaded,
},
0: {
Status: CheckpointStatusLoaded,
Chunks: []*ChunkCheckpoint{
{
Key: ChunkCheckpointKey{Path: tableMeta.DataFiles[0].FileMeta.Path, Offset: 0},
FileMeta: tableMeta.DataFiles[0].FileMeta,
Chunk: mydump.Chunk{
Offset: 0,
EndOffset: 14,
PrevRowIDMax: 0,
RowIDMax: 4, // 37 bytes with 3 columns can store at most 7 rows.
},
Timestamp: 1234567897,
},
{
Key: ChunkCheckpointKey{Path: tableMeta.DataFiles[1].FileMeta.Path, Offset: 0},
FileMeta: tableMeta.DataFiles[1].FileMeta,
Chunk: mydump.Chunk{
Offset: 0,
EndOffset: 10,
PrevRowIDMax: 4,
RowIDMax: 7,
},
Timestamp: 1234567897,
},
{
Key: ChunkCheckpointKey{Path: tableMeta.DataFiles[2].FileMeta.Path, Offset: 6},
FileMeta: tableMeta.DataFiles[2].FileMeta,
ColumnPermutation: []int{0, 1, 2, -1},
Chunk: mydump.Chunk{
Offset: 6,
EndOffset: 52,
PrevRowIDMax: 7,
RowIDMax: 20,
Columns: []string{"a", "b", "c"},
},

Timestamp: 1234567897,
},
{
Key: ChunkCheckpointKey{Path: tableMeta.DataFiles[2].FileMeta.Path, Offset: 52},
FileMeta: tableMeta.DataFiles[2].FileMeta,
ColumnPermutation: []int{0, 1, 2, -1},
Chunk: mydump.Chunk{
Offset: 52,
EndOffset: 60,
PrevRowIDMax: 20,
RowIDMax: 22,
Columns: []string{"a", "b", "c"},
},
Timestamp: 1234567897,
},
{
Key: ChunkCheckpointKey{Path: tableMeta.DataFiles[3].FileMeta.Path, Offset: 6},
FileMeta: tableMeta.DataFiles[3].FileMeta,
ColumnPermutation: []int{1, 2, 0, -1},
Chunk: mydump.Chunk{
Offset: 6,
EndOffset: 48,
PrevRowIDMax: 22,
RowIDMax: 35,
Columns: []string{"c", "a", "b"},
},
Timestamp: 1234567897,
},
},
},
1: {
Status: CheckpointStatusLoaded,
Chunks: []*ChunkCheckpoint{
{
Key: ChunkCheckpointKey{Path: tableMeta.DataFiles[3].FileMeta.Path, Offset: 48},
FileMeta: tableMeta.DataFiles[3].FileMeta,
ColumnPermutation: []int{1, 2, 0, -1},
Chunk: mydump.Chunk{
Offset: 48,
EndOffset: 101,
PrevRowIDMax: 35,
RowIDMax: 48,
Columns: []string{"c", "a", "b"},
},
Timestamp: 1234567897,
},
{
Key: ChunkCheckpointKey{Path: tableMeta.DataFiles[3].FileMeta.Path, Offset: 101},
FileMeta: tableMeta.DataFiles[3].FileMeta,
ColumnPermutation: []int{1, 2, 0, -1},
Chunk: mydump.Chunk{
Offset: 101,
EndOffset: 102,
PrevRowIDMax: 48,
RowIDMax: 48,
Columns: []string{"c", "a", "b"},
},
Timestamp: 1234567897,
},
{
Key: ChunkCheckpointKey{Path: tableMeta.DataFiles[4].FileMeta.Path, Offset: 4},
FileMeta: tableMeta.DataFiles[4].FileMeta,
ColumnPermutation: []int{-1, 0, 1, -1},
Chunk: mydump.Chunk{
Offset: 4,
EndOffset: 59,
PrevRowIDMax: 48,
RowIDMax: 61,
Columns: []string{"b", "c"},
},
Timestamp: 1234567897,
},
},
},
2: {
Status: CheckpointStatusLoaded,
Chunks: []*ChunkCheckpoint{
{
Key: ChunkCheckpointKey{Path: tableMeta.DataFiles[4].FileMeta.Path, Offset: 59},
FileMeta: tableMeta.DataFiles[4].FileMeta,
ColumnPermutation: []int{-1, 0, 1, -1},
Chunk: mydump.Chunk{
Offset: 59,
EndOffset: 60,
PrevRowIDMax: 61,
RowIDMax: 61,
Columns: []string{"b", "c"},
},
Timestamp: 1234567897,
},
},
},
})
}

func (s *tableRestoreSuite) TestGetColumnsNames(c *C) {
c.Assert(getColumnNames(s.tableInfo.Core, []int{0, 1, 2, -1}), DeepEquals, []string{"a", "b", "c"})
c.Assert(getColumnNames(s.tableInfo.Core, []int{1, 0, 2, -1}), DeepEquals, []string{"b", "a", "c"})
c.Assert(getColumnNames(s.tableInfo.Core, []int{-1, 0, 1, -1}), DeepEquals, []string{"b", "c"})
c.Assert(getColumnNames(s.tableInfo.Core, []int{0, 1, -1, -1}), DeepEquals, []string{"a", "b"})
c.Assert(getColumnNames(s.tableInfo.Core, []int{1, -1, 0, -1}), DeepEquals, []string{"c", "a"})
c.Assert(getColumnNames(s.tableInfo.Core, []int{-1, 0, -1, -1}), DeepEquals, []string{"b"})
}

func (s *tableRestoreSuite) TestInitializeColumns(c *C) {
ccp := &ChunkCheckpoint{}
c.Assert(s.tr.initializeColumns(nil, ccp), IsNil)
Expand Down
3 changes: 3 additions & 0 deletions tests/column_permutation/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[mydumper]
strict-format = true
max-region-size = 200
1 change: 1 addition & 0 deletions tests/column_permutation/data/perm-schema-create.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
CREATE DATABASE `perm` IF NOT EXISTS;
22 changes: 22 additions & 0 deletions tests/column_permutation/data/perm.test_perm-schema.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
CREATE TABLE `test` (
`id` int(11) NOT NULL,
`contract_no` varchar(64) DEFAULT NULL,
`fund_seq_no` varchar(64) DEFAULT NULL,
`term_no` int(11) DEFAULT NULL,
`contract_type` varchar(8) DEFAULT NULL,
`internal_transfer_tag` varchar(8) DEFAULT NULL,
`prin_amt` int(11) DEFAULT NULL,
`start_date` varchar(8) DEFAULT NULL,
`end_date` varchar(8) DEFAULT NULL,
`batch_date` varchar(8) DEFAULT NULL,
`crt_time` timestamp DEFAULT CURRENT_TIMESTAMP,
`region_code` varchar(8) DEFAULT NULL,
`credit_code` varchar(64) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin
PARTITION BY RANGE COLUMNS(batch_date) (
PARTITION `P20200224` VALUES LESS THAN ("2020-02-05 00:00:00"),
PARTITION `P20200324` VALUES LESS THAN ("2020-03-05 00:00:00"),
PARTITION `P20200424` VALUES LESS THAN ("2020-04-05 00:00:00"),
PARTITION `P20200524` VALUES LESS THAN ("2020-05-05 00:00:00"),
PARTITION `P_MAXVALUE` VALUES LESS THAN MAXVALUE
);
6 changes: 6 additions & 0 deletions tests/column_permutation/data/perm.test_perm.0.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
contract_no,fund_seq_no,term_no,contract_type,internal_transfer_tag,prin_amt,start_date,end_date,region_code,credit_code
2020061000019011020164030595,202006100001901102016403059520200629,1,01,N,356,20200210,20200720,000000,
2020061000019011020164030596,202006100001901102016403059520200628,1,01,N,3561,20200310,20200720,000001,
2020061000019011020164030597,202006100001901102016403059520200627,1,01,N,3562,20200410,20200720,000002,33
2020061000019011020164030598,108319xx0185-202006100001901102016403059520200626,12,02,Y,26368,20200510,20200620,000003,
2020061000019011020164030599,202006100001901102016403059520200625,1,01,N,3960,20200610,20200720,000005,999
18 changes: 18 additions & 0 deletions tests/column_permutation/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
set -eu

for BACKEND in local importer tidb; do
if [ "$BACKEND" = 'local' ]; then
check_cluster_version 4 0 0 'local backend' || continue
fi
run_sql 'DROP DATABASE IF EXISTS perm'

run_lightning --backend $BACKEND

run_sql 'select count(*) from perm.test_perm;'
check_contains "count(*): 5"

run_sql "SELECT fund_seq_no, region_code, credit_code FROM perm.test_perm WHERE contract_no = '2020061000019011020164030597';"
check_contains "fund_seq_no: 202006100001901102016403059520200627"
check_contains "region_code: 000002"
check_contains "credit_code: 33"
done

0 comments on commit eb609ec

Please sign in to comment.