diff --git a/go/test/endtoend/onlineddl/vrepl_suite/testdata/rename-retype-json/after_columns b/go/test/endtoend/onlineddl/vrepl_suite/testdata/rename-retype-json/after_columns new file mode 100644 index 00000000000..99f86097862 --- /dev/null +++ b/go/test/endtoend/onlineddl/vrepl_suite/testdata/rename-retype-json/after_columns @@ -0,0 +1 @@ +id, c1j diff --git a/go/test/endtoend/onlineddl/vrepl_suite/testdata/rename-retype-json/alter b/go/test/endtoend/onlineddl/vrepl_suite/testdata/rename-retype-json/alter new file mode 100644 index 00000000000..f2e64ff0894 --- /dev/null +++ b/go/test/endtoend/onlineddl/vrepl_suite/testdata/rename-retype-json/alter @@ -0,0 +1 @@ +change column c1 c1j json diff --git a/go/test/endtoend/onlineddl/vrepl_suite/testdata/rename-retype-json/before_columns b/go/test/endtoend/onlineddl/vrepl_suite/testdata/rename-retype-json/before_columns new file mode 100644 index 00000000000..b791aa0d27a --- /dev/null +++ b/go/test/endtoend/onlineddl/vrepl_suite/testdata/rename-retype-json/before_columns @@ -0,0 +1 @@ +id, c1 diff --git a/go/test/endtoend/onlineddl/vrepl_suite/testdata/rename-retype-json/create.sql b/go/test/endtoend/onlineddl/vrepl_suite/testdata/rename-retype-json/create.sql new file mode 100644 index 00000000000..5280498e9fd --- /dev/null +++ b/go/test/endtoend/onlineddl/vrepl_suite/testdata/rename-retype-json/create.sql @@ -0,0 +1,22 @@ +drop table if exists onlineddl_test; +create table onlineddl_test ( + id int auto_increment, + c1 int not null, + primary key (id) +) auto_increment=1; + +insert into onlineddl_test values (1, 11); +insert into onlineddl_test values (2, 13); + +drop event if exists onlineddl_test; +delimiter ;; +create event onlineddl_test + on schedule every 1 second + starts current_timestamp + ends current_timestamp + interval 60 second + on completion not preserve + enable + do +begin + insert into onlineddl_test values (null, 17); +end ;; diff --git a/go/vt/vttablet/tabletmanager/vreplication/replicator_plan_test.go b/go/vt/vttablet/tabletmanager/vreplication/replicator_plan_test.go index 8b97f02dc1e..780b1c0d064 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/replicator_plan_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/replicator_plan_test.go @@ -414,6 +414,55 @@ func TestBuildPlayerPlan(t *testing.T) { }, }, }, + }, { + input: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "t1", + Filter: "select c1, convert(c using utf8mb4) as c2 from t1", + }}, + }, + plan: &TestReplicatorPlan{ + VStreamFilter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "t1", + Filter: "select c1, convert(c using utf8mb4) as c2 from t1", + }}, + }, + TargetTables: []string{"t1"}, + TablePlans: map[string]*TestTablePlan{ + "t1": { + TargetName: "t1", + SendRule: "t1", + PKReferences: []string{"c1"}, + InsertFront: "insert into t1(c1,c2)", + InsertValues: "(:a_c1,convert(:a_c using utf8mb4))", + Insert: "insert into t1(c1,c2) values (:a_c1,convert(:a_c using utf8mb4))", + Update: "update t1 set c2=convert(:a_c using utf8mb4) where c1=:b_c1", + Delete: "delete from t1 where c1=:b_c1", + }, + }, + }, + planpk: &TestReplicatorPlan{ + VStreamFilter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "t1", + Filter: "select c1, convert(c using utf8mb4) as c2, pk1, pk2 from t1", + }}, + }, + TargetTables: []string{"t1"}, + TablePlans: map[string]*TestTablePlan{ + "t1": { + TargetName: "t1", + SendRule: "t1", + PKReferences: []string{"c1", "pk1", "pk2"}, + InsertFront: "insert into t1(c1,c2)", + InsertValues: "(:a_c1,convert(:a_c using utf8mb4))", + Insert: "insert into t1(c1,c2) select :a_c1, convert(:a_c using utf8mb4) from dual where (:a_pk1,:a_pk2) <= (1,'aaa')", + Update: "update t1 set c2=convert(:a_c using utf8mb4) where c1=:b_c1 and (:b_pk1,:b_pk2) <= (1,'aaa')", + Delete: "delete from t1 where c1=:b_c1 and (:b_pk1,:b_pk2) <= (1,'aaa')", + }, + }, + }, }, { // Keywords as names. input: &binlogdatapb.Filter{ diff --git a/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go b/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go index d899be92fa7..4e8141f164f 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go +++ b/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go @@ -420,9 +420,28 @@ func (tpb *tablePlanBuilder) analyzeExpr(selExpr sqlparser.SelectExpr) (*colExpr references: make(map[string]bool), } if expr, ok := aliased.Expr.(*sqlparser.ConvertUsingExpr); ok { + // Here we find the actual column name in the convert, in case + // this is a column rename and the AS is the new column. + // For example, in convert(c1 using utf8mb4) as c2, we want to find + // c1, because c1 exists in the current table whereas c2 is the renamed column + // in the desired table. + var colName sqlparser.IdentifierCI + err := sqlparser.Walk(func(node sqlparser.SQLNode) (kontinue bool, err error) { + switch node := node.(type) { + case *sqlparser.ColName: + if !node.Qualifier.IsEmpty() { + return false, fmt.Errorf("unsupported qualifier for column: %v", sqlparser.String(node)) + } + colName = node.Name + } + return true, nil + }, aliased.Expr) + if err != nil { + return nil, fmt.Errorf("failed to find column name for convert using expression: %v, %v", sqlparser.String(aliased.Expr), err) + } selExpr := &sqlparser.ConvertUsingExpr{ Type: "utf8mb4", - Expr: &sqlparser.ColName{Name: as}, + Expr: &sqlparser.ColName{Name: colName}, } cexpr.expr = expr cexpr.operation = opExpr diff --git a/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go b/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go index d45dceda1b5..0378d04c373 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go +++ b/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go @@ -698,7 +698,26 @@ func (plan *Plan) analyzeExpr(vschema *localVSchema, selExpr sqlparser.SelectExp FixedValue: sqltypes.NewInt64(num), }, nil case *sqlparser.ConvertUsingExpr: - colnum, err := findColumn(plan.Table, aliased.As) + // Here we find the actual column name in the convert, in case + // this is a column rename and the AS is the new column. + // For example, in convert(c1 using utf8mb4) as c2, we want to find + // c1, because c1 exists in the current table whereas c2 is the renamed column + // in the desired table. + var colName sqlparser.IdentifierCI + err := sqlparser.Walk(func(node sqlparser.SQLNode) (kontinue bool, err error) { + switch node := node.(type) { + case *sqlparser.ColName: + if !node.Qualifier.IsEmpty() { + return false, fmt.Errorf("unsupported qualifier for column: %v", sqlparser.String(node)) + } + colName = node.Name + } + return true, nil + }, aliased.Expr) + if err != nil { + return ColExpr{}, fmt.Errorf("failed to find column name for convert using expression: %v, %v", sqlparser.String(aliased.Expr), err) + } + colnum, err := findColumn(plan.Table, colName) if err != nil { return ColExpr{}, err } diff --git a/go/vt/vttablet/tabletserver/vstreamer/planbuilder_test.go b/go/vt/vttablet/tabletserver/vstreamer/planbuilder_test.go index 4ec57b15b7d..441b45fc23f 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/planbuilder_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/planbuilder_test.go @@ -427,6 +427,29 @@ func TestPlanBuilder(t *testing.T) { KeyRange: nil, }}, }, + }, { + inTable: t1, + inRule: &binlogdatapb.Rule{Match: "t1", Filter: "select convert(val using utf8mb4) as val2, id as id from t1"}, + outPlan: &Plan{ + ColExprs: []ColExpr{{ + ColNum: 1, + Field: &querypb.Field{ + Name: "val", + Type: sqltypes.VarBinary, + Charset: collations.CollationBinaryID, + Flags: uint32(querypb.MySqlFlag_BINARY_FLAG), + }, + }, { + ColNum: 0, + Field: &querypb.Field{ + Name: "id", + Type: sqltypes.Int64, + Charset: collations.CollationBinaryID, + Flags: uint32(querypb.MySqlFlag_NUM_FLAG), + }, + }}, + convertUsingUTF8Columns: map[string]bool{"val": true}, + }, }, { inTable: regional, inRule: &binlogdatapb.Rule{Match: "regional", Filter: "select id, keyspace_id() from regional"},