diff --git a/.golangci.yml b/.golangci.yml index 6be8f699da..7385bd6320 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -120,7 +120,7 @@ issues: text: "SA1019:" # Fix found issues (if it's supported by the linter) - fix: true + fix: false run: # timeout for analysis, e.g. 30s, 5m, default is 1m diff --git a/Makefile b/Makefile index 8304807646..6770b11a00 100644 --- a/Makefile +++ b/Makefile @@ -146,8 +146,6 @@ fmt: tools_setup tools/bin/shfmt -l -w -d "tests/" ; \ echo "gofumports"; \ tools/bin/gofumports -w -d -local $(PACKAGE_NAME) $(PACKAGE_DIRECTORIES) 2>&1 | awk "{print} END{if(NR>0) {exit 1}}" ;\ - echo "golangci-lint"; \ - tools/bin/golangci-lint run --config=$(CURDIR)/.golangci.yml --issues-exit-code=1 $(PACKAGE_DIRECTORIES) ;\ fi lint: tools_setup diff --git a/dm/config/task.go b/dm/config/task.go index 812ab5f4da..936d669dff 100644 --- a/dm/config/task.go +++ b/dm/config/task.go @@ -245,7 +245,9 @@ type SyncerConfig struct { // refine following configs to top level configs? AutoFixGTID bool `yaml:"auto-fix-gtid" toml:"auto-fix-gtid" json:"auto-fix-gtid"` EnableGTID bool `yaml:"enable-gtid" toml:"enable-gtid" json:"enable-gtid"` - SafeMode bool `yaml:"safe-mode" toml:"safe-mode" json:"safe-mode"` + // deprecated + DisableCausality bool `yaml:"disable-detect" toml:"disable-detect" json:"disable-detect"` + SafeMode bool `yaml:"safe-mode" toml:"safe-mode" json:"safe-mode"` // deprecated, use `ansi-quotes` in top level config instead EnableANSIQuotes bool `yaml:"enable-ansi-quotes" toml:"enable-ansi-quotes" json:"enable-ansi-quotes"` } @@ -622,6 +624,9 @@ func (c *TaskConfig) adjust() error { if inst.Syncer.EnableANSIQuotes { log.L().Warn("DM could discover proper ANSI_QUOTES, `enable-ansi-quotes` is no longer take effect") } + if inst.Syncer.DisableCausality { + log.L().Warn("`disable-causality` is no longer take effect") + } for _, name := range inst.ExpressionFilters { if _, ok := c.ExprFilter[name]; !ok { diff --git a/dm/worker/status.go b/dm/worker/status.go index beb6e2b138..102abab9e3 100644 --- a/dm/worker/status.go +++ b/dm/worker/status.go @@ -17,7 +17,7 @@ import ( "encoding/json" "sort" - "github.com/golang/protobuf/jsonpb" + "github.com/gogo/protobuf/jsonpb" "go.uber.org/zap" "github.com/pingcap/dm/dm/common" diff --git a/syncer/causality.go b/syncer/causality.go index e0c5e750b5..b3594b9bca 100644 --- a/syncer/causality.go +++ b/syncer/causality.go @@ -14,31 +14,84 @@ package syncer import ( - "github.com/pingcap/dm/pkg/terror" + "time" + + "go.uber.org/zap" + + "github.com/pingcap/dm/pkg/log" + "github.com/pingcap/dm/syncer/metrics" ) // causality provides a simple mechanism to improve the concurrency of SQLs execution under the premise of ensuring correctness. // causality groups sqls that maybe contain causal relationships, and syncer executes them linearly. -// if some conflicts exist in more than one groups, then syncer waits all SQLs that are grouped be executed and reset causality. +// if some conflicts exist in more than one groups, causality generate a conflict job and reset. // this mechanism meets quiescent consistency to ensure correctness. type causality struct { relations map[string]string + outCh chan *job + inCh chan *job + logger log.Logger + + // for metrics + task string + source string } -func newCausality() *causality { - return &causality{ +// causalityWrap creates and runs a causality instance. +func causalityWrap(inCh chan *job, syncer *Syncer) chan *job { + causality := &causality{ relations: make(map[string]string), + task: syncer.cfg.Name, + source: syncer.cfg.SourceID, + logger: syncer.tctx.Logger.WithFields(zap.String("component", "causality")), + inCh: inCh, + outCh: make(chan *job, syncer.cfg.QueueSize), } + + go func() { + causality.run() + causality.close() + }() + + return causality.outCh } -func (c *causality) add(keys []string) error { - if len(keys) == 0 { - return nil +// run receives dml jobs and send causality jobs by adding causality key. +// When meet conflict, sends a conflict job. +func (c *causality) run() { + for j := range c.inCh { + metrics.QueueSizeGauge.WithLabelValues(c.task, "causality_input", c.source).Set(float64(len(c.inCh))) + + startTime := time.Now() + if j.tp == flush { + c.reset() + } else { + // detectConflict before add + if c.detectConflict(j.keys) { + c.logger.Debug("meet causality key, will generate a conflict job to flush all sqls", zap.Strings("keys", j.keys)) + c.outCh <- newConflictJob() + c.reset() + } + j.key = c.add(j.keys) + c.logger.Debug("key for keys", zap.String("key", j.key), zap.Strings("keys", j.keys)) + } + metrics.ConflictDetectDurationHistogram.WithLabelValues(c.task, c.source).Observe(time.Since(startTime).Seconds()) + + c.outCh <- j } +} + +// close closes outer channel. +func (c *causality) close() { + close(c.outCh) +} - if c.detectConflict(keys) { - return terror.ErrSyncUnitCausalityConflict.Generate() +// add adds keys relation and return the relation. The keys must `detectConflict` first to ensure correctness. +func (c *causality) add(keys []string) string { + if len(keys) == 0 { + return "" } + // find causal key selectedRelation := keys[0] var nonExistKeys []string @@ -53,13 +106,11 @@ func (c *causality) add(keys []string) error { for _, key := range nonExistKeys { c.relations[key] = selectedRelation } - return nil -} -func (c *causality) get(key string) string { - return c.relations[key] + return selectedRelation } +// reset resets relations. func (c *causality) reset() { c.relations = make(map[string]string) } diff --git a/syncer/causality_test.go b/syncer/causality_test.go index 2b4c41f7e0..dc5f223e39 100644 --- a/syncer/causality_test.go +++ b/syncer/causality_test.go @@ -14,25 +14,107 @@ package syncer import ( + "time" + . "github.com/pingcap/check" + "github.com/pingcap/parser" + "github.com/pingcap/tidb-tools/pkg/filter" + "github.com/pingcap/tidb/util/mock" + + "github.com/pingcap/dm/dm/config" + "github.com/pingcap/dm/pkg/binlog" + tcontext "github.com/pingcap/dm/pkg/context" + "github.com/pingcap/dm/pkg/log" + "github.com/pingcap/dm/pkg/utils" ) -func (s *testSyncerSuite) TestCausality(c *C) { - ca := newCausality() +func (s *testSyncerSuite) TestDetectConflict(c *C) { + ca := &causality{ + relations: make(map[string]string), + } caseData := []string{"test_1", "test_2", "test_3"} excepted := map[string]string{ "test_1": "test_1", "test_2": "test_1", "test_3": "test_1", } - c.Assert(ca.add(caseData), IsNil) + c.Assert(ca.detectConflict(caseData), IsFalse) + ca.add(caseData) c.Assert(ca.relations, DeepEquals, excepted) - c.Assert(ca.add([]string{"test_4"}), IsNil) + c.Assert(ca.detectConflict([]string{"test_4"}), IsFalse) + ca.add([]string{"test_4"}) excepted["test_4"] = "test_4" c.Assert(ca.relations, DeepEquals, excepted) conflictData := []string{"test_4", "test_3"} c.Assert(ca.detectConflict(conflictData), IsTrue) - c.Assert(ca.add(conflictData), NotNil) ca.reset() c.Assert(ca.relations, HasLen, 0) } + +func (s *testSyncerSuite) TestCasuality(c *C) { + p := parser.New() + se := mock.NewContext() + schema := "create table tb(a int primary key, b int unique);" + ti, err := createTableInfo(p, se, int64(0), schema) + c.Assert(err, IsNil) + + jobCh := make(chan *job, 10) + syncer := &Syncer{ + cfg: &config.SubTaskConfig{ + SyncerConfig: config.SyncerConfig{ + QueueSize: 1024, + }, + Name: "task", + SourceID: "source", + }, + tctx: tcontext.Background().WithLogger(log.L()), + } + causalityCh := causalityWrap(jobCh, syncer) + testCases := []struct { + op opType + vals [][]interface{} + }{ + { + op: insert, + vals: [][]interface{}{{1, 2}}, + }, + { + op: insert, + vals: [][]interface{}{{2, 3}}, + }, + { + op: update, + vals: [][]interface{}{{2, 3}, {3, 4}}, + }, + { + op: del, + vals: [][]interface{}{{1, 2}}, + }, + { + op: insert, + vals: [][]interface{}{{1, 3}}, + }, + } + results := []opType{insert, insert, update, del, conflict, insert} + table := &filter.Table{Schema: "test", Name: "t1"} + location := binlog.NewLocation("") + ec := &eventContext{startLocation: &location, currentLocation: &location, lastLocation: &location} + + for _, tc := range testCases { + var keys []string + for _, val := range tc.vals { + keys = append(keys, genMultipleKeys(ti, val, "tb")...) + } + job := newDMLJob(tc.op, table, table, "", nil, keys, ec) + jobCh <- job + } + + c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool { + return len(causalityCh) == len(results) + }), IsTrue) + + for _, op := range results { + job := <-causalityCh + c.Assert(job.tp, Equals, op) + } +} diff --git a/syncer/dml_worker.go b/syncer/dml_worker.go new file mode 100644 index 0000000000..08a8882775 --- /dev/null +++ b/syncer/dml_worker.go @@ -0,0 +1,237 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package syncer + +import ( + "sync" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" + "github.com/pingcap/tidb-tools/pkg/filter" + "go.uber.org/zap" + + tcontext "github.com/pingcap/dm/pkg/context" + "github.com/pingcap/dm/pkg/log" + "github.com/pingcap/dm/pkg/terror" + "github.com/pingcap/dm/pkg/utils" + "github.com/pingcap/dm/syncer/dbconn" + "github.com/pingcap/dm/syncer/metrics" +) + +// DMLWorker is used to sync dml. +type DMLWorker struct { + batch int + workerCount int + chanSize int + toDBConns []*dbconn.DBConn + tctx *tcontext.Context + wg sync.WaitGroup // counts conflict/flush jobs in all DML job channels. + logger log.Logger + + // for metrics + task string + source string + worker string + + // callback func + // TODO: refine callback func + successFunc func(int, []*job) + fatalFunc func(*job, error) + lagFunc func(*job, int) + addCountFunc func(bool, string, opType, int64, *filter.Table) + + // channel + inCh chan *job + flushCh chan *job +} + +// dmlWorkerWrap creates and runs a dmlWorker instance and returns flush job channel. +func dmlWorkerWrap(inCh chan *job, syncer *Syncer) chan *job { + dmlWorker := &DMLWorker{ + batch: syncer.cfg.Batch, + workerCount: syncer.cfg.WorkerCount, + chanSize: syncer.cfg.QueueSize, + task: syncer.cfg.Name, + source: syncer.cfg.SourceID, + worker: syncer.cfg.WorkerName, + logger: syncer.tctx.Logger.WithFields(zap.String("component", "dml_worker")), + successFunc: syncer.successFunc, + fatalFunc: syncer.fatalFunc, + lagFunc: syncer.updateReplicationJobTS, + addCountFunc: syncer.addCount, + tctx: syncer.tctx, + toDBConns: syncer.toDBConns, + inCh: inCh, + flushCh: make(chan *job), + } + + go func() { + dmlWorker.run() + dmlWorker.close() + }() + return dmlWorker.flushCh +} + +// close closes outer channel. +func (w *DMLWorker) close() { + close(w.flushCh) +} + +// run distribute jobs by queueBucket. +func (w *DMLWorker) run() { + jobChs := make([]chan *job, w.workerCount) + + for i := 0; i < w.workerCount; i++ { + jobChs[i] = make(chan *job, w.chanSize) + go w.executeJobs(i, jobChs[i]) + } + + defer func() { + for i := 0; i < w.workerCount; i++ { + close(jobChs[i]) + } + }() + + queueBucketMapping := make([]string, w.workerCount) + for i := 0; i < w.workerCount; i++ { + queueBucketMapping[i] = queueBucketName(i) + } + + for j := range w.inCh { + metrics.QueueSizeGauge.WithLabelValues(w.task, "dml_worker_input", w.source).Set(float64(len(w.inCh))) + if j.tp == flush || j.tp == conflict { + if j.tp == conflict { + w.addCountFunc(false, adminQueueName, j.tp, 1, j.targetTable) + } + w.wg.Add(w.workerCount) + // flush for every DML queue + for i, jobCh := range jobChs { + startTime := time.Now() + jobCh <- j + metrics.AddJobDurationHistogram.WithLabelValues(j.tp.String(), w.task, queueBucketMapping[i], w.source).Observe(time.Since(startTime).Seconds()) + } + w.wg.Wait() + if j.tp == conflict { + w.addCountFunc(true, adminQueueName, j.tp, 1, j.targetTable) + } else { + w.flushCh <- j + } + } else { + queueBucket := int(utils.GenHashKey(j.key)) % w.workerCount + w.addCountFunc(false, queueBucketMapping[queueBucket], j.tp, 1, j.targetTable) + startTime := time.Now() + w.logger.Debug("queue for key", zap.Int("queue", queueBucket), zap.String("key", j.key)) + jobChs[queueBucket] <- j + metrics.AddJobDurationHistogram.WithLabelValues(j.tp.String(), w.task, queueBucketMapping[queueBucket], w.source).Observe(time.Since(startTime).Seconds()) + } + } +} + +// executeJobs execute jobs in same queueBucket +// All the jobs received should be executed consecutively. +func (w *DMLWorker) executeJobs(queueID int, jobCh chan *job) { + jobs := make([]*job, 0, w.batch) + workerJobIdx := dmlWorkerJobIdx(queueID) + queueBucket := queueBucketName(queueID) + for j := range jobCh { + metrics.QueueSizeGauge.WithLabelValues(w.task, queueBucket, w.source).Set(float64(len(jobCh))) + + if j.tp != flush && j.tp != conflict { + if len(jobs) == 0 { + // set job TS when received first job of this batch. + w.lagFunc(j, workerJobIdx) + } + jobs = append(jobs, j) + if len(jobs) < w.batch && len(jobCh) > 0 { + continue + } + } + + failpoint.Inject("syncDMLBatchNotFull", func() { + if len(jobCh) == 0 && len(jobs) < w.batch { + w.logger.Info("execute not full job queue") + } + }) + + w.executeBatchJobs(queueID, jobs) + if j.tp == conflict || j.tp == flush { + w.wg.Done() + } + + jobs = jobs[0:0] + if len(jobCh) == 0 { + failpoint.Inject("noJobInQueueLog", func() { + w.logger.Debug("no job in queue, update lag to zero", zap.Int( + "workerJobIdx", workerJobIdx), zap.Int64("current ts", time.Now().Unix())) + }) + w.lagFunc(nil, workerJobIdx) + } + } +} + +// executeBatchJobs execute jobs with batch size. +func (w *DMLWorker) executeBatchJobs(queueID int, jobs []*job) { + var ( + affect int + db = w.toDBConns[queueID] + err error + ) + + defer func() { + if err == nil { + w.successFunc(queueID, jobs) + } else { + w.fatalFunc(jobs[affect], err) + } + }() + + if len(jobs) == 0 { + return + } + failpoint.Inject("BlockExecuteSQLs", func(v failpoint.Value) { + t := v.(int) // sleep time + w.logger.Info("BlockExecuteSQLs", zap.Any("job", jobs[0]), zap.Int("sleep time", t)) + time.Sleep(time.Second * time.Duration(t)) + }) + + failpoint.Inject("failSecondJob", func() { + if failExecuteSQL && failOnce.CAS(false, true) { + w.logger.Info("trigger failSecondJob") + err = terror.ErrDBExecuteFailed.Delegate(errors.New("failSecondJob"), "mock") + failpoint.Return() + } + }) + + queries := make([]string, 0, len(jobs)) + args := make([][]interface{}, 0, len(jobs)) + for _, j := range jobs { + queries = append(queries, j.sql) + args = append(args, j.args) + } + failpoint.Inject("WaitUserCancel", func(v failpoint.Value) { + t := v.(int) + time.Sleep(time.Duration(t) * time.Second) + }) + // use background context to execute sqls as much as possible + ctx, cancel := w.tctx.WithTimeout(maxDMLExecutionDuration) + defer cancel() + affect, err = db.ExecuteSQL(ctx, queries, args...) + failpoint.Inject("SafeModeExit", func(val failpoint.Value) { + if intVal, ok := val.(int); ok && intVal == 4 && len(jobs) > 0 { + w.logger.Warn("fail to exec DML", zap.String("failpoint", "SafeModeExit")) + affect, err = 0, terror.ErrDBExecuteFailed.Delegate(errors.New("SafeModeExit"), "mock") + } + }) +} diff --git a/syncer/job.go b/syncer/job.go index 0c542e048c..e85ae2bd55 100644 --- a/syncer/job.go +++ b/syncer/job.go @@ -35,6 +35,7 @@ const ( flush skip // used by Syncer.recordSkipSQLsLocation to record global location, but not execute SQL rotate + conflict ) func (t opType) String() string { @@ -55,6 +56,8 @@ func (t opType) String() string { return "skip" case rotate: return "rotate" + case conflict: + return "conflict" } return "" @@ -70,6 +73,7 @@ type job struct { sql string args []interface{} key string + keys []string retry bool location binlog.Location // location of last received (ROTATE / QUERY / XID) event, for global/table checkpoint startLocation binlog.Location // start location of the sql in binlog, for handle_error @@ -86,21 +90,21 @@ func (j *job) String() string { return fmt.Sprintf("tp: %s, sql: %s, args: %v, key: %s, ddls: %s, last_location: %s, start_location: %s, current_location: %s", j.tp, j.sql, j.args, j.key, j.ddls, j.location, j.startLocation, j.currentLocation) } -func newDMLJob(tp opType, sql string, sourceTable, targetTable *filter.Table, args []interface{}, - key string, location, startLocation, cmdLocation binlog.Location, eventHeader *replication.EventHeader) *job { +func newDMLJob(tp opType, sourceTable, targetTable *filter.Table, sql string, args []interface{}, + keys []string, ec *eventContext) *job { return &job{ tp: tp, sourceTbls: map[string][]*filter.Table{sourceTable.Schema: {sourceTable}}, targetTable: targetTable, sql: sql, args: args, - key: key, + keys: keys, retry: true, - location: location, - startLocation: startLocation, - currentLocation: cmdLocation, - eventHeader: eventHeader, + location: *ec.lastLocation, + startLocation: *ec.startLocation, + currentLocation: *ec.currentLocation, + eventHeader: ec.header, jobAddTime: time.Now(), } } @@ -168,6 +172,14 @@ func newFlushJob() *job { } } +func newConflictJob() *job { + return &job{ + tp: conflict, + targetTable: &filter.Table{}, + jobAddTime: time.Now(), + } +} + // put queues into bucket to monitor them. func queueBucketName(queueID int) string { return fmt.Sprintf("q_%d", queueID%defaultBucketCount) @@ -176,7 +188,3 @@ func queueBucketName(queueID int) string { func dmlWorkerJobIdx(queueID int) int { return queueID + workerJobTSArrayInitSize } - -func dmlWorkerJobIdxToQueueID(idx int) int { - return idx - workerJobTSArrayInitSize -} diff --git a/syncer/job_test.go b/syncer/job_test.go index fce6fee5b7..90adabf582 100644 --- a/syncer/job_test.go +++ b/syncer/job_test.go @@ -84,8 +84,8 @@ func (t *testJobSuite) TestJob(c *C) { jobStr string }{ { - newDMLJob(insert, "insert into test.t1 values(?)", table, table, []interface{}{1}, "1", location, location, location, ec.header), - "tp: insert, sql: insert into test.t1 values(?), args: [1], key: 1, ddls: [], last_location: position: (, 4), gtid-set: , start_location: position: (, 4), gtid-set: , current_location: position: (, 4), gtid-set: ", + newDMLJob(insert, table, table, "insert into test.t1 values(?)", []interface{}{1}, []string{"1"}, ec), + "tp: insert, sql: insert into test.t1 values(?), args: [1], key: , ddls: [], last_location: position: (, 4), gtid-set: , start_location: position: (, 4), gtid-set: , current_location: position: (, 4), gtid-set: ", }, { newDDLJob(qec), "tp: ddl, sql: , args: [], key: , ddls: [create database test], last_location: position: (, 4), gtid-set: , start_location: position: (, 4), gtid-set: , current_location: position: (, 4), gtid-set: ", diff --git a/syncer/syncer.go b/syncer/syncer.go index 4436a89a68..8478820a52 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -75,7 +75,6 @@ var ( maxRetryCount = 100 retryTimeout = 3 * time.Second - waitTime = 10 * time.Millisecond // MaxDDLConnectionTimeoutMinute also used by SubTask.ExecuteDDL. MaxDDLConnectionTimeoutMinute = 5 @@ -132,8 +131,8 @@ type Syncer struct { streamerController *StreamerController enableRelay bool - wg sync.WaitGroup - jobWg sync.WaitGroup + wg sync.WaitGroup // counts goroutines + jobWg sync.WaitGroup // counts ddl/flush job in-flight in s.dmlJobCh and s.ddlJobCh schemaTracker *schema.Tracker @@ -144,16 +143,14 @@ type Syncer struct { ddlDB *conn.BaseDB ddlDBConn *dbconn.DBConn - jobs []chan *job + dmlJobCh chan *job + ddlJobCh chan *job jobsClosed atomic.Bool jobsChanLock sync.Mutex - queueBucketMapping []string waitXIDJob atomic.Int64 isTransactionEnd bool waitTransactionLock sync.Mutex - c *causality - tableRouter *router.Table binlogFilter *bf.BinlogEvent columnMapping *cm.Mapping @@ -241,7 +238,6 @@ func NewSyncer(cfg *config.SubTaskConfig, etcdClient *clientv3.Client) *Syncer { syncer.binlogSizeCount.Store(0) syncer.lastCount.Store(0) syncer.count.Store(0) - syncer.c = newCausality() syncer.done = nil syncer.setTimezone() syncer.addJobFunc = syncer.addJob @@ -267,12 +263,10 @@ func NewSyncer(cfg *config.SubTaskConfig, etcdClient *clientv3.Client) *Syncer { return syncer } -func (s *Syncer) newJobChans(count int) { +func (s *Syncer) newJobChans() { s.closeJobChans() - s.jobs = make([]chan *job, 0, count) - for i := 0; i < count; i++ { - s.jobs = append(s.jobs, make(chan *job, s.cfg.QueueSize)) - } + s.dmlJobCh = make(chan *job, s.cfg.QueueSize) + s.ddlJobCh = make(chan *job, s.cfg.QueueSize) s.jobsClosed.Store(false) } @@ -282,9 +276,8 @@ func (s *Syncer) closeJobChans() { if s.jobsClosed.Load() { return } - for _, ch := range s.jobs { - close(ch) - } + close(s.dmlJobCh) + close(s.ddlJobCh) s.jobsClosed.Store(true) } @@ -529,7 +522,7 @@ func (s *Syncer) reset() { s.streamerController.Close(s.tctx) } // create new job chans - s.newJobChans(s.cfg.WorkerCount + 1) + s.newJobChans() s.execError.Store(nil) s.setErrLocation(nil, nil, false) @@ -794,7 +787,7 @@ func (s *Syncer) addCount(isFinished bool, queueBucket string, tp opType, n int6 m = metrics.FinishedJobsTotal } switch tp { - case insert, update, del, ddl, flush: + case insert, update, del, ddl, flush, conflict: m.WithLabelValues(tp.String(), s.cfg.Name, queueBucket, s.cfg.SourceID, s.cfg.WorkerName, targetTable.Schema, targetTable.Name).Add(float64(n)) case skip, xid: // ignore skip/xid jobs @@ -848,16 +841,8 @@ func (s *Syncer) updateReplicationLagMetric() { } } -func (s *Syncer) checkWait(job *job) bool { - if job.tp == ddl { - return true - } - - if s.checkpoint.CheckGlobalPoint() { - return true - } - - return false +func (s *Syncer) checkFlush() bool { + return s.checkpoint.CheckGlobalPoint() } func (s *Syncer) saveTablePoint(table *filter.Table, location binlog.Location) { @@ -927,11 +912,10 @@ func (s *Syncer) addJob(job *job) error { } }) - if waitXIDStatus(s.waitXIDJob.Load()) == waitComplete { + if waitXIDStatus(s.waitXIDJob.Load()) == waitComplete && job.tp != flush { s.tctx.L().Info("All jobs is completed before syncer close, the coming job will be reject", zap.Any("job", job)) return nil } - var queueBucket int switch job.tp { case xid: s.waitXIDJob.CAS(int64(waiting), int64(waitComplete)) @@ -942,53 +926,41 @@ func (s *Syncer) addJob(job *job) error { s.updateReplicationJobTS(job, skipJobIdx) case flush: s.addCount(false, adminQueueName, job.tp, 1, job.targetTable) - // ugly code addJob and sync, refine it later - s.jobWg.Add(s.cfg.WorkerCount) - for i := 0; i < s.cfg.WorkerCount; i++ { - startTime := time.Now() - s.jobs[i] <- job - // flush for every DML queue - metrics.AddJobDurationHistogram.WithLabelValues("flush", s.cfg.Name, s.queueBucketMapping[i], s.cfg.SourceID).Observe(time.Since(startTime).Seconds()) - } + s.jobWg.Add(1) + s.dmlJobCh <- job s.jobWg.Wait() s.addCount(true, adminQueueName, job.tp, 1, job.targetTable) return s.flushCheckPoints() case ddl: - s.jobWg.Wait() s.addCount(false, adminQueueName, job.tp, 1, job.targetTable) s.updateReplicationJobTS(job, ddlJobIdx) s.jobWg.Add(1) - queueBucket = s.cfg.WorkerCount startTime := time.Now() - s.jobs[queueBucket] <- job + s.ddlJobCh <- job metrics.AddJobDurationHistogram.WithLabelValues("ddl", s.cfg.Name, adminQueueName, s.cfg.SourceID).Observe(time.Since(startTime).Seconds()) + s.jobWg.Wait() case insert, update, del: - s.jobWg.Add(1) - queueBucket = int(utils.GenHashKey(job.key)) % s.cfg.WorkerCount - s.addCount(false, s.queueBucketMapping[queueBucket], job.tp, 1, job.targetTable) - startTime := time.Now() - s.tctx.L().Debug("queue for key", zap.Int("queue", queueBucket), zap.String("key", job.key)) - s.jobs[queueBucket] <- job + s.dmlJobCh <- job s.isTransactionEnd = false failpoint.Inject("checkCheckpointInMiddleOfTransaction", func() { s.tctx.L().Info("receive dml job", zap.Any("dml job", job)) time.Sleep(100 * time.Millisecond) }) - metrics.AddJobDurationHistogram.WithLabelValues(job.tp.String(), s.cfg.Name, s.queueBucketMapping[queueBucket], s.cfg.SourceID).Observe(time.Since(startTime).Seconds()) } // nolint:ifshort - wait := s.checkWait(job) + needFlush := s.checkFlush() failpoint.Inject("flushFirstJob", func() { if waitJobsDone { s.tctx.L().Info("trigger flushFirstJob") waitJobsDone = false - wait = true + needFlush = true } }) - if wait { + if needFlush { + s.jobWg.Add(1) + s.dmlJobCh <- newFlushJob() s.jobWg.Wait() - s.c.reset() } if s.execError.Load() != nil { @@ -1033,7 +1005,7 @@ func (s *Syncer) addJob(job *job) error { } } - if wait { + if needFlush || job.tp == ddl { // interrupted after save checkpoint and before flush checkpoint. failpoint.Inject("FlushCheckpointStage", func(val failpoint.Value) { err := handleFlushCheckpointStage(4, val.(int), "before flush checkpoint") @@ -1256,191 +1228,48 @@ func (s *Syncer) syncDDL(tctx *tcontext.Context, queueBucket string, db *dbconn. } } -// DML synced in batch by one worker. -func (s *Syncer) syncDML( - tctx *tcontext.Context, queueBucket string, db *dbconn.DBConn, jobChan chan *job, workerJobIdx int) { - defer s.wg.Done() - - idx := 0 - count := s.cfg.Batch - jobs := make([]*job, 0, count) - // db_schema->db_table->opType - tpCnt := make(map[string]map[string]map[opType]int64) - queueID := fmt.Sprint(dmlWorkerJobIdxToQueueID(workerJobIdx)) - - // clearF is used to reset job queue. - clearF := func() { - for i := 0; i < idx; i++ { - s.jobWg.Done() +func (s *Syncer) successFunc(queueID int, jobs []*job) { + queueBucket := queueBucketName(queueID) + if len(jobs) > 0 { + // NOTE: we can use the first job of job queue to calculate lag because when this job committed, + // every event before this job's event in this queue has already commit. + // and we can use this job to maintain the oldest binlog event ts among all workers. + j := jobs[0] + switch j.tp { + case ddl: + metrics.BinlogEventCost.WithLabelValues(metrics.BinlogEventCostStageDDLExec, s.cfg.Name, s.cfg.WorkerName, s.cfg.SourceID).Observe(time.Since(j.jobAddTime).Seconds()) + case insert, update, del: + metrics.BinlogEventCost.WithLabelValues(metrics.BinlogEventCostStageDMLExec, s.cfg.Name, s.cfg.WorkerName, s.cfg.SourceID).Observe(time.Since(j.jobAddTime).Seconds()) + // metric only increases by 1 because dm batches sql jobs in a single transaction. + metrics.FinishedTransactionTotal.WithLabelValues(s.cfg.Name, s.cfg.WorkerName, s.cfg.SourceID).Inc() } - idx = 0 - jobs = jobs[0:0] - // clear tpCnt map - tpCnt = make(map[string]map[string]map[opType]int64) - } - - // successF is used to calculate lag metric and q/tps. - successF := func() { - if len(jobs) > 0 { - // NOTE: we can use the first job of job queue to calculate lag because when this job committed, - // every event before this job's event in this queue has already commit. - // and we can use this job to maintain the oldest binlog event ts among all workers. - j := jobs[0] - switch j.tp { - case ddl: - metrics.BinlogEventCost.WithLabelValues(metrics.BinlogEventCostStageDDLExec, s.cfg.Name, s.cfg.WorkerName, s.cfg.SourceID).Observe(time.Since(j.jobAddTime).Seconds()) - case insert, update, del: - metrics.BinlogEventCost.WithLabelValues(metrics.BinlogEventCostStageDMLExec, s.cfg.Name, s.cfg.WorkerName, s.cfg.SourceID).Observe(time.Since(j.jobAddTime).Seconds()) - // metric only increases by 1 because dm batches sql jobs in a single transaction. - metrics.FinishedTransactionTotal.WithLabelValues(s.cfg.Name, s.cfg.WorkerName, s.cfg.SourceID).Inc() - } - } - // calculate qps - for dbSchema, tableM := range tpCnt { - for dbTable, tpM := range tableM { - for tpName, cnt := range tpM { - s.addCount(true, queueBucket, tpName, cnt, &filter.Table{Schema: dbSchema, Name: dbTable}) - } - } - } - // reset job TS when this batch is finished. - s.updateReplicationJobTS(nil, workerJobIdx) - metrics.ReplicationTransactionBatch.WithLabelValues(s.cfg.WorkerName, s.cfg.Name, s.cfg.SourceID, queueBucket).Observe(float64(len(jobs))) } - fatalF := func(affected int, err error) { - s.execError.Store(err) - if !utils.IsContextCanceledError(err) { - err = s.handleEventError(err, jobs[affected].startLocation, jobs[affected].currentLocation, false, "") - s.runFatalChan <- unit.NewProcessError(err) - } - clearF() + for _, sqlJob := range jobs { + s.addCount(true, queueBucket, sqlJob.tp, 1, sqlJob.targetTable) } + s.updateReplicationJobTS(nil, dmlWorkerJobIdx(queueID)) + metrics.ReplicationTransactionBatch.WithLabelValues(s.cfg.WorkerName, s.cfg.Name, s.cfg.SourceID, queueBucket).Observe(float64(len(jobs))) +} - executeSQLs := func() (int, error) { - if len(jobs) == 0 { - return 0, nil - } - failpoint.Inject("BlockExecuteSQLs", func(v failpoint.Value) { - t := v.(int) // sleep time - s.tctx.L().Info("BlockExecuteSQLs", zap.Any("job", jobs[0]), zap.Int("sleep time", t)) - time.Sleep(time.Second * time.Duration(t)) - }) - - failpoint.Inject("failSecondJob", func() { - if failExecuteSQL && failOnce.CAS(false, true) { - s.tctx.L().Info("trigger failSecondJob") - failpoint.Return(0, terror.ErrDBExecuteFailed.Delegate(errors.New("failSecondJob"), "mock")) - } - }) - - queries := make([]string, 0, len(jobs)) - args := make([][]interface{}, 0, len(jobs)) - for _, j := range jobs { - queries = append(queries, j.sql) - args = append(args, j.args) - } - failpoint.Inject("WaitUserCancel", func(v failpoint.Value) { - t := v.(int) - time.Sleep(time.Duration(t) * time.Second) - }) - // use background context to execute sqls as much as possible - ctctx, cancel := tctx.WithTimeout(maxDMLExecutionDuration) - defer cancel() - affect, err := db.ExecuteSQL(ctctx, queries, args...) - failpoint.Inject("SafeModeExit", func(val failpoint.Value) { - if intVal, ok := val.(int); ok && intVal == 4 && len(jobs) > 0 { - s.tctx.L().Warn("fail to exec DML", zap.String("failpoint", "SafeModeExit")) - affect, err = 0, terror.ErrDBExecuteFailed.Delegate(errors.New("SafeModeExit"), "mock") - } - }) - return affect, err +func (s *Syncer) fatalFunc(job *job, err error) { + s.execError.Store(err) + if !utils.IsContextCanceledError(err) { + err = s.handleEventError(err, job.startLocation, job.currentLocation, false, "") + s.runFatalChan <- unit.NewProcessError(err) } +} - var err error - var affect int - tickerInterval := waitTime - failpoint.Inject("changeTickerInterval", func(val failpoint.Value) { - t := val.(int) - tickerInterval = time.Duration(t) * time.Second - tctx.L().Info("changeTickerInterval", zap.Int("current ticker interval second", t)) - }) - timer := time.NewTimer(tickerInterval) - defer timer.Stop() - - for { - // resets the time interval for each loop to prevent a certain amount of time being spent on the previous ticker - // execution to `executeSQLs` resulting in the next ticker not waiting for the full waitTime. - if !timer.Stop() { - select { - case <-timer.C: - default: - } - } - timer.Reset(tickerInterval) - failpoint.Inject("noJobInQueueLog", func() { - tctx.L().Debug("timer Reset", - zap.Int("workerJobIdx", workerJobIdx), - zap.Duration("tickerInterval", tickerInterval), - zap.Int64("current ts", time.Now().Unix())) - }) +// DML synced with causality. +func (s *Syncer) syncDML() { + defer s.wg.Done() - select { - case sqlJob, ok := <-jobChan: - metrics.QueueSizeGauge.WithLabelValues(s.cfg.Name, queueID, s.cfg.SourceID).Set(float64(len(jobChan))) - if !ok { - if len(jobs) > 0 { - tctx.L().Warn("have unexecuted jobs when close job chan!", zap.Any("rest job", jobs)) - } - return - } - idx++ - if sqlJob.tp != flush && len(sqlJob.sql) > 0 { - if len(jobs) == 0 { - // set job TS when received first job of this batch. - s.updateReplicationJobTS(sqlJob, workerJobIdx) - } - jobs = append(jobs, sqlJob) - schemaName, tableName := sqlJob.targetTable.Schema, sqlJob.targetTable.Name - if _, ok := tpCnt[schemaName]; !ok { - tpCnt[schemaName] = make(map[string]map[opType]int64) - } - if _, ok := tpCnt[schemaName][tableName]; !ok { - tpCnt[schemaName][tableName] = make(map[opType]int64) - } - tpCnt[schemaName][tableName][sqlJob.tp]++ - } + // TODO: add compactor + causalityCh := causalityWrap(s.dmlJobCh, s) + flushCh := dmlWorkerWrap(causalityCh, s) - if idx >= count || sqlJob.tp == flush { - affect, err = executeSQLs() - if err != nil { - fatalF(affect, err) - continue - } - successF() - clearF() - } - case <-timer.C: - if len(jobs) > 0 { - failpoint.Inject("syncDMLTicker", func() { - tctx.L().Info("job queue not full, executeSQLs by ticker") - }) - affect, err = executeSQLs() - if err != nil { - fatalF(affect, err) - continue - } - successF() - clearF() - } else { - failpoint.Inject("noJobInQueueLog", func() { - tctx.L().Debug("no job in queue, update lag to zero", zap.Int( - "workerJobIdx", workerJobIdx), zap.Int64("current ts", time.Now().Unix())) - }) - // reset job TS when there is no job in the queue - s.updateReplicationJobTS(nil, workerJobIdx) - } - } + for range flushCh { + s.jobWg.Done() } } @@ -1466,7 +1295,6 @@ func (s *Syncer) Run(ctx context.Context) (err error) { s.waitTransactionLock.Lock() if s.isTransactionEnd { s.waitXIDJob.Store(int64(waitComplete)) - s.jobWg.Wait() tctx.L().Info("the last job is transaction end, done directly") runCancel() s.waitTransactionLock.Unlock() @@ -1480,7 +1308,6 @@ func (s *Syncer) Run(ctx context.Context) (err error) { tctx.L().Info("received syncer's done") case <-time.After(maxPauseOrStopWaitTime): tctx.L().Info("wait transaction end timeout") - s.jobWg.Wait() runCancel() } } @@ -1591,21 +1418,11 @@ func (s *Syncer) Run(ctx context.Context) (err error) { } } - s.queueBucketMapping = make([]string, 0, s.cfg.WorkerCount+1) - for i := 0; i < s.cfg.WorkerCount; i++ { - s.wg.Add(1) - name := queueBucketName(i) - s.queueBucketMapping = append(s.queueBucketMapping, name) - go func(i int, name string) { - s.syncDML(tctx, name, s.toDBConns[i], s.jobs[i], dmlWorkerJobIdx(i)) - }(i, name) - } + s.wg.Add(1) + go s.syncDML() - s.queueBucketMapping = append(s.queueBucketMapping, adminQueueName) s.wg.Add(1) - go func() { - s.syncDDL(tctx, adminQueueName, s.ddlDBConn, s.jobs[s.cfg.WorkerCount]) - }() + go s.syncDDL(tctx, adminQueueName, s.ddlDBConn, s.ddlJobCh) s.wg.Add(1) go func() { @@ -1649,7 +1466,6 @@ func (s *Syncer) Run(ctx context.Context) (err error) { err = terror.ErrSyncerUnitPanic.Generate(err1) } - s.jobWg.Wait() var ( err2 error exitSafeModeLoc binlog.Location @@ -1660,15 +1476,17 @@ func (s *Syncer) Run(ctx context.Context) (err error) { exitSafeModeLoc = savedGlobalLastLocation.Clone() } s.checkpoint.SaveSafeModeExitPoint(&exitSafeModeLoc) - if err2 = s.execError.Load(); err2 != nil && (terror.ErrDBExecuteFailed.Equal(err2) || terror.ErrDBUnExpect.Equal(err2)) { - err2 = s.checkpoint.FlushSafeModeExitPoint(s.tctx) - } else { - err2 = s.flushCheckPoints() + + // flush all jobs before exit + if err2 = s.flushJobs(); err2 != nil { + tctx.L().Warn("failed to flush jobs when exit task", zap.Error(err2)) } - if err2 != nil { - tctx.L().Warn("failed to flush checkpoints when exit task", zap.Error(err2)) - } else { - tctx.L().Info("flush checkpoints when exit task") + + // if any execute error, flush safemode exit point + if err2 = s.execError.Load(); err2 != nil && (terror.ErrDBExecuteFailed.Equal(err2) || terror.ErrDBUnExpect.Equal(err2)) { + if err2 = s.checkpoint.FlushSafeModeExitPoint(s.tctx); err2 != nil { + tctx.L().Warn("failed to flush safe mode checkpoints when exit task", zap.Error(err2)) + } } }() @@ -2286,7 +2104,9 @@ func (s *Syncer) handleRowsEvent(ev *replication.RowsEvent, ec eventContext) err if keys != nil { key = keys[i] } - err = s.commitJob(jobType, sourceTable, targetTable, sqls[i], arg, key, &ec) + + job := newDMLJob(jobType, sourceTable, targetTable, sqls[i], arg, key, &ec) + err = s.addJobFunc(job) if err != nil { return err } @@ -2943,37 +2763,6 @@ func (s *Syncer) trackDDL(usedSchema string, trackInfo *ddlInfo, ec *eventContex return nil } -func (s *Syncer) commitJob(tp opType, sourceTable, targetTable *filter.Table, sql string, args []interface{}, keys []string, ec *eventContext) error { - startTime := time.Now() - key, err := s.resolveCasuality(keys) - if err != nil { - return terror.ErrSyncerUnitResolveCasualityFail.Generate(err) - } - s.tctx.L().Debug("key for keys", zap.String("key", key), zap.Strings("keys", keys)) - metrics.ConflictDetectDurationHistogram.WithLabelValues(s.cfg.Name, s.cfg.SourceID).Observe(time.Since(startTime).Seconds()) - - job := newDMLJob(tp, sql, sourceTable, targetTable, args, key, *ec.lastLocation, *ec.startLocation, *ec.currentLocation, ec.header) - return s.addJobFunc(job) -} - -func (s *Syncer) resolveCasuality(keys []string) (string, error) { - if s.c.detectConflict(keys) { - s.tctx.L().Debug("meet causality key, will generate a flush job and wait all sqls executed", zap.Strings("keys", keys)) - if err := s.flushJobs(); err != nil { - return "", err - } - s.c.reset() - } - if err := s.c.add(keys); err != nil { - return "", err - } - var key string - if len(keys) > 0 { - key = keys[0] - } - return s.c.get(key), nil -} - func (s *Syncer) genRouter() error { s.tableRouter, _ = router.NewTableRouter(s.cfg.CaseSensitive, []*router.TableRule{}) for _, rule := range s.cfg.RouteRules { @@ -3138,6 +2927,7 @@ func (s *Syncer) recordSkipSQLsLocation(ec *eventContext) error { return s.addJobFunc(job) } +// flushJobs add a flush job and wait for all jobs finished. func (s *Syncer) flushJobs() error { s.tctx.L().Info("flush all jobs", zap.Stringer("global checkpoint", s.checkpoint)) job := newFlushJob() diff --git a/syncer/syncer_test.go b/syncer/syncer_test.go index 7f56f481b3..02f5aaa60b 100644 --- a/syncer/syncer_test.go +++ b/syncer/syncer_test.go @@ -28,7 +28,6 @@ import ( sqlmock "github.com/DATA-DOG/go-sqlmock" "github.com/pingcap/failpoint" "github.com/pingcap/tidb/infoschema" - "github.com/pingcap/tidb/util/mock" "github.com/pingcap/dm/dm/config" "github.com/pingcap/dm/dm/pb" @@ -261,8 +260,9 @@ func (s *testSyncerSuite) TestSelectDB(c *C) { } p := parser.New() - var err error - syncer := NewSyncer(s.cfg, nil) + cfg, err := s.cfg.Clone() + c.Assert(err, IsNil) + syncer := NewSyncer(cfg, nil) syncer.baList, err = filter.New(syncer.cfg.CaseSensitive, syncer.cfg.BAList) c.Assert(err, IsNil) err = syncer.genRouter() @@ -367,8 +367,9 @@ func (s *testSyncerSuite) TestSelectTable(c *C) { } p := parser.New() - var err error - syncer := NewSyncer(s.cfg, nil) + cfg, err := s.cfg.Clone() + c.Assert(err, IsNil) + syncer := NewSyncer(cfg, nil) syncer.baList, err = filter.New(syncer.cfg.CaseSensitive, syncer.cfg.BAList) c.Assert(err, IsNil) c.Assert(syncer.genRouter(), IsNil) @@ -402,8 +403,9 @@ func (s *testSyncerSuite) TestIgnoreDB(c *C) { res := []bool{true, true, false, false, true, true, true, true, true, true, false, false} p := parser.New() - var err error - syncer := NewSyncer(s.cfg, nil) + cfg, err := s.cfg.Clone() + c.Assert(err, IsNil) + syncer := NewSyncer(cfg, nil) syncer.baList, err = filter.New(syncer.cfg.CaseSensitive, syncer.cfg.BAList) c.Assert(err, IsNil) c.Assert(syncer.genRouter(), IsNil) @@ -494,8 +496,9 @@ func (s *testSyncerSuite) TestIgnoreTable(c *C) { } p := parser.New() - var err error - syncer := NewSyncer(s.cfg, nil) + cfg, err := s.cfg.Clone() + c.Assert(err, IsNil) + syncer := NewSyncer(cfg, nil) syncer.baList, err = filter.New(syncer.cfg.CaseSensitive, syncer.cfg.BAList) c.Assert(err, IsNil) c.Assert(syncer.genRouter(), IsNil) @@ -587,7 +590,9 @@ func (s *testSyncerSuite) TestSkipDML(c *C) { p := parser.New() var err error - syncer := NewSyncer(s.cfg, nil) + cfg, err := s.cfg.Clone() + c.Assert(err, IsNil) + syncer := NewSyncer(cfg, nil) c.Assert(syncer.genRouter(), IsNil) syncer.binlogFilter, err = bf.NewBinlogEvent(false, s.cfg.FilterRules) @@ -704,74 +709,13 @@ func (s *testSyncerSuite) TestColumnMapping(c *C) { } func (s *testSyncerSuite) TestcheckpointID(c *C) { - syncer := NewSyncer(s.cfg, nil) + cfg, err := s.cfg.Clone() + c.Assert(err, IsNil) + syncer := NewSyncer(cfg, nil) checkpointID := syncer.checkpointID() c.Assert(checkpointID, Equals, "101") } -func (s *testSyncerSuite) TestCasuality(c *C) { - p := parser.New() - se := mock.NewContext() - schema := "create table tb(a int primary key, b int unique);" - ti, err := createTableInfo(p, se, int64(0), schema) - c.Assert(err, IsNil) - insertValues := [][]interface{}{ - {1, 2}, - {3, 4}, - {4, 1}, - {1, 4}, // this insert conflict with the first one - } - keys := make([][]string, len(insertValues)) - for i := range insertValues { - keys[i] = genMultipleKeys(ti, insertValues[i], "tb") - } - - s.cfg.WorkerCount = 1 - syncer := NewSyncer(s.cfg, nil) - syncer.jobs = []chan *job{make(chan *job, 1)} - syncer.queueBucketMapping = []string{"queue_0", adminQueueName} - - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - job := <-syncer.jobs[0] - c.Assert(job.tp, Equals, flush) - syncer.jobWg.Done() - }() - - // no conflict - key1, err := syncer.resolveCasuality(keys[0]) - c.Assert(err, IsNil) - c.Assert(key1, Equals, keys[0][0]) - - key2, err := syncer.resolveCasuality(keys[1]) - c.Assert(err, IsNil) - c.Assert(key2, Equals, keys[1][0]) - - key3, err := syncer.resolveCasuality(keys[2]) - c.Assert(err, IsNil) - c.Assert(key3, Equals, keys[2][0]) - - // will detect casuality and add a flush job - db, mock, err := sqlmock.New() - c.Assert(err, IsNil) - dbConn, err := db.Conn(context.Background()) - c.Assert(err, IsNil) - - syncer.setupMockCheckpoint(c, dbConn, mock) - mock.ExpectBegin() - mock.ExpectExec(".*INSERT INTO .* VALUES.* ON DUPLICATE KEY UPDATE.*").WillReturnResult(sqlmock.NewResult(0, 1)) - mock.ExpectCommit() - key4, err := syncer.resolveCasuality(keys[3]) - c.Assert(err, IsNil) - c.Assert(key4, Equals, keys[3][0]) - if err := mock.ExpectationsWereMet(); err != nil { - c.Errorf("checkpoint db unfulfilled expectations: %s", err) - } - wg.Wait() -} - // TODO: add `TestSharding` later. func (s *testSyncerSuite) TestRun(c *C) { @@ -816,7 +760,9 @@ func (s *testSyncerSuite) TestRun(c *C) { s.cfg.WorkerCount = 2 s.cfg.MaxRetry = 1 - syncer := NewSyncer(s.cfg, nil) + cfg, err := s.cfg.Clone() + c.Assert(err, IsNil) + syncer := NewSyncer(cfg, nil) syncer.cfg.CheckpointFlushInterval = 30 syncer.fromDB = &dbconn.UpStreamConn{BaseDB: conn.NewBaseDB(db, func() {})} syncer.toDBConns = []*dbconn.DBConn{ @@ -914,10 +860,6 @@ func (s *testSyncerSuite) TestRun(c *C) { del, "DELETE FROM `test_1`.`t_1` WHERE `id` = ? LIMIT 1", []interface{}{int64(580981944116838401)}, - }, { - flush, - "", - nil, }, { // in the first minute, safe mode is true, will split update to delete + replace update, @@ -952,6 +894,10 @@ func (s *testSyncerSuite) TestRun(c *C) { ddl, "ALTER TABLE `test_1`.`t_3` ADD PRIMARY KEY(`id`, `name`)", nil, + }, { + flush, + "", + nil, }, } @@ -1012,6 +958,10 @@ func (s *testSyncerSuite) TestRun(c *C) { del, "DELETE FROM `test_1`.`t_2` WHERE `id` = ? LIMIT 1", []interface{}{int32(3)}, + }, { + flush, + "", + nil, }, } @@ -1058,7 +1008,9 @@ func (s *testSyncerSuite) TestExitSafeModeByConfig(c *C) { }, } - syncer := NewSyncer(s.cfg, nil) + cfg, err := s.cfg.Clone() + c.Assert(err, IsNil) + syncer := NewSyncer(cfg, nil) syncer.fromDB = &dbconn.UpStreamConn{BaseDB: conn.NewBaseDB(db, func() {})} syncer.toDBConns = []*dbconn.DBConn{ {Cfg: s.cfg, BaseConn: conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{})}, @@ -1171,6 +1123,10 @@ func (s *testSyncerSuite) TestExitSafeModeByConfig(c *C) { update, "UPDATE `test_1`.`t_1` SET `id` = ?, `name` = ? WHERE `id` = ? LIMIT 1", []interface{}{int32(1), "b", int32(2)}, + }, { + flush, + "", + nil, }, } @@ -1240,7 +1196,9 @@ func (s *testSyncerSuite) TestTrackDDL(c *C) { checkPointDBConn, err := checkPointDB.Conn(context.Background()) c.Assert(err, IsNil) - syncer := NewSyncer(s.cfg, nil) + cfg, err := s.cfg.Clone() + c.Assert(err, IsNil) + syncer := NewSyncer(cfg, nil) syncer.toDBConns = []*dbconn.DBConn{ {Cfg: s.cfg, BaseConn: conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{})}, {Cfg: s.cfg, BaseConn: conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{})}, diff --git a/tests/dmctl_basic/conf/get_task.yaml b/tests/dmctl_basic/conf/get_task.yaml index 20664bf4de..bb86061a95 100644 --- a/tests/dmctl_basic/conf/get_task.yaml +++ b/tests/dmctl_basic/conf/get_task.yaml @@ -152,6 +152,7 @@ syncers: max-retry: 0 auto-fix-gtid: false enable-gtid: false + disable-detect: false safe-mode: false enable-ansi-quotes: false clean-dump-file: true diff --git a/tests/import_v10x/conf/task.yaml b/tests/import_v10x/conf/task.yaml index 76f707e037..0676560aed 100644 --- a/tests/import_v10x/conf/task.yaml +++ b/tests/import_v10x/conf/task.yaml @@ -96,6 +96,7 @@ syncers: max-retry: 0 auto-fix-gtid: false enable-gtid: false + disable-detect: false safe-mode: false enable-ansi-quotes: false sync-02: @@ -107,6 +108,7 @@ syncers: max-retry: 0 auto-fix-gtid: false enable-gtid: true + disable-detect: false safe-mode: false enable-ansi-quotes: false clean-dump-file: false diff --git a/tests/metrics/run.sh b/tests/metrics/run.sh index 3140957567..67b4bfa2c5 100755 --- a/tests/metrics/run.sh +++ b/tests/metrics/run.sh @@ -17,10 +17,8 @@ function run() { check_dashboard_datasource - # add changeTickerInterval to keep metric from updating to zero too quickly when there is no work in the queue. inject_points=( "github.com/pingcap/dm/syncer/BlockDDLJob=return(1)" - "github.com/pingcap/dm/syncer/changeTickerInterval=return(10)" "github.com/pingcap/dm/syncer/ShowLagInLog=return(1)" # test lag metric >= 1 beacuse we inject BlockDDLJob(ddl) to sleep(1) ) export GO_FAILPOINTS="$(join_string \; ${inject_points[@]})" @@ -78,7 +76,6 @@ function run() { rm -rf $WORK_DIR/worker2/log/dm-worker.log # clean up the old log inject_points=( "github.com/pingcap/dm/syncer/BlockExecuteSQLs=return(2)" - "github.com/pingcap/dm/syncer/changeTickerInterval=return(10)" "github.com/pingcap/dm/syncer/ShowLagInLog=return(2)" # test lag metric >= 2 beacuse we inject BlockExecuteSQLs to sleep(2) although skip lag is 0 (locally), but we use that lag of all dml/skip lag, so lag still >= 2 ) export GO_FAILPOINTS="$(join_string \; ${inject_points[@]})" @@ -138,31 +135,18 @@ function run() { wait_pattern_exit dm-worker1.toml inject_points=( - "github.com/pingcap/dm/syncer/changeTickerInterval=return(5)" "github.com/pingcap/dm/syncer/noJobInQueueLog=return()" "github.com/pingcap/dm/syncer/IgnoreSomeTypeEvent=return(\"HeartbeatEvent\")" ) - # Since the following test needs to ensure that the dml queue is empty for a long time, - # it needs to ignore upstream heartbeat events to ensure that flushjobs are not triggered export GO_FAILPOINTS="$(join_string \; ${inject_points[@]})" - # First set the ticker interval to 5s -> expect the execSQL interval to be greater than 5s - # At 5s, the first no job log will appear in the log - # At 6s, the ticker has already waited 1s and the ticker goes to 1/5th of the way - # At 6s, a dml job is added to job chan and the ticker is reset - # At 11s the ticker write the log of the second no job - # Check that the interval between the two ticker logs is > 5s - rm -rf $WORK_DIR/worker1/log/dm-worker.log # clean up the old log run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT - check_log_contain_with_retry 'no job in queue, update lag to zero' $WORK_DIR/worker1/log/dm-worker.log echo "make a dml job" run_sql_source1 "insert into metrics.t1 (id, name, ts) values (1004, 'zmj4', '2022-05-11 12:01:05')" check_sync_diff $WORK_DIR $cur/conf/diff_config.toml - echo "sleep 6s" - sleep 6 - check_ticker_interval $WORK_DIR/worker1/log/dm-worker.log 5 + check_log_contain_with_retry 'no job in queue, update lag to zero' $WORK_DIR/worker1/log/dm-worker.log run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "stop-task test" \ "\"result\": true" 3 diff --git a/tests/new_relay/configs/tasks/test.yaml b/tests/new_relay/configs/tasks/test.yaml index b1e3dd7c2d..17b0698cfc 100644 --- a/tests/new_relay/configs/tasks/test.yaml +++ b/tests/new_relay/configs/tasks/test.yaml @@ -53,6 +53,7 @@ syncers: max-retry: 0 auto-fix-gtid: false enable-gtid: false + disable-detect: false safe-mode: false enable-ansi-quotes: false clean-dump-file: false diff --git a/tests/only_dml/run.sh b/tests/only_dml/run.sh index 88049f0298..85a6174fbd 100755 --- a/tests/only_dml/run.sh +++ b/tests/only_dml/run.sh @@ -42,7 +42,7 @@ function insert_data() { } function run() { - export GO_FAILPOINTS="github.com/pingcap/dm/pkg/streamer/SetHeartbeatInterval=return(1);github.com/pingcap/dm/syncer/syncDMLTicker=return(true)" + export GO_FAILPOINTS="github.com/pingcap/dm/pkg/streamer/SetHeartbeatInterval=return(1);github.com/pingcap/dm/syncer/syncDMLBatchNotFull=return(true)" run_sql_file $cur/data/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 check_contains 'Query OK, 1 row affected' @@ -129,8 +129,8 @@ function run() { done kill $pid - check_log_contain_with_retry 'job queue not full, executeSQLs by ticker' $WORK_DIR/worker1/log/dm-worker.log - check_log_contain_with_retry 'job queue not full, executeSQLs by ticker' $WORK_DIR/worker2/log/dm-worker.log + check_log_contain_with_retry 'execute not full job queue' $WORK_DIR/worker1/log/dm-worker.log + check_log_contain_with_retry 'execute not full job queue' $WORK_DIR/worker2/log/dm-worker.log check_sync_diff $WORK_DIR $cur/conf/diff_config.toml export GO_FAILPOINTS="" } diff --git a/tests/shardddl1/run.sh b/tests/shardddl1/run.sh index b698317886..f0c370b387 100644 --- a/tests/shardddl1/run.sh +++ b/tests/shardddl1/run.sh @@ -427,7 +427,7 @@ function DM_UpdateBARule() { run_case UpdateBARule "double-source-optimistic" "init_table 111 121 211 221" "clean_table" "optimistic" } -function DM_ADD_DROP_COLUMNS_CASE { +function DM_ADD_DROP_COLUMNS_CASE() { # add cols run_sql_source1 "alter table ${shardddl1}.${tb1} add column col1 int, add column col2 int, add column col3 int;" run_sql_source1 "insert into ${shardddl1}.${tb1} values(1,now(),1,1,1);" @@ -500,7 +500,7 @@ function DM_ADD_DROP_COLUMNS() { "clean_table" "optimistic" } -function DM_COLUMN_INDEX_CASE { +function DM_COLUMN_INDEX_CASE() { # add col and index run_sql_source1 "alter table ${shardddl1}.${tb1} add column col3 int, add index idx_col1(col1);" run_sql_source1 "insert into ${shardddl1}.${tb1} values(1,1,1,1);" @@ -573,10 +573,28 @@ function DM_COLUMN_INDEX() { "clean_table" "optimistic" } +function DM_CAUSALITY_CASE() { + run_sql_source1 "insert into ${shardddl1}.${tb1} values(1,2)" + run_sql_source1 "insert into ${shardddl1}.${tb1} values(2,3)" + run_sql_source1 "update ${shardddl1}.${tb1} set a=3, b=4 where b=3" + run_sql_source1 "delete from ${shardddl1}.${tb1} where a=1" + run_sql_source1 "insert into ${shardddl1}.${tb1} values(1,3)" + + check_log_contain_with_retry "meet causality key, will generate a conflict job to flush all sqls" $WORK_DIR/worker1/log/dm-worker.log $WORK_DIR/worker2/log/dm-worker.log + check_sync_diff $WORK_DIR $cur/conf/diff_config.toml +} + +function DM_CAUSALITY() { + run_case CAUSALITY "single-source-no-sharding" \ + "run_sql_source1 \"create table ${shardddl1}.${tb1} (a int primary key, b int unique);\"" \ + "clean_table" "" +} + function run() { init_cluster init_database + DM_CAUSALITY DM_UpdateBARule DM_RENAME_TABLE DM_RENAME_COLUMN_OPTIMISTIC diff --git a/tests/shardddl4_1/run.sh b/tests/shardddl4_1/run.sh index 6af8f2c9c0..da653dc098 100644 --- a/tests/shardddl4_1/run.sh +++ b/tests/shardddl4_1/run.sh @@ -510,17 +510,14 @@ function DM_145_CASE { run_sql_source2 "insert into ${shardddl1}.${tb2} values(300),(301),(302),(303),(304),(305);" run_sql_source1 "alter table ${shardddl1}.${tb1} engine=innodb;" + run_sql_source2 "alter table ${shardddl1}.${tb1} engine=innodb;" + run_sql_source2 "alter table ${shardddl1}.${tb2} engine=innodb;" - if [[ "$shardmode" == "pessimistic" ]]; then - run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ - "query-status test" \ - "blockingDDLs" 2 \ - "ALTER TABLE \`shardddl\`.\`tb\` ENGINE = innodb" 2 - else - run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ - "query-status test" \ - "\"result\": true" 3 - fi + run_sql_source1 "insert into ${shardddl1}.${tb1} values(400),(401),(402),(403),(404),(405);" + run_sql_source2 "insert into ${shardddl1}.${tb1} values(500),(501),(502),(503),(504),(505);" + run_sql_source2 "insert into ${shardddl1}.${tb2} values(600),(601),(602),(603),(604),(605);" + + check_sync_diff $WORK_DIR $cur/conf/diff_config.toml } # Defragment.