Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

syncer: refactor sync dml #2061

Merged
merged 62 commits into from
Oct 19, 2021
Merged
Show file tree
Hide file tree
Changes from 57 commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
52e9f13
refine sync dml
GMHDBJD Aug 29, 2021
96008ac
fix bug
GMHDBJD Aug 29, 2021
b54a927
fix data race
GMHDBJD Aug 30, 2021
53de6ce
fix data race
GMHDBJD Aug 30, 2021
22dad39
fix test
GMHDBJD Aug 30, 2021
9606c90
update
GMHDBJD Aug 30, 2021
57145f9
update
GMHDBJD Aug 30, 2021
90f17bc
fix test
GMHDBJD Aug 30, 2021
34d4989
fix test
GMHDBJD Aug 30, 2021
687d4e0
fix ut
GMHDBJD Aug 30, 2021
0ae0f33
fix it
GMHDBJD Aug 30, 2021
3163999
save work
GMHDBJD Aug 31, 2021
6f1fe91
fix wait
GMHDBJD Aug 31, 2021
4851cd4
fix
GMHDBJD Sep 1, 2021
b7cf22d
Merge remote-tracking branch 'upstream/master' into refineSyncDML
GMHDBJD Sep 7, 2021
30be49c
update causality and dml worker
GMHDBJD Sep 6, 2021
ff65707
Merge remote-tracking branch 'upstream/master' into refineSyncDML
GMHDBJD Sep 7, 2021
2ca637c
refine
GMHDBJD Sep 7, 2021
9b5c1fe
fix ut
GMHDBJD Sep 7, 2021
735a087
Merge branch 'master' into refineSyncDML
GMHDBJD Sep 7, 2021
cb7a298
fix causality
GMHDBJD Sep 8, 2021
53ed0e5
Merge branch 'master' into refineSyncDML
GMHDBJD Sep 8, 2021
1401f4d
refine queue size metrics
GMHDBJD Sep 8, 2021
5da87a1
fix
GMHDBJD Sep 8, 2021
f7392fb
Merge branch 'master' into refineSyncDML
GMHDBJD Sep 8, 2021
06dda5d
debug ci
GMHDBJD Sep 8, 2021
f31328e
fix
GMHDBJD Sep 8, 2021
f02faeb
revert
GMHDBJD Sep 8, 2021
7c44aec
add test back
GMHDBJD Sep 8, 2021
22c9374
update metrics
GMHDBJD Sep 8, 2021
52b41d1
Merge remote-tracking branch 'upstream/master' into refineSyncDML
GMHDBJD Sep 9, 2021
e28579a
Merge remote-tracking branch 'upstream/master' into refineSyncDML
GMHDBJD Sep 20, 2021
a3fe7e6
Merge remote-tracking branch 'upstream/master' into refineSyncDML
GMHDBJD Sep 27, 2021
4eaba28
add more comment
GMHDBJD Sep 27, 2021
ddd4f64
reorder
GMHDBJD Sep 27, 2021
17e1e15
review causality
GMHDBJD Sep 27, 2021
5054276
fix
GMHDBJD Sep 27, 2021
f61e238
update
GMHDBJD Sep 27, 2021
1f80f27
review dml_worker
GMHDBJD Sep 27, 2021
b0f3053
Merge remote-tracking branch 'upstream/master' into refineSyncDML
GMHDBJD Sep 28, 2021
26c7ec8
update channel size
GMHDBJD Sep 28, 2021
0fd5543
Merge branch 'master' into refineSyncDML
GMHDBJD Sep 28, 2021
4c3a896
Merge branch 'master' into refineSyncDML
GMHDBJD Sep 28, 2021
13bed05
address comment
GMHDBJD Sep 30, 2021
0daf78e
Merge remote-tracking branch 'upstream/master' into refineSyncDML
GMHDBJD Sep 30, 2021
dd10d2a
fix typo
GMHDBJD Sep 30, 2021
68be61e
wrap causality
GMHDBJD Oct 8, 2021
f71704f
Merge remote-tracking branch 'upstream/master' into refineSyncDML
GMHDBJD Oct 8, 2021
e5b8f7d
address comment
GMHDBJD Oct 9, 2021
b6a2cd3
Merge branch 'master' into refineSyncDML
GMHDBJD Oct 9, 2021
6a37915
address comment
GMHDBJD Oct 9, 2021
ce8b8f4
Merge branch 'master' into refineSyncDML
GMHDBJD Oct 9, 2021
9ee5a7a
Merge remote-tracking branch 'upstream/master' into refineSyncDML
GMHDBJD Oct 15, 2021
e9dbabf
update causality
GMHDBJD Oct 17, 2021
a679f69
remove waittime
GMHDBJD Oct 17, 2021
1bc65b9
remove flush count
GMHDBJD Oct 18, 2021
6164cf1
fix ci
GMHDBJD Oct 18, 2021
c944bde
fix ci
GMHDBJD Oct 18, 2021
451891a
Merge branch 'master' into refineSyncDML
GMHDBJD Oct 18, 2021
b5f504b
address comment
GMHDBJD Oct 19, 2021
c3c7d2a
fix fmt
GMHDBJD Oct 19, 2021
87ba1fb
address comment
GMHDBJD Oct 19, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ issues:
text: "SA1019:"

# Fix found issues (if it's supported by the linter)
fix: true
fix: false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems if we change this, make fmt will not format code 🤔

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can remove golangci-lint from make fmt cmd

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried locally and make fmt takes effect

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, but it worked by gofumports

dm/Makefile

Line 148 in a0770b6

tools/bin/gofumports -w -d -local $(PACKAGE_NAME) $(PACKAGE_DIRECTORIES) 2>&1 | awk "{print} END{if(NR>0) {exit 1}}" ;\

i mean if we close this option, we can remove golang-ci-lint (L149~L150 ) from make fmt cmd


run:
# timeout for analysis, e.g. 30s, 5m, default is 1m
Expand Down
7 changes: 6 additions & 1 deletion dm/config/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,9 @@ type SyncerConfig struct {
// refine following configs to top level configs?
AutoFixGTID bool `yaml:"auto-fix-gtid" toml:"auto-fix-gtid" json:"auto-fix-gtid"`
EnableGTID bool `yaml:"enable-gtid" toml:"enable-gtid" json:"enable-gtid"`
SafeMode bool `yaml:"safe-mode" toml:"safe-mode" json:"safe-mode"`
// deprecated
DisableCausality bool `yaml:"disable-detect" toml:"disable-detect" json:"disable-detect"`
SafeMode bool `yaml:"safe-mode" toml:"safe-mode" json:"safe-mode"`
// deprecated, use `ansi-quotes` in top level config instead
EnableANSIQuotes bool `yaml:"enable-ansi-quotes" toml:"enable-ansi-quotes" json:"enable-ansi-quotes"`
}
Expand Down Expand Up @@ -622,6 +624,9 @@ func (c *TaskConfig) adjust() error {
if inst.Syncer.EnableANSIQuotes {
log.L().Warn("DM could discover proper ANSI_QUOTES, `enable-ansi-quotes` is no longer take effect")
}
if inst.Syncer.DisableCausality {
log.L().Warn("`disable-causality` is no longer take effect")
}

for _, name := range inst.ExpressionFilters {
if _, ok := c.ExprFilter[name]; !ok {
Expand Down
2 changes: 1 addition & 1 deletion dm/worker/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"encoding/json"
"sort"

"github.com/golang/protobuf/jsonpb"
"github.com/gogo/protobuf/jsonpb"
"go.uber.org/zap"

"github.com/pingcap/dm/dm/common"
Expand Down
77 changes: 64 additions & 13 deletions syncer/causality.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,31 +14,84 @@
package syncer

import (
"github.com/pingcap/dm/pkg/terror"
"time"

"go.uber.org/zap"

"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/syncer/metrics"
)

// causality provides a simple mechanism to improve the concurrency of SQLs execution under the premise of ensuring correctness.
// causality groups sqls that maybe contain causal relationships, and syncer executes them linearly.
// if some conflicts exist in more than one groups, then syncer waits all SQLs that are grouped be executed and reset causality.
// if some conflicts exist in more than one groups, causality generate a conflict job and reset.
// this mechanism meets quiescent consistency to ensure correctness.
type causality struct {
relations map[string]string
outCh chan *job
inCh chan *job
logger log.Logger

// for metrics
task string
source string
}

func newCausality() *causality {
return &causality{
// causalityWrap creates and runs a causality instance.
func causalityWrap(inCh chan *job, syncer *Syncer) chan *job {
causality := &causality{
relations: make(map[string]string),
task: syncer.cfg.Name,
source: syncer.cfg.SourceID,
logger: syncer.tctx.Logger.WithFields(zap.String("component", "causality")),
inCh: inCh,
outCh: make(chan *job, syncer.cfg.QueueSize),
}

go func() {
causality.run()
causality.close()
}()

return causality.outCh
}

func (c *causality) add(keys []string) error {
if len(keys) == 0 {
return nil
// run receives dml jobs and send causality jobs by adding causality key.
// When meet conflict, sends a conflict job.
func (c *causality) run() {
for j := range c.inCh {
metrics.QueueSizeGauge.WithLabelValues(c.task, "causality_input", c.source).Set(float64(len(c.inCh)))

startTime := time.Now()
if j.tp == flush {
c.reset()
} else {
// detectConflict before add
if c.detectConflict(j.keys) {
c.logger.Debug("meet causality key, will generate a conflict job to flush all sqls", zap.Strings("keys", j.keys))
c.outCh <- newConflictJob()
c.reset()
}
j.key = c.add(j.keys)
c.logger.Debug("key for keys", zap.String("key", j.key), zap.Strings("keys", j.keys))
}
metrics.ConflictDetectDurationHistogram.WithLabelValues(c.task, c.source).Observe(time.Since(startTime).Seconds())

c.outCh <- j
}
}

// close closes outer channel.
func (c *causality) close() {
close(c.outCh)
Ehco1996 marked this conversation as resolved.
Show resolved Hide resolved
}

if c.detectConflict(keys) {
return terror.ErrSyncUnitCausalityConflict.Generate()
// add adds keys relation and return the relation. The keys must `detectConflict` first to ensure correctness.
func (c *causality) add(keys []string) string {
if len(keys) == 0 {
return ""
}

// find causal key
selectedRelation := keys[0]
var nonExistKeys []string
Expand All @@ -53,13 +106,11 @@ func (c *causality) add(keys []string) error {
for _, key := range nonExistKeys {
c.relations[key] = selectedRelation
}
return nil
}

func (c *causality) get(key string) string {
return c.relations[key]
return selectedRelation
}

// reset resets relations.
func (c *causality) reset() {
c.relations = make(map[string]string)
}
Expand Down
92 changes: 87 additions & 5 deletions syncer/causality_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,107 @@
package syncer

import (
"time"

. "github.com/pingcap/check"
"github.com/pingcap/parser"
"github.com/pingcap/tidb-tools/pkg/filter"
"github.com/pingcap/tidb/util/mock"

"github.com/pingcap/dm/dm/config"
"github.com/pingcap/dm/pkg/binlog"
tcontext "github.com/pingcap/dm/pkg/context"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/utils"
)

func (s *testSyncerSuite) TestCausality(c *C) {
ca := newCausality()
func (s *testSyncerSuite) TestDetectConflict(c *C) {
ca := &causality{
relations: make(map[string]string),
}
caseData := []string{"test_1", "test_2", "test_3"}
excepted := map[string]string{
"test_1": "test_1",
"test_2": "test_1",
"test_3": "test_1",
}
c.Assert(ca.add(caseData), IsNil)
c.Assert(ca.detectConflict(caseData), IsFalse)
ca.add(caseData)
c.Assert(ca.relations, DeepEquals, excepted)
c.Assert(ca.add([]string{"test_4"}), IsNil)
c.Assert(ca.detectConflict([]string{"test_4"}), IsFalse)
ca.add([]string{"test_4"})
excepted["test_4"] = "test_4"
c.Assert(ca.relations, DeepEquals, excepted)
conflictData := []string{"test_4", "test_3"}
c.Assert(ca.detectConflict(conflictData), IsTrue)
c.Assert(ca.add(conflictData), NotNil)
ca.reset()
c.Assert(ca.relations, HasLen, 0)
}

func (s *testSyncerSuite) TestCasuality(c *C) {
p := parser.New()
se := mock.NewContext()
schema := "create table tb(a int primary key, b int unique);"
ti, err := createTableInfo(p, se, int64(0), schema)
c.Assert(err, IsNil)

jobCh := make(chan *job, 10)
syncer := &Syncer{
cfg: &config.SubTaskConfig{
SyncerConfig: config.SyncerConfig{
QueueSize: 1024,
},
Name: "task",
SourceID: "source",
},
tctx: tcontext.Background().WithLogger(log.L()),
}
causalityCh := causalityWrap(jobCh, syncer)
testCases := []struct {
op opType
vals [][]interface{}
}{
{
op: insert,
vals: [][]interface{}{{1, 2}},
},
{
op: insert,
vals: [][]interface{}{{2, 3}},
},
{
op: update,
vals: [][]interface{}{{2, 3}, {3, 4}},
},
{
op: del,
vals: [][]interface{}{{1, 2}},
},
{
op: insert,
vals: [][]interface{}{{1, 3}},
},
}
results := []opType{insert, insert, update, del, conflict, insert}
table := &filter.Table{Schema: "test", Name: "t1"}
location := binlog.NewLocation("")
ec := &eventContext{startLocation: &location, currentLocation: &location, lastLocation: &location}

for _, tc := range testCases {
var keys []string
for _, val := range tc.vals {
keys = append(keys, genMultipleKeys(ti, val, "tb")...)
}
job := newDMLJob(tc.op, table, table, "", nil, keys, ec)
jobCh <- job
}

c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool {
return len(causalityCh) == len(results)
}), IsTrue)

for _, op := range results {
job := <-causalityCh
c.Assert(job.tp, Equals, op)
}
}
Loading