Skip to content

Commit

Permalink
Move enum and set mappings to vstreamer
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
mattlord committed Apr 16, 2024
1 parent 791ca02 commit 8385a0c
Show file tree
Hide file tree
Showing 7 changed files with 82 additions and 53 deletions.
2 changes: 1 addition & 1 deletion examples/local/vstream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import (
*/
func main() {
ctx := context.Background()
streamCustomer := true
streamCustomer := false
var vgtid *binlogdatapb.VGtid
if streamCustomer {
vgtid = &binlogdatapb.VGtid{
Expand Down
10 changes: 5 additions & 5 deletions go/vt/schema/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package schema
import (
"fmt"
"regexp"
"strconv"
"strings"

"vitess.io/vitess/go/sqltypes"
Expand Down Expand Up @@ -171,12 +170,13 @@ func parseEnumOrSetTokens(enumOrSetValues string) []string {
}

// ParseEnumOrSetTokensMap parses the comma delimited part of an enum column definition
// and returns a map where ["1"] is the first token, and ["<n>"] is th elast token
func ParseEnumOrSetTokensMap(enumOrSetValues string) map[string]string {
// and returns a map where [1] is the first token, and [<n>] is the last.
func ParseEnumOrSetTokensMap(enumOrSetValues string) map[int]string {
tokens := parseEnumOrSetTokens(enumOrSetValues)
tokensMap := map[string]string{}
tokensMap := map[int]string{}
for i, token := range tokens {
tokensMap[strconv.Itoa(i+1)] = token
// SET and ENUM values are 1 indexed.
tokensMap[i+1] = token
}
return tokensMap
}
14 changes: 7 additions & 7 deletions go/vt/schema/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,12 +167,12 @@ func TestParseEnumTokensMap(t *testing.T) {
input := `'x-small','small','medium','large','x-large'`

enumTokensMap := ParseEnumOrSetTokensMap(input)
expect := map[string]string{
"1": "x-small",
"2": "small",
"3": "medium",
"4": "large",
"5": "x-large",
expect := map[int]string{
1: "x-small",
2: "small",
3: "medium",
4: "large",
5: "x-large",
}
assert.Equal(t, expect, enumTokensMap)
}
Expand All @@ -183,7 +183,7 @@ func TestParseEnumTokensMap(t *testing.T) {
}
for _, input := range inputs {
enumTokensMap := ParseEnumOrSetTokensMap(input)
expect := map[string]string{}
expect := map[int]string{}
assert.Equal(t, expect, enumTokensMap)
}
}
Expand Down
29 changes: 0 additions & 29 deletions go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,6 @@ type TablePlan struct {
Delete *sqlparser.ParsedQuery
MultiDelete *sqlparser.ParsedQuery
Fields []*querypb.Field
EnumValuesMap map[string](map[string]string)
ConvertIntToEnum map[string]bool
// PKReferences is used to check if an event changed
// a primary key column (row move).
Expand Down Expand Up @@ -335,34 +334,6 @@ func (tp *TablePlan) bindFieldVal(field *querypb.Field, val *sqltypes.Value) (*q
// An integer converted to an enum. We must write the textual value of the int. i.e. 0 turns to '0'
return sqltypes.StringBindVariable(val.ToString()), nil
}
if enumValues, ok := tp.EnumValuesMap[field.Name]; ok && !val.IsNull() {
// The fact that this field has a EnumValuesMap entry, means we must
// use the enum's text value as opposed to the enum's numerical value.
// This may be needed in Online DDL, when the enum column could be modified:
// - Either from ENUM to a text type (VARCHAR/TEXT)
// - Or from ENUM to another ENUM with different value ordering,
// e.g. from `('red', 'green', 'blue')` to `('red', 'blue')`.
// By applying the textual value of an enum we eliminate the ordering concern.
// In non-Online DDL this shouldn't be a concern because the schema is static,
// and so passing the enum's numerical value is sufficient.
enumValue, enumValueOK := enumValues[val.ToString()]
if !enumValueOK {
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "Invalid enum value: %v for field %s", val, field.Name)
}
// get the enum text for this val
return sqltypes.StringBindVariable(enumValue), nil
}
if field.Type == querypb.Type_ENUM {
// This is an ENUM w/o a values map, which means that we are most likely using
// the index value -- what is stored and binlogged vs. the list of strings
// defined in the table schema -- and we must use an int bindvar or we'll have
// invalid/incorrect predicates like WHERE enumcol='2'.
// This will be the case when applying binlog events.
enumIndexVal := sqltypes.MakeTrusted(querypb.Type_UINT64, val.Raw())
if enumIndex, err := enumIndexVal.ToUint64(); err == nil {
return sqltypes.Uint64BindVariable(enumIndex), nil
}
}
return sqltypes.ValueBindVariable(*val), nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"vitess.io/vitess/go/textutil"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/schema"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet"
Expand Down Expand Up @@ -231,12 +230,6 @@ func buildTablePlan(tableName string, rule *binlogdatapb.Rule, colInfos []*Colum
Match: fromTable,
}

enumValuesMap := map[string](map[string]string){}
for k, v := range rule.ConvertEnumToText {
tokensMap := schema.ParseEnumOrSetTokensMap(v)
enumValuesMap[k] = tokensMap
}

if expr, ok := sel.SelectExprs[0].(*sqlparser.StarExpr); ok {
// If it's a "select *", we return a partial plan, and complete
// it when we get back field info from the stream.
Expand All @@ -252,7 +245,6 @@ func buildTablePlan(tableName string, rule *binlogdatapb.Rule, colInfos []*Colum
SendRule: sendRule,
Lastpk: lastpk,
Stats: stats,
EnumValuesMap: enumValuesMap,
ConvertCharset: rule.ConvertCharset,
ConvertIntToEnum: rule.ConvertIntToEnum,
CollationEnv: collationEnv,
Expand Down Expand Up @@ -335,7 +327,6 @@ func buildTablePlan(tableName string, rule *binlogdatapb.Rule, colInfos []*Colum

tablePlan := tpb.generate()
tablePlan.SendRule = sendRule
tablePlan.EnumValuesMap = enumValuesMap
tablePlan.ConvertCharset = rule.ConvertCharset
tablePlan.ConvertIntToEnum = rule.ConvertIntToEnum
return tablePlan, nil
Expand Down
5 changes: 5 additions & 0 deletions go/vt/vttablet/tabletserver/vstreamer/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ type Plan struct {
// of the table.
Filters []Filter

// Convert any ordinal values seen in the binlog events for ENUM
// or SET columns to the string value. The map is keyed on the
// column number, with the value being the map of ordinal to string.
EnumValuesMap map[int](map[int]string)

env *vtenv.Environment
}

Expand Down
66 changes: 64 additions & 2 deletions go/vt/vttablet/tabletserver/vstreamer/vstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"context"
"fmt"
"io"
"strconv"
"strings"
"time"

"google.golang.org/protobuf/encoding/prototext"
Expand All @@ -35,6 +37,7 @@ import (
"vitess.io/vitess/go/vt/dbconfigs"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/logutil"
schemautils "vitess.io/vitess/go/vt/schema"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet"
Expand Down Expand Up @@ -753,6 +756,23 @@ func (vs *vstreamer) buildTablePlan(id uint64, tm *mysql.TableMap) (*binlogdatap
Plan: plan,
TableMap: tm,
}
// Add any necessary ENUM and SET ordinal to string mappings.
for i, col := range cols {
if col.Type == querypb.Type_ENUM || col.Type == querypb.Type_SET {
if plan.EnumValuesMap == nil {
log.Errorf("DEBUG: buildTablePlan initializing plan's EnumValuesMap")
plan.EnumValuesMap = make(map[int]map[int]string)
}
// Strip the enum()/set() parts out.
begin := strings.Index(col.ColumnType, "(")
end := strings.LastIndex(col.ColumnType, ")")
if begin == -1 || end == -1 {
return nil, fmt.Errorf("enum or set column does not have valid string values: %s", col.ColumnType)
}
plan.EnumValuesMap[i] = schemautils.ParseEnumOrSetTokensMap(col.ColumnType[begin+1 : end])
log.Errorf("DEBUG: buildTablePlan plan EnumValuesMap for col %d: %v", i, plan.EnumValuesMap[i])
}
}
return &binlogdatapb.VEvent{
Type: binlogdatapb.VEventType_FIELD,
FieldEvent: &binlogdatapb.FieldEvent{
Expand Down Expand Up @@ -827,15 +847,14 @@ func (vs *vstreamer) buildTableColumns(tm *mysql.TableMap) ([]*querypb.Field, er
// initially using collations for the column types based on the *connection
// collation* and not the actual *column collation*.
// But because we now get the correct collation for the actual column from
// mysqld in getExtColsInfo we know this is the correct one for the vstream
// mysqld in getExtColInfos we know this is the correct one for the vstream
// target and we use that rather than any that were in the binlog events,
// which were for the source and which can be using a different collation
// than the target.
fieldsCopy, err := getFields(vs.ctx, vs.cp, vs.se, tm.Name, tm.Database, st.Fields[:len(tm.Types)])
if err != nil {
return nil, err
}

return fieldsCopy, nil
}

Expand Down Expand Up @@ -1046,6 +1065,49 @@ func (vs *vstreamer) extractRowAndFilter(plan *streamerPlan, data []byte, dataCo
}
pos += l

// Convert the ordinal values in the binlog event for SET and ENUM fields into their
// string representations.
if plan.Table.Fields[colNum].Type == querypb.Type_ENUM {
ordinalValue, err := value.ToInt()
if err != nil {
log.Errorf("extractRowAndFilter: %s, table: %s, colNum: %d, fields: %+v, current values: %+v",
err, plan.Table.Name, colNum, plan.Table.Fields, values)
return false, nil, false, err
}
strVal := plan.EnumValuesMap[colNum][ordinalValue]
value = sqltypes.MakeTrusted(plan.Table.Fields[colNum].Type, []byte(strVal))
log.Errorf("DEBUG: extractRowAndFilter: mapped string value for col %d: %v", colNum, strVal)
}
if plan.Table.Fields[colNum].Type == querypb.Type_SET {
val := bytes.Buffer{}
// A SET column can have 64 unique values: https://dev.mysql.com/doc/refman/en/set.html
// For this reason the binlog event contains the values encoded as a 64-bit integer.
// This value can then be converted to a binary / base 2 integer where it becomes
// a bitmap of the values specified.
bv, err := value.ToUint64()
if err != nil {
log.Errorf("extractRowAndFilter: %s, table: %s, colNum: %d, fields: %+v, current values: %+v",
err, plan.Table.Name, colNum, plan.Table.Fields, values)
return false, nil, false, err
}
// Convert it to a base2 integer / binary value. Finally, strconv converts this to a
// string of 1s and 0s and we can then loop through the bytes. Note that this map is
// in reverse order as this was a little endian integer.
valMap := strconv.FormatUint(bv, 2)
valLen := len(valMap)
for i := 0; i < valLen; i++ {
if valMap[i] == '1' {
strVal := plan.EnumValuesMap[colNum][valLen-i]
if val.Len() > 0 {
val.WriteByte(',')
}
val.WriteString(strVal)
}
}
value = sqltypes.MakeTrusted(plan.Table.Fields[colNum].Type, val.Bytes())
log.Errorf("DEBUG: extractRowAndFilter: mapped string value for col %d: %v", colNum, val.String())
}

charsets[colNum] = collations.ID(plan.Table.Fields[colNum].Charset)
values[colNum] = value
valueIndex++
Expand Down

0 comments on commit 8385a0c

Please sign in to comment.