Skip to content

Commit

Permalink
fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
lidezhu committed Jan 31, 2024
1 parent 8c4221d commit 7178975
Show file tree
Hide file tree
Showing 16 changed files with 78 additions and 41 deletions.
4 changes: 2 additions & 2 deletions cdc/model/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ func (ti *TableInfo) GetRowColInfos() ([]int64, map[int64]*types.FieldType, []ro
return ti.handleColID, ti.rowColFieldTps, ti.rowColInfos
}

// GetExtraColInfosWithoutVirtualCols return column infos for non-virtual columns
// GetColInfosForRowChangedEvent return column infos for non-virtual columns
// The column order in the result is the same as the order in its corresponding RowChangedEvent
func (ti *TableInfo) GetColInfosForRowChangedEvent() []rowcodec.ColInfo {
return *ti.rowColInfosWithoutVirtualCols
Expand All @@ -389,7 +389,7 @@ func (ti *TableInfo) HasUniqueColumn() bool {
return ti.hasUniqueColumn
}

// HasVirtualColumn returns whether the table has virtual columns
// HasVirtualColumns returns whether the table has virtual columns
func (ti *TableInfo) HasVirtualColumns() bool {
return ti.virtualColumnCount > 0
}
Expand Down
26 changes: 15 additions & 11 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -698,18 +698,22 @@ func AddExtraColumnInfo(tableInfo *model.TableInfo, extraColInfos []rowcodec.Col
}
}

// GetHandleIndexOffsets4Test is used to get the offset of handle column in test
func GetHandleIndexOffsets4Test(cols []*Column) [][]int {
// GetHandleAndUniqueIndexOffsets4Test is used to get the offsets of handle columns and other unique index columns in test
func GetHandleAndUniqueIndexOffsets4Test(cols []*Column) [][]int {
result := make([][]int, 0)
handleColumns := make([]int, 0)
for i, col := range cols {
if col.Flag.IsHandleKey() {
handleColumns = append(handleColumns, i)
} else if col.Flag.IsUniqueKey() {
// TODO: add more comment why this is needed
result = append(result, []int{i})
}
}
if len(handleColumns) == 0 {
return nil
if len(handleColumns) != 0 {
result = append(result, handleColumns)
}
return [][]int{handleColumns}
return result
}

// BuildTiDBTableInfoWithoutVirtualColumns build a TableInfo without virual columns from the source table info
Expand Down Expand Up @@ -779,12 +783,7 @@ func BuildTiDBTableInfoImpl(
columnInfo.ID = columnIDAllocator.GetColumnID(col.Name)
columnInfo.Name = model.NewCIStr(col.Name)
columnInfo.SetType(col.Type)
if col.Charset != "" {
columnInfo.SetCharset(col.Charset)
} else {
// charset is not stored, give it a default value
columnInfo.SetCharset(mysql.UTF8MB4Charset)
}

if col.Collation != "" {
columnInfo.SetCollate(col.Collation)
} else {
Expand All @@ -796,6 +795,11 @@ func BuildTiDBTableInfoImpl(
flag := col.Flag
if flag.IsBinary() {
columnInfo.SetCharset("binary")
} else if col.Charset != "" {
columnInfo.SetCharset(col.Charset)
} else {
// charset is not stored, give it a default value
columnInfo.SetCharset(mysql.UTF8MB4Charset)
}
if flag.IsGeneratedColumn() {
// we do not use this field, so we set it to any non-empty string
Expand Down
2 changes: 2 additions & 0 deletions cdc/sink/dmlsink/event_appender.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ func (t *TxnEventAppender) createSingleTableTxn(
PhysicalTableID: row.PhysicalTableID,
TableInfo: row.TableInfo,
}
log.Info("createSingleTableTxn", zap.Any("txn", txn))

if row.TableInfo != nil {
txn.TableInfoVersion = row.TableInfo.Version
}
Expand Down
3 changes: 2 additions & 1 deletion cdc/sink/dmlsink/txn/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ func genTxnKeys(txn *model.SingleTableTxn) []uint64 {
log.Panic("transaction key hash fail")
}
hashRes[uint64(hasher.Sum32())] = struct{}{}
log.Info("genTxnKeys", zap.Any("row", row), zap.Uint32("hash", hasher.Sum32()))
hasher.Reset()
}
}
Expand Down Expand Up @@ -127,7 +128,7 @@ func genRowKeys(row *model.RowChangedEvent) [][]byte {
if len(keys) == 0 {
// use table ID as key if no key generated (no PK/UK),
// no concurrence for rows in the same table.
log.Debug("Use table id as the key", zap.Int64("tableID", row.PhysicalTableID))
log.Info("Use table id as the key", zap.Int64("tableID", row.PhysicalTableID), zap.String("tableName", row.TableInfo.GetTableName()))
tableKey := make([]byte, 8)
binary.BigEndian.PutUint64(tableKey, uint64(row.PhysicalTableID))
keys = [][]byte{tableKey}
Expand Down
2 changes: 1 addition & 1 deletion cdc/sink/dmlsink/txn/mysql/dml.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func prepareDelete(quoteTable string, cols []*model.Column, forceReplicate bool)
}
builder.WriteString(" LIMIT 1")
sql := builder.String()
log.Info("prepareDelete", zap.String("sql", sql))
log.Info("prepareDelete", zap.String("sql", sql), zap.Any("args", args))
return sql, args
}

Expand Down
1 change: 1 addition & 0 deletions cdc/sink/dmlsink/txn/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,7 @@ func (s *mysqlBackend) prepareDMLs() *preparedDMLs {
zap.String("changefeed", s.changefeed),
zap.Bool("translateToInsert", translateToInsert),
zap.Uint64("firstRowCommitTs", firstRow.CommitTs),
zap.String("tableName", firstRow.TableInfo.GetSchemaName()),
zap.Uint64("firstRowReplicatingTs", firstRow.ReplicatingTs),
zap.Bool("safeMode", s.cfg.SafeMode))

Expand Down
9 changes: 5 additions & 4 deletions cmd/kafka-consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -610,12 +610,11 @@ func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram
log.Error("add key value to the decoder failed", zap.Error(err))
return cerror.Trace(err)
}

log.Info("AddKeyValue", zap.ByteString("key", message.Key), zap.ByteString("value", message.Value))

counter := 0
for {
tp, hasNext, err := decoder.HasNext()
log.Info("decoder hasNext", zap.Bool("hasNext", hasNext), zap.Any("tp", tp))
if err != nil {
log.Panic("decode message key failed", zap.Error(err))
}
Expand Down Expand Up @@ -652,6 +651,7 @@ func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram
session.MarkMessage(message, "")
case model.MessageTypeRow:
row, err := decoder.NextRowChangedEvent()
log.Info("RowChangedEvent received", zap.ByteString("value", message.Value), zap.Any("row", row), zap.Any("err", err))
if err != nil {
log.Panic("decode message value failed",
zap.ByteString("value", message.Value),
Expand Down Expand Up @@ -746,6 +746,9 @@ func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram
prometheus.NewCounter(prometheus.CounterOpts{}),
))
}
for _, e := range events {
log.Info("row changed event ready to be flushed", zap.Any("event", e))
}
s, _ := sink.tableSinksMap.Load(tableID)
s.(tablesink.TableSink).AppendRowChangedEvents(events...)
commitTs := events[len(events)-1].CommitTs
Expand All @@ -754,8 +757,6 @@ func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram
sink.tablesCommitTsMap.Store(tableID, commitTs)
}
}
log.Debug("update partition resolved ts",
zap.Uint64("ts", ts), zap.Int32("partition", partition))
atomic.StoreUint64(&sink.resolvedTs, ts)
// todo: mark the offset after the DDL is fully synced to the downstream mysql.
session.MarkMessage(message, "")
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ require (
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/mtibben/percent v0.2.1 // indirect
github.com/ncw/directio v1.0.5 // indirect
github.com/ngaut/log v0.0.0-20210830112240-0124ec040aeb // indirect
github.com/ngaut/log v0.0.0-20210830112240-0124ec040aeb
github.com/ngaut/pools v0.0.0-20180318154953-b7bc8c42aac7 // indirect
github.com/ngaut/sync2 v0.0.0-20141008032647-7a24ed77b2ef // indirect
github.com/opencontainers/runtime-spec v1.0.2 // indirect
Expand Down
19 changes: 18 additions & 1 deletion pkg/sink/codec/canal/canal_json_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,8 @@ func canalJSONMessage2RowChange(msg canalJSONMessageInterface) (*model.RowChange
result.CommitTs = msg.getCommitTs()
log.Info("canalJSONMessage2RowChange meet event",
zap.String("table", *msg.getTable()),
zap.Any("type", msg.eventType()))
zap.Any("type", msg.eventType()),
zap.Any("pkNames", msg.pkNameSet()))
mysqlType := msg.getMySQLType()
var err error
if msg.eventType() == canal.EventType_DELETE {
Expand Down Expand Up @@ -187,11 +188,27 @@ func canalJSONMessage2RowChange(msg canalJSONMessageInterface) (*model.RowChange
// for `UPDATE`, `old` contain old data, set it as the `PreColumns`
if msg.eventType() == canal.EventType_UPDATE {
preCols, err := canalJSONColumnMap2RowChangeColumns(msg.getOld(), mysqlType)
// TODO: add a unit test
if len(preCols) < len(cols) {
newPreCols := make([]*model.Column, 0, len(preCols))
j := 0
for _, col := range cols {
if j < len(preCols) && col.Name == preCols[j].Name {
newPreCols = append(newPreCols, preCols[j])
j += 1
} else {
newPreCols = append(newPreCols, col)
}
}
preCols = newPreCols
}
result.PreColumns = model.Columns2ColumnDatas(preCols, result.TableInfo)
if err != nil {
return nil, err
}
}
log.Info("canalJSONMessage2RowChange return event",
zap.Any("result", result))

return result, nil
}
Expand Down
11 changes: 11 additions & 0 deletions pkg/sink/codec/open/open_protocol_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ func (b *BatchDecoder) decodeNextKey() error {

// HasNext implements the RowEventDecoder interface
func (b *BatchDecoder) HasNext() (model.MessageType, bool, error) {
log.Info("BatchDecoder HasNext begin", zap.Bool("hasNext", b.hasNext()))
if !b.hasNext() {
return 0, false, nil
}
Expand All @@ -134,6 +135,8 @@ func (b *BatchDecoder) HasNext() (model.MessageType, bool, error) {
}

if b.nextKey.Type == model.MessageTypeRow {
log.Info("BatchDecoder HasNext row")

valueLen := binary.BigEndian.Uint64(b.valueBytes[:8])
value := b.valueBytes[8 : valueLen+8]
b.valueBytes = b.valueBytes[valueLen+8:]
Expand Down Expand Up @@ -198,6 +201,7 @@ func (b *BatchDecoder) NextRowChangedEvent() (*model.RowChangedEvent, error) {
if b.nextKey.Type != model.MessageTypeRow {
return nil, cerror.ErrOpenProtocolCodecInvalidData.GenWithStack("not found row event message")
}
log.Info("BatchDecoder NextRowChangedEvent", zap.Bool("OnlyHandleKey", b.nextKey.OnlyHandleKey), zap.String("ClaimCheckLocation", b.nextKey.ClaimCheckLocation))

ctx := context.Background()
// claim-check message found
Expand Down Expand Up @@ -260,6 +264,7 @@ func (b *BatchDecoder) assembleHandleKeyOnlyEvent(
)

tableInfo := handleKeyOnlyEvent.TableInfo
log.Info("assembleHandleKeyOnlyEvent", zap.Any("handleKeyOnlyEvent", handleKeyOnlyEvent))
if handleKeyOnlyEvent.IsInsert() {
conditions := make(map[string]interface{}, len(handleKeyOnlyEvent.Columns))
for _, col := range handleKeyOnlyEvent.Columns {
Expand All @@ -271,6 +276,8 @@ func (b *BatchDecoder) assembleHandleKeyOnlyEvent(
return nil, err
}
columns := b.buildColumns(holder, conditions)
indexColumns := model.GetHandleAndUniqueIndexOffsets4Test(columns)
handleKeyOnlyEvent.TableInfo = model.BuildTableInfo(schema, table, columns, indexColumns)
handleKeyOnlyEvent.Columns = model.Columns2ColumnDatas(columns, handleKeyOnlyEvent.TableInfo)
} else if handleKeyOnlyEvent.IsDelete() {
conditions := make(map[string]interface{}, len(handleKeyOnlyEvent.PreColumns))
Expand All @@ -283,6 +290,8 @@ func (b *BatchDecoder) assembleHandleKeyOnlyEvent(
return nil, err
}
preColumns := b.buildColumns(holder, conditions)
indexColumns := model.GetHandleAndUniqueIndexOffsets4Test(preColumns)
handleKeyOnlyEvent.TableInfo = model.BuildTableInfo(schema, table, preColumns, indexColumns)
handleKeyOnlyEvent.PreColumns = model.Columns2ColumnDatas(preColumns, handleKeyOnlyEvent.TableInfo)
} else if handleKeyOnlyEvent.IsUpdate() {
conditions := make(map[string]interface{}, len(handleKeyOnlyEvent.Columns))
Expand All @@ -295,6 +304,8 @@ func (b *BatchDecoder) assembleHandleKeyOnlyEvent(
return nil, err
}
columns := b.buildColumns(holder, conditions)
indexColumns := model.GetHandleAndUniqueIndexOffsets4Test(columns)
handleKeyOnlyEvent.TableInfo = model.BuildTableInfo(schema, table, columns, indexColumns)
handleKeyOnlyEvent.Columns = model.Columns2ColumnDatas(columns, handleKeyOnlyEvent.TableInfo)

conditions = make(map[string]interface{}, len(handleKeyOnlyEvent.PreColumns))
Expand Down
10 changes: 1 addition & 9 deletions pkg/sink/codec/open/open_protocol_encoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ var (
{
Name: "col1",
Type: mysql.TypeVarchar,
Flag: model.HandleKeyFlag | model.PrimaryKeyFlag,
Flag: model.HandleKeyFlag | model.UniqueKeyFlag,
},
{
Name: "col2",
Expand Down Expand Up @@ -127,10 +127,6 @@ var (
Name: "col15",
Type: mysql.TypeBlob,
},
{
Name: "col16",
Type: mysql.TypeBlob,
},
}, [][]int{{0}})
largeTestEvent = &model.RowChangedEvent{
CommitTs: 1,
Expand Down Expand Up @@ -196,10 +192,6 @@ var (
Name: "col15",
Value: []byte("12345678910"),
},
{
Name: "col16",
Value: []byte("12345678910"),
},
}, tableInfoWithManyCols),
}

Expand Down
9 changes: 7 additions & 2 deletions pkg/sink/codec/open/open_protocol_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@ import (
"sort"
"strings"

"github.com/pingcap/log"
timodel "github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tiflow/cdc/model"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/sink/codec"
"github.com/pingcap/tiflow/pkg/sink/codec/common"
"github.com/pingcap/tiflow/pkg/sink/codec/internal"
"go.uber.org/zap"
)

type messageRow struct {
Expand Down Expand Up @@ -150,15 +152,17 @@ func msgToRowChange(key *internal.MessageKey, value *messageRow) *model.RowChang
if len(value.Delete) != 0 {
preCols := codecColumns2RowChangeColumns(value.Delete)
internal.SortColumnArrays(preCols)
indexColumns := model.GetHandleIndexOffsets4Test(preCols)
indexColumns := model.GetHandleAndUniqueIndexOffsets4Test(preCols)
log.Info("msgToRowChange", zap.Any("indexColumns", indexColumns), zap.Any("preCols", preCols))
e.TableInfo = model.BuildTableInfo(key.Schema, key.Table, preCols, indexColumns)
e.PreColumns = model.Columns2ColumnDatas(preCols, e.TableInfo)
} else {
cols := codecColumns2RowChangeColumns(value.Update)
preCols := codecColumns2RowChangeColumns(value.PreColumns)
internal.SortColumnArrays(cols)
internal.SortColumnArrays(preCols)
indexColumns := model.GetHandleIndexOffsets4Test(cols)
indexColumns := model.GetHandleAndUniqueIndexOffsets4Test(cols)
log.Info("msgToRowChange", zap.Any("indexColumns", indexColumns), zap.Any("cols", cols))
e.TableInfo = model.BuildTableInfo(key.Schema, key.Table, cols, indexColumns)
e.Columns = model.Columns2ColumnDatas(cols, e.TableInfo)
e.PreColumns = model.Columns2ColumnDatas(preCols, e.TableInfo)
Expand All @@ -169,6 +173,7 @@ func msgToRowChange(key *internal.MessageKey, value *messageRow) *model.RowChang
e.PhysicalTableID = *key.Partition
e.TableInfo.TableName.IsPartition = true
}
log.Info("msgToRowChange", zap.Any("event", e))

return e
}
Expand Down
9 changes: 4 additions & 5 deletions pkg/sqlmodel/multirow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package sqlmodel
import (
"testing"

"github.com/pingcap/tiflow/cdc/model"
cdcmodel "github.com/pingcap/tiflow/cdc/model"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -128,7 +127,7 @@ func testGenUpdateMultiRowsWithVirtualGeneratedColumn(t *testing.T, genUpdate ge
sourceTI := mockTableInfo(t, "CREATE TABLE tb1 (c INT, c1 int as (c+100) virtual not null, c2 INT, c3 INT, PRIMARY KEY (c))")
targetTI := mockTableInfo(t, "CREATE TABLE tb (c INT, c1 int as (c+100) virtual not null, c2 INT, c3 INT, PRIMARY KEY (c))")

sourceTI = model.BuildTiDBTableInfoWithoutVirtualColumns(sourceTI)
sourceTI = cdcmodel.BuildTiDBTableInfoWithoutVirtualColumns(sourceTI)

change1 := NewRowChange(source, target, []interface{}{1, 2, 3}, []interface{}{10, 20, 30}, sourceTI, targetTI, nil)
change2 := NewRowChange(source, target, []interface{}{4, 5, 6}, []interface{}{40, 50, 60}, sourceTI, targetTI, nil)
Expand Down Expand Up @@ -161,7 +160,7 @@ func testGenUpdateMultiRowsWithVirtualGeneratedColumns(t *testing.T, genUpdate g
targetTI := mockTableInfo(t, `CREATE TABLE tb (c0 int as (c4*c4) virtual not null,
c1 int as (c+100) virtual not null, c2 INT, c3 INT, c4 INT, PRIMARY KEY (c4))`)

sourceTI = model.BuildTiDBTableInfoWithoutVirtualColumns(sourceTI)
sourceTI = cdcmodel.BuildTiDBTableInfoWithoutVirtualColumns(sourceTI)

change1 := NewRowChange(source, target, []interface{}{2, 3, 1}, []interface{}{20, 30, 10}, sourceTI, targetTI, nil)
change2 := NewRowChange(source, target, []interface{}{5, 6, 4}, []interface{}{50, 60, 40}, sourceTI, targetTI, nil)
Expand Down Expand Up @@ -223,8 +222,8 @@ func TestGenInsertMultiRows(t *testing.T) {
sourceTI2 := mockTableInfo(t, "CREATE TABLE tb2 (gen INT AS (c+1), c INT PRIMARY KEY, c2 INT)")
targetTI := mockTableInfo(t, "CREATE TABLE tb (gen INT AS (c+1), c INT PRIMARY KEY, c2 INT)")

sourceTI1 = model.BuildTiDBTableInfoWithoutVirtualColumns(sourceTI1)
sourceTI2 = model.BuildTiDBTableInfoWithoutVirtualColumns(sourceTI2)
sourceTI1 = cdcmodel.BuildTiDBTableInfoWithoutVirtualColumns(sourceTI1)
sourceTI2 = cdcmodel.BuildTiDBTableInfoWithoutVirtualColumns(sourceTI2)

change1 := NewRowChange(source1, target, nil, []interface{}{1, 2}, sourceTI1, targetTI, nil)
change2 := NewRowChange(source2, target, nil, []interface{}{3, 4}, sourceTI2, targetTI, nil)
Expand Down
3 changes: 1 addition & 2 deletions pkg/sqlmodel/row_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/pingcap/tidb/pkg/parser/charset"
timodel "github.com/pingcap/tidb/pkg/parser/model"
timock "github.com/pingcap/tidb/pkg/util/mock"
"github.com/pingcap/tiflow/cdc/model"
cdcmodel "github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/dm/pkg/log"
"github.com/pingcap/tiflow/dm/pkg/utils"
Expand Down Expand Up @@ -353,7 +352,7 @@ func TestGenInsert(t *testing.T) {
for _, c := range cases {
sourceTI := mockTableInfo(t, c.sourceCreateSQL)
targetTI := mockTableInfo(t, c.targetCreateSQL)
sourceTI = model.BuildTiDBTableInfoWithoutVirtualColumns(sourceTI)
sourceTI = cdcmodel.BuildTiDBTableInfoWithoutVirtualColumns(sourceTI)
change := NewRowChange(source, target, nil, c.postValues, sourceTI, targetTI, nil)
sql, args := change.GenSQL(DMLInsert)
require.Equal(t, c.expectedInsertSQL, sql)
Expand Down
Loading

0 comments on commit 7178975

Please sign in to comment.