Skip to content

Commit

Permalink
bugfix: change column name and type to json (#14093)
Browse files Browse the repository at this point in the history
Signed-off-by: Olga Shestopalova <oshestopalova@hubspot.com>
Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com>
Co-authored-by: Olga Shestopalova <oshestopalova@hubspot.com>
Co-authored-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com>
  • Loading branch information
3 people committed Sep 28, 2023
1 parent 00d2521 commit 2eda916
Show file tree
Hide file tree
Showing 8 changed files with 137 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
id, c1j
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
change column c1 c1j json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
id, c1
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
drop table if exists onlineddl_test;
create table onlineddl_test (
id int auto_increment,
c1 int not null,
primary key (id)
) auto_increment=1;

insert into onlineddl_test values (1, 11);
insert into onlineddl_test values (2, 13);

drop event if exists onlineddl_test;
delimiter ;;
create event onlineddl_test
on schedule every 1 second
starts current_timestamp
ends current_timestamp + interval 60 second
on completion not preserve
enable
do
begin
insert into onlineddl_test values (null, 17);
end ;;
49 changes: 49 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/replicator_plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,55 @@ func TestBuildPlayerPlan(t *testing.T) {
},
},
},
}, {
input: &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "t1",
Filter: "select c1, convert(c using utf8mb4) as c2 from t1",
}},
},
plan: &TestReplicatorPlan{
VStreamFilter: &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "t1",
Filter: "select c1, convert(c using utf8mb4) as c2 from t1",
}},
},
TargetTables: []string{"t1"},
TablePlans: map[string]*TestTablePlan{
"t1": {
TargetName: "t1",
SendRule: "t1",
PKReferences: []string{"c1"},
InsertFront: "insert into t1(c1,c2)",
InsertValues: "(:a_c1,convert(:a_c using utf8mb4))",
Insert: "insert into t1(c1,c2) values (:a_c1,convert(:a_c using utf8mb4))",
Update: "update t1 set c2=convert(:a_c using utf8mb4) where c1=:b_c1",
Delete: "delete from t1 where c1=:b_c1",
},
},
},
planpk: &TestReplicatorPlan{
VStreamFilter: &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "t1",
Filter: "select c1, convert(c using utf8mb4) as c2, pk1, pk2 from t1",
}},
},
TargetTables: []string{"t1"},
TablePlans: map[string]*TestTablePlan{
"t1": {
TargetName: "t1",
SendRule: "t1",
PKReferences: []string{"c1", "pk1", "pk2"},
InsertFront: "insert into t1(c1,c2)",
InsertValues: "(:a_c1,convert(:a_c using utf8mb4))",
Insert: "insert into t1(c1,c2) select :a_c1, convert(:a_c using utf8mb4) from dual where (:a_pk1,:a_pk2) <= (1,'aaa')",
Update: "update t1 set c2=convert(:a_c using utf8mb4) where c1=:b_c1 and (:b_pk1,:b_pk2) <= (1,'aaa')",
Delete: "delete from t1 where c1=:b_c1 and (:b_pk1,:b_pk2) <= (1,'aaa')",
},
},
},
}, {
// Keywords as names.
input: &binlogdatapb.Filter{
Expand Down
21 changes: 20 additions & 1 deletion go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,9 +420,28 @@ func (tpb *tablePlanBuilder) analyzeExpr(selExpr sqlparser.SelectExpr) (*colExpr
references: make(map[string]bool),
}
if expr, ok := aliased.Expr.(*sqlparser.ConvertUsingExpr); ok {
// Here we find the actual column name in the convert, in case
// this is a column rename and the AS is the new column.
// For example, in convert(c1 using utf8mb4) as c2, we want to find
// c1, because c1 exists in the current table whereas c2 is the renamed column
// in the desired table.
var colName sqlparser.IdentifierCI
err := sqlparser.Walk(func(node sqlparser.SQLNode) (kontinue bool, err error) {
switch node := node.(type) {
case *sqlparser.ColName:
if !node.Qualifier.IsEmpty() {
return false, fmt.Errorf("unsupported qualifier for column: %v", sqlparser.String(node))
}
colName = node.Name
}
return true, nil
}, aliased.Expr)
if err != nil {
return nil, fmt.Errorf("failed to find column name for convert using expression: %v, %v", sqlparser.String(aliased.Expr), err)
}
selExpr := &sqlparser.ConvertUsingExpr{
Type: "utf8mb4",
Expr: &sqlparser.ColName{Name: as},
Expr: &sqlparser.ColName{Name: colName},
}
cexpr.expr = expr
cexpr.operation = opExpr
Expand Down
21 changes: 20 additions & 1 deletion go/vt/vttablet/tabletserver/vstreamer/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -698,7 +698,26 @@ func (plan *Plan) analyzeExpr(vschema *localVSchema, selExpr sqlparser.SelectExp
FixedValue: sqltypes.NewInt64(num),
}, nil
case *sqlparser.ConvertUsingExpr:
colnum, err := findColumn(plan.Table, aliased.As)
// Here we find the actual column name in the convert, in case
// this is a column rename and the AS is the new column.
// For example, in convert(c1 using utf8mb4) as c2, we want to find
// c1, because c1 exists in the current table whereas c2 is the renamed column
// in the desired table.
var colName sqlparser.IdentifierCI
err := sqlparser.Walk(func(node sqlparser.SQLNode) (kontinue bool, err error) {
switch node := node.(type) {
case *sqlparser.ColName:
if !node.Qualifier.IsEmpty() {
return false, fmt.Errorf("unsupported qualifier for column: %v", sqlparser.String(node))
}
colName = node.Name
}
return true, nil
}, aliased.Expr)
if err != nil {
return ColExpr{}, fmt.Errorf("failed to find column name for convert using expression: %v, %v", sqlparser.String(aliased.Expr), err)
}
colnum, err := findColumn(plan.Table, colName)
if err != nil {
return ColExpr{}, err
}
Expand Down
23 changes: 23 additions & 0 deletions go/vt/vttablet/tabletserver/vstreamer/planbuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,29 @@ func TestPlanBuilder(t *testing.T) {
KeyRange: nil,
}},
},
}, {
inTable: t1,
inRule: &binlogdatapb.Rule{Match: "t1", Filter: "select convert(val using utf8mb4) as val2, id as id from t1"},
outPlan: &Plan{
ColExprs: []ColExpr{{
ColNum: 1,
Field: &querypb.Field{
Name: "val",
Type: sqltypes.VarBinary,
Charset: collations.CollationBinaryID,
Flags: uint32(querypb.MySqlFlag_BINARY_FLAG),
},
}, {
ColNum: 0,
Field: &querypb.Field{
Name: "id",
Type: sqltypes.Int64,
Charset: collations.CollationBinaryID,
Flags: uint32(querypb.MySqlFlag_NUM_FLAG),
},
}},
convertUsingUTF8Columns: map[string]bool{"val": true},
},
}, {
inTable: regional,
inRule: &binlogdatapb.Rule{Match: "regional", Filter: "select id, keyspace_id() from regional"},
Expand Down

0 comments on commit 2eda916

Please sign in to comment.