Skip to content

Commit

Permalink
fix UT
Browse files Browse the repository at this point in the history
  • Loading branch information
lance6716 committed Jan 18, 2022
1 parent a15e9ec commit 32afbc7
Show file tree
Hide file tree
Showing 9 changed files with 61 additions and 42 deletions.
25 changes: 15 additions & 10 deletions dm/pkg/schema/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ func (tr *Tracker) GetDownStreamTableInfo(tctx *tcontext.Context, tableID string
return nil, err
}

dti = GetDownStreamTi(ti, originTi)
dti = GetDownStreamTI(ti, originTi)
tr.dsTracker.tableInfos[tableID] = dti
}
return dti, nil
Expand All @@ -396,17 +396,22 @@ func (tr *Tracker) GetDownStreamTableInfo(tctx *tcontext.Context, tableID string
// GetAvailableDownStreamUKIndexInfo gets available downstream UK whose data is not null.
// note. this function will not init downstreamTrack.
func (tr *Tracker) GetAvailableDownStreamUKIndexInfo(tableID string, data []interface{}) *model.IndexInfo {
dti, ok := tr.dsTracker.tableInfos[tableID]
dti := tr.dsTracker.tableInfos[tableID]

return GetIdentityUKByData(dti, data)
}

if !ok || len(dti.AvailableUKIndexList) == 0 {
// GetIdentityUKByData gets available downstream UK whose data is not null.
func GetIdentityUKByData(downstreamTI *DownstreamTableInfo, data []interface{}) *model.IndexInfo {
if downstreamTI == nil || len(downstreamTI.AvailableUKIndexList) == 0 {
return nil
}
// func for check data is not null
fn := func(i int) bool {
return data[i] != nil
}

for _, uk := range dti.AvailableUKIndexList {
for _, uk := range downstreamTI.AvailableUKIndexList {
// check uk's column data is not null
if isSpecifiedIndexColumn(uk, fn) {
return uk
Expand Down Expand Up @@ -483,8 +488,8 @@ func (tr *Tracker) initDownStreamSQLModeAndParser(tctx *tcontext.Context) error
return nil
}

// GetDownStreamTi constructs downstreamTable index cache by tableinfo.
func GetDownStreamTi(ti *model.TableInfo, originTi *model.TableInfo) *DownstreamTableInfo {
// GetDownStreamTI constructs downstreamTable index cache by tableinfo.
func GetDownStreamTI(downstreamTI *model.TableInfo, originTi *model.TableInfo) *DownstreamTableInfo {
var (
absoluteUKIndexInfo *model.IndexInfo
availableUKIndexList = []*model.IndexInfo{}
Expand All @@ -494,10 +499,10 @@ func GetDownStreamTi(ti *model.TableInfo, originTi *model.TableInfo) *Downstream

// func for check not null constraint
fn := func(i int) bool {
return mysql.HasNotNullFlag(ti.Columns[i].Flag)
return mysql.HasNotNullFlag(downstreamTI.Columns[i].Flag)
}

for i, idx := range ti.Indices {
for i, idx := range downstreamTI.Indices {
if !idx.Primary && !idx.Unique {
continue
}
Expand All @@ -520,7 +525,7 @@ func GetDownStreamTi(ti *model.TableInfo, originTi *model.TableInfo) *Downstream
// handle pk exceptional case.
// e.g. "create table t(a int primary key, b int)".
if !hasPk {
exPk := redirectIndexKeys(handlePkExCase(ti), originTi)
exPk := redirectIndexKeys(handlePkExCase(downstreamTI), originTi)
if exPk != nil {
absoluteUKIndexInfo = exPk
absoluteUKPosition = len(availableUKIndexList)
Expand All @@ -534,7 +539,7 @@ func GetDownStreamTi(ti *model.TableInfo, originTi *model.TableInfo) *Downstream
}

return &DownstreamTableInfo{
TableInfo: ti,
TableInfo: downstreamTI,
AbsoluteUKIndexInfo: absoluteUKIndexInfo,
AvailableUKIndexList: availableUKIndexList,
}
Expand Down
2 changes: 2 additions & 0 deletions dm/pkg/utils/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,8 @@ func (se *session) GetBuiltinFunctionUsage() map[string]uint32 {
// NewSessionCtx return a session context with specified session variables.
func NewSessionCtx(vars map[string]string) sessionctx.Context {
variables := variable.NewSessionVars()
variables.Rng.SetSeed1(0)
variables.Rng.SetSeed2(0)
for k, v := range vars {
_ = variables.SetSystemVar(k, v)
if strings.EqualFold(k, "time_zone") {
Expand Down
4 changes: 2 additions & 2 deletions dm/syncer/causality_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (s *testSyncerSuite) TestCasuality(c *C) {
Length: types.UnspecifiedLength,
}},
}
downTi := schema.GetDownStreamTi(ti, ti)
downTi := schema.GetDownStreamTI(ti, ti)
c.Assert(downTi, NotNil)

jobCh := make(chan *job, 10)
Expand Down Expand Up @@ -152,7 +152,7 @@ func (s *testSyncerSuite) TestCasualityWithPrefixIndex(c *C) {
schemaStr := "create table t (c1 text, c2 int unique, unique key c1(c1(3)));"
ti, err := createTableInfo(p, se, int64(0), schemaStr)
c.Assert(err, IsNil)
downTi := schema.GetDownStreamTi(ti, ti)
downTi := schema.GetDownStreamTI(ti, ti)
c.Assert(downTi, NotNil)
c.Assert(len(downTi.AvailableUKIndexList) == 2, IsTrue)
tiIndex := downTi.AvailableUKIndexList[0]
Expand Down
4 changes: 2 additions & 2 deletions dm/syncer/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (s *testSyncerSuite) TestCompactJob(c *C) {
Length: types.UnspecifiedLength,
}},
}
downTi := schema.GetDownStreamTi(ti, ti)
downTi := schema.GetDownStreamTI(ti, ti)
c.Assert(downTi, NotNil)

var dml *DML
Expand Down Expand Up @@ -208,7 +208,7 @@ func (s *testSyncerSuite) TestCompactorSafeMode(c *C) {
Length: types.UnspecifiedLength,
}},
}
downTi := schema.GetDownStreamTi(ti, ti)
downTi := schema.GetDownStreamTI(ti, ti)
c.Assert(downTi, NotNil)

testCases := []struct {
Expand Down
4 changes: 2 additions & 2 deletions dm/syncer/dml_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func (s *testSyncerSuite) TestGenMultipleKeys(c *C) {

ti, err := createTableInfo(p, se, int64(i+1), tc.schema)
assert(err, IsNil)
dti := schema.GetDownStreamTi(ti, ti)
dti := schema.GetDownStreamTI(ti, ti)
assert(dti, NotNil)
keys := genMultipleKeys(sessCtx, dti, ti, tc.values, "table")
assert(keys, DeepEquals, tc.keys)
Expand Down Expand Up @@ -619,7 +619,7 @@ func (s *testSyncerSuite) TestTruncateIndexValues(c *C) {
}
ti, err := createTableInfo(p, se, int64(i+1), tc.schema)
assert(err, IsNil)
dti := schema.GetDownStreamTi(ti, ti)
dti := schema.GetDownStreamTI(ti, ti)
assert(dti, NotNil)
assert(dti.AvailableUKIndexList, NotNil)
cols := make([]*model.ColumnInfo, 0, len(dti.AvailableUKIndexList[0].Columns))
Expand Down
2 changes: 1 addition & 1 deletion pkg/sqlmodel/multivalue.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func GenDeleteSQL(changes ...*RowChange) (string, []interface{}) {
zap.Int("len(whereValues)", len(whereValues)),
zap.Int("len(whereColumns)", len(whereColumns)),
zap.Any("whereValues", whereValues),
zap.Stringer("source table", change.sourceTable))
zap.Stringer("sourceTable", change.sourceTable))
return "", nil
}
args = append(args, whereValues...)
Expand Down
2 changes: 1 addition & 1 deletion pkg/sqlmodel/reduce.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (r *RowChange) Reduce(preRowChange *RowChange) {
func (r *RowChange) Split() (*RowChange, *RowChange) {
if r.tp != RowChangeUpdate {
log.L().DPanic("Split should only be called on RowChangeUpdate",
zap.Stringer("row change", r))
zap.Stringer("rowChange", r))
return nil, nil
}

Expand Down
36 changes: 24 additions & 12 deletions pkg/sqlmodel/reduce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,27 +78,39 @@ func TestReduce(t *testing.T) {
}{
// INSERT + UPDATE
{
nil, []interface{}{1, 2},
[]interface{}{1, 2}, []interface{}{3, 4},
nil, []interface{}{3, 4},
nil,
[]interface{}{1, 2},
[]interface{}{1, 2},
[]interface{}{3, 4},
nil,
[]interface{}{3, 4},
},
// INSERT + DELETE
{
nil, []interface{}{1, 2},
[]interface{}{1, 2}, nil,
[]interface{}{1, 2}, nil,
nil,
[]interface{}{1, 2},
[]interface{}{1, 2},
nil,
[]interface{}{1, 2},
nil,
},
// UPDATE + UPDATE
{
[]interface{}{1, 2}, []interface{}{1, 3},
[]interface{}{1, 3}, []interface{}{1, 4},
[]interface{}{1, 2}, []interface{}{1, 4},
[]interface{}{1, 2},
[]interface{}{1, 3},
[]interface{}{1, 3},
[]interface{}{1, 4},
[]interface{}{1, 2},
[]interface{}{1, 4},
},
// UPDATE + DELETE
{
[]interface{}{1, 2}, []interface{}{1, 3},
[]interface{}{1, 3}, nil,
[]interface{}{1, 2}, nil,
[]interface{}{1, 2},
[]interface{}{1, 3},
[]interface{}{1, 3},
nil,
[]interface{}{1, 2},
nil,
},
}

Expand Down
24 changes: 12 additions & 12 deletions pkg/sqlmodel/row_change.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,15 @@ func NewRowChange(

if preValues != nil && len(preValues) != len(sourceTableInfo.Columns) {
log.L().DPanic("preValues length not equal to sourceTableInfo columns",
zap.Int("preValues length", len(preValues)),
zap.Int("sourceTableInfo length", len(sourceTableInfo.Columns)),
zap.Stringer("source table", sourceTable))
zap.Int("preValues", len(preValues)),
zap.Int("sourceTableInfo", len(sourceTableInfo.Columns)),
zap.Stringer("sourceTable", sourceTable))
}
if postValues != nil && len(postValues) != len(sourceTableInfo.Columns) {
log.L().DPanic("postValues length not equal to sourceTableInfo columns",
zap.Int("postValues length", len(postValues)),
zap.Int("sourceTableInfo length", len(sourceTableInfo.Columns)),
zap.Stringer("source table", sourceTable))
zap.Int("postValues", len(postValues)),
zap.Int("sourceTableInfo", len(sourceTableInfo.Columns)),
zap.Stringer("sourceTable", sourceTable))
}

if targetTable != nil {
Expand Down Expand Up @@ -254,8 +254,8 @@ func valuesHolder(n int) string {
func (r *RowChange) genDeleteSQL() (string, []interface{}) {
if r.tp != RowChangeDelete && r.tp != RowChangeUpdate {
log.L().DPanic("illegal type for genDeleteSQL",
zap.String("source table", r.sourceTable.String()),
zap.Stringer("change type", r.tp))
zap.String("sourceTable", r.sourceTable.String()),
zap.Stringer("changeType", r.tp))
return "", nil
}

Expand All @@ -282,8 +282,8 @@ func isGenerated(columns []*timodel.ColumnInfo, name timodel.CIStr) bool {
func (r *RowChange) genUpdateSQL() (string, []interface{}) {
if r.tp != RowChangeUpdate {
log.L().DPanic("illegal type for genUpdateSQL",
zap.String("source table", r.sourceTable.String()),
zap.Stringer("change type", r.tp))
zap.String("sourceTable", r.sourceTable.String()),
zap.Stringer("changeType", r.tp))
return "", nil
}

Expand Down Expand Up @@ -361,7 +361,7 @@ func (r *RowChange) GenSQL(tp DMLType) (string, []interface{}) {
return r.genDeleteSQL()
}
log.L().DPanic("illegal type for GenSQL",
zap.String("source table", r.sourceTable.String()),
zap.Stringer("DML type", tp))
zap.String("sourceTable", r.sourceTable.String()),
zap.Stringer("DMLType", tp))
return "", nil
}

0 comments on commit 32afbc7

Please sign in to comment.