Skip to content

Commit

Permalink
Add JiT mapping support
Browse files Browse the repository at this point in the history
And remove some noisy debug logging.
And remove any OnlineDDL changes for now. Those changes were
premature.

Signed-off-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
mattlord committed Apr 17, 2024
1 parent 8da07ec commit 1f10e2a
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ var (
)

const (
testDataPath = "/tmp/onlineddltests"
testDataPath = "testdata"
)

func TestMain(m *testing.M) {
Expand Down
22 changes: 10 additions & 12 deletions go/vt/vttablet/onlineddl/vrepl.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,19 +476,17 @@ func (v *VRepl) analyzeTables(ctx context.Context, conn *dbconnpool.DBConnection
return err
}

/*
for _, sourcePKColumn := range sharedPKColumns.Columns() {
mappedColumn := v.targetSharedColumns.GetColumn(sourcePKColumn.Name)
if sourcePKColumn.Type == vrepl.EnumColumnType && mappedColumn.Type == vrepl.EnumColumnType {
// An ENUM as part of PRIMARY KEY. We must convert it to text because OMG that's complicated.
// There's a scenario where a query may modify the enum value (and it's bad practice, seeing
// that it's part of the PK, but it's still valid), and in that case we must have the string value
// to be able to DELETE the old row
v.targetSharedColumns.SetEnumToTextConversion(mappedColumn.Name, sourcePKColumn.EnumValues)
v.enumToTextMap[sourcePKColumn.Name] = sourcePKColumn.EnumValues
}
for _, sourcePKColumn := range sharedPKColumns.Columns() {
mappedColumn := v.targetSharedColumns.GetColumn(sourcePKColumn.Name)
if sourcePKColumn.Type == vrepl.EnumColumnType && mappedColumn.Type == vrepl.EnumColumnType {
// An ENUM as part of PRIMARY KEY. We must convert it to text because OMG that's complicated.
// There's a scenario where a query may modify the enum value (and it's bad practice, seeing
// that it's part of the PK, but it's still valid), and in that case we must have the string value
// to be able to DELETE the old row
v.targetSharedColumns.SetEnumToTextConversion(mappedColumn.Name, sourcePKColumn.EnumValues)
v.enumToTextMap[sourcePKColumn.Name] = sourcePKColumn.EnumValues
}
*/
}

for i := range v.sourceSharedColumns.Columns() {
sourceColumn := v.sourceSharedColumns.Columns()[i]
Expand Down
68 changes: 30 additions & 38 deletions go/vt/vttablet/tabletserver/vstreamer/vstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -751,27 +751,13 @@ func (vs *vstreamer) buildTablePlan(id uint64, tm *mysql.TableMap) (*binlogdatap
vs.plans[id] = nil
return nil, nil
}
if err := addEnumAndSetMappingstoPlan(plan, cols); err != nil {
return nil, fmt.Errorf("failed to build ENUM and SET column integer to string mappings: %v", err)
}
vs.plans[id] = &streamerPlan{
Plan: plan,
TableMap: tm,
}
// Add any necessary ENUM and SET integer position to string mappings.
for i, col := range cols {
if col.Type == querypb.Type_ENUM || col.Type == querypb.Type_SET {
if plan.EnumSetValuesMap == nil {
plan.EnumSetValuesMap = make(map[int]map[int]string)
}
// Strip the enum() / set() parts out.
begin := strings.Index(col.ColumnType, "(")
end := strings.LastIndex(col.ColumnType, ")")
if begin == -1 || end == -1 {
return nil, fmt.Errorf("enum or set column %s does not have valid string values: %s",
col.Name, col.ColumnType)
}
plan.EnumSetValuesMap[i] = schemautils.ParseEnumOrSetTokensMap(col.ColumnType[begin+1 : end])
log.Errorf("DEBUG: enum values for %s: %v", col.Name, plan.EnumSetValuesMap[i])
}
}
return &binlogdatapb.VEvent{
Type: binlogdatapb.VEventType_FIELD,
FieldEvent: &binlogdatapb.FieldEvent{
Expand Down Expand Up @@ -963,12 +949,6 @@ nextrow:
}

func (vs *vstreamer) processRowEvent(vevents []*binlogdatapb.VEvent, plan *streamerPlan, rows mysql.Rows) ([]*binlogdatapb.VEvent, error) {
defer func() {
if r := recover(); r != nil {
log.Errorf("DEBUG: caught panic: %v", r)
log.Flush()
}
}()
rowChanges := make([]*binlogdatapb.RowChange, 0, len(rows.Rows))
for _, row := range rows.Rows {
beforeOK, beforeValues, _, err := vs.extractRowAndFilter(plan, row.Identify, rows.IdentifyColumns, row.NullIdentifyColumns)
Expand Down Expand Up @@ -1041,12 +1021,6 @@ func (vs *vstreamer) rebuildPlans() error {
// - data values, array of one value per column
// - true, if the row image was partial (i.e. binlog_row_image=noblob and dml doesn't update one or more blob/text columns)
func (vs *vstreamer) extractRowAndFilter(plan *streamerPlan, data []byte, dataColumns, nullColumns mysql.Bitmap) (bool, []sqltypes.Value, bool, error) {
defer func() {
if r := recover(); r != nil {
log.Errorf("DEBUG: caught panic: %v", r)
log.Flush()
}
}()
if len(data) == 0 {
return false, nil, false, nil
}
Expand Down Expand Up @@ -1076,6 +1050,11 @@ func (vs *vstreamer) extractRowAndFilter(plan *streamerPlan, data []byte, dataCo
}
pos += l

if plan.EnumSetValuesMap == nil {
if err := addEnumAndSetMappingstoPlan(plan.Plan, plan.Table.Fields); err != nil {
return false, nil, false, fmt.Errorf("failed to build ENUM and SET column integer to string mappings: %v", err)
}
}
// Convert the integer values in the binlog event for SET and ENUM fields into their
// string representations.
if plan.Table.Fields[colNum].Type == querypb.Type_ENUM {
Expand All @@ -1086,14 +1065,12 @@ func (vs *vstreamer) extractRowAndFilter(plan *streamerPlan, data []byte, dataCo
}
strVal, ok := plan.EnumSetValuesMap[colNum][int(iv)]
if !ok {
return false, nil, false, fmt.Errorf("no string value found for ENUM column %s in table %s using the found integer value: %d",
plan.Table.Fields[colNum].Name, plan.Table.Name, iv)
return false, nil, false, fmt.Errorf("no string value found for ENUM column %s in table %s -- with available values being: %v -- using the found integer value: %d",
plan.Table.Fields[colNum].Name, plan.Table.Name, plan.EnumSetValuesMap[colNum], iv)
}
value = sqltypes.MakeTrusted(plan.Table.Fields[colNum].Type, []byte(strVal))
log.Errorf("DEBUG: extractRowAndFilter: mapped string value for col %d: %v", colNum, strVal)
}
if plan.Table.Fields[colNum].Type == querypb.Type_SET {
log.Errorf("DEBUG: column %s is a SET column", plan.Table.Name)
val := bytes.Buffer{}
// A SET column can have 64 unique values: https://dev.mysql.com/doc/refman/en/set.html
// For this reason the binlog event contains the values encoded as an unsigned 64-bit
Expand All @@ -1108,22 +1085,19 @@ func (vs *vstreamer) extractRowAndFilter(plan *streamerPlan, data []byte, dataCo
// See what bits are set in the uint64 using bitmasks.
for b := uint64(1); b < 1<<63; b <<= 1 {
if iv&b > 0 {
log.Errorf("DEBUG: bit at position %d is set", idx)
strVal, ok := plan.EnumSetValuesMap[colNum][idx]
if !ok {
return false, nil, false, fmt.Errorf("no string value found for SET column %s in table %s using the found bit map: %b",
plan.Table.Fields[colNum].Name, plan.Table.Name, iv)
return false, nil, false, fmt.Errorf("no string value found for SET column %s in table %s -- with available values being: %v -- using the found bit map: %b",
plan.Table.Fields[colNum].Name, plan.Table.Name, plan.EnumSetValuesMap[colNum], iv)
}
if val.Len() > 0 {
val.WriteByte(',')
}
val.WriteString(strVal)
log.Errorf("DEBUG: extractRowAndFilter: mapped string value for col %s: %v", plan.Table.Name, strVal)
}
idx++
}
value = sqltypes.MakeTrusted(plan.Table.Fields[colNum].Type, val.Bytes())
log.Errorf("DEBUG: extractRowAndFilter: mapped string value for col %d: %v", colNum, val.String())
}

charsets[colNum] = collations.ID(plan.Table.Fields[colNum].Charset)
Expand All @@ -1135,6 +1109,24 @@ func (vs *vstreamer) extractRowAndFilter(plan *streamerPlan, data []byte, dataCo
return ok, filtered, partial, err
}

// Add any necessary ENUM and SET integer position to string mappings.
func addEnumAndSetMappingstoPlan(plan *Plan, cols []*querypb.Field) error {
plan.EnumSetValuesMap = make(map[int]map[int]string)
for i, col := range cols {
if col.Type == querypb.Type_ENUM || col.Type == querypb.Type_SET {
// Strip the enum() / set() parts out.
begin := strings.Index(col.ColumnType, "(")
end := strings.LastIndex(col.ColumnType, ")")
if begin == -1 || end == -1 {
return fmt.Errorf("enum or set column %s does not have valid string values: %s",
col.Name, col.ColumnType)
}
plan.EnumSetValuesMap[i] = schemautils.ParseEnumOrSetTokensMap(col.ColumnType[begin+1 : end])
}
}
return nil
}

func wrapError(err error, stopPos replication.Position, vse *Engine) error {
if err != nil {
vse.vstreamersEndedWithErrors.Add(1)
Expand Down

0 comments on commit 1f10e2a

Please sign in to comment.