From 7178975a06b0c127f02957c08eafc71d3914b661 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Wed, 31 Jan 2024 21:31:52 +0800 Subject: [PATCH] fix tests --- cdc/model/schema_storage.go | 4 +-- cdc/model/sink.go | 26 +++++++++++-------- cdc/sink/dmlsink/event_appender.go | 2 ++ cdc/sink/dmlsink/txn/event.go | 3 ++- cdc/sink/dmlsink/txn/mysql/dml.go | 2 +- cdc/sink/dmlsink/txn/mysql/mysql.go | 1 + cmd/kafka-consumer/main.go | 9 ++++--- go.mod | 2 +- pkg/sink/codec/canal/canal_json_message.go | 19 +++++++++++++- pkg/sink/codec/open/open_protocol_decoder.go | 11 ++++++++ .../codec/open/open_protocol_encoder_test.go | 10 +------ pkg/sink/codec/open/open_protocol_message.go | 9 +++++-- pkg/sqlmodel/multirow_test.go | 9 +++---- pkg/sqlmodel/row_change_test.go | 3 +-- tests/integration_tests/cdc/dailytest/case.go | 3 +++ tests/integration_tests/cdc/run.sh | 6 +++-- 16 files changed, 78 insertions(+), 41 deletions(-) diff --git a/cdc/model/schema_storage.go b/cdc/model/schema_storage.go index 5e745ddb209..ce36ed6263a 100644 --- a/cdc/model/schema_storage.go +++ b/cdc/model/schema_storage.go @@ -369,7 +369,7 @@ func (ti *TableInfo) GetRowColInfos() ([]int64, map[int64]*types.FieldType, []ro return ti.handleColID, ti.rowColFieldTps, ti.rowColInfos } -// GetExtraColInfosWithoutVirtualCols return column infos for non-virtual columns +// GetColInfosForRowChangedEvent return column infos for non-virtual columns // The column order in the result is the same as the order in its corresponding RowChangedEvent func (ti *TableInfo) GetColInfosForRowChangedEvent() []rowcodec.ColInfo { return *ti.rowColInfosWithoutVirtualCols @@ -389,7 +389,7 @@ func (ti *TableInfo) HasUniqueColumn() bool { return ti.hasUniqueColumn } -// HasVirtualColumn returns whether the table has virtual columns +// HasVirtualColumns returns whether the table has virtual columns func (ti *TableInfo) HasVirtualColumns() bool { return ti.virtualColumnCount > 0 } diff --git a/cdc/model/sink.go b/cdc/model/sink.go index 5b9c386476e..ed59c0bf997 100644 --- a/cdc/model/sink.go +++ b/cdc/model/sink.go @@ -698,18 +698,22 @@ func AddExtraColumnInfo(tableInfo *model.TableInfo, extraColInfos []rowcodec.Col } } -// GetHandleIndexOffsets4Test is used to get the offset of handle column in test -func GetHandleIndexOffsets4Test(cols []*Column) [][]int { +// GetHandleAndUniqueIndexOffsets4Test is used to get the offsets of handle columns and other unique index columns in test +func GetHandleAndUniqueIndexOffsets4Test(cols []*Column) [][]int { + result := make([][]int, 0) handleColumns := make([]int, 0) for i, col := range cols { if col.Flag.IsHandleKey() { handleColumns = append(handleColumns, i) + } else if col.Flag.IsUniqueKey() { + // TODO: add more comment why this is needed + result = append(result, []int{i}) } } - if len(handleColumns) == 0 { - return nil + if len(handleColumns) != 0 { + result = append(result, handleColumns) } - return [][]int{handleColumns} + return result } // BuildTiDBTableInfoWithoutVirtualColumns build a TableInfo without virual columns from the source table info @@ -779,12 +783,7 @@ func BuildTiDBTableInfoImpl( columnInfo.ID = columnIDAllocator.GetColumnID(col.Name) columnInfo.Name = model.NewCIStr(col.Name) columnInfo.SetType(col.Type) - if col.Charset != "" { - columnInfo.SetCharset(col.Charset) - } else { - // charset is not stored, give it a default value - columnInfo.SetCharset(mysql.UTF8MB4Charset) - } + if col.Collation != "" { columnInfo.SetCollate(col.Collation) } else { @@ -796,6 +795,11 @@ func BuildTiDBTableInfoImpl( flag := col.Flag if flag.IsBinary() { columnInfo.SetCharset("binary") + } else if col.Charset != "" { + columnInfo.SetCharset(col.Charset) + } else { + // charset is not stored, give it a default value + columnInfo.SetCharset(mysql.UTF8MB4Charset) } if flag.IsGeneratedColumn() { // we do not use this field, so we set it to any non-empty string diff --git a/cdc/sink/dmlsink/event_appender.go b/cdc/sink/dmlsink/event_appender.go index d2537e9ea3b..a758fe0c951 100644 --- a/cdc/sink/dmlsink/event_appender.go +++ b/cdc/sink/dmlsink/event_appender.go @@ -118,6 +118,8 @@ func (t *TxnEventAppender) createSingleTableTxn( PhysicalTableID: row.PhysicalTableID, TableInfo: row.TableInfo, } + log.Info("createSingleTableTxn", zap.Any("txn", txn)) + if row.TableInfo != nil { txn.TableInfoVersion = row.TableInfo.Version } diff --git a/cdc/sink/dmlsink/txn/event.go b/cdc/sink/dmlsink/txn/event.go index 7203d5e98d8..a7690252e3c 100644 --- a/cdc/sink/dmlsink/txn/event.go +++ b/cdc/sink/dmlsink/txn/event.go @@ -94,6 +94,7 @@ func genTxnKeys(txn *model.SingleTableTxn) []uint64 { log.Panic("transaction key hash fail") } hashRes[uint64(hasher.Sum32())] = struct{}{} + log.Info("genTxnKeys", zap.Any("row", row), zap.Uint32("hash", hasher.Sum32())) hasher.Reset() } } @@ -127,7 +128,7 @@ func genRowKeys(row *model.RowChangedEvent) [][]byte { if len(keys) == 0 { // use table ID as key if no key generated (no PK/UK), // no concurrence for rows in the same table. - log.Debug("Use table id as the key", zap.Int64("tableID", row.PhysicalTableID)) + log.Info("Use table id as the key", zap.Int64("tableID", row.PhysicalTableID), zap.String("tableName", row.TableInfo.GetTableName())) tableKey := make([]byte, 8) binary.BigEndian.PutUint64(tableKey, uint64(row.PhysicalTableID)) keys = [][]byte{tableKey} diff --git a/cdc/sink/dmlsink/txn/mysql/dml.go b/cdc/sink/dmlsink/txn/mysql/dml.go index 4504e7c7aff..83c2c9f9e3e 100644 --- a/cdc/sink/dmlsink/txn/mysql/dml.go +++ b/cdc/sink/dmlsink/txn/mysql/dml.go @@ -149,7 +149,7 @@ func prepareDelete(quoteTable string, cols []*model.Column, forceReplicate bool) } builder.WriteString(" LIMIT 1") sql := builder.String() - log.Info("prepareDelete", zap.String("sql", sql)) + log.Info("prepareDelete", zap.String("sql", sql), zap.Any("args", args)) return sql, args } diff --git a/cdc/sink/dmlsink/txn/mysql/mysql.go b/cdc/sink/dmlsink/txn/mysql/mysql.go index 61a2f5e327b..0a75db56572 100644 --- a/cdc/sink/dmlsink/txn/mysql/mysql.go +++ b/cdc/sink/dmlsink/txn/mysql/mysql.go @@ -553,6 +553,7 @@ func (s *mysqlBackend) prepareDMLs() *preparedDMLs { zap.String("changefeed", s.changefeed), zap.Bool("translateToInsert", translateToInsert), zap.Uint64("firstRowCommitTs", firstRow.CommitTs), + zap.String("tableName", firstRow.TableInfo.GetSchemaName()), zap.Uint64("firstRowReplicatingTs", firstRow.ReplicatingTs), zap.Bool("safeMode", s.cfg.SafeMode)) diff --git a/cmd/kafka-consumer/main.go b/cmd/kafka-consumer/main.go index 4e21ed4a834..47714e8ad7a 100644 --- a/cmd/kafka-consumer/main.go +++ b/cmd/kafka-consumer/main.go @@ -610,12 +610,11 @@ func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram log.Error("add key value to the decoder failed", zap.Error(err)) return cerror.Trace(err) } - log.Info("AddKeyValue", zap.ByteString("key", message.Key), zap.ByteString("value", message.Value)) + counter := 0 for { tp, hasNext, err := decoder.HasNext() - log.Info("decoder hasNext", zap.Bool("hasNext", hasNext), zap.Any("tp", tp)) if err != nil { log.Panic("decode message key failed", zap.Error(err)) } @@ -652,6 +651,7 @@ func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram session.MarkMessage(message, "") case model.MessageTypeRow: row, err := decoder.NextRowChangedEvent() + log.Info("RowChangedEvent received", zap.ByteString("value", message.Value), zap.Any("row", row), zap.Any("err", err)) if err != nil { log.Panic("decode message value failed", zap.ByteString("value", message.Value), @@ -746,6 +746,9 @@ func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram prometheus.NewCounter(prometheus.CounterOpts{}), )) } + for _, e := range events { + log.Info("row changed event ready to be flushed", zap.Any("event", e)) + } s, _ := sink.tableSinksMap.Load(tableID) s.(tablesink.TableSink).AppendRowChangedEvents(events...) commitTs := events[len(events)-1].CommitTs @@ -754,8 +757,6 @@ func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram sink.tablesCommitTsMap.Store(tableID, commitTs) } } - log.Debug("update partition resolved ts", - zap.Uint64("ts", ts), zap.Int32("partition", partition)) atomic.StoreUint64(&sink.resolvedTs, ts) // todo: mark the offset after the DDL is fully synced to the downstream mysql. session.MarkMessage(message, "") diff --git a/go.mod b/go.mod index 1d280d82217..484e3c53ba1 100644 --- a/go.mod +++ b/go.mod @@ -294,7 +294,7 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/mtibben/percent v0.2.1 // indirect github.com/ncw/directio v1.0.5 // indirect - github.com/ngaut/log v0.0.0-20210830112240-0124ec040aeb // indirect + github.com/ngaut/log v0.0.0-20210830112240-0124ec040aeb github.com/ngaut/pools v0.0.0-20180318154953-b7bc8c42aac7 // indirect github.com/ngaut/sync2 v0.0.0-20141008032647-7a24ed77b2ef // indirect github.com/opencontainers/runtime-spec v1.0.2 // indirect diff --git a/pkg/sink/codec/canal/canal_json_message.go b/pkg/sink/codec/canal/canal_json_message.go index e998a6a8acd..ddba78fea55 100644 --- a/pkg/sink/codec/canal/canal_json_message.go +++ b/pkg/sink/codec/canal/canal_json_message.go @@ -159,7 +159,8 @@ func canalJSONMessage2RowChange(msg canalJSONMessageInterface) (*model.RowChange result.CommitTs = msg.getCommitTs() log.Info("canalJSONMessage2RowChange meet event", zap.String("table", *msg.getTable()), - zap.Any("type", msg.eventType())) + zap.Any("type", msg.eventType()), + zap.Any("pkNames", msg.pkNameSet())) mysqlType := msg.getMySQLType() var err error if msg.eventType() == canal.EventType_DELETE { @@ -187,11 +188,27 @@ func canalJSONMessage2RowChange(msg canalJSONMessageInterface) (*model.RowChange // for `UPDATE`, `old` contain old data, set it as the `PreColumns` if msg.eventType() == canal.EventType_UPDATE { preCols, err := canalJSONColumnMap2RowChangeColumns(msg.getOld(), mysqlType) + // TODO: add a unit test + if len(preCols) < len(cols) { + newPreCols := make([]*model.Column, 0, len(preCols)) + j := 0 + for _, col := range cols { + if j < len(preCols) && col.Name == preCols[j].Name { + newPreCols = append(newPreCols, preCols[j]) + j += 1 + } else { + newPreCols = append(newPreCols, col) + } + } + preCols = newPreCols + } result.PreColumns = model.Columns2ColumnDatas(preCols, result.TableInfo) if err != nil { return nil, err } } + log.Info("canalJSONMessage2RowChange return event", + zap.Any("result", result)) return result, nil } diff --git a/pkg/sink/codec/open/open_protocol_decoder.go b/pkg/sink/codec/open/open_protocol_decoder.go index 47d17951ef1..fd740c7d23a 100644 --- a/pkg/sink/codec/open/open_protocol_decoder.go +++ b/pkg/sink/codec/open/open_protocol_decoder.go @@ -126,6 +126,7 @@ func (b *BatchDecoder) decodeNextKey() error { // HasNext implements the RowEventDecoder interface func (b *BatchDecoder) HasNext() (model.MessageType, bool, error) { + log.Info("BatchDecoder HasNext begin", zap.Bool("hasNext", b.hasNext())) if !b.hasNext() { return 0, false, nil } @@ -134,6 +135,8 @@ func (b *BatchDecoder) HasNext() (model.MessageType, bool, error) { } if b.nextKey.Type == model.MessageTypeRow { + log.Info("BatchDecoder HasNext row") + valueLen := binary.BigEndian.Uint64(b.valueBytes[:8]) value := b.valueBytes[8 : valueLen+8] b.valueBytes = b.valueBytes[valueLen+8:] @@ -198,6 +201,7 @@ func (b *BatchDecoder) NextRowChangedEvent() (*model.RowChangedEvent, error) { if b.nextKey.Type != model.MessageTypeRow { return nil, cerror.ErrOpenProtocolCodecInvalidData.GenWithStack("not found row event message") } + log.Info("BatchDecoder NextRowChangedEvent", zap.Bool("OnlyHandleKey", b.nextKey.OnlyHandleKey), zap.String("ClaimCheckLocation", b.nextKey.ClaimCheckLocation)) ctx := context.Background() // claim-check message found @@ -260,6 +264,7 @@ func (b *BatchDecoder) assembleHandleKeyOnlyEvent( ) tableInfo := handleKeyOnlyEvent.TableInfo + log.Info("assembleHandleKeyOnlyEvent", zap.Any("handleKeyOnlyEvent", handleKeyOnlyEvent)) if handleKeyOnlyEvent.IsInsert() { conditions := make(map[string]interface{}, len(handleKeyOnlyEvent.Columns)) for _, col := range handleKeyOnlyEvent.Columns { @@ -271,6 +276,8 @@ func (b *BatchDecoder) assembleHandleKeyOnlyEvent( return nil, err } columns := b.buildColumns(holder, conditions) + indexColumns := model.GetHandleAndUniqueIndexOffsets4Test(columns) + handleKeyOnlyEvent.TableInfo = model.BuildTableInfo(schema, table, columns, indexColumns) handleKeyOnlyEvent.Columns = model.Columns2ColumnDatas(columns, handleKeyOnlyEvent.TableInfo) } else if handleKeyOnlyEvent.IsDelete() { conditions := make(map[string]interface{}, len(handleKeyOnlyEvent.PreColumns)) @@ -283,6 +290,8 @@ func (b *BatchDecoder) assembleHandleKeyOnlyEvent( return nil, err } preColumns := b.buildColumns(holder, conditions) + indexColumns := model.GetHandleAndUniqueIndexOffsets4Test(preColumns) + handleKeyOnlyEvent.TableInfo = model.BuildTableInfo(schema, table, preColumns, indexColumns) handleKeyOnlyEvent.PreColumns = model.Columns2ColumnDatas(preColumns, handleKeyOnlyEvent.TableInfo) } else if handleKeyOnlyEvent.IsUpdate() { conditions := make(map[string]interface{}, len(handleKeyOnlyEvent.Columns)) @@ -295,6 +304,8 @@ func (b *BatchDecoder) assembleHandleKeyOnlyEvent( return nil, err } columns := b.buildColumns(holder, conditions) + indexColumns := model.GetHandleAndUniqueIndexOffsets4Test(columns) + handleKeyOnlyEvent.TableInfo = model.BuildTableInfo(schema, table, columns, indexColumns) handleKeyOnlyEvent.Columns = model.Columns2ColumnDatas(columns, handleKeyOnlyEvent.TableInfo) conditions = make(map[string]interface{}, len(handleKeyOnlyEvent.PreColumns)) diff --git a/pkg/sink/codec/open/open_protocol_encoder_test.go b/pkg/sink/codec/open/open_protocol_encoder_test.go index b249160feb4..55bbf4c6614 100644 --- a/pkg/sink/codec/open/open_protocol_encoder_test.go +++ b/pkg/sink/codec/open/open_protocol_encoder_test.go @@ -69,7 +69,7 @@ var ( { Name: "col1", Type: mysql.TypeVarchar, - Flag: model.HandleKeyFlag | model.PrimaryKeyFlag, + Flag: model.HandleKeyFlag | model.UniqueKeyFlag, }, { Name: "col2", @@ -127,10 +127,6 @@ var ( Name: "col15", Type: mysql.TypeBlob, }, - { - Name: "col16", - Type: mysql.TypeBlob, - }, }, [][]int{{0}}) largeTestEvent = &model.RowChangedEvent{ CommitTs: 1, @@ -196,10 +192,6 @@ var ( Name: "col15", Value: []byte("12345678910"), }, - { - Name: "col16", - Value: []byte("12345678910"), - }, }, tableInfoWithManyCols), } diff --git a/pkg/sink/codec/open/open_protocol_message.go b/pkg/sink/codec/open/open_protocol_message.go index 8e7e9ec9db2..8257909e8aa 100644 --- a/pkg/sink/codec/open/open_protocol_message.go +++ b/pkg/sink/codec/open/open_protocol_message.go @@ -19,12 +19,14 @@ import ( "sort" "strings" + "github.com/pingcap/log" timodel "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tiflow/cdc/model" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/sink/codec" "github.com/pingcap/tiflow/pkg/sink/codec/common" "github.com/pingcap/tiflow/pkg/sink/codec/internal" + "go.uber.org/zap" ) type messageRow struct { @@ -150,7 +152,8 @@ func msgToRowChange(key *internal.MessageKey, value *messageRow) *model.RowChang if len(value.Delete) != 0 { preCols := codecColumns2RowChangeColumns(value.Delete) internal.SortColumnArrays(preCols) - indexColumns := model.GetHandleIndexOffsets4Test(preCols) + indexColumns := model.GetHandleAndUniqueIndexOffsets4Test(preCols) + log.Info("msgToRowChange", zap.Any("indexColumns", indexColumns), zap.Any("preCols", preCols)) e.TableInfo = model.BuildTableInfo(key.Schema, key.Table, preCols, indexColumns) e.PreColumns = model.Columns2ColumnDatas(preCols, e.TableInfo) } else { @@ -158,7 +161,8 @@ func msgToRowChange(key *internal.MessageKey, value *messageRow) *model.RowChang preCols := codecColumns2RowChangeColumns(value.PreColumns) internal.SortColumnArrays(cols) internal.SortColumnArrays(preCols) - indexColumns := model.GetHandleIndexOffsets4Test(cols) + indexColumns := model.GetHandleAndUniqueIndexOffsets4Test(cols) + log.Info("msgToRowChange", zap.Any("indexColumns", indexColumns), zap.Any("cols", cols)) e.TableInfo = model.BuildTableInfo(key.Schema, key.Table, cols, indexColumns) e.Columns = model.Columns2ColumnDatas(cols, e.TableInfo) e.PreColumns = model.Columns2ColumnDatas(preCols, e.TableInfo) @@ -169,6 +173,7 @@ func msgToRowChange(key *internal.MessageKey, value *messageRow) *model.RowChang e.PhysicalTableID = *key.Partition e.TableInfo.TableName.IsPartition = true } + log.Info("msgToRowChange", zap.Any("event", e)) return e } diff --git a/pkg/sqlmodel/multirow_test.go b/pkg/sqlmodel/multirow_test.go index 23e6f3f92e2..890523c6910 100644 --- a/pkg/sqlmodel/multirow_test.go +++ b/pkg/sqlmodel/multirow_test.go @@ -16,7 +16,6 @@ package sqlmodel import ( "testing" - "github.com/pingcap/tiflow/cdc/model" cdcmodel "github.com/pingcap/tiflow/cdc/model" "github.com/stretchr/testify/require" ) @@ -128,7 +127,7 @@ func testGenUpdateMultiRowsWithVirtualGeneratedColumn(t *testing.T, genUpdate ge sourceTI := mockTableInfo(t, "CREATE TABLE tb1 (c INT, c1 int as (c+100) virtual not null, c2 INT, c3 INT, PRIMARY KEY (c))") targetTI := mockTableInfo(t, "CREATE TABLE tb (c INT, c1 int as (c+100) virtual not null, c2 INT, c3 INT, PRIMARY KEY (c))") - sourceTI = model.BuildTiDBTableInfoWithoutVirtualColumns(sourceTI) + sourceTI = cdcmodel.BuildTiDBTableInfoWithoutVirtualColumns(sourceTI) change1 := NewRowChange(source, target, []interface{}{1, 2, 3}, []interface{}{10, 20, 30}, sourceTI, targetTI, nil) change2 := NewRowChange(source, target, []interface{}{4, 5, 6}, []interface{}{40, 50, 60}, sourceTI, targetTI, nil) @@ -161,7 +160,7 @@ func testGenUpdateMultiRowsWithVirtualGeneratedColumns(t *testing.T, genUpdate g targetTI := mockTableInfo(t, `CREATE TABLE tb (c0 int as (c4*c4) virtual not null, c1 int as (c+100) virtual not null, c2 INT, c3 INT, c4 INT, PRIMARY KEY (c4))`) - sourceTI = model.BuildTiDBTableInfoWithoutVirtualColumns(sourceTI) + sourceTI = cdcmodel.BuildTiDBTableInfoWithoutVirtualColumns(sourceTI) change1 := NewRowChange(source, target, []interface{}{2, 3, 1}, []interface{}{20, 30, 10}, sourceTI, targetTI, nil) change2 := NewRowChange(source, target, []interface{}{5, 6, 4}, []interface{}{50, 60, 40}, sourceTI, targetTI, nil) @@ -223,8 +222,8 @@ func TestGenInsertMultiRows(t *testing.T) { sourceTI2 := mockTableInfo(t, "CREATE TABLE tb2 (gen INT AS (c+1), c INT PRIMARY KEY, c2 INT)") targetTI := mockTableInfo(t, "CREATE TABLE tb (gen INT AS (c+1), c INT PRIMARY KEY, c2 INT)") - sourceTI1 = model.BuildTiDBTableInfoWithoutVirtualColumns(sourceTI1) - sourceTI2 = model.BuildTiDBTableInfoWithoutVirtualColumns(sourceTI2) + sourceTI1 = cdcmodel.BuildTiDBTableInfoWithoutVirtualColumns(sourceTI1) + sourceTI2 = cdcmodel.BuildTiDBTableInfoWithoutVirtualColumns(sourceTI2) change1 := NewRowChange(source1, target, nil, []interface{}{1, 2}, sourceTI1, targetTI, nil) change2 := NewRowChange(source2, target, nil, []interface{}{3, 4}, sourceTI2, targetTI, nil) diff --git a/pkg/sqlmodel/row_change_test.go b/pkg/sqlmodel/row_change_test.go index 8c4d6ad16d2..47bf0ebfe87 100644 --- a/pkg/sqlmodel/row_change_test.go +++ b/pkg/sqlmodel/row_change_test.go @@ -22,7 +22,6 @@ import ( "github.com/pingcap/tidb/pkg/parser/charset" timodel "github.com/pingcap/tidb/pkg/parser/model" timock "github.com/pingcap/tidb/pkg/util/mock" - "github.com/pingcap/tiflow/cdc/model" cdcmodel "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/dm/pkg/log" "github.com/pingcap/tiflow/dm/pkg/utils" @@ -353,7 +352,7 @@ func TestGenInsert(t *testing.T) { for _, c := range cases { sourceTI := mockTableInfo(t, c.sourceCreateSQL) targetTI := mockTableInfo(t, c.targetCreateSQL) - sourceTI = model.BuildTiDBTableInfoWithoutVirtualColumns(sourceTI) + sourceTI = cdcmodel.BuildTiDBTableInfoWithoutVirtualColumns(sourceTI) change := NewRowChange(source, target, nil, c.postValues, sourceTI, targetTI, nil) sql, args := change.GenSQL(DMLInsert) require.Equal(t, c.expectedInsertSQL, sql) diff --git a/tests/integration_tests/cdc/dailytest/case.go b/tests/integration_tests/cdc/dailytest/case.go index 3dd44770ad7..22d2ed8528f 100644 --- a/tests/integration_tests/cdc/dailytest/case.go +++ b/tests/integration_tests/cdc/dailytest/case.go @@ -437,6 +437,7 @@ func updatePKUK(db *sql.DB, opNum int) error { uk := rand.Intn(maxKey) v := rand.Intn(10000) sql = fmt.Sprintf("insert into pkuk(pk, uk, v) values(%d,%d,%d)", pk, uk, v) + log.S().Info(sql) case 1: if len(pks) == 0 || len(pks) == maxKey { continue @@ -446,12 +447,14 @@ func updatePKUK(db *sql.DB, opNum int) error { uk := rand.Intn(maxKey) v := rand.Intn(10000) sql = fmt.Sprintf("update pkuk set pk = %d, uk = %d, v = %d where pk = %d", pk, uk, v, oldPK) + log.S().Info(sql) case 2: if len(pks) == 0 { continue } oldPK = genOldPk() sql = fmt.Sprintf("delete from pkuk where pk = %d", oldPK) + log.S().Info(sql) } _, err := db.Exec(sql) diff --git a/tests/integration_tests/cdc/run.sh b/tests/integration_tests/cdc/run.sh index 8077d309dda..6a97eca8863 100755 --- a/tests/integration_tests/cdc/run.sh +++ b/tests/integration_tests/cdc/run.sh @@ -20,14 +20,16 @@ function prepare() { TOPIC_NAME="ticdc-cdc-test-$RANDOM" case $SINK_TYPE in - kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + # kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=1&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;; pulsar) SINK_URI="pulsar://127.0.0.1:6650/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;; *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/" ;; esac run_cdc_cli changefeed create --sink-uri="$SINK_URI" case $SINK_TYPE in - kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + # kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=1&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;; pulsar) run_pulsar_consumer $WORK_DIR $SINK_URI ;; esac