diff --git a/cdc/entry/mounter_test.go b/cdc/entry/mounter_test.go index c6081656078..d4f072cee70 100644 --- a/cdc/entry/mounter_test.go +++ b/cdc/entry/mounter_test.go @@ -14,6 +14,7 @@ package entry import ( + "bytes" "context" "strings" "testing" @@ -21,13 +22,19 @@ import ( "github.com/pingcap/log" ticonfig "github.com/pingcap/tidb/config" + "github.com/pingcap/tidb/ddl" + "github.com/pingcap/tidb/executor" tidbkv "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/meta/autoid" + "github.com/pingcap/tidb/parser" + "github.com/pingcap/tidb/parser/ast" timodel "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/session" "github.com/pingcap/tidb/store/mockstore" "github.com/pingcap/tidb/testkit" "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/mock" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/config" pfilter "github.com/pingcap/tiflow/pkg/filter" @@ -1087,3 +1094,72 @@ func TestDecodeEventIgnoreRow(t *testing.T) { decodeAndCheckRowInTable(tableInfo.ID, toRawKV) } } + +func TestBuildTableInfo(t *testing.T) { + cases := []struct { + origin string + recovered string + }{ + { + "CREATE TABLE t1 (c INT PRIMARY KEY)", + "CREATE TABLE `BuildTiDBTableInfo` (\n" + + " `c` int(0) NOT NULL,\n" + + " PRIMARY KEY (`c`(0)) /*T![clustered_index] CLUSTERED */\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin", + }, + { + "CREATE TABLE t1 (" + + " c INT UNSIGNED," + + " c2 VARCHAR(10) NOT NULL," + + " c3 BIT(10) NOT NULL," + + " UNIQUE KEY (c2, c3)" + + ")", + // CDC discards field length. + "CREATE TABLE `BuildTiDBTableInfo` (\n" + + " `c` int(0) unsigned DEFAULT NULL,\n" + + " `c2` varchar(0) NOT NULL,\n" + + " `c3` bit(0) NOT NULL,\n" + + " UNIQUE KEY `idx_0` (`c2`(0),`c3`(0))\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin", + }, + { + "CREATE TABLE t1 (" + + " c INT UNSIGNED," + + " gen INT AS (c+1) VIRTUAL," + + " c2 VARCHAR(10) NOT NULL," + + " gen2 INT AS (c+2) STORED," + + " c3 BIT(10) NOT NULL," + + " PRIMARY KEY (c, c2)" + + ")", + // CDC discards virtual generated column, and generating expression of stored generated column. + "CREATE TABLE `BuildTiDBTableInfo` (\n" + + " `c` int(0) unsigned NOT NULL,\n" + + " `c2` varchar(0) NOT NULL,\n" + + " `gen2` int(0) GENERATED ALWAYS AS (pass_generated_check) STORED,\n" + + " `c3` bit(0) NOT NULL,\n" + + " PRIMARY KEY (`c`(0),`c2`(0)) /*T![clustered_index] CLUSTERED */\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin", + }, + } + p := parser.New() + for _, c := range cases { + stmt, err := p.ParseOneStmt(c.origin, "", "") + require.NoError(t, err) + originTI, err := ddl.BuildTableInfoFromAST(stmt.(*ast.CreateTableStmt)) + require.NoError(t, err) + cdcTableInfo := model.WrapTableInfo(0, "test", 0, originTI) + cols, _, err := datum2Column(cdcTableInfo, map[int64]types.Datum{}, true) + require.NoError(t, err) + recoveredTI := model.BuildTiDBTableInfo(cols, cdcTableInfo.IndexColumnsOffset) + require.Equal(t, c.recovered, showCreateTable(t, recoveredTI)) + } +} + +var tiCtx = mock.NewContext() + +func showCreateTable(t *testing.T, ti *timodel.TableInfo) string { + result := bytes.NewBuffer(make([]byte, 0, 512)) + err := executor.ConstructResultOfShowCreateTable(tiCtx, ti, autoid.Allocators{}, result) + require.NoError(t, err) + return result.String() +} diff --git a/cdc/model/schema_storage.go b/cdc/model/schema_storage.go index 0a29f0486a2..1a66a05d18f 100644 --- a/cdc/model/schema_storage.go +++ b/cdc/model/schema_storage.go @@ -207,7 +207,7 @@ func (ti *TableInfo) initColumnsFlag() { // See https://dev.mysql.com/doc/refman/5.7/en/show-columns.html. // Yet if an index has multiple columns, we would like to easily determine that all those columns are indexed, // which is crucial for the completeness of the information we pass to the downstream. - // Therefore, instead of using the MySql standard, + // Therefore, instead of using the MySQL standard, // we made our own decision to mark all columns in an index with the appropriate flag(s). for _, idxInfo := range ti.Indices { for _, idxCol := range idxInfo.Columns { diff --git a/cdc/model/sink.go b/cdc/model/sink.go index 24bd1318ec7..291a464e1fa 100644 --- a/cdc/model/sink.go +++ b/cdc/model/sink.go @@ -21,6 +21,7 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/util/rowcodec" "github.com/pingcap/tiflow/pkg/quotes" "github.com/pingcap/tiflow/pkg/util" @@ -442,6 +443,97 @@ type RedoColumn struct { Flag uint64 `msg:"flag"` } +// BuildTiDBTableInfo builds a TiDB TableInfo from given information. +func BuildTiDBTableInfo(columns []*Column, indexColumns [][]int) *model.TableInfo { + ret := &model.TableInfo{} + // nowhere will use this field, so we set a debug message + ret.Name = model.NewCIStr("BuildTiDBTableInfo") + + for i, col := range columns { + columnInfo := &model.ColumnInfo{ + Offset: i, + State: model.StatePublic, + } + if col == nil { + // by referring to datum2Column, nil is happened when + // - !IsColCDCVisible, which means the column is a virtual generated + // column + // - !exist && !fillWithDefaultValue, which means upstream does not + // send the column value + // just mock for the first case + columnInfo.Name = model.NewCIStr("omitted") + columnInfo.GeneratedExprString = "pass_generated_check" + columnInfo.GeneratedStored = false + ret.Columns = append(ret.Columns, columnInfo) + continue + } + columnInfo.Name = model.NewCIStr(col.Name) + columnInfo.SetType(col.Type) + // TiKV always use utf8mb4 to store, and collation is not recorded by CDC + columnInfo.SetCharset(mysql.UTF8MB4Charset) + columnInfo.SetCollate(mysql.UTF8MB4DefaultCollation) + + // inverse initColumnsFlag + flag := col.Flag + if flag.IsBinary() { + columnInfo.SetCharset("binary") + } + if flag.IsGeneratedColumn() { + // we do not use this field, so we set it to any non-empty string + columnInfo.GeneratedExprString = "pass_generated_check" + columnInfo.GeneratedStored = true + } + if flag.IsHandleKey() { + columnInfo.AddFlag(mysql.PriKeyFlag) + ret.IsCommonHandle = true + } else if flag.IsPrimaryKey() { + columnInfo.AddFlag(mysql.PriKeyFlag) + } + if flag.IsUniqueKey() { + columnInfo.AddFlag(mysql.UniqueKeyFlag) + } + if !flag.IsNullable() { + columnInfo.AddFlag(mysql.NotNullFlag) + } + if flag.IsMultipleKey() { + columnInfo.AddFlag(mysql.MultipleKeyFlag) + } + if flag.IsUnsigned() { + columnInfo.AddFlag(mysql.UnsignedFlag) + } + ret.Columns = append(ret.Columns, columnInfo) + } + + for i, colOffsets := range indexColumns { + indexInfo := &model.IndexInfo{ + Name: model.NewCIStr(fmt.Sprintf("idx_%d", i)), + State: model.StatePublic, + } + firstCol := columns[colOffsets[0]] + if firstCol.Flag.IsPrimaryKey() { + indexInfo.Primary = true + } + if firstCol.Flag.IsUniqueKey() { + indexInfo.Unique = true + } + + for _, offset := range colOffsets { + col := ret.Columns[offset] + + indexCol := &model.IndexColumn{} + indexCol.Name = col.Name + indexCol.Offset = offset + indexInfo.Columns = append(indexInfo.Columns, indexCol) + } + + // TODO: revert the "all column set index related flag" to "only the + // first column set index related flag" if needed + + ret.Indices = append(ret.Indices, indexInfo) + } + return ret +} + // ColumnValueString returns the string representation of the column value func ColumnValueString(c interface{}) string { var data string