Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

sync: refactor dml.go #2234

Merged
merged 93 commits into from
Oct 25, 2021
Merged
Show file tree
Hide file tree
Changes from 87 commits
Commits
Show all changes
93 commits
Select commit Hold shift + click to select a range
52e9f13
refine sync dml
GMHDBJD Aug 29, 2021
96008ac
fix bug
GMHDBJD Aug 29, 2021
b54a927
fix data race
GMHDBJD Aug 30, 2021
53de6ce
fix data race
GMHDBJD Aug 30, 2021
22dad39
fix test
GMHDBJD Aug 30, 2021
9606c90
update
GMHDBJD Aug 30, 2021
57145f9
update
GMHDBJD Aug 30, 2021
90f17bc
fix test
GMHDBJD Aug 30, 2021
34d4989
fix test
GMHDBJD Aug 30, 2021
687d4e0
fix ut
GMHDBJD Aug 30, 2021
0ae0f33
fix it
GMHDBJD Aug 30, 2021
3163999
save work
GMHDBJD Aug 31, 2021
6f1fe91
fix wait
GMHDBJD Aug 31, 2021
4851cd4
fix
GMHDBJD Sep 1, 2021
b7cf22d
Merge remote-tracking branch 'upstream/master' into refineSyncDML
GMHDBJD Sep 7, 2021
30be49c
update causality and dml worker
GMHDBJD Sep 6, 2021
ff65707
Merge remote-tracking branch 'upstream/master' into refineSyncDML
GMHDBJD Sep 7, 2021
2ca637c
refine
GMHDBJD Sep 7, 2021
9b5c1fe
fix ut
GMHDBJD Sep 7, 2021
735a087
Merge branch 'master' into refineSyncDML
GMHDBJD Sep 7, 2021
cb7a298
fix causality
GMHDBJD Sep 8, 2021
53ed0e5
Merge branch 'master' into refineSyncDML
GMHDBJD Sep 8, 2021
1401f4d
refine queue size metrics
GMHDBJD Sep 8, 2021
5da87a1
fix
GMHDBJD Sep 8, 2021
f7392fb
Merge branch 'master' into refineSyncDML
GMHDBJD Sep 8, 2021
06dda5d
debug ci
GMHDBJD Sep 8, 2021
f31328e
fix
GMHDBJD Sep 8, 2021
f02faeb
revert
GMHDBJD Sep 8, 2021
7c44aec
add test back
GMHDBJD Sep 8, 2021
22c9374
update metrics
GMHDBJD Sep 8, 2021
52b41d1
Merge remote-tracking branch 'upstream/master' into refineSyncDML
GMHDBJD Sep 9, 2021
1709363
refine dml param
GMHDBJD Sep 15, 2021
b5cf1b8
save work
GMHDBJD Sep 15, 2021
4f1fede
save work
GMHDBJD Sep 18, 2021
d09d41e
save work
GMHDBJD Sep 20, 2021
e28579a
Merge remote-tracking branch 'upstream/master' into refineSyncDML
GMHDBJD Sep 20, 2021
2f466f2
Merge branch 'refineSyncDML' into compactAndMerge
GMHDBJD Sep 20, 2021
2ffe397
save work
GMHDBJD Sep 20, 2021
f5675e0
save work
GMHDBJD Sep 21, 2021
9f6ba77
save work
GMHDBJD Sep 21, 2021
d444563
save work
GMHDBJD Sep 21, 2021
88534ba
save work
GMHDBJD Sep 21, 2021
100215a
save work
GMHDBJD Sep 26, 2021
a3fe7e6
Merge remote-tracking branch 'upstream/master' into refineSyncDML
GMHDBJD Sep 27, 2021
4eaba28
add more comment
GMHDBJD Sep 27, 2021
ddd4f64
reorder
GMHDBJD Sep 27, 2021
17e1e15
review causality
GMHDBJD Sep 27, 2021
5054276
fix
GMHDBJD Sep 27, 2021
f61e238
update
GMHDBJD Sep 27, 2021
1f80f27
review dml_worker
GMHDBJD Sep 27, 2021
b0f3053
Merge remote-tracking branch 'upstream/master' into refineSyncDML
GMHDBJD Sep 28, 2021
26c7ec8
update channel size
GMHDBJD Sep 28, 2021
0fd5543
Merge branch 'master' into refineSyncDML
GMHDBJD Sep 28, 2021
4c3a896
Merge branch 'master' into refineSyncDML
GMHDBJD Sep 28, 2021
13bed05
address comment
GMHDBJD Sep 30, 2021
0daf78e
Merge remote-tracking branch 'upstream/master' into refineSyncDML
GMHDBJD Sep 30, 2021
dd10d2a
fix typo
GMHDBJD Sep 30, 2021
68be61e
wrap causality
GMHDBJD Oct 8, 2021
f71704f
Merge remote-tracking branch 'upstream/master' into refineSyncDML
GMHDBJD Oct 8, 2021
e5b8f7d
address comment
GMHDBJD Oct 9, 2021
b6a2cd3
Merge branch 'master' into refineSyncDML
GMHDBJD Oct 9, 2021
6a37915
address comment
GMHDBJD Oct 9, 2021
ce8b8f4
Merge branch 'master' into refineSyncDML
GMHDBJD Oct 9, 2021
85718b1
Merge branch 'compactAndMerge' into refactorDML
GMHDBJD Oct 10, 2021
b7b1b9e
save work
GMHDBJD Oct 14, 2021
9ee5a7a
Merge remote-tracking branch 'upstream/master' into refineSyncDML
GMHDBJD Oct 15, 2021
fe9e286
Merge branch 'refineSyncDML' into refactorDML
GMHDBJD Oct 15, 2021
e9dbabf
update causality
GMHDBJD Oct 17, 2021
a679f69
remove waittime
GMHDBJD Oct 17, 2021
dc358f5
remove flush count
GMHDBJD Oct 18, 2021
594ecfb
Merge branch 'refineSyncDML' into refactorDML
GMHDBJD Oct 18, 2021
3f4b63e
refactor dml
GMHDBJD Oct 18, 2021
1bc65b9
remove flush count
GMHDBJD Oct 18, 2021
6164cf1
fix ci
GMHDBJD Oct 18, 2021
9d48744
Merge branch 'refineSyncDML' into refactorDML
GMHDBJD Oct 18, 2021
c944bde
fix ci
GMHDBJD Oct 18, 2021
451891a
Merge branch 'master' into refineSyncDML
GMHDBJD Oct 18, 2021
b5f504b
address comment
GMHDBJD Oct 19, 2021
c3c7d2a
fix fmt
GMHDBJD Oct 19, 2021
87ba1fb
address comment
GMHDBJD Oct 19, 2021
0518536
Merge branch 'refineSyncDML' into refactorDML
GMHDBJD Oct 19, 2021
6e6e872
add unit test
GMHDBJD Oct 19, 2021
26ee3ca
Merge remote-tracking branch 'upstream/master' into refactorDML
GMHDBJD Oct 19, 2021
b331e99
update
GMHDBJD Oct 19, 2021
ec6789d
fix typo
GMHDBJD Oct 19, 2021
ee93b7a
update comment
GMHDBJD Oct 19, 2021
5251c94
update comment
GMHDBJD Oct 19, 2021
401e560
address comment
GMHDBJD Oct 20, 2021
ebb36fb
address comment
GMHDBJD Oct 21, 2021
d99bb92
add no primary/unique key test
GMHDBJD Oct 21, 2021
9993694
Merge branch 'master' into refactorDML
GMHDBJD Oct 21, 2021
4fc1ee6
address comment
GMHDBJD Oct 25, 2021
2ebdb46
Merge branch 'master' into refactorDML
GMHDBJD Oct 25, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions syncer/causality.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,15 @@ func (c *causality) run() {
if j.tp == flush {
c.reset()
} else {
keys := j.dml.identifyKeys()
// detectConflict before add
if c.detectConflict(j.keys) {
c.logger.Debug("meet causality key, will generate a conflict job to flush all sqls", zap.Strings("keys", j.keys))
if c.detectConflict(keys) {
c.logger.Debug("meet causality key, will generate a conflict job to flush all sqls", zap.Strings("keys", keys))
c.outCh <- newConflictJob()
c.reset()
}
j.key = c.add(j.keys)
c.logger.Debug("key for keys", zap.String("key", j.key), zap.Strings("keys", j.keys))
j.dml.key = c.add(keys)
c.logger.Debug("key for keys", zap.String("key", j.dml.key), zap.Strings("keys", keys))
}
metrics.ConflictDetectDurationHistogram.WithLabelValues(c.task, c.source).Observe(time.Since(startTime).Seconds())

Expand Down
24 changes: 11 additions & 13 deletions syncer/causality_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,28 +71,30 @@ func (s *testSyncerSuite) TestCasuality(c *C) {
}
causalityCh := causalityWrap(jobCh, syncer)
testCases := []struct {
op opType
vals [][]interface{}
op opType
oldVals []interface{}
vals []interface{}
}{
{
op: insert,
vals: [][]interface{}{{1, 2}},
vals: []interface{}{1, 2},
},
{
op: insert,
vals: [][]interface{}{{2, 3}},
vals: []interface{}{2, 3},
},
{
op: update,
vals: [][]interface{}{{2, 3}, {3, 4}},
op: update,
oldVals: []interface{}{2, 3},
vals: []interface{}{3, 4},
},
{
op: del,
vals: [][]interface{}{{1, 2}},
vals: []interface{}{1, 2},
},
{
op: insert,
vals: [][]interface{}{{1, 3}},
vals: []interface{}{1, 3},
},
}
results := []opType{insert, insert, update, del, conflict, insert}
Expand All @@ -101,11 +103,7 @@ func (s *testSyncerSuite) TestCasuality(c *C) {
ec := &eventContext{startLocation: &location, currentLocation: &location, lastLocation: &location}

for _, tc := range testCases {
var keys []string
for _, val := range tc.vals {
keys = append(keys, genMultipleKeys(ti, val, "tb")...)
}
job := newDMLJob(tc.op, table, table, "", nil, keys, ec)
job := newDMLJob(tc.op, table, table, newDML(tc.op, false, "", table, tc.oldVals, tc.vals, tc.oldVals, tc.vals, ti.Columns, ti), ec)
jobCh <- job
}

Expand Down
Loading