Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
address comment
Browse files Browse the repository at this point in the history
  • Loading branch information
GMHDBJD committed Oct 25, 2021
1 parent 9993694 commit 4fc1ee6
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 13 deletions.
23 changes: 18 additions & 5 deletions syncer/dml.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (

// genDMLParam stores pruned columns, data as well as the original columns, data, index.
type genDMLParam struct {
tableID string // as a key in map like `schema`.`table`
targetTableID string // as a key in map like `schema`.`table`
sourceTable *filter.Table // origin table
safeMode bool // only used in update
data [][]interface{} // pruned data
Expand Down Expand Up @@ -81,7 +81,7 @@ RowLoop:
}
}

dmls = append(dmls, newDML(insert, param.safeMode, param.tableID, param.sourceTable, nil, value, nil, originalValue, columns, ti))
dmls = append(dmls, newDML(insert, param.safeMode, param.targetTableID, param.sourceTable, nil, value, nil, originalValue, columns, ti))
}

return dmls, nil
Expand Down Expand Up @@ -145,7 +145,7 @@ RowLoop:
}
}

dmls = append(dmls, newDML(update, param.safeMode, param.tableID, param.sourceTable, oldValues, changedValues, oriOldValues, oriChangedValues, columns, ti))
dmls = append(dmls, newDML(update, param.safeMode, param.targetTableID, param.sourceTable, oldValues, changedValues, oriOldValues, oriChangedValues, columns, ti))
}

return dmls, nil
Expand Down Expand Up @@ -176,7 +176,7 @@ RowLoop:
continue RowLoop
}
}
dmls = append(dmls, newDML(del, false, param.tableID, param.sourceTable, nil, value, nil, value, ti.Columns, ti))
dmls = append(dmls, newDML(del, false, param.targetTableID, param.sourceTable, nil, value, nil, value, ti.Columns, ti))
}

return dmls, nil
Expand Down Expand Up @@ -437,6 +437,11 @@ func newDML(op opType, safeMode bool, targetTableID string, sourceTable *filter.
}
}

// String returns the DML's string.
func (dml *DML) String() string {
return fmt.Sprintf("[safemode: %t, targetTableID: %s, op: %s, columns: %v, oldValues: %v, values: %v]", dml.safeMode, dml.targetTableID, dml.op.String(), dml.columnNames(), dml.originOldValues, dml.originValues)
}

// identifyKeys gens keys by unique not null value.
// This is used for causality.
// PK or (UK + NOT NULL) or (UK + NULL + NOT NULL VALUE).
Expand All @@ -453,6 +458,15 @@ func (dml *DML) identifyKeys() []string {
return keys
}

// columnNames return column names of DML.
func (dml *DML) columnNames() []string {
columnNames := make([]string, 0, len(dml.columns))
for _, column := range dml.columns {
columnNames = append(columnNames, column.Name.O)
}
return columnNames
}

// whereColumnsAndValues gets columns and values of unique column with not null value.
// This is used to generete where condition.
func (dml *DML) whereColumnsAndValues() ([]string, []interface{}) {
Expand Down Expand Up @@ -521,7 +535,6 @@ func genMultipleKeys(ti *model.TableInfo, value []interface{}, table string) []s
key := genKeyList(table, cols, vals)
if len(key) > 0 { // ignore `null` value.
multipleKeys = append(multipleKeys, key)
// TODO: break here? one unique index is enough?
} else {
log.L().Debug("ignore empty key", zap.String("table", table))
}
Expand Down
6 changes: 5 additions & 1 deletion syncer/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,11 @@ type job struct {

func (j *job) String() string {
// only output some important information, maybe useful in execution.
return fmt.Sprintf("tp: %s, ddls: %s, last_location: %s, start_location: %s, current_location: %s", j.tp, j.ddls, j.location, j.startLocation, j.currentLocation)
var dmlStr string
if j.dml != nil {
dmlStr = j.dml.String()
}
return fmt.Sprintf("tp: %s, dml: %s, ddls: %s, last_location: %s, start_location: %s, current_location: %s", j.tp, dmlStr, j.ddls, j.location, j.startLocation, j.currentLocation)
}

func newDMLJob(tp opType, sourceTable, targetTable *filter.Table, dml *DML, ec *eventContext) *job {
Expand Down
21 changes: 15 additions & 6 deletions syncer/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ package syncer

import (
. "github.com/pingcap/check"
"github.com/pingcap/parser"
"github.com/pingcap/tidb-tools/pkg/filter"
"github.com/pingcap/tidb/util/mock"

"github.com/pingcap/dm/pkg/binlog"
)
Expand Down Expand Up @@ -79,25 +81,32 @@ func (t *testJobSuite) TestJob(c *C) {
needHandleDDLs: []string{"create database test"},
shardingDDLInfo: ddlInfo,
}

schema := "create table test.tb(id int primary key, col1 int unique not null)"
p := parser.New()
se := mock.NewContext()
ti, err := createTableInfo(p, se, 0, schema)
c.Assert(err, IsNil)

testCases := []struct {
job *job
jobStr string
}{
{
newDMLJob(insert, table, table, nil, ec),
"tp: insert, ddls: [], last_location: position: (, 4), gtid-set: , start_location: position: (, 4), gtid-set: , current_location: position: (, 4), gtid-set: ",
newDMLJob(insert, table, table, newDML(insert, true, "targetTable", table, nil, []interface{}{2, 2}, nil, []interface{}{2, 2}, ti.Columns, ti), ec),
"tp: insert, dml: [safemode: true, targetTableID: targetTable, op: insert, columns: [id col1], oldValues: [], values: [2 2]], ddls: [], last_location: position: (, 4), gtid-set: , start_location: position: (, 4), gtid-set: , current_location: position: (, 4), gtid-set: ",
}, {
newDDLJob(qec),
"tp: ddl, ddls: [create database test], last_location: position: (, 4), gtid-set: , start_location: position: (, 4), gtid-set: , current_location: position: (, 4), gtid-set: ",
"tp: ddl, dml: , ddls: [create database test], last_location: position: (, 4), gtid-set: , start_location: position: (, 4), gtid-set: , current_location: position: (, 4), gtid-set: ",
}, {
newXIDJob(binlog.NewLocation(""), binlog.NewLocation(""), binlog.NewLocation("")),
"tp: xid, ddls: [], last_location: position: (, 4), gtid-set: , start_location: position: (, 4), gtid-set: , current_location: position: (, 4), gtid-set: ",
"tp: xid, dml: , ddls: [], last_location: position: (, 4), gtid-set: , start_location: position: (, 4), gtid-set: , current_location: position: (, 4), gtid-set: ",
}, {
newFlushJob(),
"tp: flush, ddls: [], last_location: position: (, 0), gtid-set: , start_location: position: (, 0), gtid-set: , current_location: position: (, 0), gtid-set: ",
"tp: flush, dml: , ddls: [], last_location: position: (, 0), gtid-set: , start_location: position: (, 0), gtid-set: , current_location: position: (, 0), gtid-set: ",
}, {
newSkipJob(ec),
"tp: skip, ddls: [], last_location: position: (, 4), gtid-set: , start_location: position: (, 0), gtid-set: , current_location: position: (, 0), gtid-set: ",
"tp: skip, dml: , ddls: [], last_location: position: (, 4), gtid-set: , start_location: position: (, 0), gtid-set: , current_location: position: (, 0), gtid-set: ",
},
}

Expand Down
2 changes: 1 addition & 1 deletion syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2038,7 +2038,7 @@ func (s *Syncer) handleRowsEvent(ev *replication.RowsEvent, ec eventContext) err
)

param := &genDMLParam{
tableID: utils.GenTableID(targetTable),
targetTableID: utils.GenTableID(targetTable),
data: prunedRows,
originalData: rows,
columns: prunedColumns,
Expand Down

0 comments on commit 4fc1ee6

Please sign in to comment.