diff --git a/dm/pkg/utils/common.go b/dm/pkg/utils/common.go index 9c5ddfc7ada..25c4ecb348a 100644 --- a/dm/pkg/utils/common.go +++ b/dm/pkg/utils/common.go @@ -37,6 +37,10 @@ import ( "github.com/pingcap/tiflow/dm/pkg/terror" ) +func init() { + ZeroSessionCtx = NewSessionCtx(nil) +} + // TrimCtrlChars returns a slice of the string s with all leading // and trailing control characters removed. func TrimCtrlChars(s string) string { @@ -322,11 +326,12 @@ func (se *session) GetBuiltinFunctionUsage() map[string]uint32 { return se.builtinFunctionUsage } +// ZeroSessionCtx is used when the session variables is not important. +var ZeroSessionCtx sessionctx.Context + // NewSessionCtx return a session context with specified session variables. func NewSessionCtx(vars map[string]string) sessionctx.Context { variables := variable.NewSessionVars() - variables.Rng.SetSeed1(0) - variables.Rng.SetSeed2(0) for k, v := range vars { _ = variables.SetSystemVar(k, v) if strings.EqualFold(k, "time_zone") { diff --git a/pkg/sqlmodel/causality_test.go b/pkg/sqlmodel/causality_test.go index 2464856169a..4a49e95640e 100644 --- a/pkg/sqlmodel/causality_test.go +++ b/pkg/sqlmodel/causality_test.go @@ -14,6 +14,7 @@ package sqlmodel import ( + "sync" "testing" "github.com/stretchr/testify/require" @@ -53,3 +54,20 @@ func TestCausalityKeys(t *testing.T) { require.Equal(t, ca.causalityKeys, change.CausalityKeys()) } } + +func TestCausalityKeysNoRace(t *testing.T) { + t.Parallel() + + source := &cdcmodel.TableName{Schema: "db", Table: "tb1"} + ti := mockTableInfo(t, "CREATE TABLE tb1 (c INT PRIMARY KEY, c2 INT, c3 VARCHAR(10) UNIQUE)") + var wg sync.WaitGroup + for i := 0; i < 100; i++ { + wg.Add(1) + go func() { + change := NewRowChange(source, nil, []interface{}{1, 2, "abc"}, []interface{}{3, 4, "abc"}, ti, nil, nil) + change.CausalityKeys() + wg.Done() + }() + } + wg.Wait() +} diff --git a/pkg/sqlmodel/multivalue.go b/pkg/sqlmodel/multivalue.go index 6d2046d632c..3789d56c7a6 100644 --- a/pkg/sqlmodel/multivalue.go +++ b/pkg/sqlmodel/multivalue.go @@ -31,10 +31,12 @@ func SameTypeTargetAndColumns(lhs *RowChange, rhs *RowChange) bool { if lhs.sourceTable.Schema == rhs.sourceTable.Schema && lhs.sourceTable.Table == rhs.sourceTable.Table { return true } - if lhs.targetTable.Schema != rhs.targetTable.Schema || lhs.targetTable.Table == rhs.targetTable.Table { + if lhs.targetTable.Schema != rhs.targetTable.Schema || lhs.targetTable.Table != rhs.targetTable.Table { return false } + // when the targets are the same and the sources are not the same (same group of shard tables), this piece of code + // is run. var lhsCols, rhsCols []string switch lhs.tp { case RowChangeDelete: diff --git a/pkg/sqlmodel/row_change.go b/pkg/sqlmodel/row_change.go index e1aaadde33b..f0c8d941fb9 100644 --- a/pkg/sqlmodel/row_change.go +++ b/pkg/sqlmodel/row_change.go @@ -124,7 +124,7 @@ func NewRowChange( if tiCtx != nil { ret.tiSessionCtx = tiCtx } else { - ret.tiSessionCtx = utils.NewSessionCtx(nil) + ret.tiSessionCtx = utils.ZeroSessionCtx } ret.calculateType() diff --git a/pkg/sqlmodel/row_change_test.go b/pkg/sqlmodel/row_change_test.go index 36da1e0e5b4..733c540cecd 100644 --- a/pkg/sqlmodel/row_change_test.go +++ b/pkg/sqlmodel/row_change_test.go @@ -85,7 +85,7 @@ func TestNewRowChange(t *testing.T) { expected.targetTable = expected.sourceTable expected.targetTableInfo = expected.sourceTableInfo - expected.tiSessionCtx = utils.NewSessionCtx(nil) + expected.tiSessionCtx = utils.ZeroSessionCtx expected.identityInfo = nil actual = NewRowChange(source, nil, []interface{}{1, 2}, []interface{}{1, 3}, sourceTI, nil, nil) require.Equal(t, expected, actual)