From 043836894cc9756e44e732f6e5098bf1859d1550 Mon Sep 17 00:00:00 2001 From: Lonng Date: Mon, 1 Apr 2019 12:05:31 +0800 Subject: [PATCH 1/4] *: make drainer translator's SQL_MODE configurable --- cmd/drainer/drainer.toml | 3 +++ drainer/config.go | 10 ++++++++++ drainer/config_test.go | 4 +++- drainer/server.go | 2 +- drainer/syncer.go | 13 ++++++++----- drainer/translator/flash.go | 12 +++++++++--- drainer/translator/kafka.go | 3 ++- drainer/translator/mysql.go | 9 +++++++-- drainer/translator/pb.go | 10 +++++++--- drainer/translator/translator.go | 3 ++- drainer/translator/translator_test.go | 3 ++- 11 files changed, 54 insertions(+), 18 deletions(-) diff --git a/cmd/drainer/drainer.toml b/cmd/drainer/drainer.toml index e05a7369b..de6336aef 100644 --- a/cmd/drainer/drainer.toml +++ b/cmd/drainer/drainer.toml @@ -16,6 +16,9 @@ pd-urls = "http://127.0.0.1:2379" # Use the specified compressor to compress payload between pump and drainer compressor = "" +# Use the sql-mode to parse DDL statment +sql-mode = "STRICT_TRANS_TABLES,NO_ENGINE_SUBSTITUTION" + #[security] # Path of file that contains list of trusted SSL CAs for connection with cluster components. # ssl-ca = "/path/to/ca.pem" diff --git a/drainer/config.go b/drainer/config.go index 27a699c23..37632325c 100644 --- a/drainer/config.go +++ b/drainer/config.go @@ -15,6 +15,8 @@ import ( "github.com/BurntSushi/toml" "github.com/ngaut/log" "github.com/pingcap/errors" + "github.com/pingcap/parser/mysql" + "github.com/pingcap/tidb-binlog/drainer/executor" "github.com/pingcap/tidb-binlog/pkg/filter" "github.com/pingcap/tidb-binlog/pkg/flags" @@ -63,6 +65,8 @@ type Config struct { DataDir string `toml:"data-dir" json:"data-dir"` DetectInterval int `toml:"detect-interval" json:"detect-interval"` EtcdURLs string `toml:"pd-urls" json:"pd-urls"` + StrSQLMode string `toml:"sql-mode" json:"sql-mode"` + SQLMode mysql.SQLMode `toml:"-" json:"-"` LogFile string `toml:"log-file" json:"log-file"` LogRotate string `toml:"log-rotate" json:"log-rotate"` InitialCommitTS int64 `toml:"initial-commit-ts" json:"initial-commit-ts"` @@ -83,6 +87,7 @@ func NewConfig() *Config { cfg := &Config{ EtcdTimeout: defaultEtcdTimeout, SyncerCfg: new(SyncerConfig), + StrSQLMode: mysql.DefaultSQLMode, } cfg.FlagSet = flag.NewFlagSet("drainer", flag.ContinueOnError) fs := cfg.FlagSet @@ -158,6 +163,11 @@ func (cfg *Config) Parse(args []string) error { return errors.Trace(err) } + cfg.SQLMode, err = mysql.GetSQLMode(cfg.StrSQLMode) + if err != nil { + return errors.New("invalid config: `sql-mode` must be a valid SQL_MODE") + } + cfg.tls, err = cfg.Security.ToTLSConfig() if err != nil { return errors.Errorf("tls config %+v error %v", cfg.Security, err) diff --git a/drainer/config_test.go b/drainer/config_test.go index 094ebb78e..aa108ea1d 100644 --- a/drainer/config_test.go +++ b/drainer/config_test.go @@ -4,6 +4,7 @@ import ( "testing" . "github.com/pingcap/check" + "github.com/pingcap/parser/mysql" ) // Hook up gocheck into the "go test" runner. @@ -30,6 +31,8 @@ func (t *testDrainerSuite) TestConfig(c *C) { c.Assert(cfg.SyncerCfg.TxnBatch, Equals, 1) c.Assert(cfg.SyncerCfg.DestDBType, Equals, "mysql") c.Assert(cfg.SyncerCfg.To.Host, Equals, "127.0.0.1") + c.Assert(cfg.StrSQLMode, Equals, mysql.DefaultSQLMode) + c.Assert(cfg.SQLMode, Equals, mysql.ModeStrictTransTables|mysql.ModeNoEngineSubstitution) } func (t *testDrainerSuite) TestValidate(c *C) { @@ -52,4 +55,3 @@ func (t *testDrainerSuite) TestValidate(c *C) { err = cfg.validate() c.Assert(err, IsNil) } - diff --git a/drainer/server.go b/drainer/server.go index 2af49d1b7..eb03931b0 100644 --- a/drainer/server.go +++ b/drainer/server.go @@ -107,7 +107,7 @@ func NewServer(cfg *Config) (*Server, error) { checkpointTSOGauge.Set(float64(oracle.ExtractPhysical(uint64(cp.TS())))) - syncer, err := NewSyncer(ctx, cp, cfg.SyncerCfg) + syncer, err := NewSyncer(ctx, cp, cfg.SQLMode, cfg.SyncerCfg) if err != nil { return nil, errors.Trace(err) } diff --git a/drainer/syncer.go b/drainer/syncer.go index cb86336eb..3f0b86a25 100644 --- a/drainer/syncer.go +++ b/drainer/syncer.go @@ -7,6 +7,7 @@ import ( "sync" "time" + "github.com/pingcap/parser/mysql" "golang.org/x/net/context" "github.com/ngaut/log" @@ -34,7 +35,8 @@ type Syncer struct { schema *Schema cp checkpoint.CheckPoint - cfg *SyncerConfig + sqlMode mysql.SQLMode + cfg *SyncerConfig translator translator.SQLTranslator @@ -59,8 +61,9 @@ type Syncer struct { } // NewSyncer returns a Drainer instance -func NewSyncer(ctx context.Context, cp checkpoint.CheckPoint, cfg *SyncerConfig) (*Syncer, error) { +func NewSyncer(ctx context.Context, cp checkpoint.CheckPoint, sqlMode mysql.SQLMode, cfg *SyncerConfig) (*Syncer, error) { syncer := new(Syncer) + syncer.sqlMode = sqlMode syncer.cfg = cfg syncer.cp = cp syncer.input = make(chan *binlogItem, maxBinlogItemCount) @@ -130,13 +133,13 @@ func (s *Syncer) checkWait(job *job) bool { func (s *Syncer) enableSafeModeInitializationPhase() { // set safeMode to true and useInsert to flase at the first, and will use the config after 5 minutes. - s.translator.SetConfig(true) + s.translator.SetConfig(true, s.sqlMode) go func() { ctx, cancel := context.WithCancel(s.ctx) defer func() { cancel() - s.translator.SetConfig(s.cfg.SafeMode) + s.translator.SetConfig(s.cfg.SafeMode, s.sqlMode) }() select { @@ -428,7 +431,7 @@ func (s *Syncer) run(jobs []*model.Job) error { return errors.Trace(err) } - s.translator.SetConfig(s.cfg.SafeMode) + s.translator.SetConfig(s.cfg.SafeMode, s.sqlMode) go s.enableSafeModeInitializationPhase() for i := 0; i < s.cfg.WorkerCount; i++ { diff --git a/drainer/translator/flash.go b/drainer/translator/flash.go index cc888476a..baa6eba4b 100644 --- a/drainer/translator/flash.go +++ b/drainer/translator/flash.go @@ -11,6 +11,7 @@ import ( "github.com/pingcap/parser" "github.com/pingcap/parser/ast" "github.com/pingcap/parser/model" + parsermysql "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb-binlog/pkg/dml" "github.com/pingcap/tidb-binlog/pkg/util" "github.com/pingcap/tidb/mysql" @@ -19,14 +20,17 @@ import ( ) // flashTranslator translates TiDB binlog to flash sqls -type flashTranslator struct{} +type flashTranslator struct { + sqlMode parsermysql.SQLMode +} func init() { Register("flash", &flashTranslator{}) } // Config set the configuration -func (f *flashTranslator) SetConfig(bool) { +func (f *flashTranslator) SetConfig(_ bool, sqlMode parsermysql.SQLMode) { + f.sqlMode = sqlMode } func (f *flashTranslator) GenInsertSQLs(schema string, table *model.TableInfo, rows [][]byte, commitTS int64) ([]string, [][]string, [][]interface{}, error) { @@ -191,7 +195,9 @@ func (f *flashTranslator) GenDeleteSQLs(schema string, table *model.TableInfo, r func (f *flashTranslator) GenDDLSQL(sql string, schema string, commitTS int64) (string, error) { schema = strings.ToLower(schema) - stmt, err := parser.New().ParseOneStmt(sql, "", "") + ddlParser := parser.New() + ddlParser.SetSQLMode(f.sqlMode) + stmt, err := ddlParser.ParseOneStmt(sql, "", "") if err != nil { return "", errors.Trace(err) } diff --git a/drainer/translator/kafka.go b/drainer/translator/kafka.go index cd8d6da2a..c8ad879dd 100644 --- a/drainer/translator/kafka.go +++ b/drainer/translator/kafka.go @@ -9,6 +9,7 @@ import ( "github.com/ngaut/log" "github.com/pingcap/errors" "github.com/pingcap/parser/model" + parsermysql "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb-binlog/pkg/util" obinlog "github.com/pingcap/tidb-tools/tidb-binlog/slave_binlog_proto/go-binlog" "github.com/pingcap/tidb/mysql" @@ -24,7 +25,7 @@ func init() { Register("kafka", &kafkaTranslator{}) } -func (p *kafkaTranslator) SetConfig(bool) { +func (p *kafkaTranslator) SetConfig(bool, parsermysql.SQLMode) { // do nothing } diff --git a/drainer/translator/mysql.go b/drainer/translator/mysql.go index 854ec0128..948eb9dd7 100644 --- a/drainer/translator/mysql.go +++ b/drainer/translator/mysql.go @@ -13,6 +13,7 @@ import ( "github.com/pingcap/parser" "github.com/pingcap/parser/ast" "github.com/pingcap/parser/model" + parsermysql "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb-binlog/pkg/dml" "github.com/pingcap/tidb-binlog/pkg/util" "github.com/pingcap/tidb/mysql" @@ -27,6 +28,7 @@ const implicitColID = -1 type mysqlTranslator struct { // safeMode is a mode for translate sql, will translate update to delete and replace, and translate insert to replace. safeMode int32 + sqlMode parsermysql.SQLMode } func init() { @@ -34,12 +36,13 @@ func init() { Register("tidb", &mysqlTranslator{}) } -func (m *mysqlTranslator) SetConfig(safeMode bool) { +func (m *mysqlTranslator) SetConfig(safeMode bool, sqlMode parsermysql.SQLMode) { if safeMode { atomic.StoreInt32(&m.safeMode, 1) } else { atomic.StoreInt32(&m.safeMode, 0) } + m.sqlMode = sqlMode } func (m *mysqlTranslator) GenInsertSQLs(schema string, table *model.TableInfo, rows [][]byte, commitTS int64) ([]string, [][]string, [][]interface{}, error) { @@ -272,7 +275,9 @@ func (m *mysqlTranslator) genDeleteSQL(schema string, table *model.TableInfo, co } func (m *mysqlTranslator) GenDDLSQL(sql string, schema string, commitTS int64) (string, error) { - stmt, err := parser.New().ParseOneStmt(sql, "", "") + ddlParser := parser.New() + ddlParser.SetSQLMode(m.sqlMode) + stmt, err := ddlParser.ParseOneStmt(sql, "", "") if err != nil { return "", errors.Trace(err) } diff --git a/drainer/translator/pb.go b/drainer/translator/pb.go index 528149306..3f7932dde 100644 --- a/drainer/translator/pb.go +++ b/drainer/translator/pb.go @@ -9,6 +9,7 @@ import ( "github.com/pingcap/parser" "github.com/pingcap/parser/ast" "github.com/pingcap/parser/model" + "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb-binlog/pkg/util" pb "github.com/pingcap/tidb-binlog/proto/binlog" "github.com/pingcap/tidb/sessionctx/stmtctx" @@ -19,14 +20,15 @@ import ( // pbTranslator translates TiDB binlog to self-description protobuf type pbTranslator struct { + sqlMode mysql.SQLMode } func init() { Register("pb", &pbTranslator{}) } -func (p *pbTranslator) SetConfig(bool) { - // do nothing +func (p *pbTranslator) SetConfig(_ bool, sqlMode mysql.SQLMode) { + p.sqlMode = sqlMode } func (p *pbTranslator) GenInsertSQLs(schema string, table *model.TableInfo, rows [][]byte, commitTS int64) ([]string, [][]string, [][]interface{}, error) { @@ -182,7 +184,9 @@ func (p *pbTranslator) GenDeleteSQLs(schema string, table *model.TableInfo, rows } func (p *pbTranslator) GenDDLSQL(sql string, schema string, commitTS int64) (string, error) { - stmt, err := parser.New().ParseOneStmt(sql, "", "") + ddlParser := parser.New() + ddlParser.SetSQLMode(p.sqlMode) + stmt, err := ddlParser.ParseOneStmt(sql, "", "") if err != nil { return "", errors.Trace(err) } diff --git a/drainer/translator/translator.go b/drainer/translator/translator.go index f4fc95825..efa07c5a3 100644 --- a/drainer/translator/translator.go +++ b/drainer/translator/translator.go @@ -6,6 +6,7 @@ import ( "github.com/ngaut/log" "github.com/pingcap/errors" "github.com/pingcap/parser/model" + parsermysql "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb-binlog/pkg/util" "github.com/pingcap/tidb/mysql" "github.com/pingcap/tidb/table" @@ -35,7 +36,7 @@ var providers = make(map[string]SQLTranslator) // SQLTranslator is the interface for translating TiDB binlog to target sqls type SQLTranslator interface { // Config set the configuration - SetConfig(safeMode bool) + SetConfig(safeMode bool, sqlMode parsermysql.SQLMode) // GenInsertSQLs generates the insert sqls GenInsertSQLs(schema string, table *model.TableInfo, rows [][]byte, commitTS int64) ([]string, [][]string, [][]interface{}, error) diff --git a/drainer/translator/translator_test.go b/drainer/translator/translator_test.go index ab32d0521..ec9645f4c 100644 --- a/drainer/translator/translator_test.go +++ b/drainer/translator/translator_test.go @@ -9,6 +9,7 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/parser/model" + parsermysql "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb/mysql" "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/tablecodec" @@ -57,7 +58,7 @@ func (t *testTranslatorSuite) TestTranslater(c *C) { } func testGenInsertSQLs(c *C, s SQLTranslator, safeMode bool) { - s.SetConfig(safeMode) + s.SetConfig(safeMode, parsermysql.ModeStrictTransTables|parsermysql.ModeNoEngineSubstitution) schema := "t" tables := []*model.TableInfo{testGenTable("normal"), testGenTable("hasPK"), testGenTable("hasID")} exceptedKeys := []int{3, 2, 1} From 20547cea43231821b2107244c85a836bd2e82ae5 Mon Sep 17 00:00:00 2001 From: Lonng Date: Mon, 1 Apr 2019 12:49:15 +0800 Subject: [PATCH 2/4] *: address comment --- cmd/drainer/drainer.toml | 6 +++--- drainer/config.go | 13 +++++++------ drainer/config_test.go | 4 ++-- drainer/server.go | 2 +- drainer/syncer.go | 18 ++++++++---------- 5 files changed, 21 insertions(+), 22 deletions(-) diff --git a/cmd/drainer/drainer.toml b/cmd/drainer/drainer.toml index de6336aef..ffc3f29c0 100644 --- a/cmd/drainer/drainer.toml +++ b/cmd/drainer/drainer.toml @@ -16,9 +16,6 @@ pd-urls = "http://127.0.0.1:2379" # Use the specified compressor to compress payload between pump and drainer compressor = "" -# Use the sql-mode to parse DDL statment -sql-mode = "STRICT_TRANS_TABLES,NO_ENGINE_SUBSTITUTION" - #[security] # Path of file that contains list of trusted SSL CAs for connection with cluster components. # ssl-ca = "/path/to/ca.pem" @@ -30,6 +27,9 @@ sql-mode = "STRICT_TRANS_TABLES,NO_ENGINE_SUBSTITUTION" # syncer Configuration. [syncer] +# Use the sql-mode to parse DDL statment +sql-mode = "STRICT_TRANS_TABLES,NO_ENGINE_SUBSTITUTION" + # disable sync these schema ignore-schemas = "INFORMATION_SCHEMA,PERFORMANCE_SCHEMA,mysql" diff --git a/drainer/config.go b/drainer/config.go index 37632325c..0442b9ee5 100644 --- a/drainer/config.go +++ b/drainer/config.go @@ -45,6 +45,8 @@ var ( // SyncerConfig is the Syncer's configuration. type SyncerConfig struct { + StrSQLMode string `toml:"sql-mode" json:"sql-mode"` + SQLMode mysql.SQLMode `toml:"-" json:"-"` IgnoreSchemas string `toml:"ignore-schemas" json:"ignore-schemas"` TxnBatch int `toml:"txn-batch" json:"txn-batch"` WorkerCount int `toml:"worker-count" json:"worker-count"` @@ -65,8 +67,6 @@ type Config struct { DataDir string `toml:"data-dir" json:"data-dir"` DetectInterval int `toml:"detect-interval" json:"detect-interval"` EtcdURLs string `toml:"pd-urls" json:"pd-urls"` - StrSQLMode string `toml:"sql-mode" json:"sql-mode"` - SQLMode mysql.SQLMode `toml:"-" json:"-"` LogFile string `toml:"log-file" json:"log-file"` LogRotate string `toml:"log-rotate" json:"log-rotate"` InitialCommitTS int64 `toml:"initial-commit-ts" json:"initial-commit-ts"` @@ -86,8 +86,9 @@ type Config struct { func NewConfig() *Config { cfg := &Config{ EtcdTimeout: defaultEtcdTimeout, - SyncerCfg: new(SyncerConfig), - StrSQLMode: mysql.DefaultSQLMode, + SyncerCfg: &SyncerConfig{ + StrSQLMode: mysql.DefaultSQLMode, + }, } cfg.FlagSet = flag.NewFlagSet("drainer", flag.ContinueOnError) fs := cfg.FlagSet @@ -163,9 +164,9 @@ func (cfg *Config) Parse(args []string) error { return errors.Trace(err) } - cfg.SQLMode, err = mysql.GetSQLMode(cfg.StrSQLMode) + cfg.SyncerCfg.SQLMode, err = mysql.GetSQLMode(cfg.SyncerCfg.StrSQLMode) if err != nil { - return errors.New("invalid config: `sql-mode` must be a valid SQL_MODE") + return errors.Annotate(err, "invalid config: `sql-mode` must be a valid SQL_MODE") } cfg.tls, err = cfg.Security.ToTLSConfig() diff --git a/drainer/config_test.go b/drainer/config_test.go index aa108ea1d..a4ff0514f 100644 --- a/drainer/config_test.go +++ b/drainer/config_test.go @@ -31,8 +31,8 @@ func (t *testDrainerSuite) TestConfig(c *C) { c.Assert(cfg.SyncerCfg.TxnBatch, Equals, 1) c.Assert(cfg.SyncerCfg.DestDBType, Equals, "mysql") c.Assert(cfg.SyncerCfg.To.Host, Equals, "127.0.0.1") - c.Assert(cfg.StrSQLMode, Equals, mysql.DefaultSQLMode) - c.Assert(cfg.SQLMode, Equals, mysql.ModeStrictTransTables|mysql.ModeNoEngineSubstitution) + c.Assert(cfg.SyncerCfg.StrSQLMode, Equals, mysql.DefaultSQLMode) + c.Assert(cfg.SyncerCfg.SQLMode, Equals, mysql.ModeStrictTransTables|mysql.ModeNoEngineSubstitution) } func (t *testDrainerSuite) TestValidate(c *C) { diff --git a/drainer/server.go b/drainer/server.go index eb03931b0..2af49d1b7 100644 --- a/drainer/server.go +++ b/drainer/server.go @@ -107,7 +107,7 @@ func NewServer(cfg *Config) (*Server, error) { checkpointTSOGauge.Set(float64(oracle.ExtractPhysical(uint64(cp.TS())))) - syncer, err := NewSyncer(ctx, cp, cfg.SQLMode, cfg.SyncerCfg) + syncer, err := NewSyncer(ctx, cp, cfg.SyncerCfg) if err != nil { return nil, errors.Trace(err) } diff --git a/drainer/syncer.go b/drainer/syncer.go index 3f0b86a25..6bad3b80e 100644 --- a/drainer/syncer.go +++ b/drainer/syncer.go @@ -7,20 +7,20 @@ import ( "sync" "time" - "github.com/pingcap/parser/mysql" "golang.org/x/net/context" "github.com/ngaut/log" "github.com/pingcap/errors" "github.com/pingcap/parser/model" + "github.com/pingcap/tidb/store/tikv/oracle" + pb "github.com/pingcap/tipb/go-binlog" + "github.com/pingcap/tidb-binlog/drainer/checkpoint" "github.com/pingcap/tidb-binlog/drainer/executor" "github.com/pingcap/tidb-binlog/drainer/translator" "github.com/pingcap/tidb-binlog/pkg/filter" "github.com/pingcap/tidb-binlog/pkg/loader" pkgsql "github.com/pingcap/tidb-binlog/pkg/sql" - "github.com/pingcap/tidb/store/tikv/oracle" - pb "github.com/pingcap/tipb/go-binlog" ) var ( @@ -35,8 +35,7 @@ type Syncer struct { schema *Schema cp checkpoint.CheckPoint - sqlMode mysql.SQLMode - cfg *SyncerConfig + cfg *SyncerConfig translator translator.SQLTranslator @@ -61,9 +60,8 @@ type Syncer struct { } // NewSyncer returns a Drainer instance -func NewSyncer(ctx context.Context, cp checkpoint.CheckPoint, sqlMode mysql.SQLMode, cfg *SyncerConfig) (*Syncer, error) { +func NewSyncer(ctx context.Context, cp checkpoint.CheckPoint, cfg *SyncerConfig) (*Syncer, error) { syncer := new(Syncer) - syncer.sqlMode = sqlMode syncer.cfg = cfg syncer.cp = cp syncer.input = make(chan *binlogItem, maxBinlogItemCount) @@ -133,13 +131,13 @@ func (s *Syncer) checkWait(job *job) bool { func (s *Syncer) enableSafeModeInitializationPhase() { // set safeMode to true and useInsert to flase at the first, and will use the config after 5 minutes. - s.translator.SetConfig(true, s.sqlMode) + s.translator.SetConfig(true, s.cfg.SQLMode) go func() { ctx, cancel := context.WithCancel(s.ctx) defer func() { cancel() - s.translator.SetConfig(s.cfg.SafeMode, s.sqlMode) + s.translator.SetConfig(s.cfg.SafeMode, s.cfg.SQLMode) }() select { @@ -431,7 +429,7 @@ func (s *Syncer) run(jobs []*model.Job) error { return errors.Trace(err) } - s.translator.SetConfig(s.cfg.SafeMode, s.sqlMode) + s.translator.SetConfig(s.cfg.SafeMode, s.cfg.SQLMode) go s.enableSafeModeInitializationPhase() for i := 0; i < s.cfg.WorkerCount; i++ { From 2acf74040fac963a5ff02f1a9646fd76706faaa1 Mon Sep 17 00:00:00 2001 From: Jiahao Huang Date: Mon, 1 Apr 2019 15:39:57 +0800 Subject: [PATCH 3/4] drainer/* Set the according sql-mode --- cmd/drainer/drainer.toml | 3 ++- drainer/executor/executor.go | 4 ++-- drainer/executor/mysql.go | 4 ++-- drainer/syncer.go | 2 +- drainer/util.go | 4 ++-- pkg/sql/sql.go | 15 +++++++++++++-- 6 files changed, 22 insertions(+), 10 deletions(-) diff --git a/cmd/drainer/drainer.toml b/cmd/drainer/drainer.toml index ffc3f29c0..64d7ac71c 100644 --- a/cmd/drainer/drainer.toml +++ b/cmd/drainer/drainer.toml @@ -27,7 +27,8 @@ compressor = "" # syncer Configuration. [syncer] -# Use the sql-mode to parse DDL statment +# Assume the upstream sql-mode, will use the same sql-mode to parse DDL statment, and set the same sql-mode at downstream +# when db-type is mysql sql-mode = "STRICT_TRANS_TABLES,NO_ENGINE_SUBSTITUTION" # disable sync these schema diff --git a/drainer/executor/executor.go b/drainer/executor/executor.go index 986636909..369fb1afe 100644 --- a/drainer/executor/executor.go +++ b/drainer/executor/executor.go @@ -15,10 +15,10 @@ type Executor interface { } // New returns the an Executor instance by given name -func New(name string, cfg *DBConfig) (Executor, error) { +func New(name string, cfg *DBConfig, sqlMode string) (Executor, error) { switch name { case "mysql", "tidb": - return newMysql(cfg) + return newMysql(cfg, sqlMode) case "pb": return newPB(cfg) case "flash": diff --git a/drainer/executor/mysql.go b/drainer/executor/mysql.go index e8316b5d0..cdd10b9c3 100644 --- a/drainer/executor/mysql.go +++ b/drainer/executor/mysql.go @@ -17,8 +17,8 @@ type mysqlExecutor struct { *baseError } -func newMysql(cfg *DBConfig) (Executor, error) { - db, err := pkgsql.OpenDB("mysql", cfg.Host, cfg.Port, cfg.User, cfg.Password) +func newMysql(cfg *DBConfig, sqlMode string) (Executor, error) { + db, err := pkgsql.OpenDBWithSQLMode("mysql", cfg.Host, cfg.Port, cfg.User, cfg.Password, sqlMode) if err != nil { return nil, errors.Trace(err) } diff --git a/drainer/syncer.go b/drainer/syncer.go index 6bad3b80e..bce2dcbc9 100644 --- a/drainer/syncer.go +++ b/drainer/syncer.go @@ -419,7 +419,7 @@ func (s *Syncer) run(jobs []*model.Job) error { return errors.Trace(err) } - s.executors, err = createExecutors(s.cfg.DestDBType, s.cfg.To, s.cfg.WorkerCount) + s.executors, err = createExecutors(s.cfg.DestDBType, s.cfg.To, s.cfg.WorkerCount, s.cfg.StrSQLMode) if err != nil { return errors.Trace(err) } diff --git a/drainer/util.go b/drainer/util.go index 3fbc5e9b0..edb092519 100644 --- a/drainer/util.go +++ b/drainer/util.go @@ -134,10 +134,10 @@ func closeExecutors(executors ...executor.Executor) { } } -func createExecutors(destDBType string, cfg *executor.DBConfig, count int) ([]executor.Executor, error) { +func createExecutors(destDBType string, cfg *executor.DBConfig, count int, sqlMODE string) ([]executor.Executor, error) { executors := make([]executor.Executor, 0, count) for i := 0; i < count; i++ { - executor, err := executor.New(destDBType, cfg) + executor, err := executor.New(destDBType, cfg, sqlMODE) if err != nil { return nil, errors.Trace(err) } diff --git a/pkg/sql/sql.go b/pkg/sql/sql.go index a7c2dd96c..712692eeb 100644 --- a/pkg/sql/sql.go +++ b/pkg/sql/sql.go @@ -4,6 +4,7 @@ import ( "database/sql" "fmt" "net" + "net/url" "strconv" "strings" "time" @@ -116,9 +117,14 @@ func ExecuteTxnWithHistogram(db *sql.DB, sqls []string, args [][]interface{}, hi return nil } -// OpenDB creates an instance of sql.DB. -func OpenDB(proto string, host string, port int, username string, password string) (*sql.DB, error) { +// OpenDBWithSQLMode creates an instance of sql.DB. +func OpenDBWithSQLMode(proto string, host string, port int, username string, password string, sqlMode string) (*sql.DB, error) { dbDSN := fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8mb4,utf8&multiStatements=true", username, password, host, port) + if len(sqlMode) > 0 { + log.Info("set sql mode: ", sqlMode) + // same as "set sql_mode = ''" + dbDSN += "&sql_mode='" + url.QueryEscape(sqlMode) + "'" + } db, err := sql.Open(proto, dbDSN) if err != nil { return nil, errors.Trace(err) @@ -127,6 +133,11 @@ func OpenDB(proto string, host string, port int, username string, password strin return db, nil } +// OpenDB creates an instance of sql.DB. +func OpenDB(proto string, host string, port int, username string, password string) (*sql.DB, error) { + return OpenDBWithSQLMode(proto, host, port, username, password, "") +} + // IgnoreDDLError checks the error can be ignored or not. func IgnoreDDLError(err error) bool { errCode, ok := GetSQLErrCode(err) From 61d408b3e82c8d1e39aabe38ec7947b4a23935f3 Mon Sep 17 00:00:00 2001 From: Jiahao Huang Date: Mon, 1 Apr 2019 17:03:06 +0800 Subject: [PATCH 4/4] make everything same as before if no `sql-mode` in config file --- cmd/drainer/drainer.toml | 7 ++++--- drainer/config.go | 14 +++++++------- drainer/config_test.go | 5 +++-- drainer/executor/executor.go | 2 +- drainer/executor/mysql.go | 2 +- drainer/util.go | 2 +- pkg/sql/sql.go | 11 +++++------ 7 files changed, 22 insertions(+), 21 deletions(-) diff --git a/cmd/drainer/drainer.toml b/cmd/drainer/drainer.toml index 64d7ac71c..3fc9e5f1f 100644 --- a/cmd/drainer/drainer.toml +++ b/cmd/drainer/drainer.toml @@ -27,9 +27,10 @@ compressor = "" # syncer Configuration. [syncer] -# Assume the upstream sql-mode, will use the same sql-mode to parse DDL statment, and set the same sql-mode at downstream -# when db-type is mysql -sql-mode = "STRICT_TRANS_TABLES,NO_ENGINE_SUBSTITUTION" +# Assume the upstream sql-mode. +# If this is setted , will use the same sql-mode to parse DDL statment, and set the same sql-mode at downstream when db-type is mysql. +# If this is not setted, it will not set any sql-mode. +# sql-mode = "STRICT_TRANS_TABLES,NO_ENGINE_SUBSTITUTION" # disable sync these schema ignore-schemas = "INFORMATION_SCHEMA,PERFORMANCE_SCHEMA,mysql" diff --git a/drainer/config.go b/drainer/config.go index 0442b9ee5..7ae8d1a0f 100644 --- a/drainer/config.go +++ b/drainer/config.go @@ -45,7 +45,7 @@ var ( // SyncerConfig is the Syncer's configuration. type SyncerConfig struct { - StrSQLMode string `toml:"sql-mode" json:"sql-mode"` + StrSQLMode *string `toml:"sql-mode" json:"sql-mode"` SQLMode mysql.SQLMode `toml:"-" json:"-"` IgnoreSchemas string `toml:"ignore-schemas" json:"ignore-schemas"` TxnBatch int `toml:"txn-batch" json:"txn-batch"` @@ -86,9 +86,7 @@ type Config struct { func NewConfig() *Config { cfg := &Config{ EtcdTimeout: defaultEtcdTimeout, - SyncerCfg: &SyncerConfig{ - StrSQLMode: mysql.DefaultSQLMode, - }, + SyncerCfg: &SyncerConfig{}, } cfg.FlagSet = flag.NewFlagSet("drainer", flag.ContinueOnError) fs := cfg.FlagSet @@ -164,9 +162,11 @@ func (cfg *Config) Parse(args []string) error { return errors.Trace(err) } - cfg.SyncerCfg.SQLMode, err = mysql.GetSQLMode(cfg.SyncerCfg.StrSQLMode) - if err != nil { - return errors.Annotate(err, "invalid config: `sql-mode` must be a valid SQL_MODE") + if cfg.SyncerCfg.StrSQLMode != nil { + cfg.SyncerCfg.SQLMode, err = mysql.GetSQLMode(*cfg.SyncerCfg.StrSQLMode) + if err != nil { + return errors.Annotate(err, "invalid config: `sql-mode` must be a valid SQL_MODE") + } } cfg.tls, err = cfg.Security.ToTLSConfig() diff --git a/drainer/config_test.go b/drainer/config_test.go index a4ff0514f..ea7834ec1 100644 --- a/drainer/config_test.go +++ b/drainer/config_test.go @@ -31,8 +31,9 @@ func (t *testDrainerSuite) TestConfig(c *C) { c.Assert(cfg.SyncerCfg.TxnBatch, Equals, 1) c.Assert(cfg.SyncerCfg.DestDBType, Equals, "mysql") c.Assert(cfg.SyncerCfg.To.Host, Equals, "127.0.0.1") - c.Assert(cfg.SyncerCfg.StrSQLMode, Equals, mysql.DefaultSQLMode) - c.Assert(cfg.SyncerCfg.SQLMode, Equals, mysql.ModeStrictTransTables|mysql.ModeNoEngineSubstitution) + var strSQLMode *string + c.Assert(cfg.SyncerCfg.StrSQLMode, Equals, strSQLMode) + c.Assert(cfg.SyncerCfg.SQLMode, Equals, mysql.SQLMode(0)) } func (t *testDrainerSuite) TestValidate(c *C) { diff --git a/drainer/executor/executor.go b/drainer/executor/executor.go index 369fb1afe..7f5162402 100644 --- a/drainer/executor/executor.go +++ b/drainer/executor/executor.go @@ -15,7 +15,7 @@ type Executor interface { } // New returns the an Executor instance by given name -func New(name string, cfg *DBConfig, sqlMode string) (Executor, error) { +func New(name string, cfg *DBConfig, sqlMode *string) (Executor, error) { switch name { case "mysql", "tidb": return newMysql(cfg, sqlMode) diff --git a/drainer/executor/mysql.go b/drainer/executor/mysql.go index cdd10b9c3..31e269984 100644 --- a/drainer/executor/mysql.go +++ b/drainer/executor/mysql.go @@ -17,7 +17,7 @@ type mysqlExecutor struct { *baseError } -func newMysql(cfg *DBConfig, sqlMode string) (Executor, error) { +func newMysql(cfg *DBConfig, sqlMode *string) (Executor, error) { db, err := pkgsql.OpenDBWithSQLMode("mysql", cfg.Host, cfg.Port, cfg.User, cfg.Password, sqlMode) if err != nil { return nil, errors.Trace(err) diff --git a/drainer/util.go b/drainer/util.go index edb092519..4628457cb 100644 --- a/drainer/util.go +++ b/drainer/util.go @@ -134,7 +134,7 @@ func closeExecutors(executors ...executor.Executor) { } } -func createExecutors(destDBType string, cfg *executor.DBConfig, count int, sqlMODE string) ([]executor.Executor, error) { +func createExecutors(destDBType string, cfg *executor.DBConfig, count int, sqlMODE *string) ([]executor.Executor, error) { executors := make([]executor.Executor, 0, count) for i := 0; i < count; i++ { executor, err := executor.New(destDBType, cfg, sqlMODE) diff --git a/pkg/sql/sql.go b/pkg/sql/sql.go index 712692eeb..68674f8f2 100644 --- a/pkg/sql/sql.go +++ b/pkg/sql/sql.go @@ -118,16 +118,15 @@ func ExecuteTxnWithHistogram(db *sql.DB, sqls []string, args [][]interface{}, hi } // OpenDBWithSQLMode creates an instance of sql.DB. -func OpenDBWithSQLMode(proto string, host string, port int, username string, password string, sqlMode string) (*sql.DB, error) { +func OpenDBWithSQLMode(proto string, host string, port int, username string, password string, sqlMode *string) (*sql.DB, error) { dbDSN := fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8mb4,utf8&multiStatements=true", username, password, host, port) - if len(sqlMode) > 0 { - log.Info("set sql mode: ", sqlMode) + if sqlMode != nil { // same as "set sql_mode = ''" - dbDSN += "&sql_mode='" + url.QueryEscape(sqlMode) + "'" + dbDSN += "&sql_mode='" + url.QueryEscape(*sqlMode) + "'" } db, err := sql.Open(proto, dbDSN) if err != nil { - return nil, errors.Trace(err) + return nil, errors.Annotatef(err, "dsn: %s", dbDSN) } return db, nil @@ -135,7 +134,7 @@ func OpenDBWithSQLMode(proto string, host string, port int, username string, pas // OpenDB creates an instance of sql.DB. func OpenDB(proto string, host string, port int, username string, password string) (*sql.DB, error) { - return OpenDBWithSQLMode(proto, host, port, username, password, "") + return OpenDBWithSQLMode(proto, host, port, username, password, nil) } // IgnoreDDLError checks the error can be ignored or not.