Skip to content

Commit

Permalink
binlog loop back sync (#884)
Browse files Browse the repository at this point in the history
* Add sync mode config (#867)

allow when the column number of downstream table mismatch with current schema.

For the case bidirectional replication, we will execute the DDL at one side, for add or drop column
the column number will mismatch.

cluster A <-> cluster B

drop column of table t at cluster A
some DML of table t at cluster B will miss the column dropped compared to cluster A

* binlog loop back sync (#879)

In the AA dual activity scenario, there is data write (DML) on both sides. The data of tidb cluster on both sides needs to be synchronized with each other,but avoid loopback synchronization,and ddl is only writed one side.

add three variables in drainer.toml as identification to confirm need sync ddl or not and need set sync mark identification or not and sync identification id to Avoid loopback synchronization
add configuration item
loopback-control (true/false) set mark table identification or not and filter txn by mark table
ddl-sync (true/false) sync ddl to downstream DB or not
channel-id (integer) sync identification id,avoid loopback synchronization

Co-authored-by: Nihao123451 <37206498+Nihao123451@users.noreply.github.com>

Co-authored-by: freemindLi <59459626+freemindLi@users.noreply.github.com>
Co-authored-by: Nihao123451 <37206498+Nihao123451@users.noreply.github.com>
  • Loading branch information
3 people authored Jan 15, 2020
1 parent e9cce0d commit a50a36f
Show file tree
Hide file tree
Showing 15 changed files with 451 additions and 29 deletions.
17 changes: 17 additions & 0 deletions cmd/drainer/drainer.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,19 @@ compressor = ""
# number of binlog events in a transaction batch
txn-batch = 20

# sync ddl to downstream db or not
sync-ddl = true

# This variable works in dual-a. if it is false, the upstream data will all be synchronized to the downstream, except for the filtered table.
# If it is true, the channel value is set at the same time, and the upstream starts with the mark table ID updated, and the channel ID is the same as its channel ID.
# this part of data will not be synchronized to the downstream. Therefore, in dual-a scenario,both sides Channel id also needs to be set to the same value
loopback-control = false

# When loopback control is turned on, the channel ID will work.
# In the dual-a scenario, the channel ID synchronized from the downstream to the upstream and the channel ID synchronized from
# the upstream to the downstream need to be set to the same value to avoid loopback synchronization
channel-id = 1

# work count to execute binlogs
# if the latency between drainer and downstream(mysql or tidb) are too high, you might want to increase this
# to get higher throughput by higher concurrent write to the downstream
Expand Down Expand Up @@ -84,6 +97,10 @@ password = ""
# if encrypted_password is not empty, password will be ignored.
encrypted_password = ""
port = 3306
# 1: SyncFullColumn, 2: SyncPartialColumn
# when setting SyncPartialColumn drainer will allow the downstream schema
# having more or less column numbers and relax sql mode by removing STRICT_TRANS_TABLES.
# sync-mode = 1

[syncer.to.checkpoint]
# only support mysql or tidb now, you can uncomment this to control where the checkpoint is saved.
Expand Down
6 changes: 6 additions & 0 deletions drainer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ type SyncerConfig struct {
IgnoreSchemas string `toml:"ignore-schemas" json:"ignore-schemas"`
IgnoreTables []filter.TableName `toml:"ignore-table" json:"ignore-table"`
TxnBatch int `toml:"txn-batch" json:"txn-batch"`
LoopbackControl bool `toml:"loopback-control" json:"loopback-control"`
SyncDDL bool `toml:"sync-ddl" json:"sync-ddl"`
ChannelID int64 `toml:"channel-id" json:"channel-id"`
WorkerCount int `toml:"worker-count" json:"worker-count"`
To *dsync.DBConfig `toml:"to" json:"to"`
DoTables []filter.TableName `toml:"replicate-do-table" json:"replicate-do-table"`
Expand Down Expand Up @@ -127,6 +130,9 @@ func NewConfig() *Config {
fs.Int64Var(&cfg.InitialCommitTS, "initial-commit-ts", -1, "if drainer donesn't have checkpoint, use initial commitTS to initial checkpoint, will get a latest timestamp from pd if setting to be -1")
fs.StringVar(&cfg.Compressor, "compressor", "", "use the specified compressor to compress payload between pump and drainer, only 'gzip' is supported now (default \"\", ie. compression disabled.)")
fs.IntVar(&cfg.SyncerCfg.TxnBatch, "txn-batch", 20, "number of binlog events in a transaction batch")
fs.BoolVar(&cfg.SyncerCfg.LoopbackControl, "loopback-control", false, "set mark or not ")
fs.BoolVar(&cfg.SyncerCfg.SyncDDL, "sync-ddl", true, "sync ddl or not")
fs.Int64Var(&cfg.SyncerCfg.ChannelID, "channel-id", 0, "sync channel id ")
fs.StringVar(&cfg.SyncerCfg.IgnoreSchemas, "ignore-schemas", "INFORMATION_SCHEMA,PERFORMANCE_SCHEMA,mysql", "disable sync those schemas")
fs.IntVar(&cfg.SyncerCfg.WorkerCount, "c", 16, "parallel worker count")
fs.StringVar(&cfg.SyncerCfg.DestDBType, "dest-db-type", "mysql", "target db type: mysql or tidb or file or kafka; see syncer section in conf/drainer.toml")
Expand Down
42 changes: 42 additions & 0 deletions drainer/loopbacksync/loopbacksync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package loopbacksync

const (
//MarkTableName mark table name
MarkTableName = "retl._drainer_repl_mark"
//ChannelID channel id
ChannelID = "channel_id"
//Val val
Val = "val"
//ChannelInfo channel info
ChannelInfo = "channel_info"
)

//LoopBackSync loopback sync info
type LoopBackSync struct {
ChannelID int64
LoopbackControl bool
SyncDDL bool
}

//NewLoopBackSyncInfo return LoopBackSyncInfo objec
func NewLoopBackSyncInfo(ChannelID int64, LoopbackControl, SyncDDL bool) *LoopBackSync {
l := &LoopBackSync{
ChannelID: ChannelID,
LoopbackControl: LoopbackControl,
SyncDDL: SyncDDL,
}
return l
}
27 changes: 27 additions & 0 deletions drainer/loopbacksync/loopbacksync_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package loopbacksync

import "testing"

//TestNewLoopBackSyncInfo test loopBackSyncInfo alloc
func TestNewLoopBackSyncInfo(t *testing.T) {
var ChannelID int64 = 1
var LoopbackControl = true
var SyncDDL = false
l := NewLoopBackSyncInfo(ChannelID, LoopbackControl, SyncDDL)
if l == nil {
t.Error("alloc loopBackSyncInfo objec failed ")
}
}
52 changes: 49 additions & 3 deletions drainer/sync/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@ package sync

import (
"database/sql"
"strings"
"sync"

"github.com/pingcap/tidb-binlog/drainer/loopbacksync"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb-binlog/drainer/translator"
Expand All @@ -38,21 +41,42 @@ type MysqlSyncer struct {
var createDB = loader.CreateDBWithSQLMode

// NewMysqlSyncer returns a instance of MysqlSyncer
func NewMysqlSyncer(cfg *DBConfig, tableInfoGetter translator.TableInfoGetter, worker int, batchSize int, queryHistogramVec *prometheus.HistogramVec, sqlMode *string, destDBType string) (*MysqlSyncer, error) {
func NewMysqlSyncer(cfg *DBConfig, tableInfoGetter translator.TableInfoGetter, worker int, batchSize int, queryHistogramVec *prometheus.HistogramVec, sqlMode *string, destDBType string, info *loopbacksync.LoopBackSync) (*MysqlSyncer, error) {
db, err := createDB(cfg.User, cfg.Password, cfg.Host, cfg.Port, sqlMode)
if err != nil {
return nil, errors.Trace(err)
}

var opts []loader.Option
opts = append(opts, loader.WorkerCount(worker), loader.BatchSize(batchSize), loader.SaveAppliedTS(destDBType == "tidb"))
opts = append(opts, loader.WorkerCount(worker), loader.BatchSize(batchSize), loader.SaveAppliedTS(destDBType == "tidb"), loader.SetloopBackSyncInfo(info))
if queryHistogramVec != nil {
opts = append(opts, loader.Metrics(&loader.MetricsGroup{
QueryHistogramVec: queryHistogramVec,
EventCounterVec: nil,
}))
}

if cfg.SyncMode != 0 {
mode := loader.SyncMode(cfg.SyncMode)
opts = append(opts, loader.SyncModeOption(mode))

if mode == loader.SyncPartialColumn {
var oldMode, newMode string
oldMode, newMode, err = relaxSQLMode(db)
if err != nil {
return nil, errors.Trace(err)
}

if newMode != oldMode {
db.Close()
db, err = createDB(cfg.User, cfg.Password, cfg.Host, cfg.Port, &newMode)
if err != nil {
return nil, errors.Trace(err)
}
}
}
}

loader, err := loader.NewLoader(db, opts...)
if err != nil {
return nil, errors.Trace(err)
Expand All @@ -69,6 +93,29 @@ func NewMysqlSyncer(cfg *DBConfig, tableInfoGetter translator.TableInfoGetter, w
return s, nil
}

// set newMode as the oldMode query from db by removing "STRICT_TRANS_TABLES".
func relaxSQLMode(db *sql.DB) (oldMode string, newMode string, err error) {
row := db.QueryRow("SELECT @@SESSION.sql_mode;")
err = row.Scan(&oldMode)
if err != nil {
return "", "", errors.Trace(err)
}

toRemove := "STRICT_TRANS_TABLES"
newMode = oldMode

if !strings.Contains(oldMode, toRemove) {
return
}

// concatenated by "," like: mode1,mode2
newMode = strings.Replace(newMode, toRemove+",", "", -1)
newMode = strings.Replace(newMode, ","+toRemove, "", -1)
newMode = strings.Replace(newMode, toRemove, "", -1)

return
}

// SetSafeMode make the MysqlSyncer to use safe mode or not
func (m *MysqlSyncer) SetSafeMode(mode bool) {
m.loader.SetSafeMode(mode)
Expand All @@ -80,7 +127,6 @@ func (m *MysqlSyncer) Sync(item *Item) error {
if err != nil {
return errors.Trace(err)
}

txn.Metadata = item

select {
Expand Down
26 changes: 26 additions & 0 deletions drainer/sync/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,29 @@ func (s *mysqlSuite) TestMySQLSyncerAvoidBlock(c *check.C) {
c.Fatal("mysql syncer hasn't synced item in 1s after some error occurs in loader")
}
}

func (s *mysqlSuite) TestRelaxSQLMode(c *check.C) {
tests := []struct {
oldMode string
newMode string
}{
{"ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE", "ONLY_FULL_GROUP_BY,NO_ZERO_IN_DATE"},
{"ONLY_FULL_GROUP_BY,NO_ZERO_IN_DATE,STRICT_TRANS_TABLES", "ONLY_FULL_GROUP_BY,NO_ZERO_IN_DATE"},
{"STRICT_TRANS_TABLES", ""},
{"ONLY_FULL_GROUP_BY,NO_ZERO_IN_DATE", "ONLY_FULL_GROUP_BY,NO_ZERO_IN_DATE"},
}

for _, test := range tests {
db, dbMock, err := sqlmock.New()
c.Assert(err, check.IsNil)

rows := sqlmock.NewRows([]string{"@@SESSION.sql_mode"}).
AddRow(test.oldMode)
dbMock.ExpectQuery("SELECT @@SESSION.sql_mode;").WillReturnRows(rows)

getOld, getNew, err := relaxSQLMode(db)
c.Assert(err, check.IsNil)
c.Assert(getOld, check.Equals, test.oldMode)
c.Assert(getNew, check.Equals, test.newMode)
}
}
2 changes: 1 addition & 1 deletion drainer/sync/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (s *syncerSuite) SetUpTest(c *check.C) {
createDB = oldCreateDB
}()

mysql, err := NewMysqlSyncer(cfg, infoGetter, 1, 1, nil, nil, "mysql")
mysql, err := NewMysqlSyncer(cfg, infoGetter, 1, 1, nil, nil, "mysql", nil)
c.Assert(err, check.IsNil)
s.syncers = append(s.syncers, mysql)

Expand Down
1 change: 1 addition & 0 deletions drainer/sync/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type DBConfig struct {
Password string `toml:"password" json:"password"`
// if EncryptedPassword is not empty, Password will be ignore.
EncryptedPassword string `toml:"encrypted_password" json:"encrypted_password"`
SyncMode int `toml:"sync-mode" json:"sync-mode"`
Port int `toml:"port" json:"port"`
Checkpoint CheckpointConfig `toml:"checkpoint" json:"checkpoint"`
BinlogFileDir string `toml:"dir" json:"dir"`
Expand Down
58 changes: 54 additions & 4 deletions drainer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,14 @@
package drainer

import (
"reflect"
"strings"
"sync/atomic"
"time"

"github.com/pingcap/tidb-binlog/drainer/loopbacksync"
"github.com/pingcap/tidb-binlog/pkg/loader"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/parser/model"
Expand Down Expand Up @@ -46,6 +50,8 @@ type Syncer struct {

filter *filter.Filter

loopbackSync *loopbacksync.LoopBackSync

// last time we successfully sync binlog item to downstream
lastSyncTime time.Time

Expand All @@ -70,6 +76,7 @@ func NewSyncer(cp checkpoint.CheckPoint, cfg *SyncerConfig, jobs []*model.Job) (
ignoreDBs = strings.Split(cfg.IgnoreSchemas, ",")
}
syncer.filter = filter.NewFilter(ignoreDBs, cfg.IgnoreTables, cfg.DoDBs, cfg.DoTables)
syncer.loopbackSync = loopbacksync.NewLoopBackSyncInfo(cfg.ChannelID, cfg.LoopbackControl, cfg.SyncDDL)

var err error
// create schema
Expand All @@ -78,15 +85,15 @@ func NewSyncer(cp checkpoint.CheckPoint, cfg *SyncerConfig, jobs []*model.Job) (
return nil, errors.Trace(err)
}

syncer.dsyncer, err = createDSyncer(cfg, syncer.schema)
syncer.dsyncer, err = createDSyncer(cfg, syncer.schema, syncer.loopbackSync)
if err != nil {
return nil, errors.Trace(err)
}

return syncer, nil
}

func createDSyncer(cfg *SyncerConfig, schema *Schema) (dsyncer dsync.Syncer, err error) {
func createDSyncer(cfg *SyncerConfig, schema *Schema, info *loopbacksync.LoopBackSync) (dsyncer dsync.Syncer, err error) {
switch cfg.DestDBType {
case "kafka":
dsyncer, err = dsync.NewKafka(cfg.To, schema)
Expand All @@ -104,7 +111,7 @@ func createDSyncer(cfg *SyncerConfig, schema *Schema) (dsyncer dsync.Syncer, err
return nil, errors.Annotate(err, "fail to create flash dsyncer")
}
case "mysql", "tidb":
dsyncer, err = dsync.NewMysqlSyncer(cfg.To, schema, cfg.WorkerCount, cfg.TxnBatch, queryHistogramVec, cfg.StrSQLMode, cfg.DestDBType)
dsyncer, err = dsync.NewMysqlSyncer(cfg.To, schema, cfg.WorkerCount, cfg.TxnBatch, queryHistogramVec, cfg.StrSQLMode, cfg.DestDBType, info)
if err != nil {
return nil, errors.Annotate(err, "fail to create mysql dsyncer")
}
Expand Down Expand Up @@ -348,6 +355,15 @@ ForLoop:
err = errors.Annotate(err, "handlePreviousDDLJobIfNeed failed")
break ForLoop
}
var isFilterTransaction = false
var err1 error
if s.loopbackSync != nil && s.loopbackSync.LoopbackControl {
isFilterTransaction, err1 = loopBackStatus(binlog, preWrite, s.schema, s.loopbackSync)
if err1 != nil {
err = errors.Annotate(err1, "analyze transaction failed")
break ForLoop
}
}

var ignore bool
ignore, err = filterTable(preWrite, s.filter, s.schema)
Expand All @@ -356,7 +372,7 @@ ForLoop:
break ForLoop
}

if !ignore {
if !ignore && !isFilterTransaction {
s.addDMLEventMetrics(preWrite.GetMutations())
beginTime := time.Now()
lastAddComitTS = binlog.GetCommitTs()
Expand All @@ -374,6 +390,11 @@ ForLoop:
// DDL (with version 10, commit ts 100) -> DDL (with version 9, commit ts 101) would never happen
s.schema.addJob(b.job)

if !s.cfg.SyncDDL {
log.Info("Syncer skips DDL", zap.String("sql", b.job.Query), zap.Int64("ts", b.GetCommitTs()), zap.Bool("SyncDDL", s.cfg.SyncDDL))
continue
}

log.Debug("get DDL", zap.Int64("SchemaVersion", b.job.BinlogInfo.SchemaVersion))
lastDDLSchemaVersion = b.job.BinlogInfo.SchemaVersion

Expand Down Expand Up @@ -436,6 +457,35 @@ ForLoop:
return cerr
}

func findLoopBackMark(dmls []*loader.DML, info *loopbacksync.LoopBackSync) (bool, error) {
for _, dml := range dmls {
tableName := dml.Database + "." + dml.Table
if strings.EqualFold(tableName, loopbacksync.MarkTableName) {
channelID, ok := dml.Values[loopbacksync.ChannelID]
if ok {
channelIDInt64, ok := channelID.(int64)
if !ok {
return false, errors.Errorf("wrong type of channelID: %s", reflect.TypeOf(channelID))
}
if channelIDInt64 == info.ChannelID {
return true, nil
}
}
}
}
return false, nil
}

func loopBackStatus(binlog *pb.Binlog, prewriteValue *pb.PrewriteValue, infoGetter translator.TableInfoGetter, info *loopbacksync.LoopBackSync) (bool, error) {
var tableName string
var schemaName string
txn, err := translator.TiBinlogToTxn(infoGetter, schemaName, tableName, binlog, prewriteValue)
if err != nil {
return false, errors.Trace(err)
}
return findLoopBackMark(txn.DMLs, info)
}

// filterTable may drop some table mutation in `PrewriteValue`
// Return true if all table mutations are dropped.
func filterTable(pv *pb.PrewriteValue, filter *filter.Filter, schema *Schema) (ignore bool, err error) {
Expand Down
Loading

0 comments on commit a50a36f

Please sign in to comment.