Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VReplication: Handle SQL NULL and JSON 'null' correctly for JSON columns #13944

Merged
merged 16 commits into from
Sep 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions go/mysql/binlog/binlog_json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ func TestMarshalJSONToSQL(t *testing.T) {
{
name: "null",
data: []byte{},
expected: "CAST(null as JSON)",
expected: "CAST(_utf8mb4'null' as JSON)",
},
{
name: `object {"a": "b"}`,
Expand Down Expand Up @@ -330,17 +330,17 @@ func TestMarshalJSONToSQL(t *testing.T) {
{
name: `true`,
data: []byte{4, 1},
expected: `CAST(true as JSON)`,
expected: `CAST(_utf8mb4'true' as JSON)`,
},
{
name: `false`,
data: []byte{4, 2},
expected: `CAST(false as JSON)`,
expected: `CAST(_utf8mb4'false' as JSON)`,
},
{
name: `null`,
data: []byte{4, 0},
expected: `CAST(null as JSON)`,
expected: `CAST(_utf8mb4'null' as JSON)`,
},
{
name: `-1`,
Expand Down
8 changes: 4 additions & 4 deletions go/mysql/json/marshal.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,24 +137,24 @@ func (v *Value) marshalSQLInternal(top bool, dst []byte) []byte {
return dst
case TypeBoolean:
if top {
dst = append(dst, "CAST("...)
dst = append(dst, "CAST(_utf8mb4'"...)
}
if v == ValueTrue {
dst = append(dst, "true"...)
} else {
dst = append(dst, "false"...)
}
if top {
dst = append(dst, " as JSON)"...)
dst = append(dst, "' as JSON)"...)
}
return dst
case TypeNull:
if top {
dst = append(dst, "CAST("...)
dst = append(dst, "CAST(_utf8mb4'"...)
}
dst = append(dst, "null"...)
if top {
dst = append(dst, " as JSON)"...)
dst = append(dst, "' as JSON)"...)
}
return dst
default:
Expand Down
2 changes: 1 addition & 1 deletion go/test/endtoend/vreplication/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ create table _vt_PURGE_4f9194b43b2011eb8a0104ed332e05c2_20221210194431(id int, v
create table db_order_test (c_uuid varchar(64) not null default '', created_at datetime not null, dstuff varchar(128), dtstuff text, dbstuff blob, cstuff char(32), primary key (c_uuid,created_at), key (dstuff)) CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
create table vdiff_order (order_id varchar(50) collate utf8mb4_unicode_ci not null, primary key (order_id), key (order_id)) charset=utf8mb4 COLLATE=utf8mb4_unicode_ci;
create table datze (id int, dt1 datetime not null default current_timestamp, dt2 datetime not null, ts1 timestamp default current_timestamp, primary key (id), key (dt1));
create table json_tbl (id int, j1 json, j2 json, primary key(id));
create table json_tbl (id int, j1 json, j2 json, j3 json not null, primary key(id));
create table geom_tbl (id int, g geometry, p point, ls linestring, pg polygon, mp multipoint, mls multilinestring, mpg multipolygon, gc geometrycollection, primary key(id));
create table ` + "`blüb_tbl`" + ` (id int, val1 varchar(20), ` + "`blöb1`" + ` blob, val2 varbinary(20), ` + "`bl@b2`" + ` longblob, txt1 text, blb3 tinyblob, txt2 longtext, blb4 mediumblob, primary key(id));
create table reftable (id int, val1 varchar(20), primary key(id), key(val1));
Expand Down
14 changes: 9 additions & 5 deletions go/test/endtoend/vreplication/initial_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,16 @@ const NumJSONRows = 100

func insertJSONValues(t *testing.T) {
// insert null value combinations
execVtgateQuery(t, vtgateConn, "product:0", "insert into json_tbl(id) values(1)")
execVtgateQuery(t, vtgateConn, "product:0", "insert into json_tbl(id, j1) values(2, \"{}\")")
execVtgateQuery(t, vtgateConn, "product:0", "insert into json_tbl(id, j2) values(3, \"{}\")")
execVtgateQuery(t, vtgateConn, "product:0", "insert into json_tbl(id, j3) values(1, \"{}\")")
execVtgateQuery(t, vtgateConn, "product:0", "insert into json_tbl(id, j1, j3) values(2, \"{}\", \"{}\")")
execVtgateQuery(t, vtgateConn, "product:0", "insert into json_tbl(id, j2, j3) values(3, \"{}\", \"{}\")")
execVtgateQuery(t, vtgateConn, "product:0", "insert into json_tbl(id, j1, j2, j3) values(4, NULL, 'null', '\"null\"')")
execVtgateQuery(t, vtgateConn, "product:0", "insert into json_tbl(id, j3) values(5, JSON_QUOTE('null'))")
execVtgateQuery(t, vtgateConn, "product:0", "insert into json_tbl(id, j3) values(6, '{}')")

id := 4
q := "insert into json_tbl(id, j1, j2) values(%d, '%s', '%s')"
id := 8 // 6 inserted above and one after copy phase is done

q := "insert into json_tbl(id, j1, j2, j3) values(%d, '%s', '%s', '{}')"
numJsonValues := len(jsonValues)
for id <= NumJSONRows {
id++
Expand Down
2 changes: 2 additions & 0 deletions go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -748,6 +748,8 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl
dec80Replicated := false
execVtgateQuery(t, vtgateConn, sourceKs, "update customer set dec80 = 0")
execVtgateQuery(t, vtgateConn, sourceKs, "update customer set blb = \"new blob data\" where cid=3")
execVtgateQuery(t, vtgateConn, sourceKs, "update json_tbl set j1 = null, j2 = 'null', j3 = '\"null\"'")
execVtgateQuery(t, vtgateConn, sourceKs, "insert into json_tbl(id, j1, j2, j3) values (7, null, 'null', '\"null\"')")
waitForNoWorkflowLag(t, vc, targetKs, workflow)
for _, shard := range []string{"-80", "80-"} {
shardTarget := fmt.Sprintf("%s:%s", targetKs, shard)
Expand Down
17 changes: 9 additions & 8 deletions go/vt/sqlparser/parsed_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,15 +129,16 @@ func (pq *ParsedQuery) AppendFromRow(buf *bytes2.Buffer, fields []*querypb.Field
case querypb.Type_TUPLE:
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unexpected Type_TUPLE for value %d", i)
case querypb.Type_JSON:
buf2 := sqltypes.NullBytes
if col.length >= 0 {
buf2 = row.Values[col.offset : col.offset+col.length]
}
vv, err := vjson.MarshalSQLValue(buf2)
if err != nil {
return err
if col.length < 0 { // An SQL NULL and not an actual JSON value
buf.WriteString(sqltypes.NullStr)
} else { // A JSON value (which may be a JSON null literal value)
buf2 := row.Values[col.offset : col.offset+col.length]
vv, err := vjson.MarshalSQLValue(buf2)
if err != nil {
return err
}
buf.WriteString(vv.RawStr())
}
buf.WriteString(vv.RawStr())
default:
if col.length < 0 {
// -1 means a null variable; serialize it directly
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,7 @@ func customExpectData(t *testing.T, table string, values [][]string, exec func(c
if err == nil {
return
}
log.Errorf("data mismatch: %v, retrying", err)
time.Sleep(tick)
}
}
Expand All @@ -730,7 +731,7 @@ func compareQueryResults(t *testing.T, query string, values [][]string,
}
for j, val := range row {
if got := qr.Rows[i][j].ToString(); got != val {
return fmt.Errorf("mismatch at (%d, %d): %v, want %s", i, j, qr.Rows[i][j], val)
return fmt.Errorf("mismatch at (%d, %d): got '%s', want '%s'", i, j, qr.Rows[i][j].ToString(), val)
}
}
}
Expand Down
10 changes: 7 additions & 3 deletions go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,9 +378,13 @@ func (tp *TablePlan) applyChange(rowChange *binlogdatapb.RowChange, executor fun
var newVal *sqltypes.Value
var err error
if field.Type == querypb.Type_JSON {
newVal, err = vjson.MarshalSQLValue(vals[i].Raw())
if err != nil {
return nil, err
if vals[i].IsNull() { // An SQL NULL and not an actual JSON value
newVal = &sqltypes.NULL
} else { // A JSON value (which may be a JSON null literal value)
newVal, err = vjson.MarshalSQLValue(vals[i].Raw())
if err != nil {
return nil, err
}
}
bindVar, err = tp.bindFieldVal(field, newVal)
} else {
Expand Down
11 changes: 7 additions & 4 deletions go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,7 @@ func testPlayerCopyTables(t *testing.T) {

execStatements(t, []string{
"create table src1(id int, val varbinary(128), d decimal(8,0), j json, primary key(id))",
"insert into src1 values(2, 'bbb', 1, '{\"foo\": \"bar\"}'), (1, 'aaa', 0, JSON_ARRAY(123456789012345678901234567890, \"abcd\"))",
"insert into src1 values(2, 'bbb', 1, '{\"foo\": \"bar\"}'), (1, 'aaa', 0, JSON_ARRAY(123456789012345678901234567890, \"abcd\")), (3, 'ccc', 2, 'null'), (4, 'ddd', 3, '{\"name\": \"matt\", \"size\": null}'), (5, 'eee', 4, null)",
fmt.Sprintf("create table %s.dst1(id int, val varbinary(128), val2 varbinary(128), d decimal(8,0), j json, primary key(id))", vrepldb),
"create table yes(id int, val varbinary(128), primary key(id))",
fmt.Sprintf("create table %s.yes(id int, val varbinary(128), primary key(id))", vrepldb),
Expand Down Expand Up @@ -617,8 +617,8 @@ func testPlayerCopyTables(t *testing.T) {
// The first fast-forward has no starting point. So, it just saves the current position.
"/update _vt.vreplication set pos=",
"begin",
"insert into dst1(id,val,val2,d,j) values (1,'aaa','aaa',0,JSON_ARRAY(123456789012345678901234567890, _utf8mb4'abcd')), (2,'bbb','bbb',1,JSON_OBJECT(_utf8mb4'foo', _utf8mb4'bar'))",
`/insert into _vt.copy_state \(lastpk, vrepl_id, table_name\) values \('fields:{name:\\"id\\" type:INT32 charset:63 flags:53251} rows:{lengths:1 values:\\"2\\"}'.*`,
"insert into dst1(id,val,val2,d,j) values (1,'aaa','aaa',0,JSON_ARRAY(123456789012345678901234567890, _utf8mb4'abcd')), (2,'bbb','bbb',1,JSON_OBJECT(_utf8mb4'foo', _utf8mb4'bar')), (3,'ccc','ccc',2,CAST(_utf8mb4'null' as JSON)), (4,'ddd','ddd',3,JSON_OBJECT(_utf8mb4'name', _utf8mb4'matt', _utf8mb4'size', null)), (5,'eee','eee',4,null)",
`/insert into _vt.copy_state \(lastpk, vrepl_id, table_name\) values \('fields:{name:\\"id\\" type:INT32 charset:63 flags:53251} rows:{lengths:1 values:\\"5\\"}'.*`,
"commit",
// copy of dst1 is done: delete from copy_state.
"/delete cs, pca from _vt.copy_state as cs left join _vt.post_copy_action as pca on cs.vrepl_id=pca.vrepl_id and cs.table_name=pca.table_name.*dst1",
Expand All @@ -634,9 +634,12 @@ func testPlayerCopyTables(t *testing.T) {
expectData(t, "dst1", [][]string{
{"1", "aaa", "aaa", "0", "[123456789012345678901234567890, \"abcd\"]"},
{"2", "bbb", "bbb", "1", "{\"foo\": \"bar\"}"},
{"3", "ccc", "ccc", "2", "null"},
{"4", "ddd", "ddd", "3", "{\"name\": \"matt\", \"size\": null}"},
{"5", "eee", "eee", "4", ""},
})
expectData(t, "yes", [][]string{})
validateCopyRowCountStat(t, 2)
validateCopyRowCountStat(t, 5)
ctx, cancel := context.WithCancel(context.Background())

type logTestCase struct {
Expand Down
13 changes: 11 additions & 2 deletions go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1641,17 +1641,26 @@ func TestPlayerTypes(t *testing.T) {
},
}, {
input: "insert into vitess_json(val1,val2,val3,val4,val5) values (null,'{}','123','{\"a\":[42,100]}','{\"foo\": \"bar\"}')",
output: "insert into vitess_json(id,val1,val2,val3,val4,val5) values (1,CAST(null as JSON),JSON_OBJECT(),CAST(123 as JSON),JSON_OBJECT(_utf8mb4'a', JSON_ARRAY(42, 100)),JSON_OBJECT(_utf8mb4'foo', _utf8mb4'bar'))",
output: "insert into vitess_json(id,val1,val2,val3,val4,val5) values (1,null,JSON_OBJECT(),CAST(123 as JSON),JSON_OBJECT(_utf8mb4'a', JSON_ARRAY(42, 100)),JSON_OBJECT(_utf8mb4'foo', _utf8mb4'bar'))",
table: "vitess_json",
data: [][]string{
{"1", "", "{}", "123", `{"a": [42, 100]}`, `{"foo": "bar"}`},
},
}, {
input: "update vitess_json set val1 = '{\"bar\": \"foo\"}', val4 = '{\"a\": [98, 123]}', val5 = convert(x'7b7d' using utf8mb4)",
input: "insert into vitess_json(val1,val2,val3,val4,val5) values ('null', '{\"name\":null}','123','{\"a\":[42,100]}','{\"foo\": \"bar\"}')",
output: "insert into vitess_json(id,val1,val2,val3,val4,val5) values (2,CAST(_utf8mb4'null' as JSON),JSON_OBJECT(_utf8mb4'name', null),CAST(123 as JSON),JSON_OBJECT(_utf8mb4'a', JSON_ARRAY(42, 100)),JSON_OBJECT(_utf8mb4'foo', _utf8mb4'bar'))",
table: "vitess_json",
data: [][]string{
{"1", "", "{}", "123", `{"a": [42, 100]}`, `{"foo": "bar"}`},
{"2", "null", `{"name": null}`, "123", `{"a": [42, 100]}`, `{"foo": "bar"}`},
},
}, {
input: "update vitess_json set val1 = '{\"bar\": \"foo\"}', val4 = '{\"a\": [98, 123]}', val5 = convert(x'7b7d' using utf8mb4) where id=1",
output: "update vitess_json set val1=JSON_OBJECT(_utf8mb4'bar', _utf8mb4'foo'), val2=JSON_OBJECT(), val3=CAST(123 as JSON), val4=JSON_OBJECT(_utf8mb4'a', JSON_ARRAY(98, 123)), val5=JSON_OBJECT() where id=1",
table: "vitess_json",
data: [][]string{
{"1", `{"bar": "foo"}`, "{}", "123", `{"a": [98, 123]}`, `{}`},
{"2", "null", `{"name": null}`, "123", `{"a": [42, 100]}`, `{"foo": "bar"}`},
},
}}

Expand Down