Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
syncer: refactor sync dml (#2061) (#2237)
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Oct 19, 2021
1 parent 3c3eb96 commit 4d21188
Show file tree
Hide file tree
Showing 18 changed files with 566 additions and 434 deletions.
2 changes: 1 addition & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ issues:
text: "SA1019:"

# Fix found issues (if it's supported by the linter)
fix: true
fix: false

run:
# timeout for analysis, e.g. 30s, 5m, default is 1m
Expand Down
2 changes: 0 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,6 @@ fmt: tools_setup
tools/bin/shfmt -l -w -d "tests/" ; \
echo "gofumports"; \
tools/bin/gofumports -w -d -local $(PACKAGE_NAME) $(PACKAGE_DIRECTORIES) 2>&1 | awk "{print} END{if(NR>0) {exit 1}}" ;\
echo "golangci-lint"; \
tools/bin/golangci-lint run --config=$(CURDIR)/.golangci.yml --issues-exit-code=1 $(PACKAGE_DIRECTORIES) ;\
fi

lint: tools_setup
Expand Down
7 changes: 6 additions & 1 deletion dm/config/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,9 @@ type SyncerConfig struct {
// refine following configs to top level configs?
AutoFixGTID bool `yaml:"auto-fix-gtid" toml:"auto-fix-gtid" json:"auto-fix-gtid"`
EnableGTID bool `yaml:"enable-gtid" toml:"enable-gtid" json:"enable-gtid"`
SafeMode bool `yaml:"safe-mode" toml:"safe-mode" json:"safe-mode"`
// deprecated
DisableCausality bool `yaml:"disable-detect" toml:"disable-detect" json:"disable-detect"`
SafeMode bool `yaml:"safe-mode" toml:"safe-mode" json:"safe-mode"`
// deprecated, use `ansi-quotes` in top level config instead
EnableANSIQuotes bool `yaml:"enable-ansi-quotes" toml:"enable-ansi-quotes" json:"enable-ansi-quotes"`
}
Expand Down Expand Up @@ -622,6 +624,9 @@ func (c *TaskConfig) adjust() error {
if inst.Syncer.EnableANSIQuotes {
log.L().Warn("DM could discover proper ANSI_QUOTES, `enable-ansi-quotes` is no longer take effect")
}
if inst.Syncer.DisableCausality {
log.L().Warn("`disable-causality` is no longer take effect")
}

for _, name := range inst.ExpressionFilters {
if _, ok := c.ExprFilter[name]; !ok {
Expand Down
2 changes: 1 addition & 1 deletion dm/worker/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"encoding/json"
"sort"

"github.com/golang/protobuf/jsonpb"
"github.com/gogo/protobuf/jsonpb"
"go.uber.org/zap"

"github.com/pingcap/dm/dm/common"
Expand Down
77 changes: 64 additions & 13 deletions syncer/causality.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,31 +14,84 @@
package syncer

import (
"github.com/pingcap/dm/pkg/terror"
"time"

"go.uber.org/zap"

"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/syncer/metrics"
)

// causality provides a simple mechanism to improve the concurrency of SQLs execution under the premise of ensuring correctness.
// causality groups sqls that maybe contain causal relationships, and syncer executes them linearly.
// if some conflicts exist in more than one groups, then syncer waits all SQLs that are grouped be executed and reset causality.
// if some conflicts exist in more than one groups, causality generate a conflict job and reset.
// this mechanism meets quiescent consistency to ensure correctness.
type causality struct {
relations map[string]string
outCh chan *job
inCh chan *job
logger log.Logger

// for metrics
task string
source string
}

func newCausality() *causality {
return &causality{
// causalityWrap creates and runs a causality instance.
func causalityWrap(inCh chan *job, syncer *Syncer) chan *job {
causality := &causality{
relations: make(map[string]string),
task: syncer.cfg.Name,
source: syncer.cfg.SourceID,
logger: syncer.tctx.Logger.WithFields(zap.String("component", "causality")),
inCh: inCh,
outCh: make(chan *job, syncer.cfg.QueueSize),
}

go func() {
causality.run()
causality.close()
}()

return causality.outCh
}

func (c *causality) add(keys []string) error {
if len(keys) == 0 {
return nil
// run receives dml jobs and send causality jobs by adding causality key.
// When meet conflict, sends a conflict job.
func (c *causality) run() {
for j := range c.inCh {
metrics.QueueSizeGauge.WithLabelValues(c.task, "causality_input", c.source).Set(float64(len(c.inCh)))

startTime := time.Now()
if j.tp == flush {
c.reset()
} else {
// detectConflict before add
if c.detectConflict(j.keys) {
c.logger.Debug("meet causality key, will generate a conflict job to flush all sqls", zap.Strings("keys", j.keys))
c.outCh <- newConflictJob()
c.reset()
}
j.key = c.add(j.keys)
c.logger.Debug("key for keys", zap.String("key", j.key), zap.Strings("keys", j.keys))
}
metrics.ConflictDetectDurationHistogram.WithLabelValues(c.task, c.source).Observe(time.Since(startTime).Seconds())

c.outCh <- j
}
}

// close closes outer channel.
func (c *causality) close() {
close(c.outCh)
}

if c.detectConflict(keys) {
return terror.ErrSyncUnitCausalityConflict.Generate()
// add adds keys relation and return the relation. The keys must `detectConflict` first to ensure correctness.
func (c *causality) add(keys []string) string {
if len(keys) == 0 {
return ""
}

// find causal key
selectedRelation := keys[0]
var nonExistKeys []string
Expand All @@ -53,13 +106,11 @@ func (c *causality) add(keys []string) error {
for _, key := range nonExistKeys {
c.relations[key] = selectedRelation
}
return nil
}

func (c *causality) get(key string) string {
return c.relations[key]
return selectedRelation
}

// reset resets relations.
func (c *causality) reset() {
c.relations = make(map[string]string)
}
Expand Down
92 changes: 87 additions & 5 deletions syncer/causality_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,107 @@
package syncer

import (
"time"

. "github.com/pingcap/check"
"github.com/pingcap/parser"
"github.com/pingcap/tidb-tools/pkg/filter"
"github.com/pingcap/tidb/util/mock"

"github.com/pingcap/dm/dm/config"
"github.com/pingcap/dm/pkg/binlog"
tcontext "github.com/pingcap/dm/pkg/context"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/utils"
)

func (s *testSyncerSuite) TestCausality(c *C) {
ca := newCausality()
func (s *testSyncerSuite) TestDetectConflict(c *C) {
ca := &causality{
relations: make(map[string]string),
}
caseData := []string{"test_1", "test_2", "test_3"}
excepted := map[string]string{
"test_1": "test_1",
"test_2": "test_1",
"test_3": "test_1",
}
c.Assert(ca.add(caseData), IsNil)
c.Assert(ca.detectConflict(caseData), IsFalse)
ca.add(caseData)
c.Assert(ca.relations, DeepEquals, excepted)
c.Assert(ca.add([]string{"test_4"}), IsNil)
c.Assert(ca.detectConflict([]string{"test_4"}), IsFalse)
ca.add([]string{"test_4"})
excepted["test_4"] = "test_4"
c.Assert(ca.relations, DeepEquals, excepted)
conflictData := []string{"test_4", "test_3"}
c.Assert(ca.detectConflict(conflictData), IsTrue)
c.Assert(ca.add(conflictData), NotNil)
ca.reset()
c.Assert(ca.relations, HasLen, 0)
}

func (s *testSyncerSuite) TestCasuality(c *C) {
p := parser.New()
se := mock.NewContext()
schema := "create table tb(a int primary key, b int unique);"
ti, err := createTableInfo(p, se, int64(0), schema)
c.Assert(err, IsNil)

jobCh := make(chan *job, 10)
syncer := &Syncer{
cfg: &config.SubTaskConfig{
SyncerConfig: config.SyncerConfig{
QueueSize: 1024,
},
Name: "task",
SourceID: "source",
},
tctx: tcontext.Background().WithLogger(log.L()),
}
causalityCh := causalityWrap(jobCh, syncer)
testCases := []struct {
op opType
vals [][]interface{}
}{
{
op: insert,
vals: [][]interface{}{{1, 2}},
},
{
op: insert,
vals: [][]interface{}{{2, 3}},
},
{
op: update,
vals: [][]interface{}{{2, 3}, {3, 4}},
},
{
op: del,
vals: [][]interface{}{{1, 2}},
},
{
op: insert,
vals: [][]interface{}{{1, 3}},
},
}
results := []opType{insert, insert, update, del, conflict, insert}
table := &filter.Table{Schema: "test", Name: "t1"}
location := binlog.NewLocation("")
ec := &eventContext{startLocation: &location, currentLocation: &location, lastLocation: &location}

for _, tc := range testCases {
var keys []string
for _, val := range tc.vals {
keys = append(keys, genMultipleKeys(ti, val, "tb")...)
}
job := newDMLJob(tc.op, table, table, "", nil, keys, ec)
jobCh <- job
}

c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool {
return len(causalityCh) == len(results)
}), IsTrue)

for _, op := range results {
job := <-causalityCh
c.Assert(job.tp, Equals, op)
}
}
Loading

0 comments on commit 4d21188

Please sign in to comment.