Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sink(ticdc): convert values of pre-columns properly (#8421) #8438

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions cdc/sinkv2/eventsink/txn/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,15 +260,15 @@ func convert2RowChanges(
return res
}

func convertBinaryToString(row *model.RowChangedEvent) {
for i, col := range row.Columns {
func convertBinaryToString(cols []*model.Column) {
for i, col := range cols {
if col == nil {
continue
}
if col.Charset != "" && col.Charset != charset.CharsetBin {
colValBytes, ok := col.Value.([]byte)
if ok {
row.Columns[i].Value = string(colValBytes)
cols[i].Value = string(colValBytes)
}
}
}
Expand All @@ -289,7 +289,8 @@ func (s *mysqlBackend) groupRowsByType(
deleteRow := make([]*sqlmodel.RowChange, 0, preAllocateSize)

for _, row := range event.Event.Rows {
convertBinaryToString(row)
convertBinaryToString(row.Columns)
convertBinaryToString(row.PreColumns)

if row.IsInsert() {
insertRow = append(
Expand Down
129 changes: 79 additions & 50 deletions cdc/sinkv2/eventsink/txn/mysql/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/charset"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tiflow/cdc/contextutil"
"github.com/pingcap/tiflow/cdc/model"
Expand Down Expand Up @@ -1222,10 +1223,12 @@ func TestPrepareBatchDMLs(t *testing.T) {
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag,
Value: 1,
}, {
Name: "a3",
Type: mysql.TypeLong,
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag,
Value: 1,
Name: "a3",
Type: mysql.TypeVarchar,
Charset: charset.CharsetGBK,
Flag: model.BinaryFlag | model.MultipleKeyFlag |
model.HandleKeyFlag | model.UniqueKeyFlag,
Value: []byte("你好"),
}},
IndexColumns: [][]int{{1, 2}},
},
Expand All @@ -1239,18 +1242,20 @@ func TestPrepareBatchDMLs(t *testing.T) {
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag,
Value: 2,
}, {
Name: "a3",
Type: mysql.TypeLong,
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag,
Value: 2,
Name: "a3",
Type: mysql.TypeVarchar,
Charset: charset.CharsetGBK,
Flag: model.BinaryFlag | model.MultipleKeyFlag |
model.HandleKeyFlag | model.UniqueKeyFlag,
Value: []byte("世界"),
}},
IndexColumns: [][]int{{1, 2}},
},
},
expected: &preparedDMLs{
startTs: []model.Ts{418658114257813514},
sqls: []string{"DELETE FROM `common_1`.`uk_without_pk` WHERE (`a1`,`a3`) IN ((?,?),(?,?))"},
values: [][]interface{}{{1, 1, 2, 2}},
values: [][]interface{}{{1, "你好", 2, "世界"}},
rowCount: 2,
},
},
Expand All @@ -1266,10 +1271,11 @@ func TestPrepareBatchDMLs(t *testing.T) {
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag,
Value: 1,
}, {
Name: "a3",
Type: mysql.TypeLong,
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag,
Value: 1,
Name: "a3",
Type: mysql.TypeVarchar,
Charset: charset.CharsetGBK,
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag,
Value: "你好",
}},
IndexColumns: [][]int{{1, 1}},
},
Expand All @@ -1283,18 +1289,20 @@ func TestPrepareBatchDMLs(t *testing.T) {
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.HandleKeyFlag,
Value: 2,
}, {
Name: "a3",
Type: mysql.TypeLong,
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.HandleKeyFlag,
Value: 2,
Name: "a3",
Type: mysql.TypeVarchar,
Charset: charset.CharsetGBK,
Flag: model.BinaryFlag | model.MultipleKeyFlag |
model.HandleKeyFlag | model.HandleKeyFlag,
Value: "世界",
}},
IndexColumns: [][]int{{2, 2}},
},
},
expected: &preparedDMLs{
startTs: []model.Ts{418658114257813516},
sqls: []string{"INSERT INTO `common_1`.`uk_without_pk` (`a1`,`a3`) VALUES (?,?),(?,?)"},
values: [][]interface{}{{1, 1, 2, 2}},
values: [][]interface{}{{1, "你好", 2, "世界"}},
rowCount: 2,
},
},
Expand All @@ -1311,21 +1319,25 @@ func TestPrepareBatchDMLs(t *testing.T) {
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag,
Value: 1,
}, {
Name: "a3",
Type: mysql.TypeLong,
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag,
Value: 1,
Name: "a3",
Type: mysql.TypeVarchar,
Charset: charset.CharsetGBK,
Flag: model.BinaryFlag | model.MultipleKeyFlag |
model.HandleKeyFlag | model.UniqueKeyFlag,
Value: []byte("开发"),
}},
Columns: []*model.Column{nil, {
Name: "a1",
Type: mysql.TypeLong,
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag,
Value: 2,
}, {
Name: "a3",
Type: mysql.TypeLong,
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag,
Value: 2,
Name: "a3",
Type: mysql.TypeVarchar,
Charset: charset.CharsetGBK,
Flag: model.BinaryFlag | model.MultipleKeyFlag |
model.HandleKeyFlag | model.UniqueKeyFlag,
Value: []byte("测试"),
}},
IndexColumns: [][]int{{1, 2}},
},
Expand All @@ -1339,29 +1351,40 @@ func TestPrepareBatchDMLs(t *testing.T) {
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag,
Value: 3,
}, {
Name: "a3",
Type: mysql.TypeLong,
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag,
Value: 3,
Name: "a3",
Type: mysql.TypeVarchar,
Charset: charset.CharsetGBK,
Flag: model.BinaryFlag | model.MultipleKeyFlag |
model.HandleKeyFlag | model.UniqueKeyFlag,
Value: []byte("纽约"),
}},
Columns: []*model.Column{nil, {
Name: "a1",
Type: mysql.TypeLong,
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag,
Value: 4,
}, {
Name: "a3",
Type: mysql.TypeLong,
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag,
Value: 4,
Name: "a3",
Type: mysql.TypeVarchar,
Charset: charset.CharsetGBK,
Flag: model.BinaryFlag | model.MultipleKeyFlag |
model.HandleKeyFlag | model.UniqueKeyFlag,
Value: []byte("北京"),
}},
IndexColumns: [][]int{{1, 2}},
},
},
expected: &preparedDMLs{
startTs: []model.Ts{418658114257813516},
sqls: []string{"UPDATE `common_1`.`uk_without_pk` SET `a1`=CASE WHEN ROW(`a1`,`a3`)=ROW(?,?) THEN ? WHEN ROW(`a1`,`a3`)=ROW(?,?) THEN ? END, `a3`=CASE WHEN ROW(`a1`,`a3`)=ROW(?,?) THEN ? WHEN ROW(`a1`,`a3`)=ROW(?,?) THEN ? END WHERE ROW(`a1`,`a3`) IN (ROW(?,?),ROW(?,?))"},
values: [][]interface{}{{1, 1, 2, 3, 3, 4, 1, 1, 2, 3, 3, 4, 1, 1, 3, 3}},
startTs: []model.Ts{418658114257813516},
sqls: []string{"UPDATE `common_1`.`uk_without_pk` SET `a1`=CASE " +
"WHEN ROW(`a1`,`a3`)=ROW(?,?) THEN ? WHEN ROW(`a1`,`a3`)=ROW(?,?) " +
"THEN ? END, `a3`=CASE WHEN ROW(`a1`,`a3`)=ROW(?,?) " +
"THEN ? WHEN ROW(`a1`,`a3`)=ROW(?,?) THEN ? END WHERE " +
"ROW(`a1`,`a3`) IN (ROW(?,?),ROW(?,?))"},
values: [][]interface{}{{
1, "开发", 2, 3, "纽约", 4, 1, "开发", "测试", 3,
"纽约", "北京", 1, "开发", 3, "纽约",
}},
rowCount: 2,
},
},
Expand All @@ -1378,10 +1401,12 @@ func TestPrepareBatchDMLs(t *testing.T) {
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag,
Value: 2,
}, {
Name: "a3",
Type: mysql.TypeLong,
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag,
Value: 2,
Name: "a3",
Type: mysql.TypeVarchar,
Charset: charset.CharsetGBK,
Flag: model.BinaryFlag | model.MultipleKeyFlag |
model.HandleKeyFlag | model.UniqueKeyFlag,
Value: []byte("你好"),
}},

IndexColumns: [][]int{{1, 2}},
Expand All @@ -1396,10 +1421,12 @@ func TestPrepareBatchDMLs(t *testing.T) {
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag,
Value: 1,
}, {
Name: "a3",
Type: mysql.TypeLong,
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag,
Value: 1,
Name: "a3",
Type: mysql.TypeVarchar,
Charset: charset.CharsetGBK,
Flag: model.BinaryFlag | model.MultipleKeyFlag |
model.HandleKeyFlag | model.UniqueKeyFlag,
Value: []byte("世界"),
}},
IndexColumns: [][]int{{1, 2}},
},
Expand All @@ -1413,10 +1440,12 @@ func TestPrepareBatchDMLs(t *testing.T) {
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag,
Value: 2,
}, {
Name: "a3",
Type: mysql.TypeLong,
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag,
Value: 2,
Name: "a3",
Type: mysql.TypeVarchar,
Charset: charset.CharsetGBK,
Flag: model.BinaryFlag | model.MultipleKeyFlag |
model.HandleKeyFlag | model.UniqueKeyFlag,
Value: "你好",
}},
IndexColumns: [][]int{{1, 2}},
},
Expand All @@ -1427,7 +1456,7 @@ func TestPrepareBatchDMLs(t *testing.T) {
"DELETE FROM `common_1`.`uk_without_pk` WHERE (`a1`,`a3`) IN ((?,?),(?,?))",
"INSERT INTO `common_1`.`uk_without_pk` (`a1`,`a3`) VALUES (?,?)",
},
values: [][]interface{}{{1, 1, 2, 2}, {2, 2}},
values: [][]interface{}{{1, "世界", 2, "你好"}, {2, "你好"}},
rowCount: 3,
},
},
Expand Down
38 changes: 32 additions & 6 deletions tests/integration_tests/charset_gbk/data/test.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ CREATE DATABASE `charset_gbk_test0` CHARACTER SET utf8mb4;

USE `charset_gbk_test0`;

/* this is a test for columns which charset is gbk*/
/* this is a test for columns which charset is gbk, with pk*/
CREATE TABLE t0 (
id INT,
name varchar(128) CHARACTER SET gbk,
Expand All @@ -30,8 +30,34 @@ WHERE name = '测试';
DELETE FROM t0
WHERE name = '部署';

/* this is a test for table which charset is gbk*/
/* this is a test for table which charset is gbk, without pk but with uk */
CREATE TABLE t1 (
id INT NOT NULL,
name varchar(128) CHARACTER SET gbk NOT NULL,
country char(32) CHARACTER SET gbk,
city varchar(64),
description text CHARACTER SET gbk,
image tinyblob,
UNIQUE KEY (id, name)
) ENGINE = InnoDB CHARSET = utf8mb4;

INSERT INTO t1
VALUES (1, '测试', "中国", "上海", "你好,世界"
, 0xC4E3BAC3CAC0BDE7);

INSERT INTO t1
VALUES (2, '部署', "美国", "纽约", "世界,你好"
, 0xCAC0BDE7C4E3BAC3);

UPDATE t1
SET name = '开发'
WHERE name = '测试';

DELETE FROM t1
WHERE name = '部署';

/* this is a test for table which charset is gbk*/
CREATE TABLE t2 (
id INT,
name varchar(128),
country char(32),
Expand All @@ -41,19 +67,19 @@ CREATE TABLE t1 (
PRIMARY KEY (id)
) ENGINE = InnoDB CHARSET = gbk;

INSERT INTO t1
INSERT INTO t2
VALUES (1, '测试', "中国", "上海", "你好,世界"
, 0xC4E3BAC3CAC0BDE7);

INSERT INTO t1
INSERT INTO t2
VALUES (2, '部署', "美国", "纽约", "世界,你好"
, 0xCAC0BDE7C4E3BAC3);

UPDATE t1
UPDATE t2
SET name = '开发'
WHERE name = '测试';

DELETE FROM t1
DELETE FROM t2
WHERE name = '部署';

/* this is a test for db which charset is gbk*/
Expand Down