Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

model(cdc): add a function to recover TiDB TableInfo #7480

Merged
merged 4 commits into from
Nov 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 76 additions & 0 deletions cdc/entry/mounter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,27 @@
package entry

import (
"bytes"
"context"
"strings"
"testing"
"time"

"github.com/pingcap/log"
ticonfig "github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/executor"
tidbkv "github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/ast"
timodel "github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/mock"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
pfilter "github.com/pingcap/tiflow/pkg/filter"
Expand Down Expand Up @@ -1087,3 +1094,72 @@ func TestDecodeEventIgnoreRow(t *testing.T) {
decodeAndCheckRowInTable(tableInfo.ID, toRawKV)
}
}

func TestBuildTableInfo(t *testing.T) {
cases := []struct {
origin string
recovered string
}{
{
"CREATE TABLE t1 (c INT PRIMARY KEY)",
"CREATE TABLE `BuildTiDBTableInfo` (\n" +
" `c` int(0) NOT NULL,\n" +
" PRIMARY KEY (`c`(0)) /*T![clustered_index] CLUSTERED */\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin",
},
{
"CREATE TABLE t1 (" +
" c INT UNSIGNED," +
" c2 VARCHAR(10) NOT NULL," +
" c3 BIT(10) NOT NULL," +
" UNIQUE KEY (c2, c3)" +
")",
// CDC discards field length.
"CREATE TABLE `BuildTiDBTableInfo` (\n" +
" `c` int(0) unsigned DEFAULT NULL,\n" +
" `c2` varchar(0) NOT NULL,\n" +
" `c3` bit(0) NOT NULL,\n" +
" UNIQUE KEY `idx_0` (`c2`(0),`c3`(0))\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin",
},
{
"CREATE TABLE t1 (" +
" c INT UNSIGNED," +
" gen INT AS (c+1) VIRTUAL," +
" c2 VARCHAR(10) NOT NULL," +
" gen2 INT AS (c+2) STORED," +
" c3 BIT(10) NOT NULL," +
" PRIMARY KEY (c, c2)" +
")",
// CDC discards virtual generated column, and generating expression of stored generated column.
"CREATE TABLE `BuildTiDBTableInfo` (\n" +
" `c` int(0) unsigned NOT NULL,\n" +
" `c2` varchar(0) NOT NULL,\n" +
" `gen2` int(0) GENERATED ALWAYS AS (pass_generated_check) STORED,\n" +
" `c3` bit(0) NOT NULL,\n" +
" PRIMARY KEY (`c`(0),`c2`(0)) /*T![clustered_index] CLUSTERED */\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin",
},
}
p := parser.New()
for _, c := range cases {
stmt, err := p.ParseOneStmt(c.origin, "", "")
require.NoError(t, err)
originTI, err := ddl.BuildTableInfoFromAST(stmt.(*ast.CreateTableStmt))
require.NoError(t, err)
cdcTableInfo := model.WrapTableInfo(0, "test", 0, originTI)
cols, _, err := datum2Column(cdcTableInfo, map[int64]types.Datum{}, true)
require.NoError(t, err)
recoveredTI := model.BuildTiDBTableInfo(cols, cdcTableInfo.IndexColumnsOffset)
require.Equal(t, c.recovered, showCreateTable(t, recoveredTI))
}
}

var tiCtx = mock.NewContext()

func showCreateTable(t *testing.T, ti *timodel.TableInfo) string {
result := bytes.NewBuffer(make([]byte, 0, 512))
err := executor.ConstructResultOfShowCreateTable(tiCtx, ti, autoid.Allocators{}, result)
require.NoError(t, err)
return result.String()
}
2 changes: 1 addition & 1 deletion cdc/model/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func (ti *TableInfo) initColumnsFlag() {
// See https://dev.mysql.com/doc/refman/5.7/en/show-columns.html.
// Yet if an index has multiple columns, we would like to easily determine that all those columns are indexed,
// which is crucial for the completeness of the information we pass to the downstream.
// Therefore, instead of using the MySql standard,
// Therefore, instead of using the MySQL standard,
// we made our own decision to mark all columns in an index with the appropriate flag(s).
for _, idxInfo := range ti.Indices {
for _, idxCol := range idxInfo.Columns {
Expand Down
92 changes: 92 additions & 0 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/pingcap/log"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/util/rowcodec"
"github.com/pingcap/tiflow/pkg/quotes"
"github.com/pingcap/tiflow/pkg/util"
Expand Down Expand Up @@ -442,6 +443,97 @@ type RedoColumn struct {
Flag uint64 `msg:"flag"`
}

// BuildTiDBTableInfo builds a TiDB TableInfo from given information.
func BuildTiDBTableInfo(columns []*Column, indexColumns [][]int) *model.TableInfo {
ret := &model.TableInfo{}
// nowhere will use this field, so we set a debug message
ret.Name = model.NewCIStr("BuildTiDBTableInfo")

for i, col := range columns {
columnInfo := &model.ColumnInfo{
Offset: i,
State: model.StatePublic,
}
if col == nil {
// by referring to datum2Column, nil is happened when
// - !IsColCDCVisible, which means the column is a virtual generated
// column
// - !exist && !fillWithDefaultValue, which means upstream does not
// send the column value
// just mock for the first case
columnInfo.Name = model.NewCIStr("omitted")
columnInfo.GeneratedExprString = "pass_generated_check"
columnInfo.GeneratedStored = false
ret.Columns = append(ret.Columns, columnInfo)
continue
}
columnInfo.Name = model.NewCIStr(col.Name)
columnInfo.SetType(col.Type)
// TiKV always use utf8mb4 to store, and collation is not recorded by CDC
columnInfo.SetCharset(mysql.UTF8MB4Charset)
columnInfo.SetCollate(mysql.UTF8MB4DefaultCollation)

// inverse initColumnsFlag
flag := col.Flag
if flag.IsBinary() {
columnInfo.SetCharset("binary")
}
if flag.IsGeneratedColumn() {
// we do not use this field, so we set it to any non-empty string
columnInfo.GeneratedExprString = "pass_generated_check"
columnInfo.GeneratedStored = true
}
if flag.IsHandleKey() {
columnInfo.AddFlag(mysql.PriKeyFlag)
ret.IsCommonHandle = true
} else if flag.IsPrimaryKey() {
columnInfo.AddFlag(mysql.PriKeyFlag)
}
if flag.IsUniqueKey() {
columnInfo.AddFlag(mysql.UniqueKeyFlag)
}
if !flag.IsNullable() {
columnInfo.AddFlag(mysql.NotNullFlag)
}
if flag.IsMultipleKey() {
columnInfo.AddFlag(mysql.MultipleKeyFlag)
}
if flag.IsUnsigned() {
columnInfo.AddFlag(mysql.UnsignedFlag)
}
ret.Columns = append(ret.Columns, columnInfo)
}

for i, colOffsets := range indexColumns {
indexInfo := &model.IndexInfo{
Name: model.NewCIStr(fmt.Sprintf("idx_%d", i)),
State: model.StatePublic,
}
firstCol := columns[colOffsets[0]]
if firstCol.Flag.IsPrimaryKey() {
indexInfo.Primary = true
}
if firstCol.Flag.IsUniqueKey() {
indexInfo.Unique = true
}

for _, offset := range colOffsets {
col := ret.Columns[offset]

indexCol := &model.IndexColumn{}
indexCol.Name = col.Name
indexCol.Offset = offset
indexInfo.Columns = append(indexInfo.Columns, indexCol)
}

// TODO: revert the "all column set index related flag" to "only the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this TODO item need to be done in this PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I'll test if this reversion is really needed in future PR

// first column set index related flag" if needed

ret.Indices = append(ret.Indices, indexInfo)
}
return ret
}

// ColumnValueString returns the string representation of the column value
func ColumnValueString(c interface{}) string {
var data string
Expand Down