diff --git a/syncer/causality.go b/syncer/causality.go index b3594b9bc..ae28b1ff3 100644 --- a/syncer/causality.go +++ b/syncer/causality.go @@ -66,14 +66,15 @@ func (c *causality) run() { if j.tp == flush { c.reset() } else { + keys := j.dml.identifyKeys() // detectConflict before add - if c.detectConflict(j.keys) { - c.logger.Debug("meet causality key, will generate a conflict job to flush all sqls", zap.Strings("keys", j.keys)) + if c.detectConflict(keys) { + c.logger.Debug("meet causality key, will generate a conflict job to flush all sqls", zap.Strings("keys", keys)) c.outCh <- newConflictJob() c.reset() } - j.key = c.add(j.keys) - c.logger.Debug("key for keys", zap.String("key", j.key), zap.Strings("keys", j.keys)) + j.dml.key = c.add(keys) + c.logger.Debug("key for keys", zap.String("key", j.dml.key), zap.Strings("keys", keys)) } metrics.ConflictDetectDurationHistogram.WithLabelValues(c.task, c.source).Observe(time.Since(startTime).Seconds()) diff --git a/syncer/causality_test.go b/syncer/causality_test.go index dc5f223e3..7b555028f 100644 --- a/syncer/causality_test.go +++ b/syncer/causality_test.go @@ -71,28 +71,30 @@ func (s *testSyncerSuite) TestCasuality(c *C) { } causalityCh := causalityWrap(jobCh, syncer) testCases := []struct { - op opType - vals [][]interface{} + op opType + oldVals []interface{} + vals []interface{} }{ { op: insert, - vals: [][]interface{}{{1, 2}}, + vals: []interface{}{1, 2}, }, { op: insert, - vals: [][]interface{}{{2, 3}}, + vals: []interface{}{2, 3}, }, { - op: update, - vals: [][]interface{}{{2, 3}, {3, 4}}, + op: update, + oldVals: []interface{}{2, 3}, + vals: []interface{}{3, 4}, }, { op: del, - vals: [][]interface{}{{1, 2}}, + vals: []interface{}{1, 2}, }, { op: insert, - vals: [][]interface{}{{1, 3}}, + vals: []interface{}{1, 3}, }, } results := []opType{insert, insert, update, del, conflict, insert} @@ -101,11 +103,7 @@ func (s *testSyncerSuite) TestCasuality(c *C) { ec := &eventContext{startLocation: &location, currentLocation: &location, lastLocation: &location} for _, tc := range testCases { - var keys []string - for _, val := range tc.vals { - keys = append(keys, genMultipleKeys(ti, val, "tb")...) - } - job := newDMLJob(tc.op, table, table, "", nil, keys, ec) + job := newDMLJob(tc.op, table, table, newDML(tc.op, false, "", table, tc.oldVals, tc.vals, tc.oldVals, tc.vals, ti.Columns, ti), ec) jobCh <- job } diff --git a/syncer/dml.go b/syncer/dml.go index ece76443d..231720404 100644 --- a/syncer/dml.go +++ b/syncer/dml.go @@ -32,7 +32,8 @@ import ( // genDMLParam stores pruned columns, data as well as the original columns, data, index. type genDMLParam struct { - tableID string // as a key in map like `schema`.`table` + targetTableID string // as a key in map like `schema`.`table` + sourceTable *filter.Table // origin table safeMode bool // only used in update data [][]interface{} // pruned data originalData [][]interface{} // all data @@ -48,28 +49,19 @@ func extractValueFromData(data []interface{}, columns []*model.ColumnInfo) []int return value } -func (s *Syncer) genInsertSQLs(param *genDMLParam, filterExprs []expression.Expression) ([]string, [][]string, [][]interface{}, error) { +func (s *Syncer) genAndFilterInsertDMLs(param *genDMLParam, filterExprs []expression.Expression) ([]*DML, error) { var ( - tableID = param.tableID dataSeq = param.data originalDataSeq = param.originalData columns = param.columns ti = param.sourceTableInfo - sqls = make([]string, 0, len(dataSeq)) - keys = make([][]string, 0, len(dataSeq)) - values = make([][]interface{}, 0, len(dataSeq)) + dmls = make([]*DML, 0, len(dataSeq)) ) - insertOrReplace := "INSERT INTO" - if param.safeMode { - insertOrReplace = "REPLACE INTO" - } - sql := genInsertReplace(insertOrReplace, tableID, columns) - RowLoop: for dataIdx, data := range dataSeq { if len(data) != len(columns) { - return nil, nil, nil, terror.ErrSyncerUnitDMLColumnNotMatch.Generate(len(columns), len(data)) + return nil, terror.ErrSyncerUnitDMLColumnNotMatch.Generate(len(columns), len(data)) } value := extractValueFromData(data, columns) @@ -81,7 +73,7 @@ RowLoop: for _, expr := range filterExprs { skip, err := SkipDMLByExpression(originalValue, expr, ti.Columns) if err != nil { - return nil, nil, nil, err + return nil, err } if skip { s.filteredInsert.Add(1) @@ -89,37 +81,25 @@ RowLoop: } } - ks := genMultipleKeys(ti, originalValue, tableID) - sqls = append(sqls, sql) - values = append(values, value) - keys = append(keys, ks) + dmls = append(dmls, newDML(insert, param.safeMode, param.targetTableID, param.sourceTable, nil, value, nil, originalValue, columns, ti)) } - return sqls, keys, values, nil + return dmls, nil } -func (s *Syncer) genUpdateSQLs( +func (s *Syncer) genAndFilterUpdateDMLs( param *genDMLParam, oldValueFilters []expression.Expression, newValueFilters []expression.Expression, -) ([]string, [][]string, [][]interface{}, error) { +) ([]*DML, error) { var ( - tableID = param.tableID - data = param.data - originalData = param.originalData - columns = param.columns - ti = param.sourceTableInfo - defaultIndexColumns = findFitIndex(ti) - replaceSQL string // `REPLACE INTO` SQL - sqls = make([]string, 0, len(data)/2) - keys = make([][]string, 0, len(data)/2) - values = make([][]interface{}, 0, len(data)/2) + data = param.data + originalData = param.originalData + columns = param.columns + ti = param.sourceTableInfo + dmls = make([]*DML, 0, len(data)/2) ) - if param.safeMode { - replaceSQL = genInsertReplace("REPLACE INTO", tableID, columns) - } - RowLoop: for i := 0; i < len(data); i += 2 { oldData := data[i] @@ -128,11 +108,11 @@ RowLoop: oriChangedData := originalData[i+1] if len(oldData) != len(changedData) { - return nil, nil, nil, terror.ErrSyncerUnitDMLOldNewValueMismatch.Generate(len(oldData), len(changedData)) + return nil, terror.ErrSyncerUnitDMLOldNewValueMismatch.Generate(len(oldData), len(changedData)) } if len(oldData) != len(columns) { - return nil, nil, nil, terror.ErrSyncerUnitDMLColumnNotMatch.Generate(len(columns), len(oldData)) + return nil, terror.ErrSyncerUnitDMLColumnNotMatch.Generate(len(columns), len(oldData)) } oldValues := extractValueFromData(oldData, columns) @@ -152,11 +132,11 @@ RowLoop: oldExpr, newExpr := oldValueFilters[j], newValueFilters[j] skip1, err := SkipDMLByExpression(oriOldValues, oldExpr, ti.Columns) if err != nil { - return nil, nil, nil, err + return nil, err } skip2, err := SkipDMLByExpression(oriChangedValues, newExpr, ti.Columns) if err != nil { - return nil, nil, nil, err + return nil, err } if skip1 && skip2 { s.filteredUpdate.Add(1) @@ -165,73 +145,23 @@ RowLoop: } } - if defaultIndexColumns == nil { - defaultIndexColumns = getAvailableIndexColumn(ti, oriOldValues) - } - - ks := genMultipleKeys(ti, oriOldValues, tableID) - ks = append(ks, genMultipleKeys(ti, oriChangedValues, tableID)...) - - if param.safeMode { - // generate delete sql from old data - sql, value := genDeleteSQL(tableID, oriOldValues, ti.Columns, defaultIndexColumns) - sqls = append(sqls, sql) - values = append(values, value) - keys = append(keys, ks) - // generate replace sql from new data - sqls = append(sqls, replaceSQL) - values = append(values, changedValues) - keys = append(keys, ks) - continue - } - - // NOTE: move these variables outer of `for` if needed (to reuse). - updateColumns := make([]*model.ColumnInfo, 0, indexColumnsCount(defaultIndexColumns)) - updateValues := make([]interface{}, 0, indexColumnsCount(defaultIndexColumns)) - for j := range oldValues { - updateColumns = append(updateColumns, columns[j]) - updateValues = append(updateValues, changedValues[j]) - } - - // ignore no changed sql - if len(updateColumns) == 0 { - continue - } - - value := make([]interface{}, 0, len(oldData)) - value = append(value, updateValues...) - - whereColumns, whereValues := ti.Columns, oriOldValues - if defaultIndexColumns != nil { - whereColumns, whereValues = getColumnData(ti.Columns, defaultIndexColumns, oriOldValues) - } - - value = append(value, whereValues...) - - sql := genUpdateSQL(tableID, updateColumns, whereColumns, whereValues) - sqls = append(sqls, sql) - values = append(values, value) - keys = append(keys, ks) + dmls = append(dmls, newDML(update, param.safeMode, param.targetTableID, param.sourceTable, oldValues, changedValues, oriOldValues, oriChangedValues, columns, ti)) } - return sqls, keys, values, nil + return dmls, nil } -func (s *Syncer) genDeleteSQLs(param *genDMLParam, filterExprs []expression.Expression) ([]string, [][]string, [][]interface{}, error) { +func (s *Syncer) genAndFilterDeleteDMLs(param *genDMLParam, filterExprs []expression.Expression) ([]*DML, error) { var ( - tableID = param.tableID - dataSeq = param.originalData - ti = param.sourceTableInfo - defaultIndexColumns = findFitIndex(ti) - sqls = make([]string, 0, len(dataSeq)) - keys = make([][]string, 0, len(dataSeq)) - values = make([][]interface{}, 0, len(dataSeq)) + dataSeq = param.originalData + ti = param.sourceTableInfo + dmls = make([]*DML, 0, len(dataSeq)) ) RowLoop: for _, data := range dataSeq { if len(data) != len(ti.Columns) { - return nil, nil, nil, terror.ErrSyncerUnitDMLColumnNotMatch.Generate(len(ti.Columns), len(data)) + return nil, terror.ErrSyncerUnitDMLColumnNotMatch.Generate(len(ti.Columns), len(data)) } value := extractValueFromData(data, ti.Columns) @@ -239,101 +169,17 @@ RowLoop: for _, expr := range filterExprs { skip, err := SkipDMLByExpression(value, expr, ti.Columns) if err != nil { - return nil, nil, nil, err + return nil, err } if skip { s.filteredDelete.Add(1) continue RowLoop } } - - if defaultIndexColumns == nil { - defaultIndexColumns = getAvailableIndexColumn(ti, value) - } - ks := genMultipleKeys(ti, value, tableID) - - sql, value := genDeleteSQL(tableID, value, ti.Columns, defaultIndexColumns) - sqls = append(sqls, sql) - values = append(values, value) - keys = append(keys, ks) + dmls = append(dmls, newDML(del, false, param.targetTableID, param.sourceTable, nil, value, nil, value, ti.Columns, ti)) } - return sqls, keys, values, nil -} - -// genInsertReplace generates a DML for `INSERT INTO` or `REPLCATE INTO`. -// the returned SQL with placeholders for `VALUES`. -func genInsertReplace(op, table string, columns []*model.ColumnInfo) string { - // NOTE: use sync.Pool to hold the builder if needed later. - var buf strings.Builder - buf.Grow(256) - buf.WriteString(op) - buf.WriteString(" " + table + " (") - for i, column := range columns { - if i != len(columns)-1 { - buf.WriteString("`" + strings.ReplaceAll(column.Name.O, "`", "``") + "`,") - } else { - buf.WriteString("`" + strings.ReplaceAll(column.Name.O, "`", "``") + "`)") - } - } - buf.WriteString(" VALUES (") - - // placeholders - for i := range columns { - if i != len(columns)-1 { - buf.WriteString("?,") - } else { - buf.WriteString("?)") - } - } - return buf.String() -} - -// genUpdateSQL generates a `UPDATE` SQL with `SET` and `WHERE`. -func genUpdateSQL(table string, updateColumns, whereColumns []*model.ColumnInfo, whereValues []interface{}) string { - var buf strings.Builder - buf.Grow(2048) - buf.WriteString("UPDATE ") - buf.WriteString(table) - buf.WriteString(" SET ") - - for i, column := range updateColumns { - if i == len(updateColumns)-1 { - fmt.Fprintf(&buf, "`%s` = ?", strings.ReplaceAll(column.Name.O, "`", "``")) - } else { - fmt.Fprintf(&buf, "`%s` = ?, ", strings.ReplaceAll(column.Name.O, "`", "``")) - } - } - - buf.WriteString(" WHERE ") - genWhere(&buf, whereColumns, whereValues) - buf.WriteString(" LIMIT 1") - return buf.String() -} - -// genDeleteSQL generates a `DELETE FROM` SQL with `WHERE`. -func genDeleteSQL(table string, value []interface{}, columns []*model.ColumnInfo, indexColumns *model.IndexInfo) (string, []interface{}) { - whereColumns, whereValues := columns, value - if indexColumns != nil { - whereColumns, whereValues = getColumnData(columns, indexColumns, value) - } - - var buf strings.Builder - buf.Grow(1024) - buf.WriteString("DELETE FROM ") - buf.WriteString(table) - buf.WriteString(" WHERE ") - genWhere(&buf, whereColumns, whereValues) - buf.WriteString(" LIMIT 1") - - return buf.String(), whereValues -} - -func indexColumnsCount(index *model.IndexInfo) int { - if index == nil { - return 0 - } - return len(index.Columns) + return dmls, nil } func castUnsigned(data interface{}, ft *types.FieldType) interface{} { @@ -411,62 +257,6 @@ func columnValue(value interface{}, ft *types.FieldType) string { return data } -func genKeyList(table string, columns []*model.ColumnInfo, dataSeq []interface{}) string { - var buf strings.Builder - for i, data := range dataSeq { - if data == nil { - log.L().Debug("ignore null value", zap.String("column", columns[i].Name.O), zap.String("table", table)) - continue // ignore `null` value. - } - // one column key looks like:`column_val.column_name.` - buf.WriteString(columnValue(data, &columns[i].FieldType)) - buf.WriteString(".") - buf.WriteString(columns[i].Name.String()) - buf.WriteString(".") - } - if buf.Len() == 0 { - log.L().Debug("all value are nil, no key generated", zap.String("table", table)) - return "" // all values are `null`. - } - buf.WriteString(table) - return buf.String() -} - -func genMultipleKeys(ti *model.TableInfo, value []interface{}, table string) []string { - multipleKeys := make([]string, 0, len(ti.Indices)+1) - if ti.PKIsHandle { - if pk := ti.GetPkColInfo(); pk != nil { - cols := []*model.ColumnInfo{pk} - vals := []interface{}{value[pk.Offset]} - multipleKeys = append(multipleKeys, genKeyList(table, cols, vals)) - } - } - - for _, indexCols := range ti.Indices { - // PK also has a true Unique - if !indexCols.Unique { - continue - } - cols, vals := getColumnData(ti.Columns, indexCols, value) - key := genKeyList(table, cols, vals) - if len(key) > 0 { // ignore `null` value. - multipleKeys = append(multipleKeys, key) - // TODO: break here? one unique index is enough? - } else { - log.L().Debug("ignore empty key", zap.String("table", table)) - } - } - - if len(multipleKeys) == 0 { - // use table name as key if no key generated (no PK/UK), - // no concurrence for rows in the same table. - log.L().Debug("use table name as the key", zap.String("table", table)) - multipleKeys = append(multipleKeys, table) - } - - return multipleKeys -} - func findFitIndex(ti *model.TableInfo) *model.IndexInfo { for _, idx := range ti.Indices { if idx.Primary { @@ -538,21 +328,6 @@ func getColumnData(columns []*model.ColumnInfo, indexColumns *model.IndexInfo, d return cols, values } -func genWhere(buf *strings.Builder, columns []*model.ColumnInfo, data []interface{}) { - for i, col := range columns { - if i != 0 { - buf.WriteString(" AND ") - } - buf.WriteByte('`') - buf.WriteString(strings.ReplaceAll(col.Name.O, "`", "``")) - if data[i] == nil { - buf.WriteString("` IS ?") - } else { - buf.WriteString("` = ?") - } - } -} - func (s *Syncer) mappingDML(table *filter.Table, ti *model.TableInfo, data [][]interface{}) ([][]interface{}, error) { if s.columnMapping == nil { return data, nil @@ -630,3 +405,262 @@ func checkLogColumns(skipped [][]int) error { } return nil } + +// DML stores param for DML. +type DML struct { + targetTableID string + sourceTable *filter.Table + op opType + oldValues []interface{} // only for update SQL + values []interface{} + columns []*model.ColumnInfo + sourceTableInfo *model.TableInfo + originOldValues []interface{} // only for update SQL + originValues []interface{} // use to gen key and `WHERE` + safeMode bool + key string // use to detect causality +} + +// newDML creates DML. +func newDML(op opType, safeMode bool, targetTableID string, sourceTable *filter.Table, oldValues, values, originOldValues, originValues []interface{}, columns []*model.ColumnInfo, sourceTableInfo *model.TableInfo) *DML { + return &DML{ + op: op, + safeMode: safeMode, + targetTableID: targetTableID, + sourceTable: sourceTable, + oldValues: oldValues, + values: values, + columns: columns, + sourceTableInfo: sourceTableInfo, + originOldValues: originOldValues, + originValues: originValues, + } +} + +// String returns the DML's string. +func (dml *DML) String() string { + return fmt.Sprintf("[safemode: %t, targetTableID: %s, op: %s, columns: %v, oldValues: %v, values: %v]", dml.safeMode, dml.targetTableID, dml.op.String(), dml.columnNames(), dml.originOldValues, dml.originValues) +} + +// identifyKeys gens keys by unique not null value. +// This is used for causality. +// PK or (UK + NOT NULL) or (UK + NULL + NOT NULL VALUE). +func (dml *DML) identifyKeys() []string { + var keys []string + // for UPDATE statement + if dml.originOldValues != nil { + keys = append(keys, genMultipleKeys(dml.sourceTableInfo, dml.originOldValues, dml.targetTableID)...) + } + + if dml.originValues != nil { + keys = append(keys, genMultipleKeys(dml.sourceTableInfo, dml.originValues, dml.targetTableID)...) + } + return keys +} + +// columnNames return column names of DML. +func (dml *DML) columnNames() []string { + columnNames := make([]string, 0, len(dml.columns)) + for _, column := range dml.columns { + columnNames = append(columnNames, column.Name.O) + } + return columnNames +} + +// whereColumnsAndValues gets columns and values of unique column with not null value. +// This is used to generete where condition. +func (dml *DML) whereColumnsAndValues() ([]string, []interface{}) { + columns, values := dml.sourceTableInfo.Columns, dml.originValues + + if dml.op == update { + values = dml.originOldValues + } + + defaultIndexColumns := findFitIndex(dml.sourceTableInfo) + + if defaultIndexColumns == nil { + defaultIndexColumns = getAvailableIndexColumn(dml.sourceTableInfo, values) + } + if defaultIndexColumns != nil { + columns, values = getColumnData(dml.sourceTableInfo.Columns, defaultIndexColumns, values) + } + + columnNames := make([]string, 0, len(columns)) + for _, column := range columns { + columnNames = append(columnNames, column.Name.O) + } + return columnNames, values +} + +// genKeyList format keys. +func genKeyList(table string, columns []*model.ColumnInfo, dataSeq []interface{}) string { + var buf strings.Builder + for i, data := range dataSeq { + if data == nil { + log.L().Debug("ignore null value", zap.String("column", columns[i].Name.O), zap.String("table", table)) + continue // ignore `null` value. + } + // one column key looks like:`column_val.column_name.` + buf.WriteString(columnValue(data, &columns[i].FieldType)) + buf.WriteString(".") + buf.WriteString(columns[i].Name.O) + buf.WriteString(".") + } + if buf.Len() == 0 { + log.L().Debug("all value are nil, no key generated", zap.String("table", table)) + return "" // all values are `null`. + } + buf.WriteString(table) + return buf.String() +} + +// genMultipleKeys gens keys with UNIQUE NOT NULL value. +// if not UNIQUE NOT NULL value, use table name instead. +func genMultipleKeys(ti *model.TableInfo, value []interface{}, table string) []string { + multipleKeys := make([]string, 0, len(ti.Indices)+1) + if ti.PKIsHandle { + if pk := ti.GetPkColInfo(); pk != nil { + cols := []*model.ColumnInfo{pk} + vals := []interface{}{value[pk.Offset]} + multipleKeys = append(multipleKeys, genKeyList(table, cols, vals)) + } + } + + for _, indexCols := range ti.Indices { + // PK also has a true Unique + if !indexCols.Unique { + continue + } + cols, vals := getColumnData(ti.Columns, indexCols, value) + key := genKeyList(table, cols, vals) + if len(key) > 0 { // ignore `null` value. + multipleKeys = append(multipleKeys, key) + } else { + log.L().Debug("ignore empty key", zap.String("table", table)) + } + } + + if len(multipleKeys) == 0 { + // use table name as key if no key generated (no PK/UK), + // no concurrence for rows in the same table. + log.L().Debug("use table name as the key", zap.String("table", table)) + multipleKeys = append(multipleKeys, table) + } + + return multipleKeys +} + +// genWhere generates where condition. +func (dml *DML) genWhere(buf *strings.Builder) []interface{} { + whereColumns, whereValues := dml.whereColumnsAndValues() + + for i, col := range whereColumns { + if i != 0 { + buf.WriteString(" AND ") + } + buf.WriteByte('`') + buf.WriteString(strings.ReplaceAll(col, "`", "``")) + if whereValues[i] == nil { + buf.WriteString("` IS ?") + } else { + buf.WriteString("` = ?") + } + } + return whereValues +} + +// genSQL generates SQL for a DML. +func (dml *DML) genSQL() (sql []string, arg [][]interface{}) { + switch dml.op { + case insert: + return dml.genInsertSQL() + case del: + return dml.genDeleteSQL() + case update: + return dml.genUpdateSQL() + } + return +} + +// genUpdateSQL generates a `UPDATE` SQL with `WHERE`. +func (dml *DML) genUpdateSQL() ([]string, [][]interface{}) { + if dml.safeMode { + sqls, args := dml.genDeleteSQL() + insertSQLs, insertArgs := dml.genInsertSQL() + sqls = append(sqls, insertSQLs...) + args = append(args, insertArgs...) + return sqls, args + } + var buf strings.Builder + buf.Grow(2048) + buf.WriteString("UPDATE ") + buf.WriteString(dml.targetTableID) + buf.WriteString(" SET ") + + for i, column := range dml.columns { + if i == len(dml.columns)-1 { + fmt.Fprintf(&buf, "`%s` = ?", strings.ReplaceAll(column.Name.O, "`", "``")) + } else { + fmt.Fprintf(&buf, "`%s` = ?, ", strings.ReplaceAll(column.Name.O, "`", "``")) + } + } + + buf.WriteString(" WHERE ") + whereArgs := dml.genWhere(&buf) + buf.WriteString(" LIMIT 1") + + args := dml.values + args = append(args, whereArgs...) + return []string{buf.String()}, [][]interface{}{args} +} + +// genDeleteSQL generates a `DELETE FROM` SQL with `WHERE`. +func (dml *DML) genDeleteSQL() ([]string, [][]interface{}) { + var buf strings.Builder + buf.Grow(1024) + buf.WriteString("DELETE FROM ") + buf.WriteString(dml.targetTableID) + buf.WriteString(" WHERE ") + whereArgs := dml.genWhere(&buf) + buf.WriteString(" LIMIT 1") + + return []string{buf.String()}, [][]interface{}{whereArgs} +} + +// genInsertSQL generates a `INSERT`. +// if in safemode, generates a `INSERT ON DUPLICATE UPDATE` statement. +func (dml *DML) genInsertSQL() ([]string, [][]interface{}) { + var buf strings.Builder + buf.Grow(256) + buf.WriteString("INSERT INTO ") + buf.WriteString(dml.targetTableID) + buf.WriteString(" (") + for i, column := range dml.columns { + if i != len(dml.columns)-1 { + buf.WriteString("`" + strings.ReplaceAll(column.Name.O, "`", "``") + "`,") + } else { + buf.WriteString("`" + strings.ReplaceAll(column.Name.O, "`", "``") + "`)") + } + } + buf.WriteString(" VALUES (") + + // placeholders + for i := range dml.columns { + if i != len(dml.columns)-1 { + buf.WriteString("?,") + } else { + buf.WriteString("?)") + } + } + if dml.safeMode { + buf.WriteString(" ON DUPLICATE KEY UPDATE ") + for i, column := range dml.columns { + col := strings.ReplaceAll(column.Name.O, "`", "``") + buf.WriteString("`" + col + "`=VALUES(`" + col + "`)") + if i != len(dml.columns)-1 { + buf.WriteByte(',') + } + } + } + return []string{buf.String()}, [][]interface{}{dml.values} +} diff --git a/syncer/dml_test.go b/syncer/dml_test.go index f4dbb313a..026991405 100644 --- a/syncer/dml_test.go +++ b/syncer/dml_test.go @@ -15,6 +15,7 @@ package syncer import ( "math" + "strings" . "github.com/pingcap/check" "github.com/pingcap/parser" @@ -22,6 +23,7 @@ import ( "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" "github.com/pingcap/parser/types" + "github.com/pingcap/tidb-tools/pkg/filter" tiddl "github.com/pingcap/tidb/ddl" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/util/mock" @@ -222,3 +224,92 @@ func (s *testSyncerSuite) TestGenMultipleKeys(c *C) { assert(keys, DeepEquals, tc.keys) } } + +func (s *testSyncerSuite) TestGenWhere(c *C) { + p := parser.New() + se := mock.NewContext() + schema1 := "create table test.tb(id int primary key, col1 int unique not null, col2 int unique, name varchar(24))" + ti1, err := createTableInfo(p, se, 0, schema1) + c.Assert(err, IsNil) + schema2 := "create table test.tb(id int, col1 int, col2 int, name varchar(24))" + ti2, err := createTableInfo(p, se, 0, schema2) + c.Assert(err, IsNil) + + testCases := []struct { + dml *DML + sql string + values []interface{} + }{ + { + newDML(del, false, "", &filter.Table{}, nil, []interface{}{1, 2, 3, "haha"}, nil, []interface{}{1, 2, 3, "haha"}, ti1.Columns, ti1), + "`id` = ?", + []interface{}{1}, + }, + { + newDML(update, false, "", &filter.Table{}, []interface{}{1, 2, 3, "haha"}, []interface{}{4, 5, 6, "hihi"}, []interface{}{1, 2, 3, "haha"}, []interface{}{4, 5, 6, "hihi"}, ti1.Columns, ti1), + "`id` = ?", + []interface{}{1}, + }, + { + newDML(del, false, "", &filter.Table{}, nil, []interface{}{1, 2, 3, "haha"}, nil, []interface{}{1, 2, 3, "haha"}, ti2.Columns, ti2), + "`id` = ? AND `col1` = ? AND `col2` = ? AND `name` = ?", + []interface{}{1, 2, 3, "haha"}, + }, + { + newDML(update, false, "", &filter.Table{}, []interface{}{1, 2, 3, "haha"}, []interface{}{4, 5, 6, "hihi"}, []interface{}{1, 2, 3, "haha"}, []interface{}{4, 5, 6, "hihi"}, ti2.Columns, ti2), + "`id` = ? AND `col1` = ? AND `col2` = ? AND `name` = ?", + []interface{}{1, 2, 3, "haha"}, + }, + } + for _, tc := range testCases { + var buf strings.Builder + whereValues := tc.dml.genWhere(&buf) + c.Assert(buf.String(), Equals, tc.sql) + c.Assert(whereValues, DeepEquals, tc.values) + } +} + +func (s *testSyncerSuite) TestGenSQL(c *C) { + p := parser.New() + se := mock.NewContext() + schema := "create table test.tb(id int primary key, col1 int unique not null, col2 int unique, name varchar(24))" + ti, err := createTableInfo(p, se, 0, schema) + c.Assert(err, IsNil) + + testCases := []struct { + dml *DML + queries []string + args [][]interface{} + }{ + { + newDML(insert, false, "`targetSchema`.`targetTable`", &filter.Table{}, nil, []interface{}{1, 2, 3, "haha"}, nil, []interface{}{1, 2, 3, "haha"}, ti.Columns, ti), + []string{"INSERT INTO `targetSchema`.`targetTable` (`id`,`col1`,`col2`,`name`) VALUES (?,?,?,?)"}, + [][]interface{}{{1, 2, 3, "haha"}}, + }, + { + newDML(insert, true, "`targetSchema`.`targetTable`", &filter.Table{}, nil, []interface{}{1, 2, 3, "haha"}, nil, []interface{}{1, 2, 3, "haha"}, ti.Columns, ti), + []string{"INSERT INTO `targetSchema`.`targetTable` (`id`,`col1`,`col2`,`name`) VALUES (?,?,?,?) ON DUPLICATE KEY UPDATE `id`=VALUES(`id`),`col1`=VALUES(`col1`),`col2`=VALUES(`col2`),`name`=VALUES(`name`)"}, + [][]interface{}{{1, 2, 3, "haha"}}, + }, + { + newDML(del, false, "`targetSchema`.`targetTable`", &filter.Table{}, nil, []interface{}{1, 2, 3, "haha"}, nil, []interface{}{1, 2, 3, "haha"}, ti.Columns, ti), + []string{"DELETE FROM `targetSchema`.`targetTable` WHERE `id` = ? LIMIT 1"}, + [][]interface{}{{1}}, + }, + { + newDML(update, false, "`targetSchema`.`targetTable`", &filter.Table{}, []interface{}{1, 2, 3, "haha"}, []interface{}{4, 5, 6, "hihi"}, []interface{}{1, 2, 3, "haha"}, []interface{}{1, 2, 3, "haha"}, ti.Columns, ti), + []string{"UPDATE `targetSchema`.`targetTable` SET `id` = ?, `col1` = ?, `col2` = ?, `name` = ? WHERE `id` = ? LIMIT 1"}, + [][]interface{}{{4, 5, 6, "hihi", 1}}, + }, + { + newDML(update, true, "`targetSchema`.`targetTable`", &filter.Table{}, []interface{}{1, 2, 3, "haha"}, []interface{}{4, 5, 6, "hihi"}, []interface{}{1, 2, 3, "haha"}, []interface{}{1, 2, 3, "haha"}, ti.Columns, ti), + []string{"DELETE FROM `targetSchema`.`targetTable` WHERE `id` = ? LIMIT 1", "INSERT INTO `targetSchema`.`targetTable` (`id`,`col1`,`col2`,`name`) VALUES (?,?,?,?) ON DUPLICATE KEY UPDATE `id`=VALUES(`id`),`col1`=VALUES(`col1`),`col2`=VALUES(`col2`),`name`=VALUES(`name`)"}, + [][]interface{}{{1}, {4, 5, 6, "hihi"}}, + }, + } + for _, tc := range testCases { + queries, args := tc.dml.genSQL() + c.Assert(queries, DeepEquals, tc.queries) + c.Assert(args, DeepEquals, tc.args) + } +} diff --git a/syncer/dml_worker.go b/syncer/dml_worker.go index 08a888277..c9c7c09c4 100644 --- a/syncer/dml_worker.go +++ b/syncer/dml_worker.go @@ -129,10 +129,10 @@ func (w *DMLWorker) run() { w.flushCh <- j } } else { - queueBucket := int(utils.GenHashKey(j.key)) % w.workerCount + queueBucket := int(utils.GenHashKey(j.dml.key)) % w.workerCount w.addCountFunc(false, queueBucketMapping[queueBucket], j.tp, 1, j.targetTable) startTime := time.Now() - w.logger.Debug("queue for key", zap.Int("queue", queueBucket), zap.String("key", j.key)) + w.logger.Debug("queue for key", zap.Int("queue", queueBucket), zap.String("key", j.dml.key)) jobChs[queueBucket] <- j metrics.AddJobDurationHistogram.WithLabelValues(j.tp.String(), w.task, queueBucketMapping[queueBucket], w.source).Observe(time.Since(startTime).Seconds()) } @@ -217,8 +217,9 @@ func (w *DMLWorker) executeBatchJobs(queueID int, jobs []*job) { queries := make([]string, 0, len(jobs)) args := make([][]interface{}, 0, len(jobs)) for _, j := range jobs { - queries = append(queries, j.sql) - args = append(args, j.args) + query, arg := j.dml.genSQL() + queries = append(queries, query...) + args = append(args, arg...) } failpoint.Inject("WaitUserCancel", func(v failpoint.Value) { t := v.(int) diff --git a/syncer/job.go b/syncer/job.go index e85ae2bd5..ffa89350b 100644 --- a/syncer/job.go +++ b/syncer/job.go @@ -70,10 +70,7 @@ type job struct { // sql example: drop table `s1`.`t1`, `s2`.`t2`. sourceTbls map[string][]*filter.Table targetTable *filter.Table - sql string - args []interface{} - key string - keys []string + dml *DML retry bool location binlog.Location // location of last received (ROTATE / QUERY / XID) event, for global/table checkpoint startLocation binlog.Location // start location of the sql in binlog, for handle_error @@ -87,18 +84,19 @@ type job struct { func (j *job) String() string { // only output some important information, maybe useful in execution. - return fmt.Sprintf("tp: %s, sql: %s, args: %v, key: %s, ddls: %s, last_location: %s, start_location: %s, current_location: %s", j.tp, j.sql, j.args, j.key, j.ddls, j.location, j.startLocation, j.currentLocation) + var dmlStr string + if j.dml != nil { + dmlStr = j.dml.String() + } + return fmt.Sprintf("tp: %s, dml: %s, ddls: %s, last_location: %s, start_location: %s, current_location: %s", j.tp, dmlStr, j.ddls, j.location, j.startLocation, j.currentLocation) } -func newDMLJob(tp opType, sourceTable, targetTable *filter.Table, sql string, args []interface{}, - keys []string, ec *eventContext) *job { +func newDMLJob(tp opType, sourceTable, targetTable *filter.Table, dml *DML, ec *eventContext) *job { return &job{ tp: tp, sourceTbls: map[string][]*filter.Table{sourceTable.Schema: {sourceTable}}, targetTable: targetTable, - sql: sql, - args: args, - keys: keys, + dml: dml, retry: true, location: *ec.lastLocation, diff --git a/syncer/job_test.go b/syncer/job_test.go index 90adabf58..439425e74 100644 --- a/syncer/job_test.go +++ b/syncer/job_test.go @@ -15,7 +15,9 @@ package syncer import ( . "github.com/pingcap/check" + "github.com/pingcap/parser" "github.com/pingcap/tidb-tools/pkg/filter" + "github.com/pingcap/tidb/util/mock" "github.com/pingcap/dm/pkg/binlog" ) @@ -79,25 +81,32 @@ func (t *testJobSuite) TestJob(c *C) { needHandleDDLs: []string{"create database test"}, shardingDDLInfo: ddlInfo, } + + schema := "create table test.tb(id int primary key, col1 int unique not null)" + p := parser.New() + se := mock.NewContext() + ti, err := createTableInfo(p, se, 0, schema) + c.Assert(err, IsNil) + testCases := []struct { job *job jobStr string }{ { - newDMLJob(insert, table, table, "insert into test.t1 values(?)", []interface{}{1}, []string{"1"}, ec), - "tp: insert, sql: insert into test.t1 values(?), args: [1], key: , ddls: [], last_location: position: (, 4), gtid-set: , start_location: position: (, 4), gtid-set: , current_location: position: (, 4), gtid-set: ", + newDMLJob(insert, table, table, newDML(insert, true, "targetTable", table, nil, []interface{}{2, 2}, nil, []interface{}{2, 2}, ti.Columns, ti), ec), + "tp: insert, dml: [safemode: true, targetTableID: targetTable, op: insert, columns: [id col1], oldValues: [], values: [2 2]], ddls: [], last_location: position: (, 4), gtid-set: , start_location: position: (, 4), gtid-set: , current_location: position: (, 4), gtid-set: ", }, { newDDLJob(qec), - "tp: ddl, sql: , args: [], key: , ddls: [create database test], last_location: position: (, 4), gtid-set: , start_location: position: (, 4), gtid-set: , current_location: position: (, 4), gtid-set: ", + "tp: ddl, dml: , ddls: [create database test], last_location: position: (, 4), gtid-set: , start_location: position: (, 4), gtid-set: , current_location: position: (, 4), gtid-set: ", }, { newXIDJob(binlog.NewLocation(""), binlog.NewLocation(""), binlog.NewLocation("")), - "tp: xid, sql: , args: [], key: , ddls: [], last_location: position: (, 4), gtid-set: , start_location: position: (, 4), gtid-set: , current_location: position: (, 4), gtid-set: ", + "tp: xid, dml: , ddls: [], last_location: position: (, 4), gtid-set: , start_location: position: (, 4), gtid-set: , current_location: position: (, 4), gtid-set: ", }, { newFlushJob(), - "tp: flush, sql: , args: [], key: , ddls: [], last_location: position: (, 0), gtid-set: , start_location: position: (, 0), gtid-set: , current_location: position: (, 0), gtid-set: ", + "tp: flush, dml: , ddls: [], last_location: position: (, 0), gtid-set: , start_location: position: (, 0), gtid-set: , current_location: position: (, 0), gtid-set: ", }, { newSkipJob(ec), - "tp: skip, sql: , args: [], key: , ddls: [], last_location: position: (, 4), gtid-set: , start_location: position: (, 0), gtid-set: , current_location: position: (, 0), gtid-set: ", + "tp: skip, dml: , ddls: [], last_location: position: (, 4), gtid-set: , start_location: position: (, 0), gtid-set: , current_location: position: (, 0), gtid-set: ", }, } diff --git a/syncer/syncer.go b/syncer/syncer.go index 8478820a5..745ae36a6 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -2033,18 +2033,17 @@ func (s *Syncer) handleRowsEvent(ev *replication.RowsEvent, ec eventContext) err } var ( - sqls []string - keys [][]string - args [][]interface{} + dmls []*DML jobType opType ) param := &genDMLParam{ - tableID: utils.GenTableID(targetTable), + targetTableID: utils.GenTableID(targetTable), data: prunedRows, originalData: rows, columns: prunedColumns, sourceTableInfo: tableInfo, + sourceTable: sourceTable, } switch ec.header.EventType { @@ -2055,7 +2054,7 @@ func (s *Syncer) handleRowsEvent(ev *replication.RowsEvent, ec eventContext) err } param.safeMode = ec.safeMode - sqls, keys, args, err = s.genInsertSQLs(param, exprFilter) + dmls, err = s.genAndFilterInsertDMLs(param, exprFilter) if err != nil { return terror.Annotatef(err, "gen insert sqls failed, sourceTable: %v, targetTable: %v", sourceTable, targetTable) } @@ -2069,7 +2068,7 @@ func (s *Syncer) handleRowsEvent(ev *replication.RowsEvent, ec eventContext) err } param.safeMode = ec.safeMode - sqls, keys, args, err = s.genUpdateSQLs(param, oldExprFilter, newExprFilter) + dmls, err = s.genAndFilterUpdateDMLs(param, oldExprFilter, newExprFilter) if err != nil { return terror.Annotatef(err, "gen update sqls failed, sourceTable: %v, targetTable: %v", sourceTable, targetTable) } @@ -2082,7 +2081,7 @@ func (s *Syncer) handleRowsEvent(ev *replication.RowsEvent, ec eventContext) err return err2 } - sqls, keys, args, err = s.genDeleteSQLs(param, exprFilter) + dmls, err = s.genAndFilterDeleteDMLs(param, exprFilter) if err != nil { return terror.Annotatef(err, "gen delete sqls failed, sourceTable: %v, targetTable: %v", sourceTable, targetTable) } @@ -2095,17 +2094,8 @@ func (s *Syncer) handleRowsEvent(ev *replication.RowsEvent, ec eventContext) err } startTime := time.Now() - for i := range sqls { - var arg []interface{} - var key []string - if args != nil { - arg = args[i] - } - if keys != nil { - key = keys[i] - } - - job := newDMLJob(jobType, sourceTable, targetTable, sqls[i], arg, key, &ec) + for i := range dmls { + job := newDMLJob(jobType, sourceTable, targetTable, dmls[i], &ec) err = s.addJobFunc(job) if err != nil { return err diff --git a/syncer/syncer_test.go b/syncer/syncer_test.go index 02f5aaa60..0ded94cab 100644 --- a/syncer/syncer_test.go +++ b/syncer/syncer_test.go @@ -818,85 +818,80 @@ func (s *testSyncerSuite) TestRun(c *C) { // now every ddl job will start with a flush job { flush, - "", + nil, nil, }, { ddl, - "CREATE DATABASE IF NOT EXISTS `test_1`", + []string{"CREATE DATABASE IF NOT EXISTS `test_1`"}, nil, }, { flush, - "", + nil, nil, }, { ddl, - "CREATE TABLE IF NOT EXISTS `test_1`.`t_1` (`id` INT PRIMARY KEY,`name` VARCHAR(24))", + []string{"CREATE TABLE IF NOT EXISTS `test_1`.`t_1` (`id` INT PRIMARY KEY,`name` VARCHAR(24))"}, nil, }, { flush, - "", + nil, nil, }, { ddl, - "CREATE TABLE IF NOT EXISTS `test_1`.`t_2` (`id` INT PRIMARY KEY,`name` VARCHAR(24))", + []string{"CREATE TABLE IF NOT EXISTS `test_1`.`t_2` (`id` INT PRIMARY KEY,`name` VARCHAR(24))"}, nil, }, { insert, - "REPLACE INTO `test_1`.`t_1` (`id`,`name`) VALUES (?,?)", - []interface{}{int64(580981944116838401), "a"}, + []string{"INSERT INTO `test_1`.`t_1` (`id`,`name`) VALUES (?,?) ON DUPLICATE KEY UPDATE `id`=VALUES(`id`),`name`=VALUES(`name`)"}, + [][]interface{}{{int64(580981944116838401), "a"}}, }, { flush, - "", + nil, nil, }, { ddl, - "ALTER TABLE `test_1`.`t_1` ADD INDEX `index1`(`name`)", + []string{"ALTER TABLE `test_1`.`t_1` ADD INDEX `index1`(`name`)"}, nil, }, { insert, - "REPLACE INTO `test_1`.`t_1` (`id`,`name`) VALUES (?,?)", - []interface{}{int64(580981944116838402), "b"}, + []string{"INSERT INTO `test_1`.`t_1` (`id`,`name`) VALUES (?,?) ON DUPLICATE KEY UPDATE `id`=VALUES(`id`),`name`=VALUES(`name`)"}, + [][]interface{}{{int64(580981944116838402), "b"}}, }, { del, - "DELETE FROM `test_1`.`t_1` WHERE `id` = ? LIMIT 1", - []interface{}{int64(580981944116838401)}, + []string{"DELETE FROM `test_1`.`t_1` WHERE `id` = ? LIMIT 1"}, + [][]interface{}{{int64(580981944116838401)}}, }, { - // in the first minute, safe mode is true, will split update to delete + replace + // safe mode is true, will split update to delete + replace update, - "DELETE FROM `test_1`.`t_1` WHERE `id` = ? LIMIT 1", - []interface{}{int64(580981944116838402)}, - }, { - // in the first minute, safe mode is true, will split update to delete + replace - update, - "REPLACE INTO `test_1`.`t_1` (`id`,`name`) VALUES (?,?)", - []interface{}{int64(580981944116838401), "b"}, + []string{"DELETE FROM `test_1`.`t_1` WHERE `id` = ? LIMIT 1", "INSERT INTO `test_1`.`t_1` (`id`,`name`) VALUES (?,?) ON DUPLICATE KEY UPDATE `id`=VALUES(`id`),`name`=VALUES(`name`)"}, + [][]interface{}{{int64(580981944116838402)}, {int64(580981944116838401), "b"}}, }, { flush, - "", + nil, nil, }, { ddl, - "CREATE TABLE IF NOT EXISTS `test_1`.`t_3` (`id` INT PRIMARY KEY,`name` VARCHAR(24))", + []string{"CREATE TABLE IF NOT EXISTS `test_1`.`t_3` (`id` INT PRIMARY KEY,`name` VARCHAR(24))"}, nil, }, { flush, - "", + nil, nil, }, { ddl, - "ALTER TABLE `test_1`.`t_3` DROP PRIMARY KEY", + []string{"ALTER TABLE `test_1`.`t_3` DROP PRIMARY KEY"}, nil, }, { flush, - "", + nil, nil, }, { ddl, - "ALTER TABLE `test_1`.`t_3` ADD PRIMARY KEY(`id`, `name`)", + []string{"ALTER TABLE `test_1`.`t_3` ADD PRIMARY KEY(`id`, `name`)"}, nil, }, { flush, - "", + nil, nil, }, } @@ -952,15 +947,15 @@ func (s *testSyncerSuite) TestRun(c *C) { expectJobs2 := []*expectJob{ { insert, - "INSERT INTO `test_1`.`t_2` (`id`,`name`) VALUES (?,?)", - []interface{}{int32(3), "c"}, + []string{"INSERT INTO `test_1`.`t_2` (`id`,`name`) VALUES (?,?)"}, + [][]interface{}{{int32(3), "c"}}, }, { del, - "DELETE FROM `test_1`.`t_2` WHERE `id` = ? LIMIT 1", - []interface{}{int32(3)}, + []string{"DELETE FROM `test_1`.`t_2` WHERE `id` = ? LIMIT 1"}, + [][]interface{}{{int32(3)}}, }, { flush, - "", + nil, nil, }, } @@ -1080,52 +1075,48 @@ func (s *testSyncerSuite) TestExitSafeModeByConfig(c *C) { // now every ddl job will start with a flush job { flush, - "", + nil, nil, }, { ddl, - "CREATE DATABASE IF NOT EXISTS `test_1`", + []string{"CREATE DATABASE IF NOT EXISTS `test_1`"}, nil, }, { flush, - "", + nil, nil, }, { ddl, - "CREATE TABLE IF NOT EXISTS `test_1`.`t_1` (`id` INT PRIMARY KEY,`name` VARCHAR(24))", + []string{"CREATE TABLE IF NOT EXISTS `test_1`.`t_1` (`id` INT PRIMARY KEY,`name` VARCHAR(24))"}, nil, }, { insert, - "REPLACE INTO `test_1`.`t_1` (`id`,`name`) VALUES (?,?)", - []interface{}{int32(1), "a"}, + []string{"INSERT INTO `test_1`.`t_1` (`id`,`name`) VALUES (?,?) ON DUPLICATE KEY UPDATE `id`=VALUES(`id`),`name`=VALUES(`name`)"}, + [][]interface{}{{int32(1), "a"}}, }, { del, - "DELETE FROM `test_1`.`t_1` WHERE `id` = ? LIMIT 1", - []interface{}{int32(1)}, + []string{"DELETE FROM `test_1`.`t_1` WHERE `id` = ? LIMIT 1"}, + [][]interface{}{{int32(1)}}, }, { update, - "DELETE FROM `test_1`.`t_1` WHERE `id` = ? LIMIT 1", - []interface{}{int32(2)}, - }, { - update, - "REPLACE INTO `test_1`.`t_1` (`id`,`name`) VALUES (?,?)", - []interface{}{int32(1), "b"}, + []string{"DELETE FROM `test_1`.`t_1` WHERE `id` = ? LIMIT 1", "INSERT INTO `test_1`.`t_1` (`id`,`name`) VALUES (?,?) ON DUPLICATE KEY UPDATE `id`=VALUES(`id`),`name`=VALUES(`name`)"}, + [][]interface{}{{int32(2)}, {int32(1), "b"}}, }, { // start from this event, location passes safeModeExitLocation and safe mode should exit insert, - "INSERT INTO `test_1`.`t_1` (`id`,`name`) VALUES (?,?)", - []interface{}{int32(1), "a"}, + []string{"INSERT INTO `test_1`.`t_1` (`id`,`name`) VALUES (?,?)"}, + [][]interface{}{{int32(1), "a"}}, }, { del, - "DELETE FROM `test_1`.`t_1` WHERE `id` = ? LIMIT 1", - []interface{}{int32(1)}, + []string{"DELETE FROM `test_1`.`t_1` WHERE `id` = ? LIMIT 1"}, + [][]interface{}{{int32(1)}}, }, { update, - "UPDATE `test_1`.`t_1` SET `id` = ?, `name` = ? WHERE `id` = ? LIMIT 1", - []interface{}{int32(1), "b", int32(2)}, + []string{"UPDATE `test_1`.`t_1` SET `id` = ?, `name` = ? WHERE `id` = ? LIMIT 1"}, + [][]interface{}{{int32(1), "b", int32(2)}}, }, { flush, - "", + nil, nil, }, } @@ -1364,21 +1355,20 @@ func executeSQLAndWait(expectJobNum int) { type expectJob struct { tp opType - sqlInJob string - args []interface{} + sqlInJob []string + args [][]interface{} } func checkJobs(c *C, jobs []*job, expectJobs []*expectJob) { c.Assert(len(jobs), Equals, len(expectJobs), Commentf("jobs = %q", jobs)) for i, job := range jobs { - c.Log(i, job.tp, job.ddls, job.sql, job.args) - c.Assert(job.tp, Equals, expectJobs[i].tp) if job.tp == ddl { - c.Assert(job.ddls[0], Equals, expectJobs[i].sqlInJob) - } else { - c.Assert(job.sql, Equals, expectJobs[i].sqlInJob) - c.Assert(job.args, DeepEquals, expectJobs[i].args) + c.Assert(job.ddls, DeepEquals, expectJobs[i].sqlInJob) + } else if job.tp == insert || job.tp == update || job.tp == del { + sqls, args := job.dml.genSQL() + c.Assert(sqls, DeepEquals, expectJobs[i].sqlInJob) + c.Assert(args, DeepEquals, expectJobs[i].args) } } }