diff --git a/dm/pkg/schema/tracker.go b/dm/pkg/schema/tracker.go index 3c15c3eb05f..7c0fef17949 100644 --- a/dm/pkg/schema/tracker.go +++ b/dm/pkg/schema/tracker.go @@ -387,7 +387,7 @@ func (tr *Tracker) GetDownStreamTableInfo(tctx *tcontext.Context, tableID string return nil, err } - dti = GetDownStreamTi(ti, originTi) + dti = GetDownStreamTI(ti, originTi) tr.dsTracker.tableInfos[tableID] = dti } return dti, nil @@ -396,9 +396,14 @@ func (tr *Tracker) GetDownStreamTableInfo(tctx *tcontext.Context, tableID string // GetAvailableDownStreamUKIndexInfo gets available downstream UK whose data is not null. // note. this function will not init downstreamTrack. func (tr *Tracker) GetAvailableDownStreamUKIndexInfo(tableID string, data []interface{}) *model.IndexInfo { - dti, ok := tr.dsTracker.tableInfos[tableID] + dti := tr.dsTracker.tableInfos[tableID] + + return GetIdentityUKByData(dti, data) +} - if !ok || len(dti.AvailableUKIndexList) == 0 { +// GetIdentityUKByData gets available downstream UK whose data is not null. +func GetIdentityUKByData(downstreamTI *DownstreamTableInfo, data []interface{}) *model.IndexInfo { + if downstreamTI == nil || len(downstreamTI.AvailableUKIndexList) == 0 { return nil } // func for check data is not null @@ -406,7 +411,7 @@ func (tr *Tracker) GetAvailableDownStreamUKIndexInfo(tableID string, data []inte return data[i] != nil } - for _, uk := range dti.AvailableUKIndexList { + for _, uk := range downstreamTI.AvailableUKIndexList { // check uk's column data is not null if isSpecifiedIndexColumn(uk, fn) { return uk @@ -483,8 +488,8 @@ func (tr *Tracker) initDownStreamSQLModeAndParser(tctx *tcontext.Context) error return nil } -// GetDownStreamTi constructs downstreamTable index cache by tableinfo. -func GetDownStreamTi(ti *model.TableInfo, originTi *model.TableInfo) *DownstreamTableInfo { +// GetDownStreamTI constructs downstreamTable index cache by tableinfo. +func GetDownStreamTI(downstreamTI *model.TableInfo, originTi *model.TableInfo) *DownstreamTableInfo { var ( absoluteUKIndexInfo *model.IndexInfo availableUKIndexList = []*model.IndexInfo{} @@ -494,10 +499,10 @@ func GetDownStreamTi(ti *model.TableInfo, originTi *model.TableInfo) *Downstream // func for check not null constraint fn := func(i int) bool { - return mysql.HasNotNullFlag(ti.Columns[i].Flag) + return mysql.HasNotNullFlag(downstreamTI.Columns[i].Flag) } - for i, idx := range ti.Indices { + for i, idx := range downstreamTI.Indices { if !idx.Primary && !idx.Unique { continue } @@ -520,7 +525,7 @@ func GetDownStreamTi(ti *model.TableInfo, originTi *model.TableInfo) *Downstream // handle pk exceptional case. // e.g. "create table t(a int primary key, b int)". if !hasPk { - exPk := redirectIndexKeys(handlePkExCase(ti), originTi) + exPk := redirectIndexKeys(handlePkExCase(downstreamTI), originTi) if exPk != nil { absoluteUKIndexInfo = exPk absoluteUKPosition = len(availableUKIndexList) @@ -534,7 +539,7 @@ func GetDownStreamTi(ti *model.TableInfo, originTi *model.TableInfo) *Downstream } return &DownstreamTableInfo{ - TableInfo: ti, + TableInfo: downstreamTI, AbsoluteUKIndexInfo: absoluteUKIndexInfo, AvailableUKIndexList: availableUKIndexList, } diff --git a/dm/pkg/utils/common.go b/dm/pkg/utils/common.go index 1718f2d3fb6..9c5ddfc7ada 100644 --- a/dm/pkg/utils/common.go +++ b/dm/pkg/utils/common.go @@ -325,6 +325,8 @@ func (se *session) GetBuiltinFunctionUsage() map[string]uint32 { // NewSessionCtx return a session context with specified session variables. func NewSessionCtx(vars map[string]string) sessionctx.Context { variables := variable.NewSessionVars() + variables.Rng.SetSeed1(0) + variables.Rng.SetSeed2(0) for k, v := range vars { _ = variables.SetSystemVar(k, v) if strings.EqualFold(k, "time_zone") { diff --git a/dm/syncer/causality_test.go b/dm/syncer/causality_test.go index 1354ca7a979..1109b8c1b80 100644 --- a/dm/syncer/causality_test.go +++ b/dm/syncer/causality_test.go @@ -83,7 +83,7 @@ func (s *testSyncerSuite) TestCasuality(c *C) { Length: types.UnspecifiedLength, }}, } - downTi := schema.GetDownStreamTi(ti, ti) + downTi := schema.GetDownStreamTI(ti, ti) c.Assert(downTi, NotNil) jobCh := make(chan *job, 10) @@ -152,7 +152,7 @@ func (s *testSyncerSuite) TestCasualityWithPrefixIndex(c *C) { schemaStr := "create table t (c1 text, c2 int unique, unique key c1(c1(3)));" ti, err := createTableInfo(p, se, int64(0), schemaStr) c.Assert(err, IsNil) - downTi := schema.GetDownStreamTi(ti, ti) + downTi := schema.GetDownStreamTI(ti, ti) c.Assert(downTi, NotNil) c.Assert(len(downTi.AvailableUKIndexList) == 2, IsTrue) tiIndex := downTi.AvailableUKIndexList[0] diff --git a/dm/syncer/compactor_test.go b/dm/syncer/compactor_test.go index 06ad3993791..506f9581a7f 100644 --- a/dm/syncer/compactor_test.go +++ b/dm/syncer/compactor_test.go @@ -91,7 +91,7 @@ func (s *testSyncerSuite) TestCompactJob(c *C) { Length: types.UnspecifiedLength, }}, } - downTi := schema.GetDownStreamTi(ti, ti) + downTi := schema.GetDownStreamTI(ti, ti) c.Assert(downTi, NotNil) var dml *DML @@ -208,7 +208,7 @@ func (s *testSyncerSuite) TestCompactorSafeMode(c *C) { Length: types.UnspecifiedLength, }}, } - downTi := schema.GetDownStreamTi(ti, ti) + downTi := schema.GetDownStreamTI(ti, ti) c.Assert(downTi, NotNil) testCases := []struct { diff --git a/dm/syncer/dml_test.go b/dm/syncer/dml_test.go index a7748abb4bf..148c2538eff 100644 --- a/dm/syncer/dml_test.go +++ b/dm/syncer/dml_test.go @@ -224,7 +224,7 @@ func (s *testSyncerSuite) TestGenMultipleKeys(c *C) { ti, err := createTableInfo(p, se, int64(i+1), tc.schema) assert(err, IsNil) - dti := schema.GetDownStreamTi(ti, ti) + dti := schema.GetDownStreamTI(ti, ti) assert(dti, NotNil) keys := genMultipleKeys(sessCtx, dti, ti, tc.values, "table") assert(keys, DeepEquals, tc.keys) @@ -619,7 +619,7 @@ func (s *testSyncerSuite) TestTruncateIndexValues(c *C) { } ti, err := createTableInfo(p, se, int64(i+1), tc.schema) assert(err, IsNil) - dti := schema.GetDownStreamTi(ti, ti) + dti := schema.GetDownStreamTI(ti, ti) assert(dti, NotNil) assert(dti.AvailableUKIndexList, NotNil) cols := make([]*model.ColumnInfo, 0, len(dti.AvailableUKIndexList[0].Columns)) diff --git a/pkg/sqlmodel/multivalue.go b/pkg/sqlmodel/multivalue.go index 48a639c3f2b..6d2046d632c 100644 --- a/pkg/sqlmodel/multivalue.go +++ b/pkg/sqlmodel/multivalue.go @@ -104,7 +104,7 @@ func GenDeleteSQL(changes ...*RowChange) (string, []interface{}) { zap.Int("len(whereValues)", len(whereValues)), zap.Int("len(whereColumns)", len(whereColumns)), zap.Any("whereValues", whereValues), - zap.Stringer("source table", change.sourceTable)) + zap.Stringer("sourceTable", change.sourceTable)) return "", nil } args = append(args, whereValues...) diff --git a/pkg/sqlmodel/reduce.go b/pkg/sqlmodel/reduce.go index 891dee04588..efc65162c72 100644 --- a/pkg/sqlmodel/reduce.go +++ b/pkg/sqlmodel/reduce.go @@ -121,7 +121,7 @@ func (r *RowChange) Reduce(preRowChange *RowChange) { func (r *RowChange) Split() (*RowChange, *RowChange) { if r.tp != RowChangeUpdate { log.L().DPanic("Split should only be called on RowChangeUpdate", - zap.Stringer("row change", r)) + zap.Stringer("rowChange", r)) return nil, nil } diff --git a/pkg/sqlmodel/reduce_test.go b/pkg/sqlmodel/reduce_test.go index 21eae1395e5..d0e2b11f38e 100644 --- a/pkg/sqlmodel/reduce_test.go +++ b/pkg/sqlmodel/reduce_test.go @@ -78,27 +78,39 @@ func TestReduce(t *testing.T) { }{ // INSERT + UPDATE { - nil, []interface{}{1, 2}, - []interface{}{1, 2}, []interface{}{3, 4}, - nil, []interface{}{3, 4}, + nil, + []interface{}{1, 2}, + []interface{}{1, 2}, + []interface{}{3, 4}, + nil, + []interface{}{3, 4}, }, // INSERT + DELETE { - nil, []interface{}{1, 2}, - []interface{}{1, 2}, nil, - []interface{}{1, 2}, nil, + nil, + []interface{}{1, 2}, + []interface{}{1, 2}, + nil, + []interface{}{1, 2}, + nil, }, // UPDATE + UPDATE { - []interface{}{1, 2}, []interface{}{1, 3}, - []interface{}{1, 3}, []interface{}{1, 4}, - []interface{}{1, 2}, []interface{}{1, 4}, + []interface{}{1, 2}, + []interface{}{1, 3}, + []interface{}{1, 3}, + []interface{}{1, 4}, + []interface{}{1, 2}, + []interface{}{1, 4}, }, // UPDATE + DELETE { - []interface{}{1, 2}, []interface{}{1, 3}, - []interface{}{1, 3}, nil, - []interface{}{1, 2}, nil, + []interface{}{1, 2}, + []interface{}{1, 3}, + []interface{}{1, 3}, + nil, + []interface{}{1, 2}, + nil, }, } diff --git a/pkg/sqlmodel/row_change.go b/pkg/sqlmodel/row_change.go index c69974c5575..e1aaadde33b 100644 --- a/pkg/sqlmodel/row_change.go +++ b/pkg/sqlmodel/row_change.go @@ -98,15 +98,15 @@ func NewRowChange( if preValues != nil && len(preValues) != len(sourceTableInfo.Columns) { log.L().DPanic("preValues length not equal to sourceTableInfo columns", - zap.Int("preValues length", len(preValues)), - zap.Int("sourceTableInfo length", len(sourceTableInfo.Columns)), - zap.Stringer("source table", sourceTable)) + zap.Int("preValues", len(preValues)), + zap.Int("sourceTableInfo", len(sourceTableInfo.Columns)), + zap.Stringer("sourceTable", sourceTable)) } if postValues != nil && len(postValues) != len(sourceTableInfo.Columns) { log.L().DPanic("postValues length not equal to sourceTableInfo columns", - zap.Int("postValues length", len(postValues)), - zap.Int("sourceTableInfo length", len(sourceTableInfo.Columns)), - zap.Stringer("source table", sourceTable)) + zap.Int("postValues", len(postValues)), + zap.Int("sourceTableInfo", len(sourceTableInfo.Columns)), + zap.Stringer("sourceTable", sourceTable)) } if targetTable != nil { @@ -254,8 +254,8 @@ func valuesHolder(n int) string { func (r *RowChange) genDeleteSQL() (string, []interface{}) { if r.tp != RowChangeDelete && r.tp != RowChangeUpdate { log.L().DPanic("illegal type for genDeleteSQL", - zap.String("source table", r.sourceTable.String()), - zap.Stringer("change type", r.tp)) + zap.String("sourceTable", r.sourceTable.String()), + zap.Stringer("changeType", r.tp)) return "", nil } @@ -282,8 +282,8 @@ func isGenerated(columns []*timodel.ColumnInfo, name timodel.CIStr) bool { func (r *RowChange) genUpdateSQL() (string, []interface{}) { if r.tp != RowChangeUpdate { log.L().DPanic("illegal type for genUpdateSQL", - zap.String("source table", r.sourceTable.String()), - zap.Stringer("change type", r.tp)) + zap.String("sourceTable", r.sourceTable.String()), + zap.Stringer("changeType", r.tp)) return "", nil } @@ -361,7 +361,7 @@ func (r *RowChange) GenSQL(tp DMLType) (string, []interface{}) { return r.genDeleteSQL() } log.L().DPanic("illegal type for GenSQL", - zap.String("source table", r.sourceTable.String()), - zap.Stringer("DML type", tp)) + zap.String("sourceTable", r.sourceTable.String()), + zap.Stringer("DMLType", tp)) return "", nil }