Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pkg(both): a separate library for DML row change #4376

Merged
merged 13 commits into from
Feb 8, 2022
4 changes: 3 additions & 1 deletion dm/pkg/log/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ var (

// InitLogger initializes DM's and also the TiDB library's loggers.
func InitLogger(cfg *Config) error {
inDev := strings.ToLower(cfg.Level) == "debug"
amyangfei marked this conversation as resolved.
Show resolved Hide resolved
// init DM logger
logger, props, err := pclog.InitLogger(&pclog.Config{
Level: cfg.Level,
Expand All @@ -114,6 +115,7 @@ func InitLogger(cfg *Config) error {
MaxDays: cfg.FileMaxDays,
MaxBackups: cfg.FileMaxBackups,
},
Development: inDev,
})
if err != nil {
return terror.ErrInitLoggerFail.Delegate(err)
Expand All @@ -125,7 +127,7 @@ func InitLogger(cfg *Config) error {
appLevel = props.Level
appProps = props
// init and set tidb slow query logger to stdout if log level is debug
if cfg.Level == "debug" {
if inDev {
slowQueryLogger := zap.NewExample()
slowQueryLogger = slowQueryLogger.With(zap.String("component", "slow query logger"))
logutil.SlowQueryLogger = slowQueryLogger
Expand Down
25 changes: 15 additions & 10 deletions dm/pkg/schema/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ func (tr *Tracker) GetDownStreamTableInfo(tctx *tcontext.Context, tableID string
return nil, err
}

dti = GetDownStreamTi(ti, originTi)
dti = GetDownStreamTI(ti, originTi)
tr.dsTracker.tableInfos[tableID] = dti
}
return dti, nil
Expand All @@ -412,17 +412,22 @@ func (tr *Tracker) GetDownStreamTableInfo(tctx *tcontext.Context, tableID string
// GetAvailableDownStreamUKIndexInfo gets available downstream UK whose data is not null.
// note. this function will not init downstreamTrack.
func (tr *Tracker) GetAvailableDownStreamUKIndexInfo(tableID string, data []interface{}) *model.IndexInfo {
dti, ok := tr.dsTracker.tableInfos[tableID]
dti := tr.dsTracker.tableInfos[tableID]

return GetIdentityUKByData(dti, data)
}

if !ok || len(dti.AvailableUKIndexList) == 0 {
// GetIdentityUKByData gets available downstream UK whose data is not null.
func GetIdentityUKByData(downstreamTI *DownstreamTableInfo, data []interface{}) *model.IndexInfo {
if downstreamTI == nil || len(downstreamTI.AvailableUKIndexList) == 0 {
return nil
}
// func for check data is not null
fn := func(i int) bool {
return data[i] != nil
}

for _, uk := range dti.AvailableUKIndexList {
for _, uk := range downstreamTI.AvailableUKIndexList {
// check uk's column data is not null
if isSpecifiedIndexColumn(uk, fn) {
return uk
Expand Down Expand Up @@ -499,8 +504,8 @@ func (tr *Tracker) initDownStreamSQLModeAndParser(tctx *tcontext.Context) error
return nil
}

// GetDownStreamTi constructs downstreamTable index cache by tableinfo.
func GetDownStreamTi(ti *model.TableInfo, originTi *model.TableInfo) *DownstreamTableInfo {
// GetDownStreamTI constructs downstreamTable index cache by tableinfo.
func GetDownStreamTI(downstreamTI *model.TableInfo, originTi *model.TableInfo) *DownstreamTableInfo {
var (
absoluteUKIndexInfo *model.IndexInfo
availableUKIndexList = []*model.IndexInfo{}
Expand All @@ -510,10 +515,10 @@ func GetDownStreamTi(ti *model.TableInfo, originTi *model.TableInfo) *Downstream

// func for check not null constraint
fn := func(i int) bool {
return mysql.HasNotNullFlag(ti.Columns[i].Flag)
return mysql.HasNotNullFlag(downstreamTI.Columns[i].Flag)
}

for i, idx := range ti.Indices {
for i, idx := range downstreamTI.Indices {
if !idx.Primary && !idx.Unique {
continue
}
Expand All @@ -536,7 +541,7 @@ func GetDownStreamTi(ti *model.TableInfo, originTi *model.TableInfo) *Downstream
// handle pk exceptional case.
// e.g. "create table t(a int primary key, b int)".
if !hasPk {
exPk := redirectIndexKeys(handlePkExCase(ti), originTi)
exPk := redirectIndexKeys(handlePkExCase(downstreamTI), originTi)
if exPk != nil {
absoluteUKIndexInfo = exPk
absoluteUKPosition = len(availableUKIndexList)
Expand All @@ -550,7 +555,7 @@ func GetDownStreamTi(ti *model.TableInfo, originTi *model.TableInfo) *Downstream
}

return &DownstreamTableInfo{
TableInfo: ti,
TableInfo: downstreamTI,
AbsoluteUKIndexInfo: absoluteUKIndexInfo,
AvailableUKIndexList: availableUKIndexList,
}
Expand Down
7 changes: 7 additions & 0 deletions dm/pkg/utils/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ import (
"github.com/pingcap/tiflow/dm/pkg/terror"
)

func init() {
ZeroSessionCtx = NewSessionCtx(nil)
}

// TrimCtrlChars returns a slice of the string s with all leading
// and trailing control characters removed.
func TrimCtrlChars(s string) string {
Expand Down Expand Up @@ -322,6 +326,9 @@ func (se *session) GetBuiltinFunctionUsage() map[string]uint32 {
return se.builtinFunctionUsage
}

// ZeroSessionCtx is used when the session variables is not important.
var ZeroSessionCtx sessionctx.Context

// NewSessionCtx return a session context with specified session variables.
func NewSessionCtx(vars map[string]string) sessionctx.Context {
variables := variable.NewSessionVars()
Expand Down
4 changes: 2 additions & 2 deletions dm/syncer/causality_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (s *testSyncerSuite) TestCasuality(c *C) {
Length: types.UnspecifiedLength,
}},
}
downTi := schema.GetDownStreamTi(ti, ti)
downTi := schema.GetDownStreamTI(ti, ti)
c.Assert(downTi, NotNil)

jobCh := make(chan *job, 10)
Expand Down Expand Up @@ -152,7 +152,7 @@ func (s *testSyncerSuite) TestCasualityWithPrefixIndex(c *C) {
schemaStr := "create table t (c1 text, c2 int unique, unique key c1(c1(3)));"
ti, err := createTableInfo(p, se, int64(0), schemaStr)
c.Assert(err, IsNil)
downTi := schema.GetDownStreamTi(ti, ti)
downTi := schema.GetDownStreamTI(ti, ti)
c.Assert(downTi, NotNil)
c.Assert(len(downTi.AvailableUKIndexList) == 2, IsTrue)
tiIndex := downTi.AvailableUKIndexList[0]
Expand Down
4 changes: 2 additions & 2 deletions dm/syncer/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (s *testSyncerSuite) TestCompactJob(c *C) {
Length: types.UnspecifiedLength,
}},
}
downTi := schema.GetDownStreamTi(ti, ti)
downTi := schema.GetDownStreamTI(ti, ti)
c.Assert(downTi, NotNil)

var dml *DML
Expand Down Expand Up @@ -208,7 +208,7 @@ func (s *testSyncerSuite) TestCompactorSafeMode(c *C) {
Length: types.UnspecifiedLength,
}},
}
downTi := schema.GetDownStreamTi(ti, ti)
downTi := schema.GetDownStreamTI(ti, ti)
c.Assert(downTi, NotNil)

testCases := []struct {
Expand Down
4 changes: 2 additions & 2 deletions dm/syncer/dml_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func (s *testSyncerSuite) TestGenMultipleKeys(c *C) {

ti, err := createTableInfo(p, se, int64(i+1), tc.schema)
assert(err, IsNil)
dti := schema.GetDownStreamTi(ti, ti)
dti := schema.GetDownStreamTI(ti, ti)
assert(dti, NotNil)
keys := genMultipleKeys(sessCtx, dti, ti, tc.values, "table")
assert(keys, DeepEquals, tc.keys)
Expand Down Expand Up @@ -619,7 +619,7 @@ func (s *testSyncerSuite) TestTruncateIndexValues(c *C) {
}
ti, err := createTableInfo(p, se, int64(i+1), tc.schema)
assert(err, IsNil)
dti := schema.GetDownStreamTi(ti, ti)
dti := schema.GetDownStreamTI(ti, ti)
assert(dti, NotNil)
assert(dti.AvailableUKIndexList, NotNil)
cols := make([]*model.ColumnInfo, 0, len(dti.AvailableUKIndexList[0].Columns))
Expand Down
4 changes: 3 additions & 1 deletion dm/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -871,7 +871,9 @@ func (s *Syncer) updateReplicationLagMetric() {
func (s *Syncer) saveTablePoint(table *filter.Table, location binlog.Location) {
ti, err := s.schemaTracker.GetTableInfo(table)
if err != nil && table.Name != "" {
s.tctx.L().DPanic("table info missing from schema tracker",
// TODO: if we RENAME tb1 TO tb2, the tracker will remove TableInfo of tb1 but we still save the table
// checkpoint for tb1. We can delete the table checkpoint in future.
s.tctx.L().Warn("table info missing from schema tracker",
zap.Stringer("table", table),
zap.Stringer("location", location),
zap.Error(err))
Expand Down
166 changes: 166 additions & 0 deletions pkg/sqlmodel/causality.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package sqlmodel

import (
"fmt"
"strconv"
"strings"

timodel "github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/tablecodec"
"go.uber.org/zap"

"github.com/pingcap/tiflow/dm/pkg/log"
"github.com/pingcap/tiflow/dm/pkg/utils"
)

// CausalityKeys returns all string representation of causality keys. If two row
// changes has the same causality keys, they must be replicated sequentially.
func (r *RowChange) CausalityKeys() []string {
r.lazyInitIdentityInfo()

ret := make([]string, 0, 1)
if r.preValues != nil {
ret = append(ret, r.getCausalityString(r.preValues)...)
}
if r.postValues != nil {
ret = append(ret, r.getCausalityString(r.postValues)...)
}
return ret
}

func columnValue2String(value interface{}) string {
var data string
switch v := value.(type) {
case nil:
data = "null"
case bool:
if v {
data = "1"
} else {
data = "0"
}
case int:
data = strconv.FormatInt(int64(v), 10)
case int8:
data = strconv.FormatInt(int64(v), 10)
case int16:
data = strconv.FormatInt(int64(v), 10)
case int32:
data = strconv.FormatInt(int64(v), 10)
case int64:
data = strconv.FormatInt(v, 10)
case uint8:
data = strconv.FormatUint(uint64(v), 10)
case uint16:
data = strconv.FormatUint(uint64(v), 10)
case uint32:
data = strconv.FormatUint(uint64(v), 10)
case uint64:
data = strconv.FormatUint(v, 10)
case float32:
data = strconv.FormatFloat(float64(v), 'f', -1, 32)
case float64:
data = strconv.FormatFloat(v, 'f', -1, 64)
case string:
data = v
case []byte:
data = string(v)
default:
data = fmt.Sprintf("%v", v)
}

return data
}

func genKeyString(
table string,
columns []*timodel.ColumnInfo,
values []interface{},
) string {
var buf strings.Builder
for i, data := range values {
if data == nil {
log.L().Debug("ignore null value",
zap.String("column", columns[i].Name.O),
zap.String("table", table))
continue // ignore `null` value.
}
// one column key looks like:`column_val.column_name.`
buf.WriteString(columnValue2String(data))
buf.WriteString(".")
buf.WriteString(columns[i].Name.L)
buf.WriteString(".")
}
if buf.Len() == 0 {
log.L().Debug("all value are nil, no key generated",
zap.String("table", table))
return "" // all values are `null`.
}
buf.WriteString(table)
return buf.String()
}

// truncateIndexValues truncate prefix index from data.
func truncateIndexValues(
ctx sessionctx.Context,
ti *timodel.TableInfo,
indexColumns *timodel.IndexInfo,
tiColumns []*timodel.ColumnInfo,
data []interface{},
) []interface{} {
values := make([]interface{}, 0, len(indexColumns.Columns))
datums, err := utils.AdjustBinaryProtocolForDatum(ctx, data, tiColumns)
if err != nil {
log.L().Warn("adjust binary protocol for datum error", zap.Error(err))
return data
}
tablecodec.TruncateIndexValues(ti, indexColumns, datums)
for _, datum := range datums {
values = append(values, datum.GetValue())
}
return values
}

func (r *RowChange) getCausalityString(values []interface{}) []string {
pkAndUks := r.identityInfo.AvailableUKIndexList
if len(pkAndUks) == 0 {
// the table has no PK/UK, all values of the row consists the causality key
return []string{genKeyString(r.sourceTable.String(), r.sourceTableInfo.Columns, values)}
}

ret := make([]string, 0, len(pkAndUks))

for _, indexCols := range pkAndUks {
cols, vals := getColsAndValuesOfIdx(r.sourceTableInfo.Columns, indexCols, values)
// handle prefix index
truncVals := truncateIndexValues(r.tiSessionCtx, r.sourceTableInfo, indexCols, cols, vals)
key := genKeyString(r.sourceTable.String(), cols, truncVals)
if len(key) > 0 { // ignore `null` value.
ret = append(ret, key)
} else {
log.L().Debug("ignore empty key", zap.String("table", r.sourceTable.String()))
}
}

if len(ret) == 0 {
// the table has no PK/UK, or all UK are NULL. all values of the row
// consists the causality key
return []string{genKeyString(r.sourceTable.String(), r.sourceTableInfo.Columns, values)}
}

return ret
}
Loading