From 508f58e908696097461d37bcec6fa203b4c3cf2d Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Wed, 12 Jan 2022 21:05:42 -0600 Subject: [PATCH] This is an automated cherry-pick of #1093 Signed-off-by: ti-chi-bot --- .github/pull_request_template.md | 12 +++++++++++- Makefile | 2 +- drainer/sync/pb.go | 3 ++- drainer/translator/pb.go | 24 ++++++++++++++---------- drainer/translator/pb_test.go | 32 ++++++++++++++++++++++++++++++++ drainer/translator/translator.go | 14 +++++++------- pkg/loader/load.go | 13 +------------ pkg/loader/load_test.go | 18 ------------------ pkg/util/util.go | 28 ++++++++++++++++++++++++++++ pkg/util/util_test.go | 24 ++++++++++++++++++++++++ tests/sequence/run.sh | 2 +- 11 files changed, 121 insertions(+), 51 deletions(-) diff --git a/.github/pull_request_template.md b/.github/pull_request_template.md index 664953aac..8c0280e71 100644 --- a/.github/pull_request_template.md +++ b/.github/pull_request_template.md @@ -35,4 +35,14 @@ Related changes - Need to cherry-pick to the release branch - Need to update the documentation - Need to update the `tidb-ansible` repository - - Need to be included in the release note \ No newline at end of file +<<<<<<< HEAD + - Need to be included in the release note +======= + - Need to be included in the release note + +### Release note + + + +- No release note +>>>>>>> 194d4ac1 (pb: update pb parser to avoid drainer failure (#1093)) diff --git a/Makefile b/Makefile index fa23aa53f..ad6ce401a 100644 --- a/Makefile +++ b/Makefile @@ -19,7 +19,7 @@ GOVERSION := "`go version`" ARCH := "`uname -s`" LINUX := "Linux" MAC := "Darwin" -PACKAGE_LIST := go list ./...| grep -vE 'vendor|proto' +PACKAGE_LIST := go list ./...| grep -vE 'vendor|proto' | sed '/github.com\/pingcap\/tidb-binlog\/bin/d' PACKAGES := $$($(PACKAGE_LIST)) PACKAGE_DIRECTORIES := $(PACKAGE_LIST) | sed 's|github.com/pingcap/$(PROJECT)/||' FILES := $$(find . -name '*.go' -type f | grep -vE 'vendor' | grep -vE 'binlog.pb.go') diff --git a/drainer/sync/pb.go b/drainer/sync/pb.go index 160f3253f..9f439f65b 100644 --- a/drainer/sync/pb.go +++ b/drainer/sync/pb.go @@ -19,10 +19,11 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" + tb "github.com/pingcap/tipb/go-binlog" + "github.com/pingcap/tidb-binlog/drainer/translator" "github.com/pingcap/tidb-binlog/pkg/binlogfile" pb "github.com/pingcap/tidb-binlog/proto/binlog" - tb "github.com/pingcap/tipb/go-binlog" ) var _ Syncer = &pbSyncer{} diff --git a/drainer/translator/pb.go b/drainer/translator/pb.go index 92df4700d..462727d3b 100644 --- a/drainer/translator/pb.go +++ b/drainer/translator/pb.go @@ -21,15 +21,22 @@ import ( "github.com/golang/protobuf/proto" "github.com/pingcap/errors" +<<<<<<< HEAD "github.com/pingcap/parser/ast" "github.com/pingcap/parser/model" "github.com/pingcap/tidb-binlog/pkg/util" pb "github.com/pingcap/tidb-binlog/proto/binlog" +======= + "github.com/pingcap/tidb/parser/model" +>>>>>>> 194d4ac1 (pb: update pb parser to avoid drainer failure (#1093)) "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/codec" tipb "github.com/pingcap/tipb/go-binlog" + + "github.com/pingcap/tidb-binlog/pkg/util" + pb "github.com/pingcap/tidb-binlog/proto/binlog" ) // TiBinlogToPbBinlog translate the binlog format @@ -40,16 +47,13 @@ func TiBinlogToPbBinlog(infoGetter TableInfoGetter, schema string, table string, if tiBinlog.DdlJobId > 0 { // DDL sql := string(tiBinlog.GetDdlQuery()) - stmt, err := getParser().ParseOneStmt(sql, "", "") - if err != nil { - return nil, errors.Trace(err) - } - - _, isCreateDatabase := stmt.(*ast.CreateDatabaseStmt) - if isCreateDatabase { - sql += ";" - } else { - sql = fmt.Sprintf("use %s; %s;", quoteName(schema), sql) + if len(schema) > 0 { + isCreateDatabase := util.IsCreateDatabaseDDL(sql, sqlMode) + if isCreateDatabase { + sql += ";" + } else { + sql = fmt.Sprintf("use %s; %s;", quoteName(schema), sql) + } } pbBinlog.Tp = pb.BinlogType_DDL diff --git a/drainer/translator/pb_test.go b/drainer/translator/pb_test.go index ac4d1823d..8b7e438f3 100644 --- a/drainer/translator/pb_test.go +++ b/drainer/translator/pb_test.go @@ -17,10 +17,16 @@ import ( "fmt" "github.com/pingcap/check" +<<<<<<< HEAD "github.com/pingcap/parser/mysql" pb "github.com/pingcap/tidb-binlog/proto/binlog" +======= + "github.com/pingcap/tidb/parser/mysql" +>>>>>>> 194d4ac1 (pb: update pb parser to avoid drainer failure (#1093)) "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/codec" + + pb "github.com/pingcap/tidb-binlog/proto/binlog" ) type testPbSuite struct { @@ -55,6 +61,32 @@ func (t *testPbSuite) TestDDL(c *check.C) { CommitTs: t.TiBinlog.GetCommitTs(), DdlQuery: []byte(expected), }) + + // test PB shouldn't reports an error when meeting unsupported DDL + t.TiBinlog.DdlQuery = []byte("invalid sql") + pbBinog, err = TiBinlogToPbBinlog(t, t.Schema, t.Table, t.TiBinlog, nil) + c.Assert(err, check.IsNil) + + c.Log("get ddl: ", string(pbBinog.GetDdlQuery())) + expected = fmt.Sprintf("use `%s`; %s;", t.Schema, string(t.TiBinlog.GetDdlQuery())) + c.Assert(pbBinog, check.DeepEquals, &pb.Binlog{ + Tp: pb.BinlogType_DDL, + CommitTs: t.TiBinlog.GetCommitTs(), + DdlQuery: []byte(expected), + }) + + // test PB shouldn't reports an error when meeting unsupported DDL + t.TiBinlog.DdlQuery = []byte("invalid sql") + t.Schema = "" + pbBinog, err = TiBinlogToPbBinlog(t, t.Schema, t.Table, t.TiBinlog, nil) + c.Assert(err, check.IsNil) + + c.Log("get ddl: ", string(pbBinog.GetDdlQuery())) + c.Assert(pbBinog, check.DeepEquals, &pb.Binlog{ + Tp: pb.BinlogType_DDL, + CommitTs: t.TiBinlog.GetCommitTs(), + DdlQuery: t.TiBinlog.GetDdlQuery(), + }) } func (t *testPbSuite) testDML(c *check.C, tp pb.EventType) { diff --git a/drainer/translator/translator.go b/drainer/translator/translator.go index 6752c5f51..25346f3d7 100644 --- a/drainer/translator/translator.go +++ b/drainer/translator/translator.go @@ -19,15 +19,22 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" +<<<<<<< HEAD "github.com/pingcap/parser" "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb-binlog/pkg/util" +======= + "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/parser/mysql" +>>>>>>> 194d4ac1 (pb: update pb parser to avoid drainer failure (#1093)) "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/codec" "go.uber.org/zap" + + "github.com/pingcap/tidb-binlog/pkg/util" ) var sqlMode mysql.SQLMode @@ -37,13 +44,6 @@ func SetSQLMode(mode mysql.SQLMode) { sqlMode = mode } -func getParser() (p *parser.Parser) { - p = parser.New() - p.SetSQLMode(sqlMode) - - return -} - func insertRowToDatums(table *model.TableInfo, row []byte) (datums map[int64]types.Datum, err error) { colsTypeMap := util.ToColumnTypeMap(table.Columns) diff --git a/pkg/loader/load.go b/pkg/loader/load.go index 538e88128..900023f6c 100644 --- a/pkg/loader/load.go +++ b/pkg/loader/load.go @@ -376,17 +376,6 @@ func needRefreshTableInfo(sql string) bool { return true } -func isCreateDatabaseDDL(sql string) bool { - stmt, err := parser.New().ParseOneStmt(sql, "", "") - if err != nil { - log.Error("parse sql failed", zap.String("sql", sql), zap.Error(err)) - return false - } - - _, isCreateDatabase := stmt.(*ast.CreateDatabaseStmt) - return isCreateDatabase -} - func (s *loaderImpl) execDDL(ddl *DDL) error { log.Debug("exec ddl", zap.Reflect("ddl", ddl)) if ddl.ShouldSkip { @@ -399,7 +388,7 @@ func (s *loaderImpl) execDDL(ddl *DDL) error { return err } - if len(ddl.Database) > 0 && !isCreateDatabaseDDL(ddl.SQL) { + if len(ddl.Database) > 0 && !util.IsCreateDatabaseDDL(ddl.SQL, tmysql.ModeNone) { _, err = tx.Exec(fmt.Sprintf("use %s;", quoteName(ddl.Database))) if err != nil { if rbErr := tx.Rollback(); rbErr != nil { diff --git a/pkg/loader/load_test.go b/pkg/loader/load_test.go index 7cccc448a..1d37b7b54 100644 --- a/pkg/loader/load_test.go +++ b/pkg/loader/load_test.go @@ -320,24 +320,6 @@ func (s *getTblInfoSuite) TestShouldCacheResult(c *check.C) { c.Assert(nCalled, check.Equals, 1) } -type isCreateDBDDLSuite struct{} - -var _ = check.Suite(&isCreateDBDDLSuite{}) - -func (s *isCreateDBDDLSuite) TestInvalidSQL(c *check.C) { - c.Assert(isCreateDatabaseDDL("INSERT INTO Y a b c;"), check.IsFalse) -} - -func (s *isCreateDBDDLSuite) TestNonCreateDBSQL(c *check.C) { - c.Assert(isCreateDatabaseDDL("SELECT 1;"), check.IsFalse) - c.Assert(isCreateDatabaseDDL(`INSERT INTO tbl(id, name) VALUES(1, "test";`), check.IsFalse) -} - -func (s *isCreateDBDDLSuite) TestCreateDBSQL(c *check.C) { - c.Assert(isCreateDatabaseDDL("CREATE DATABASE test;"), check.IsTrue) - c.Assert(isCreateDatabaseDDL("create database `db2`;"), check.IsTrue) -} - type needRefreshTableInfoSuite struct{} var _ = check.Suite(&needRefreshTableInfoSuite{}) diff --git a/pkg/util/util.go b/pkg/util/util.go index 1385da44f..2aa19c131 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -26,14 +26,28 @@ import ( "github.com/BurntSushi/toml" "github.com/pingcap/errors" "github.com/pingcap/log" +<<<<<<< HEAD "github.com/pingcap/parser/model" "github.com/pingcap/tidb-binlog/pkg/flags" "github.com/pingcap/tidb-binlog/pkg/security" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/types" +======= + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/parser" + "github.com/pingcap/tidb/parser/ast" + "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/parser/mysql" + "github.com/pingcap/tidb/types" + _ "github.com/pingcap/tidb/types/parser_driver" // for parser driver + "github.com/tikv/client-go/v2/oracle" +>>>>>>> 194d4ac1 (pb: update pb parser to avoid drainer failure (#1093)) pd "github.com/tikv/pd/client" "go.uber.org/zap" + + "github.com/pingcap/tidb-binlog/pkg/flags" + "github.com/pingcap/tidb-binlog/pkg/security" ) // DefaultIP get a default non local ip, err is not nil, ip return 127.0.0.1 @@ -333,3 +347,17 @@ func WriteFileAtomic(filename string, data []byte, perm os.FileMode) error { return os.Rename(f.Name(), filename) } + +// IsCreateDatabaseDDL checks whether ddl is a create database statement +func IsCreateDatabaseDDL(sql string, sqlMode mysql.SQLMode) bool { + p := parser.New() + p.SetSQLMode(sqlMode) + stmt, err := p.ParseOneStmt(sql, "", "") + if err != nil { + log.Error("parse sql failed", zap.String("sql", sql), zap.Error(err)) + return false + } + + _, isCreateDatabase := stmt.(*ast.CreateDatabaseStmt) + return isCreateDatabase +} diff --git a/pkg/util/util_test.go b/pkg/util/util_test.go index 4c118abf5..4ca7a959e 100644 --- a/pkg/util/util_test.go +++ b/pkg/util/util_test.go @@ -22,13 +22,18 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/log" +<<<<<<< HEAD "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb-binlog/pkg/security" +======= +>>>>>>> 194d4ac1 (pb: update pb parser to avoid drainer failure (#1093)) "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/types" pd "github.com/tikv/pd/client" "go.uber.org/zap/zapcore" + + "github.com/pingcap/tidb-binlog/pkg/security" ) // Hook up gocheck into the "go test" runner. @@ -340,3 +345,22 @@ func (s *waitUntilTimeoutSuit) TestGoAndAbortGoroutine(c *C) { }, time.Second) c.Assert(called, IsTrue) } + +type isCreateDBDDLSuite struct{} + +var _ = Suite(&isCreateDBDDLSuite{}) + +func (s *isCreateDBDDLSuite) TestInvalidSQL(c *C) { + c.Assert(IsCreateDatabaseDDL("INSERT INTO Y a b c;", mysql.ModeNone), IsFalse) + c.Assert(IsCreateDatabaseDDL("Invalid SQL", mysql.ModeNone), IsFalse) +} + +func (s *isCreateDBDDLSuite) TestNonCreateDBSQL(c *C) { + c.Assert(IsCreateDatabaseDDL("SELECT 1;", mysql.ModeNone), IsFalse) + c.Assert(IsCreateDatabaseDDL(`INSERT INTO tbl(id, name) VALUES(1, "test";`, mysql.ModeNone), IsFalse) +} + +func (s *isCreateDBDDLSuite) TestCreateDBSQL(c *C) { + c.Assert(IsCreateDatabaseDDL("CREATE DATABASE test;", mysql.ModeNone), IsTrue) + c.Assert(IsCreateDatabaseDDL("create database `db2`;", mysql.ModeNone), IsTrue) +} diff --git a/tests/sequence/run.sh b/tests/sequence/run.sh index 41985fbf2..0ca678dfd 100755 --- a/tests/sequence/run.sh +++ b/tests/sequence/run.sh @@ -14,7 +14,7 @@ run_sql 'CREATE TABLE seq.table_name (id INT DEFAULT NEXT VALUE FOR seq.sequence run_sql 'INSERT INTO seq.table_name(val) values(10);' -sleep 3 +sleep 5 down_run_sql 'SELECT count(*), sum(id), sum(val) FROM seq.table_name;' check_contains 'count(*): 1'