Skip to content

Commit

Permalink
syncer(dm): use DML library (pingcap#4313)
Browse files Browse the repository at this point in the history
  • Loading branch information
lance6716 authored and zhaoxinyu committed Feb 16, 2022
1 parent 7a60841 commit e7ec34a
Show file tree
Hide file tree
Showing 21 changed files with 996 additions and 1,719 deletions.
4 changes: 2 additions & 2 deletions dm/pkg/schema/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,13 +397,13 @@ func (tr *Tracker) GetDownStreamTableInfo(tctx *tcontext.Context, tableID string
dti, ok := tr.dsTracker.tableInfos[tableID]
if !ok {
tctx.Logger.Info("Downstream schema tracker init. ", zap.String("tableID", tableID))
ti, err := tr.getTableInfoByCreateStmt(tctx, tableID)
downstreamTI, err := tr.getTableInfoByCreateStmt(tctx, tableID)
if err != nil {
tctx.Logger.Error("Init dowstream schema info error. ", zap.String("tableID", tableID), zap.Error(err))
return nil, err
}

dti = GetDownStreamTI(ti, originTi)
dti = GetDownStreamTI(downstreamTI, originTi)
tr.dsTracker.tableInfos[tableID] = dti
}
return dti, nil
Expand Down
7 changes: 4 additions & 3 deletions dm/syncer/causality.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"go.uber.org/zap"

"github.com/pingcap/tidb/sessionctx"

"github.com/pingcap/tiflow/dm/pkg/log"
"github.com/pingcap/tiflow/dm/syncer/metrics"
)
Expand Down Expand Up @@ -79,16 +80,16 @@ func (c *causality) run() {
c.relation.gc(j.flushSeq)
continue
default:
keys := j.dml.identifyKeys(c.sessCtx)
keys := j.dml.CausalityKeys()

// detectConflict before add
if c.detectConflict(keys) {
c.logger.Debug("meet causality key, will generate a conflict job to flush all sqls", zap.Strings("keys", keys))
c.outCh <- newConflictJob(c.workerCount)
c.relation.clear()
}
j.dml.key = c.add(keys)
c.logger.Debug("key for keys", zap.String("key", j.dml.key), zap.Strings("keys", keys))
j.dmlQueueKey = c.add(keys)
c.logger.Debug("key for keys", zap.String("key", j.dmlQueueKey), zap.Strings("keys", keys))
}
metrics.ConflictDetectDurationHistogram.WithLabelValues(c.task, c.source).Observe(time.Since(startTime).Seconds())

Expand Down
143 changes: 23 additions & 120 deletions dm/syncer/causality_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,19 @@ package syncer

import (
"math"
"testing"
"time"

. "github.com/pingcap/check"
"github.com/pingcap/tidb-tools/pkg/filter"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/types"
"github.com/pingcap/tidb/util/mock"
"github.com/stretchr/testify/require"

cdcmodel "github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/dm/dm/config"
"github.com/pingcap/tiflow/dm/pkg/binlog"
tcontext "github.com/pingcap/tiflow/dm/pkg/context"
"github.com/pingcap/tiflow/dm/pkg/log"
"github.com/pingcap/tiflow/dm/pkg/schema"
"github.com/pingcap/tiflow/dm/pkg/utils"
"github.com/pingcap/tiflow/pkg/sqlmodel"
)

func (s *testSyncerSuite) TestDetectConflict(c *C) {
Expand Down Expand Up @@ -65,26 +63,11 @@ func (s *testSyncerSuite) TestDetectConflict(c *C) {
c.Assert(ca.relation.len(), Equals, 0)
}

func (s *testSyncerSuite) TestCasuality(c *C) {
p := parser.New()
se := mock.NewContext()
func TestCausality(t *testing.T) {
t.Parallel()

schemaStr := "create table tb(a int primary key, b int unique);"
ti, err := createTableInfo(p, se, int64(0), schemaStr)
c.Assert(err, IsNil)
tiIndex := &model.IndexInfo{
Table: ti.Name,
Unique: true,
Primary: true,
State: model.StatePublic,
Tp: model.IndexTypeBtree,
Columns: []*model.IndexColumn{{
Name: ti.Columns[0].Name,
Offset: ti.Columns[0].Offset,
Length: types.UnspecifiedLength,
}},
}
downTi := schema.GetDownStreamTI(ti, ti)
c.Assert(downTi, NotNil)
ti := mockTableInfo(t, schemaStr)

jobCh := make(chan *job, 10)
syncer := &Syncer{
Expand All @@ -100,124 +83,44 @@ func (s *testSyncerSuite) TestCasuality(c *C) {
}
causalityCh := causalityWrap(jobCh, syncer)
testCases := []struct {
op opType
oldVals []interface{}
vals []interface{}
preVals []interface{}
postVals []interface{}
}{
{
op: insert,
vals: []interface{}{1, 2},
postVals: []interface{}{1, 2},
},
{
op: insert,
vals: []interface{}{2, 3},
postVals: []interface{}{2, 3},
},
{
op: update,
oldVals: []interface{}{2, 3},
vals: []interface{}{3, 4},
preVals: []interface{}{2, 3},
postVals: []interface{}{3, 4},
},
{
op: del,
vals: []interface{}{1, 2},
preVals: []interface{}{1, 2},
},
{
op: insert,
vals: []interface{}{1, 3},
postVals: []interface{}{1, 3},
},
}
results := []opType{insert, insert, update, del, conflict, insert}
table := &filter.Table{Schema: "test", Name: "t1"}
results := []opType{dml, dml, dml, dml, conflict, dml}
table := &cdcmodel.TableName{Schema: "test", Table: "t1"}
location := binlog.NewLocation("")
ec := &eventContext{startLocation: &location, currentLocation: &location, lastLocation: &location}

for _, tc := range testCases {
job := newDMLJob(tc.op, table, table, newDML(tc.op, false, "", table, tc.oldVals, tc.vals, tc.oldVals, tc.vals, ti.Columns, ti, tiIndex, downTi), ec)
change := sqlmodel.NewRowChange(table, nil, tc.preVals, tc.postVals, ti, nil, nil)
job := newDMLJob(change, ec)
jobCh <- job
}

c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool {
require.Eventually(t, func() bool {
return len(causalityCh) == len(results)
}), IsTrue)
}, 3*time.Second, 100*time.Millisecond)

for _, op := range results {
job := <-causalityCh
c.Assert(job.tp, Equals, op)
}
}

func (s *testSyncerSuite) TestCasualityWithPrefixIndex(c *C) {
p := parser.New()
se := mock.NewContext()
schemaStr := "create table t (c1 text, c2 int unique, unique key c1(c1(3)));"
ti, err := createTableInfo(p, se, int64(0), schemaStr)
c.Assert(err, IsNil)
downTi := schema.GetDownStreamTI(ti, ti)
c.Assert(downTi, NotNil)
c.Assert(len(downTi.AvailableUKIndexList) == 2, IsTrue)
tiIndex := downTi.AvailableUKIndexList[0]

jobCh := make(chan *job, 10)
syncer := &Syncer{
cfg: &config.SubTaskConfig{
SyncerConfig: config.SyncerConfig{
QueueSize: 1024,
},
Name: "task",
SourceID: "source",
},
tctx: tcontext.Background().WithLogger(log.L()),
sessCtx: utils.NewSessionCtx(map[string]string{"time_zone": "UTC"}),
}
causalityCh := causalityWrap(jobCh, syncer)
testCases := []struct {
op opType
oldVals []interface{}
vals []interface{}
}{
{
op: insert,
vals: []interface{}{"1234", 1},
},
{
op: insert,
vals: []interface{}{"2345", 2},
},
{
op: update,
oldVals: []interface{}{"2345", 2},
vals: []interface{}{"2345", 3},
},
{
op: del,
vals: []interface{}{"1234", 1},
},
{
op: insert,
vals: []interface{}{"2345", 1},
},
}
results := []opType{insert, insert, update, del, conflict, insert}
resultKeys := []string{"123.c1.", "234.c1.", "234.c1.", "123.c1.", "conflict", "234.c1."}
table := &filter.Table{Schema: "test", Name: "t1"}
location := binlog.NewLocation("")
ec := &eventContext{startLocation: &location, currentLocation: &location, lastLocation: &location}

for _, tc := range testCases {
job := newDMLJob(tc.op, table, table, newDML(tc.op, false, "", table, tc.oldVals, tc.vals, tc.oldVals, tc.vals, ti.Columns, ti, tiIndex, downTi), ec)
jobCh <- job
}

c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool {
return len(causalityCh) == len(results)
}), IsTrue)

for i, op := range results {
job := <-causalityCh
if job.tp != conflict {
c.Assert(job.dml.key, Equals, resultKeys[i])
}
c.Assert(job.tp, Equals, op)
require.Equal(t, op, job.tp)
}
}

Expand Down
12 changes: 6 additions & 6 deletions dm/syncer/checkpoint_flush_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ type checkpointFlushTask struct {
}

type checkpointFlushWorker struct {
input chan *checkpointFlushTask
cp CheckPoint
execError *atomic.Error
afterFlushFn func(task *checkpointFlushTask) error
addCountFunc func(bool, string, opType, int64, *filter.Table)
input chan *checkpointFlushTask
cp CheckPoint
execError *atomic.Error
afterFlushFn func(task *checkpointFlushTask) error
updateJobMetricsFn func(bool, string, *job)
}

// Add add a new flush checkpoint job.
Expand All @@ -60,7 +60,7 @@ func (w *checkpointFlushWorker) Run(ctx *tcontext.Context) {
if isAsyncFlush {
task.asyncflushJob.flushWg.Wait()

w.addCountFunc(true, adminQueueName, task.asyncflushJob.tp, 1, task.asyncflushJob.targetTable)
w.updateJobMetricsFn(true, adminQueueName, task.asyncflushJob)
ctx.L().Info("async flush checkpoint snapshot job has been processed by dml worker, about to flush checkpoint snapshot", zap.Int64("job sequence", task.asyncflushJob.flushSeq), zap.Int("snapshot_id", task.snapshotInfo.id))
} else {
ctx.L().Info("about to sync flush checkpoint snapshot", zap.Int("snapshot_id", task.snapshotInfo.id))
Expand Down
Loading

0 comments on commit e7ec34a

Please sign in to comment.