Skip to content

Commit

Permalink
Add sync mode config (#867)
Browse files Browse the repository at this point in the history
allow when the column number of downstream table mismatch with current schema.

For the case bidirectional replication, we will execute the DDL at one side, for add or drop column
the column number will mismatch.

cluster A <-> cluster B

drop column of table t at cluster A
some DML of table t at cluster B will miss the column dropped compared to cluster A
  • Loading branch information
july2993 authored Jan 10, 2020
1 parent ede19d5 commit 43fb818
Show file tree
Hide file tree
Showing 6 changed files with 143 additions and 0 deletions.
4 changes: 4 additions & 0 deletions cmd/drainer/drainer.toml
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ password = ""
# if encrypted_password is not empty, password will be ignored.
encrypted_password = ""
port = 3306
# 1: SyncFullColumn, 2: SyncPartialColumn
# when setting SyncPartialColumn drainer will allow the downstream schema
# having more or less column numbers and relax sql mode by removing STRICT_TRANS_TABLES.
# sync-mode = 1

[syncer.to.checkpoint]
# only support mysql or tidb now, you can uncomment this to control where the checkpoint is saved.
Expand Down
45 changes: 45 additions & 0 deletions drainer/sync/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package sync

import (
"database/sql"
"strings"
"sync"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -55,6 +56,27 @@ func NewMysqlSyncer(cfg *DBConfig, tableInfoGetter translator.TableInfoGetter, w
}))
}

if cfg.SyncMode != 0 {
mode := loader.SyncMode(cfg.SyncMode)
opts = append(opts, loader.SyncModeOption(mode))

if mode == loader.SyncPartialColumn {
var oldMode, newMode string
oldMode, newMode, err = relaxSQLMode(db)
if err != nil {
return nil, errors.Trace(err)
}

if newMode != oldMode {
db.Close()
db, err = createDB(cfg.User, cfg.Password, cfg.Host, cfg.Port, &newMode)
if err != nil {
return nil, errors.Trace(err)
}
}
}
}

loader, err := loader.NewLoader(db, opts...)
if err != nil {
return nil, errors.Trace(err)
Expand All @@ -72,6 +94,29 @@ func NewMysqlSyncer(cfg *DBConfig, tableInfoGetter translator.TableInfoGetter, w
return s, nil
}

// set newMode as the oldMode query from db by removing "STRICT_TRANS_TABLES".
func relaxSQLMode(db *sql.DB) (oldMode string, newMode string, err error) {
row := db.QueryRow("SELECT @@SESSION.sql_mode;")
err = row.Scan(&oldMode)
if err != nil {
return "", "", errors.Trace(err)
}

toRemove := "STRICT_TRANS_TABLES"
newMode = oldMode

if !strings.Contains(oldMode, toRemove) {
return
}

// concatenated by "," like: mode1,mode2
newMode = strings.Replace(newMode, toRemove+",", "", -1)
newMode = strings.Replace(newMode, ","+toRemove, "", -1)
newMode = strings.Replace(newMode, toRemove, "", -1)

return
}

// SetSafeMode make the MysqlSyncer to use safe mode or not
func (m *MysqlSyncer) SetSafeMode(mode bool) {
m.loader.SetSafeMode(mode)
Expand Down
26 changes: 26 additions & 0 deletions drainer/sync/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,3 +168,29 @@ func (s *mysqlSuite) TestMySQLSyncerWithRelayer(c *check.C) {
// The previous files should be removed.
c.Assert(len(names), check.Equals, 2)
}

func (s *mysqlSuite) TestRelaxSQLMode(c *check.C) {
tests := []struct {
oldMode string
newMode string
}{
{"ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE", "ONLY_FULL_GROUP_BY,NO_ZERO_IN_DATE"},
{"ONLY_FULL_GROUP_BY,NO_ZERO_IN_DATE,STRICT_TRANS_TABLES", "ONLY_FULL_GROUP_BY,NO_ZERO_IN_DATE"},
{"STRICT_TRANS_TABLES", ""},
{"ONLY_FULL_GROUP_BY,NO_ZERO_IN_DATE", "ONLY_FULL_GROUP_BY,NO_ZERO_IN_DATE"},
}

for _, test := range tests {
db, dbMock, err := sqlmock.New()
c.Assert(err, check.IsNil)

rows := sqlmock.NewRows([]string{"@@SESSION.sql_mode"}).
AddRow(test.oldMode)
dbMock.ExpectQuery("SELECT @@SESSION.sql_mode;").WillReturnRows(rows)

getOld, getNew, err := relaxSQLMode(db)
c.Assert(err, check.IsNil)
c.Assert(getOld, check.Equals, test.oldMode)
c.Assert(getNew, check.Equals, test.newMode)
}
}
1 change: 1 addition & 0 deletions drainer/sync/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type DBConfig struct {
Password string `toml:"password" json:"password"`
// if EncryptedPassword is not empty, Password will be ignore.
EncryptedPassword string `toml:"encrypted_password" json:"encrypted_password"`
SyncMode int `toml:"sync-mode" json:"sync-mode"`
Port int `toml:"port" json:"port"`
Checkpoint CheckpointConfig `toml:"checkpoint" json:"checkpoint"`
BinlogFileDir string `toml:"dir" json:"dir"`
Expand Down
36 changes: 36 additions & 0 deletions pkg/loader/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ type loaderImpl struct {

batchSize int
workerCount int
syncMode SyncMode

input chan *Txn
successTxn chan *Txn
Expand Down Expand Up @@ -99,23 +100,41 @@ type MetricsGroup struct {
QueryHistogramVec *prometheus.HistogramVec
}

// SyncMode represents the sync mode of DML.
type SyncMode int

// SyncMode values.
const (
SyncFullColumn SyncMode = 1 + iota
SyncPartialColumn
)

type options struct {
workerCount int
batchSize int
metrics *MetricsGroup
saveAppliedTS bool
syncMode SyncMode
}

var defaultLoaderOptions = options{
workerCount: 16,
batchSize: 20,
metrics: nil,
saveAppliedTS: false,
syncMode: SyncFullColumn,
}

// A Option sets options such batch size, worker count etc.
type Option func(*options)

// SyncModeOption set sync mode of loader.
func SyncModeOption(n SyncMode) Option {
return func(o *options) {
o.syncMode = n
}
}

// WorkerCount set worker count of loader
func WorkerCount(n int) Option {
return func(o *options) {
Expand Down Expand Up @@ -402,6 +421,20 @@ func (s *loaderImpl) singleExec(executor *executor, dmls []*DML) error {
return errors.Trace(err)
}

func removeOrphanCols(info *tableInfo, dml *DML) {
mp := make(map[string]struct{}, len(info.columns))
for _, name := range info.columns {
mp[name] = struct{}{}
}

for name := range dml.Values {
if _, ok := mp[name]; !ok {
delete(dml.Values, name)
delete(dml.OldValues, name)
}
}
}

func (s *loaderImpl) execDMLs(dmls []*DML) error {
if len(dmls) == 0 {
return nil
Expand All @@ -412,6 +445,9 @@ func (s *loaderImpl) execDMLs(dmls []*DML) error {
return errors.Trace(err)
}
filterGeneratedCols(dml)
if s.syncMode == SyncPartialColumn {
removeOrphanCols(dml.info, dml)
}
}

batchTables, singleDMLs := s.groupDMLs(dmls)
Expand Down
31 changes: 31 additions & 0 deletions pkg/loader/load_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,37 @@ func (cs *LoadSuite) SetUpTest(c *check.C) {
func (cs *LoadSuite) TearDownTest(c *check.C) {
}

func (cs *LoadSuite) TestRemoveOrphanCols(c *check.C) {
dml := &DML{
Values: map[string]interface{}{
"exist1": 11,
"exist2": 22,
"orhpan1": 11,
"orhpan2": 22,
},
OldValues: map[string]interface{}{
"exist1": 1,
"exist2": 2,
"orhpan1": 1,
"orhpan2": 2,
},
}

info := &tableInfo{
columns: []string{"exist1", "exist2"},
}

removeOrphanCols(info, dml)
c.Assert(dml.Values, check.DeepEquals, map[string]interface{}{
"exist1": 11,
"exist2": 22,
})
c.Assert(dml.OldValues, check.DeepEquals, map[string]interface{}{
"exist1": 1,
"exist2": 2,
})
}

func (cs *LoadSuite) TestOptions(c *check.C) {
var o options
WorkerCount(42)(&o)
Expand Down

0 comments on commit 43fb818

Please sign in to comment.