From a3ee2bea5e23f5a04190224b0330ed28c08913eb Mon Sep 17 00:00:00 2001 From: lance6716 Date: Thu, 10 Sep 2020 11:36:03 +0800 Subject: [PATCH] *: save exitSafeModeLoc in checkpoint, load it, upgrade strategy (#988) --- _utils/terror_gen/errors_release.txt | 2 +- dm/config/task.go | 3 - dm/master/bootstrap.go | 7 +- dm/master/scheduler/scheduler.go | 20 +++++ errors.toml | 2 +- loader/loader.go | 7 +- pkg/dumpling/utils.go | 20 ++--- pkg/terror/error_list.go | 2 +- pkg/upgrade/upgrade.go | 102 ++++++++++++++++++++-- pkg/upgrade/upgrade_test.go | 12 +-- pkg/upgrade/version.go | 2 +- pkg/utils/util.go | 18 ++++ pkg/v1dbschema/schema.go | 19 +---- syncer/checkpoint.go | 123 ++++++++++++++++++++++----- syncer/checkpoint_test.go | 55 ++++++++++-- syncer/mode.go | 4 +- syncer/syncer.go | 10 ++- syncer/syncer_test.go | 57 +++++++++++-- tests/import_v10x/run.sh | 5 ++ 19 files changed, 382 insertions(+), 88 deletions(-) diff --git a/_utils/terror_gen/errors_release.txt b/_utils/terror_gen/errors_release.txt index e7199a7f57..29b8585aa9 100644 --- a/_utils/terror_gen/errors_release.txt +++ b/_utils/terror_gen/errors_release.txt @@ -4,7 +4,7 @@ ErrDBInvalidConn,[code=10003:class=database:scope=not-set:level=high], "Message: ErrDBUnExpect,[code=10004:class=database:scope=not-set:level=high], "Message: unexpect database error: %s" ErrDBQueryFailed,[code=10005:class=database:scope=not-set:level=high], "Message: query statement failed: %s" ErrDBExecuteFailed,[code=10006:class=database:scope=not-set:level=high], "Message: execute statement failed: %s" -ErrParseMydumperMeta,[code=11001:class=functional:scope=internal:level=high], "Message: parse mydumper metadata error: %s" +ErrParseMydumperMeta,[code=11001:class=functional:scope=internal:level=high], "Message: parse mydumper metadata error: %s, metadata: %s" ErrGetFileSize,[code=11002:class=functional:scope=internal:level=high], "Message: get file %s size" ErrDropMultipleTables,[code=11003:class=functional:scope=internal:level=high], "Message: not allowed operation: drop multiple tables in one statement, Workaround: It is recommended to include only one DDL operation in a statement executed upstream. Please manually handle it using dmctl (skipping the DDL statement or replacing the DDL statement with a specified DDL statement). For details, see https://docs.pingcap.com/tidb-data-migration/stable/skip-or-replace-abnormal-sql-statements" ErrRenameMultipleTables,[code=11004:class=functional:scope=internal:level=high], "Message: not allowed operation: rename multiple tables in one statement, Workaround: It is recommended to include only one DDL operation in a statement executed upstream. Please manually handle it using dmctl (skipping the DDL statement or replacing the DDL statement with a specified DDL statement). For details, see https://docs.pingcap.com/tidb-data-migration/stable/skip-or-replace-abnormal-sql-statements" diff --git a/dm/config/task.go b/dm/config/task.go index e02b91e791..9ae394838f 100644 --- a/dm/config/task.go +++ b/dm/config/task.go @@ -21,7 +21,6 @@ import ( "strings" "time" - "github.com/pingcap/dm/pkg/binlog" "github.com/pingcap/dm/pkg/log" "github.com/pingcap/dm/pkg/terror" @@ -242,8 +241,6 @@ type SyncerConfig struct { EnableGTID bool `yaml:"enable-gtid" toml:"enable-gtid" json:"enable-gtid"` DisableCausality bool `yaml:"disable-detect" toml:"disable-detect" json:"disable-detect"` SafeMode bool `yaml:"safe-mode" toml:"safe-mode" json:"safe-mode"` - // when dump unit can't run consistent dump, enable safe mode until pass exit location of dumping - SafeModeExitLoc *binlog.Location `yaml:"-" toml:"-" json:"-"` // deprecated, use `ansi-quotes` in top level config instead EnableANSIQuotes bool `yaml:"enable-ansi-quotes" toml:"enable-ansi-quotes" json:"enable-ansi-quotes"` } diff --git a/dm/master/bootstrap.go b/dm/master/bootstrap.go index 565331c33c..a1accc2b71 100644 --- a/dm/master/bootstrap.go +++ b/dm/master/bootstrap.go @@ -58,7 +58,12 @@ func (s *Server) bootstrap(ctx context.Context) error { } } - err := upgrade.TryUpgrade(s.etcdClient) + uctx := upgrade.Context{ + Context: ctx, + SubTaskConfigs: s.scheduler.GetSubTaskCfgs(), + } + err := upgrade.TryUpgrade(s.etcdClient, uctx) + if err != nil { return err } diff --git a/dm/master/scheduler/scheduler.go b/dm/master/scheduler/scheduler.go index 59e81b0705..472b973014 100644 --- a/dm/master/scheduler/scheduler.go +++ b/dm/master/scheduler/scheduler.go @@ -660,6 +660,26 @@ func (s *Scheduler) GetSubTaskCfgsByTask(task string) map[string]*config.SubTask return cloneM } +// GetSubTaskCfgs gets all subconfig, return nil when error happens +func (s *Scheduler) GetSubTaskCfgs() map[string]map[string]config.SubTaskConfig { + s.mu.RLock() + defer s.mu.RUnlock() + clone := make(map[string]map[string]config.SubTaskConfig, len(s.subTaskCfgs)) + for task, m := range s.subTaskCfgs { + clone2 := make(map[string]config.SubTaskConfig, len(m)) + for source, cfg := range m { + cfg2, err := cfg.Clone() + if err != nil { + return nil + } + clone2[source] = *cfg2 + } + clone[task] = clone2 + } + + return clone +} + // AddWorker adds the information of the DM-worker when registering a new instance. // This only adds the information of the DM-worker, // in order to know whether it's online (ready to handle works), diff --git a/errors.toml b/errors.toml index 01977fb645..13beeb19c3 100644 --- a/errors.toml +++ b/errors.toml @@ -35,7 +35,7 @@ workaround = "" tags = ["not-set", "high"] [error.DM-functional-11001] -message = "parse mydumper metadata error: %s" +message = "parse mydumper metadata error: %s, metadata: %s" description = "" workaround = "" tags = ["internal", "high"] diff --git a/loader/loader.go b/loader/loader.go index 0cd05e9e20..67b2b1a883 100644 --- a/loader/loader.go +++ b/loader/loader.go @@ -20,6 +20,7 @@ import ( "encoding/hex" "fmt" "io" + "io/ioutil" "os" "path/filepath" "strings" @@ -1274,8 +1275,12 @@ func (l *Loader) getMydumpMetadata() error { metafile := filepath.Join(l.cfg.LoaderConfig.Dir, "metadata") loc, _, err := dumpling.ParseMetaData(metafile, l.cfg.Flavor) if err != nil { + toPrint, err2 := ioutil.ReadFile(metafile) + if err2 != nil { + toPrint = []byte(err2.Error()) + } l.logCtx.L().Error("fail to parse dump metadata", log.ShortError(err)) - return err + return terror.ErrParseMydumperMeta.Generate(err, toPrint) } l.metaBinlog.Set(loc.Position.String()) diff --git a/pkg/dumpling/utils.go b/pkg/dumpling/utils.go index efe8f7e25c..0879dc3aaa 100644 --- a/pkg/dumpling/utils.go +++ b/pkg/dumpling/utils.go @@ -25,16 +25,16 @@ import ( "github.com/pingcap/dm/pkg/binlog" "github.com/pingcap/dm/pkg/gtid" - "github.com/pingcap/dm/pkg/terror" ) // ParseMetaData parses mydumper's output meta file and returns binlog location. // since v2.0.0, dumpling maybe configured to output master status after connection pool is established, // we return this location as well. func ParseMetaData(filename, flavor string) (*binlog.Location, *binlog.Location, error) { + invalidErr := fmt.Errorf("file %s invalid format", filename) fd, err := os.Open(filename) if err != nil { - return nil, nil, terror.ErrParseMydumperMeta.Generate(err) + return nil, nil, err } defer fd.Close() @@ -89,7 +89,7 @@ func ParseMetaData(filename, flavor string) (*binlog.Location, *binlog.Location, if err2 == io.EOF { break } else if err2 != nil { - return nil, nil, terror.ErrParseMydumperMeta.Generate(err2) + return nil, nil, err2 } line = strings.TrimSpace(line) if len(line) == 0 { @@ -99,14 +99,14 @@ func ParseMetaData(filename, flavor string) (*binlog.Location, *binlog.Location, switch line { case "SHOW MASTER STATUS:": if err3 := parsePosAndGTID(&pos, >idStr); err3 != nil { - return nil, nil, terror.ErrParseMydumperMeta.Generate(err3) + return nil, nil, err3 } case "SHOW SLAVE STATUS:": // ref: https://github.com/maxbube/mydumper/blob/master/mydumper.c#L434 for { line, err3 := br.ReadString('\n') if err3 != nil { - return nil, nil, terror.ErrParseMydumperMeta.Generate(err3) + return nil, nil, err3 } line = strings.TrimSpace(line) if len(line) == 0 { @@ -116,7 +116,7 @@ func ParseMetaData(filename, flavor string) (*binlog.Location, *binlog.Location, case "SHOW MASTER STATUS: /* AFTER CONNECTION POOL ESTABLISHED */": useLocation2 = true if err3 := parsePosAndGTID(&pos2, >idStr2); err3 != nil { - return nil, nil, terror.ErrParseMydumperMeta.Generate(err3) + return nil, nil, err3 } default: // do nothing for Started dump, Finished dump... @@ -124,12 +124,12 @@ func ParseMetaData(filename, flavor string) (*binlog.Location, *binlog.Location, } if len(pos.Name) == 0 || pos.Pos == uint32(0) { - return nil, nil, terror.ErrParseMydumperMeta.Generate(fmt.Sprintf("file %s invalid format", filename)) + return nil, nil, invalidErr } gset, err := gtid.ParserGTID(flavor, gtidStr) if err != nil { - return nil, nil, terror.ErrParseMydumperMeta.Generate(fmt.Sprintf("file %s invalid format", filename)) + return nil, nil, invalidErr } loc = &binlog.Location{ Position: pos, @@ -138,11 +138,11 @@ func ParseMetaData(filename, flavor string) (*binlog.Location, *binlog.Location, if useLocation2 { if len(pos2.Name) == 0 || pos2.Pos == uint32(0) { - return nil, nil, terror.ErrParseMydumperMeta.Generate(fmt.Sprintf("file %s invalid format", filename)) + return nil, nil, invalidErr } gset2, err := gtid.ParserGTID(flavor, gtidStr2) if err != nil { - return nil, nil, terror.ErrParseMydumperMeta.Generate(fmt.Sprintf("file %s invalid format", filename)) + return nil, nil, invalidErr } loc2 = &binlog.Location{ Position: pos2, diff --git a/pkg/terror/error_list.go b/pkg/terror/error_list.go index 3a1c8bf2c7..83e4938848 100644 --- a/pkg/terror/error_list.go +++ b/pkg/terror/error_list.go @@ -607,7 +607,7 @@ var ( ErrDBExecuteFailed = New(codeDBExecuteFailed, ClassDatabase, ScopeNotSet, LevelHigh, "execute statement failed: %s", "") // Functional error - ErrParseMydumperMeta = New(codeParseMydumperMeta, ClassFunctional, ScopeInternal, LevelHigh, "parse mydumper metadata error: %s", "") + ErrParseMydumperMeta = New(codeParseMydumperMeta, ClassFunctional, ScopeInternal, LevelHigh, "parse mydumper metadata error: %s, metadata: %s", "") ErrGetFileSize = New(codeGetFileSize, ClassFunctional, ScopeInternal, LevelHigh, "get file %s size", "") ErrDropMultipleTables = New(codeDropMultipleTables, ClassFunctional, ScopeInternal, LevelHigh, "not allowed operation: drop multiple tables in one statement", "It is recommended to include only one DDL operation in a statement executed upstream. Please manually handle it using dmctl (skipping the DDL statement or replacing the DDL statement with a specified DDL statement). For details, see https://docs.pingcap.com/tidb-data-migration/stable/skip-or-replace-abnormal-sql-statements") ErrRenameMultipleTables = New(codeRenameMultipleTables, ClassFunctional, ScopeInternal, LevelHigh, "not allowed operation: rename multiple tables in one statement", "It is recommended to include only one DDL operation in a statement executed upstream. Please manually handle it using dmctl (skipping the DDL statement or replacing the DDL statement with a specified DDL statement). For details, see https://docs.pingcap.com/tidb-data-migration/stable/skip-or-replace-abnormal-sql-statements") diff --git a/pkg/upgrade/upgrade.go b/pkg/upgrade/upgrade.go index f0abf6391f..2fcdda7a1a 100644 --- a/pkg/upgrade/upgrade.go +++ b/pkg/upgrade/upgrade.go @@ -13,18 +13,48 @@ package upgrade -import "go.etcd.io/etcd/clientv3" +import ( + "context" + "fmt" + + "github.com/pingcap/tidb-tools/pkg/dbutil" + "go.etcd.io/etcd/clientv3" + "go.uber.org/zap" + + "github.com/pingcap/dm/dm/config" + "github.com/pingcap/dm/pkg/conn" + tcontext "github.com/pingcap/dm/pkg/context" + "github.com/pingcap/dm/pkg/cputil" + "github.com/pingcap/dm/pkg/log" + "github.com/pingcap/dm/pkg/utils" +) var ( // upgrades records all functions used to upgrade from one version to the later version. - upgrades = []func(cli *clientv3.Client) error{ + upgrades = []func(cli *clientv3.Client, uctx Context) error{ upgradeToVer1, + upgradeToVer2, } ) +// Context is used to pass something to TryUpgrade +// NOTE that zero value of Context is nil, be aware of nil-dereference +type Context struct { + context.Context + SubTaskConfigs map[string]map[string]config.SubTaskConfig +} + +// NewUpgradeContext creates a Context, avoid nil Context member +func NewUpgradeContext() Context { + return Context{ + Context: context.Background(), + SubTaskConfigs: make(map[string]map[string]config.SubTaskConfig), + } +} + // TryUpgrade tries to upgrade the cluster from an older version to a new version. // This methods should have no side effects even calling multiple times. -func TryUpgrade(cli *clientv3.Client) error { +func TryUpgrade(cli *clientv3.Client, uctx Context) error { // 1. get previous version from etcd. preVer, _, err := GetVersion(cli) if err != nil { @@ -52,7 +82,7 @@ func TryUpgrade(cli *clientv3.Client) error { // 4. do upgrade operations. for _, upgrade := range upgrades { - err = upgrade(cli) + err = upgrade(cli, uctx) if err != nil { return err } @@ -65,6 +95,68 @@ func TryUpgrade(cli *clientv3.Client) error { // upgradeToVer1 does upgrade operations from Ver0 to Ver1. // in fact, this do nothing now, and just for demonstration. -func upgradeToVer1(cli *clientv3.Client) error { +func upgradeToVer1(cli *clientv3.Client, uctx Context) error { + return nil +} + +// upgradeToVer2 does upgrade operations from Ver1 to Ver2 (v2.0.0-rc.3) to upgrade syncer checkpoint schema +// TODO: determine v2.0.0-rc.3 or another version in above line +func upgradeToVer2(cli *clientv3.Client, uctx Context) error { + upgradeTaskName := "upgradeToVer2" + logger := log.L().WithFields(zap.String("task", upgradeTaskName)) + + if uctx.SubTaskConfigs == nil { + logger.Info("no downstream DB, skipping") + return nil + } + + // tableName -> DBConfig + dbConfigs := map[string]config.DBConfig{} + for task, m := range uctx.SubTaskConfigs { + for sourceID, subCfg := range m { + tableName := dbutil.TableName(subCfg.MetaSchema, cputil.SyncerCheckpoint(subCfg.Name)) + subCfg2, err := subCfg.DecryptPassword() + if err != nil { + log.L().Error("subconfig error when upgrading", zap.String("task", task), + zap.String("source id", sourceID), zap.String("subtask config", subCfg.String()), zap.Error(err)) + return err + } + dbConfigs[tableName] = subCfg2.To + } + } + + toClose := make([]*conn.BaseDB, 0, len(dbConfigs)) + defer func() { + for _, db := range toClose { + db.Close() + } + }() + for tableName, cfg := range dbConfigs { + targetDB, err := conn.DefaultDBProvider.Apply(cfg) + if err != nil { + logger.Error("target DB error when upgrading", zap.String("table name", tableName)) + return err + } + toClose = append(toClose, targetDB) + // try to add columns. + // NOTE: ignore already exists error to continue the process. + queries := []string{ + fmt.Sprintf(`ALTER TABLE %s ADD COLUMN exit_safe_binlog_name VARCHAR(128) DEFAULT '' AFTER binlog_gtid`, tableName), + fmt.Sprintf(`ALTER TABLE %s ADD COLUMN exit_safe_binlog_pos INT UNSIGNED DEFAULT 0 AFTER exit_safe_binlog_name`, tableName), + fmt.Sprintf(`ALTER TABLE %s ADD COLUMN exit_safe_binlog_gtid TEXT AFTER exit_safe_binlog_pos`, tableName), + } + tctx := tcontext.NewContext(uctx.Context, logger) + dbConn, err := targetDB.GetBaseConn(tctx.Ctx) + if err != nil { + logger.Error("skip target DB when upgrading", zap.String("table name", tableName)) + return err + } + _, err = dbConn.ExecuteSQLWithIgnoreError(tctx, nil, upgradeTaskName, utils.IgnoreErrorCheckpoint, queries) + if err != nil { + logger.Error("error while adding column for checkpoint table", zap.String("table name", tableName)) + return err + } + } + return nil } diff --git a/pkg/upgrade/upgrade_test.go b/pkg/upgrade/upgrade_test.go index 6d3c87e884..af520880dd 100644 --- a/pkg/upgrade/upgrade_test.go +++ b/pkg/upgrade/upgrade_test.go @@ -56,8 +56,8 @@ func (t *testForEtcd) TestTryUpgrade(c *C) { upgrades = oldUpgrades }() mockVerNo := uint64(0) - upgrades = []func(cli *clientv3.Client) error{ - func(cli *clientv3.Client) error { + upgrades = []func(cli *clientv3.Client, uctx Context) error{ + func(cli *clientv3.Client, uctx Context) error { mockVerNo = currentInternalNo + 1 return nil }, @@ -70,7 +70,7 @@ func (t *testForEtcd) TestTryUpgrade(c *C) { c.Assert(ver.NotSet(), IsTrue) // try to upgrade, but do nothing except the current version recorded. - c.Assert(TryUpgrade(etcdTestCli), IsNil) + c.Assert(TryUpgrade(etcdTestCli, NewUpgradeContext()), IsNil) ver, rev2, err := GetVersion(etcdTestCli) c.Assert(err, IsNil) c.Assert(rev2, Greater, rev1) @@ -78,7 +78,7 @@ func (t *testForEtcd) TestTryUpgrade(c *C) { c.Assert(mockVerNo, Equals, uint64(0)) // try to upgrade again, do nothing because the version is the same. - c.Assert(TryUpgrade(etcdTestCli), IsNil) + c.Assert(TryUpgrade(etcdTestCli, NewUpgradeContext()), IsNil) ver, rev3, err := GetVersion(etcdTestCli) c.Assert(err, IsNil) c.Assert(rev3, Equals, rev2) @@ -94,7 +94,7 @@ func (t *testForEtcd) TestTryUpgrade(c *C) { CurrentVersion = newerVer // try to upgrade, to a newer version, upgrade operations applied. - c.Assert(TryUpgrade(etcdTestCli), IsNil) + c.Assert(TryUpgrade(etcdTestCli, NewUpgradeContext()), IsNil) ver, rev4, err := GetVersion(etcdTestCli) c.Assert(err, IsNil) c.Assert(rev4, Greater, rev3) @@ -103,7 +103,7 @@ func (t *testForEtcd) TestTryUpgrade(c *C) { // try to upgrade, to an older version, do nothing. CurrentVersion = oldCurrentVer - c.Assert(TryUpgrade(etcdTestCli), IsNil) + c.Assert(TryUpgrade(etcdTestCli, NewUpgradeContext()), IsNil) ver, rev5, err := GetVersion(etcdTestCli) c.Assert(err, IsNil) c.Assert(rev5, Equals, rev4) diff --git a/pkg/upgrade/version.go b/pkg/upgrade/version.go index e68897c894..60aae32976 100644 --- a/pkg/upgrade/version.go +++ b/pkg/upgrade/version.go @@ -28,7 +28,7 @@ const ( // The current internal version number of the DM cluster used when upgrading from an older version. // NOTE: +1 when a new incompatible version is introduced, so it's different from the release version. // NOTE: it's the version of the cluster (= the version of DM-master leader now), other component versions are not recorded yet. - currentInternalNo uint64 = 1 + currentInternalNo uint64 = 2 // The minimum internal version number of the DM cluster used when importing from v1.0.x. minInternalNo uint64 = 0 ) diff --git a/pkg/utils/util.go b/pkg/utils/util.go index e5665c1221..0e9de9bb04 100644 --- a/pkg/utils/util.go +++ b/pkg/utils/util.go @@ -22,7 +22,9 @@ import ( "strings" "time" + gmysql "github.com/go-sql-driver/mysql" "github.com/pingcap/errors" + "github.com/pingcap/tidb/errno" "github.com/siddontang/go-mysql/mysql" "github.com/pingcap/dm/dm/pb" @@ -162,6 +164,22 @@ func IsContextCanceledError(err error) bool { return errors.Cause(err) == context.Canceled } +// IgnoreErrorCheckpoint is used in checkpoint update +func IgnoreErrorCheckpoint(err error) bool { + err = errors.Cause(err) // check the original error + mysqlErr, ok := err.(*gmysql.MySQLError) + if !ok { + return false + } + + switch mysqlErr.Number { + case errno.ErrDupFieldName: + return true + default: + return false + } +} + // IsBuildInSkipDDL return true when checked sql that will be skipped for syncer func IsBuildInSkipDDL(sql string) bool { return builtInSkipDDLPatterns.FindStringIndex(sql) != nil diff --git a/pkg/v1dbschema/schema.go b/pkg/v1dbschema/schema.go index 5314cb654f..355d637b81 100644 --- a/pkg/v1dbschema/schema.go +++ b/pkg/v1dbschema/schema.go @@ -35,6 +35,7 @@ import ( "github.com/pingcap/dm/pkg/gtid" "github.com/pingcap/dm/pkg/log" "github.com/pingcap/dm/pkg/terror" + "github.com/pingcap/dm/pkg/utils" ) // UpdateSchema updates the DB schema from v1.0.x to v2.0.x, including: @@ -109,7 +110,8 @@ func updateSyncerCheckpoint(tctx *tcontext.Context, dbConn *conn.BaseConn, taskN fmt.Sprintf(`ALTER TABLE %s ADD COLUMN binlog_gtid TEXT AFTER binlog_pos`, tableName), fmt.Sprintf(`ALTER TABLE %s ADD COLUMN table_info JSON NOT NULL AFTER binlog_gtid`, tableName), } - _, err := dbConn.ExecuteSQLWithIgnoreError(tctx, nil, taskName, ignoreErrorCheckpoint, queries) + updateTaskName := "importFromV10x" + _, err := dbConn.ExecuteSQLWithIgnoreError(tctx, nil, updateTaskName, utils.IgnoreErrorCheckpoint, queries) if err != nil { return terror.Annotatef(err, "add columns for checkpoint table") } @@ -217,21 +219,6 @@ func setGlobalGTIDs(tctx *tcontext.Context, dbConn *conn.BaseConn, taskName, tab return err } -func ignoreErrorCheckpoint(err error) bool { - err = errors.Cause(err) // check the original error - mysqlErr, ok := err.(*mysql.MySQLError) - if !ok { - return false - } - - switch mysqlErr.Number { - case errno.ErrDupFieldName: - return true - default: - return false - } -} - func ignoreErrorOnlineDDL(err error) bool { err = errors.Cause(err) // check the original error mysqlErr, ok := err.(*mysql.MySQLError) diff --git a/syncer/checkpoint.go b/syncer/checkpoint.go index ed83a25790..fbb2578f29 100644 --- a/syncer/checkpoint.go +++ b/syncer/checkpoint.go @@ -18,6 +18,7 @@ import ( "database/sql" "encoding/json" "fmt" + "io/ioutil" "path" "sync" "time" @@ -213,6 +214,13 @@ type CheckPoint interface { // corresponding to Meta.Pos and Meta.GTID GlobalPoint() binlog.Location + // SaveSafeModeExitPoint saves the pointer to location which indicates safe mode exit + // this location is used when dump unit can't assure consistency + SaveSafeModeExitPoint(point *binlog.Location) + + // SafeModeExitPoint returns the location where safe mode could safely turn off after + SafeModeExitPoint() *binlog.Location + // TablePoint returns all table's stream checkpoint TablePoint() map[string]map[string]binlog.Location @@ -258,6 +266,14 @@ type RemoteCheckPoint struct { globalPoint *binlogPoint globalPointSaveTime time.Time + // safeModeExitPoint is set in RemoteCheckPoint.Load (from downstream DB) and LoadMeta (from metadata file). + // it is unset (set nil) in RemoteCheckPoint.Clear, and when syncer's stream pass its location. + // it is flushed along with globalPoint which called by Syncer.flushCheckPoints. + // this variable is mainly used to decide status of safe mode, so it is access when + // - init safe mode + // - checking in sync and if passed, unset it + safeModeExitPoint *binlog.Location + logCtx *tcontext.Context } @@ -317,6 +333,7 @@ func (cp *RemoteCheckPoint) Clear(tctx *tcontext.Context) error { cp.globalPoint = newBinlogPoint(binlog.NewLocation(cp.cfg.Flavor), binlog.NewLocation(cp.cfg.Flavor), nil, nil, cp.cfg.EnableGTID) cp.globalPointSaveTime = time.Time{} cp.points = make(map[string]map[string]*binlogPoint) + cp.safeModeExitPoint = nil return nil } @@ -349,6 +366,17 @@ func (cp *RemoteCheckPoint) saveTablePoint(sourceSchema, sourceTable string, loc } } +// SaveSafeModeExitPoint implements CheckPoint.SaveSafeModeExitPoint +// shouldn't call concurrently (only called before loop in Syncer.Run and in loop to reset) +func (cp *RemoteCheckPoint) SaveSafeModeExitPoint(point *binlog.Location) { + cp.safeModeExitPoint = point +} + +// SafeModeExitPoint implements CheckPoint.SafeModeExitPoint +func (cp *RemoteCheckPoint) SafeModeExitPoint() *binlog.Location { + return cp.safeModeExitPoint +} + // DeleteTablePoint implements CheckPoint.DeleteTablePoint func (cp *RemoteCheckPoint) DeleteTablePoint(tctx *tcontext.Context, sourceSchema, sourceTable string) error { cp.Lock() @@ -460,7 +488,7 @@ func (cp *RemoteCheckPoint) FlushPointsExcept(tctx *tcontext.Context, exceptTabl if cp.globalPoint.outOfDate() || cp.globalPointSaveTime.IsZero() { locationG := cp.GlobalPoint() - sqlG, argG := cp.genUpdateSQL(globalCpSchema, globalCpTable, locationG, nil, true) + sqlG, argG := cp.genUpdateSQL(globalCpSchema, globalCpTable, locationG, cp.safeModeExitPoint, nil, true) sqls = append(sqls, sqlG) args = append(args, argG) } @@ -481,7 +509,7 @@ func (cp *RemoteCheckPoint) FlushPointsExcept(tctx *tcontext.Context, exceptTabl } location := point.MySQLLocation() - sql2, arg := cp.genUpdateSQL(schema, table, location, tiBytes, false) + sql2, arg := cp.genUpdateSQL(schema, table, location, nil, tiBytes, false) sqls = append(sqls, sql2) args = append(args, arg) @@ -628,6 +656,9 @@ func (cp *RemoteCheckPoint) createTable(tctx *tcontext.Context) error { binlog_name VARCHAR(128), binlog_pos INT UNSIGNED, binlog_gtid TEXT, + exit_safe_binlog_name VARCHAR(128) DEFAULT '', + exit_safe_binlog_pos INT UNSIGNED DEFAULT 0, + exit_safe_binlog_gtid TEXT, table_info JSON NOT NULL, is_global BOOLEAN, create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, @@ -645,7 +676,7 @@ func (cp *RemoteCheckPoint) Load(tctx *tcontext.Context, schemaTracker *schema.T cp.Lock() defer cp.Unlock() - query := `SELECT cp_schema, cp_table, binlog_name, binlog_pos, binlog_gtid, table_info, is_global FROM ` + cp.tableName + ` WHERE id = ?` + query := `SELECT cp_schema, cp_table, binlog_name, binlog_pos, binlog_gtid, exit_safe_binlog_name, exit_safe_binlog_pos, exit_safe_binlog_gtid, table_info, is_global FROM ` + cp.tableName + ` WHERE id = ?` rows, err := cp.dbConn.querySQL(tctx, query, cp.id) defer func() { if rows != nil { @@ -663,18 +694,21 @@ func (cp *RemoteCheckPoint) Load(tctx *tcontext.Context, schemaTracker *schema.T } // checkpoints in DB have higher priority - // if don't want to use checkpoint in DB, set `remove-previous-checkpoint` to `true` + // if don't want to use checkpoint in DB, set `remove-meta` to `true` var ( - cpSchema string - cpTable string - binlogName string - binlogPos uint32 - binlogGTIDSet sql.NullString - tiBytes []byte - isGlobal bool + cpSchema string + cpTable string + binlogName string + binlogPos uint32 + binlogGTIDSet sql.NullString + exitSafeBinlogName string + exitSafeBinlogPos uint32 + exitSafeBinlogGTIDSet sql.NullString + tiBytes []byte + isGlobal bool ) for rows.Next() { - err := rows.Scan(&cpSchema, &cpTable, &binlogName, &binlogPos, &binlogGTIDSet, &tiBytes, &isGlobal) + err := rows.Scan(&cpSchema, &cpTable, &binlogName, &binlogPos, &binlogGTIDSet, &exitSafeBinlogName, &exitSafeBinlogPos, &exitSafeBinlogGTIDSet, &tiBytes, &isGlobal) if err != nil { return terror.WithScope(terror.DBErrorAdapt(err, terror.ErrDBDriverError), terror.ScopeDownstream) } @@ -696,6 +730,34 @@ func (cp *RemoteCheckPoint) Load(tctx *tcontext.Context, schemaTracker *schema.T cp.globalPoint = newBinlogPoint(location.Clone(), location.Clone(), nil, nil, cp.cfg.EnableGTID) cp.logCtx.L().Info("fetch global checkpoint from DB", log.WrapStringerField("global checkpoint", cp.globalPoint)) } + + if cp.cfg.EnableGTID { + // gtid set default is "", but upgrade may cause NULL value + if exitSafeBinlogGTIDSet.Valid && exitSafeBinlogGTIDSet.String != "" { + gset2, err2 := gtid.ParserGTID(cp.cfg.Flavor, exitSafeBinlogGTIDSet.String) + if err2 != nil { + return err2 + } + exitSafeModeLoc := binlog.Location{ + Position: mysql.Position{ + Name: exitSafeBinlogName, + Pos: exitSafeBinlogPos, + }, + GTIDSet: gset2, + } + cp.SaveSafeModeExitPoint(&exitSafeModeLoc) + } + } else { + if exitSafeBinlogName != "" { + exitSafeModeLoc := binlog.Location{ + Position: mysql.Position{ + Name: exitSafeBinlogName, + Pos: exitSafeBinlogPos, + }, + } + cp.SaveSafeModeExitPoint(&exitSafeModeLoc) + } + } continue // skip global checkpoint } @@ -775,7 +837,7 @@ func (cp *RemoteCheckPoint) LoadMeta() error { cp.logCtx.L().Info("loaded checkpoints from meta", log.WrapStringerField("global checkpoint", cp.globalPoint)) } if safeModeExitLoc != nil { - cp.cfg.SafeModeExitLoc = safeModeExitLoc + cp.SaveSafeModeExitPoint(safeModeExitLoc) cp.logCtx.L().Info("set SafeModeExitLoc from meta", zap.Stringer("SafeModeExitLoc", safeModeExitLoc)) } @@ -783,16 +845,19 @@ func (cp *RemoteCheckPoint) LoadMeta() error { } // genUpdateSQL generates SQL and arguments for update checkpoint -func (cp *RemoteCheckPoint) genUpdateSQL(cpSchema, cpTable string, location binlog.Location, tiBytes []byte, isGlobal bool) (string, []interface{}) { +func (cp *RemoteCheckPoint) genUpdateSQL(cpSchema, cpTable string, location binlog.Location, safeModeExitLoc *binlog.Location, tiBytes []byte, isGlobal bool) (string, []interface{}) { // use `INSERT INTO ... ON DUPLICATE KEY UPDATE` rather than `REPLACE INTO` // to keep `create_time`, `update_time` correctly sql2 := `INSERT INTO ` + cp.tableName + ` - (id, cp_schema, cp_table, binlog_name, binlog_pos, binlog_gtid, table_info, is_global) VALUES - (?, ?, ?, ?, ?, ?, ?, ?) + (id, cp_schema, cp_table, binlog_name, binlog_pos, binlog_gtid, exit_safe_binlog_name, exit_safe_binlog_pos, exit_safe_binlog_gtid, table_info, is_global) VALUES + (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON DUPLICATE KEY UPDATE binlog_name = VALUES(binlog_name), binlog_pos = VALUES(binlog_pos), binlog_gtid = VALUES(binlog_gtid), + exit_safe_binlog_name = VALUES(exit_safe_binlog_name), + exit_safe_binlog_pos = VALUES(exit_safe_binlog_pos), + exit_safe_binlog_gtid = VALUES(exit_safe_binlog_gtid), table_info = VALUES(table_info), is_global = VALUES(is_global); ` @@ -806,14 +871,34 @@ func (cp *RemoteCheckPoint) genUpdateSQL(cpSchema, cpTable string, location binl tiBytes = []byte("null") } - args := []interface{}{cp.id, cpSchema, cpTable, location.Position.Name, location.Position.Pos, location.GTIDSetStr(), tiBytes, isGlobal} + var ( + exitSafeName string + exitSafePos uint32 + exitSafeGTIDStr string + ) + if safeModeExitLoc != nil { + exitSafeName = safeModeExitLoc.Position.Name + exitSafePos = safeModeExitLoc.Position.Pos + exitSafeGTIDStr = safeModeExitLoc.GTIDSetStr() + } + + args := []interface{}{cp.id, cpSchema, cpTable, location.Position.Name, location.Position.Pos, location.GTIDSetStr(), + exitSafeName, exitSafePos, exitSafeGTIDStr, tiBytes, isGlobal} return sql2, args } func (cp *RemoteCheckPoint) parseMetaData() (*binlog.Location, *binlog.Location, error) { // `metadata` is mydumper's output meta file name filename := path.Join(cp.cfg.Dir, "metadata") - cp.logCtx.L().Info("parsing metadata from file", zap.String("file", filename)) - return dumpling.ParseMetaData(filename, cp.cfg.Flavor) + loc, loc2, err := dumpling.ParseMetaData(filename, cp.cfg.Flavor) + if err != nil { + toPrint, err2 := ioutil.ReadFile(filename) + if err2 != nil { + toPrint = []byte(err2.Error()) + } + err = terror.ErrParseMydumperMeta.Generate(err, toPrint) + } + + return loc, loc2, err } diff --git a/syncer/checkpoint_test.go b/syncer/checkpoint_test.go index 42958267e4..a89afbc496 100644 --- a/syncer/checkpoint_test.go +++ b/syncer/checkpoint_test.go @@ -155,7 +155,7 @@ func (s *testCheckpointSuite) testGlobalCheckPoint(c *C, cp CheckPoint) { cp.SaveGlobalPoint(binlog.Location{Position: pos1}) s.mock.ExpectBegin() - s.mock.ExpectExec("(162)?"+flushCheckPointSQL).WithArgs(cpid, "", "", pos1.Name, pos1.Pos, "", []byte("null"), true).WillReturnResult(sqlmock.NewResult(0, 1)) + s.mock.ExpectExec("(162)?"+flushCheckPointSQL).WithArgs(cpid, "", "", pos1.Name, pos1.Pos, "", "", 0, "", []byte("null"), true).WillReturnResult(sqlmock.NewResult(0, 1)) s.mock.ExpectCommit() err = cp.FlushPointsExcept(tctx, nil, nil, nil) c.Log(errors.ErrorStack(err)) @@ -196,7 +196,7 @@ func (s *testCheckpointSuite) testGlobalCheckPoint(c *C, cp CheckPoint) { // flush + rollback s.mock.ExpectBegin() - s.mock.ExpectExec("(202)?"+flushCheckPointSQL).WithArgs(cpid, "", "", pos2.Name, pos2.Pos, "", []byte("null"), true).WillReturnResult(sqlmock.NewResult(0, 1)) + s.mock.ExpectExec("(202)?"+flushCheckPointSQL).WithArgs(cpid, "", "", pos2.Name, pos2.Pos, "", "", 0, "", []byte("null"), true).WillReturnResult(sqlmock.NewResult(0, 1)) s.mock.ExpectCommit() err = cp.FlushPointsExcept(tctx, nil, nil, nil) c.Assert(err, IsNil) @@ -208,8 +208,8 @@ func (s *testCheckpointSuite) testGlobalCheckPoint(c *C, cp CheckPoint) { pos3 := pos2 pos3.Pos = pos2.Pos + 1000 // > pos2 to enable save cp.SaveGlobalPoint(binlog.Location{Position: pos3}) - columns := []string{"cp_schema", "cp_table", "binlog_name", "binlog_pos", "binlog_gtid", "table_info", "is_global"} - s.mock.ExpectQuery(loadCheckPointSQL).WithArgs(cpid).WillReturnRows(sqlmock.NewRows(columns).AddRow("", "", pos2.Name, pos2.Pos, "", []byte("null"), true)) + columns := []string{"cp_schema", "cp_table", "binlog_name", "binlog_pos", "binlog_gtid", "exit_safe_binlog_name", "exit_safe_binlog_pos", "exit_safe_binlog_gtid", "table_info", "is_global"} + s.mock.ExpectQuery(loadCheckPointSQL).WithArgs(cpid).WillReturnRows(sqlmock.NewRows(columns).AddRow("", "", pos2.Name, pos2.Pos, "", "", 0, "", []byte("null"), true)) err = cp.Load(tctx, s.tracker) c.Assert(err, IsNil) c.Assert(cp.GlobalPoint().Position, Equals, pos2) @@ -253,11 +253,11 @@ func (s *testCheckpointSuite) testGlobalCheckPoint(c *C, cp CheckPoint) { c.Assert(err, IsNil) s.cfg.Mode = config.ModeAll s.cfg.Dir = dir - cp.LoadMeta() + c.Assert(cp.LoadMeta(), IsNil) // should flush because globalPointSaveTime is zero s.mock.ExpectBegin() - s.mock.ExpectExec("(202)?"+flushCheckPointSQL).WithArgs(cpid, "", "", pos1.Name, pos1.Pos, "", []byte("null"), true).WillReturnResult(sqlmock.NewResult(0, 1)) + s.mock.ExpectExec("(202)?"+flushCheckPointSQL).WithArgs(cpid, "", "", pos1.Name, pos1.Pos, "", "", 0, "", []byte("null"), true).WillReturnResult(sqlmock.NewResult(0, 1)) s.mock.ExpectCommit() err = cp.FlushPointsExcept(tctx, nil, nil, nil) c.Assert(err, IsNil) @@ -267,6 +267,45 @@ func (s *testCheckpointSuite) testGlobalCheckPoint(c *C, cp CheckPoint) { c.Assert(cp.GlobalPoint().Position, Equals, pos1) c.Assert(cp.FlushedGlobalPoint().Position, Equals, pos1) + s.mock.ExpectBegin() + s.mock.ExpectExec(clearCheckPointSQL).WithArgs(cpid).WillReturnResult(sqlmock.NewResult(0, 1)) + s.mock.ExpectCommit() + err = cp.Clear(tctx) + c.Assert(err, IsNil) + + // check dumpling write exitSafeModeLocation in metadata + err = ioutil.WriteFile(filename, []byte( + fmt.Sprintf(`SHOW MASTER STATUS: + Log: %s + Pos: %d + GTID: + +SHOW SLAVE STATUS: + Host: %s + Log: %s + Pos: %d + GTID: + +SHOW MASTER STATUS: /* AFTER CONNECTION POOL ESTABLISHED */ + Log: %s + Pos: %d + GTID: +`, pos1.Name, pos1.Pos, "slave_host", pos1.Name, pos1.Pos+1000, pos2.Name, pos2.Pos)), 0644) + c.Assert(err, IsNil) + c.Assert(cp.LoadMeta(), IsNil) + + // should flush because globalPointSaveTime is zero + s.mock.ExpectBegin() + s.mock.ExpectExec("(202)?"+flushCheckPointSQL).WithArgs(cpid, "", "", pos1.Name, pos1.Pos, "", pos2.Name, pos2.Pos, "", []byte("null"), true).WillReturnResult(sqlmock.NewResult(0, 1)) + s.mock.ExpectCommit() + err = cp.FlushPointsExcept(tctx, nil, nil, nil) + c.Assert(err, IsNil) + s.mock.ExpectQuery(loadCheckPointSQL).WillReturnRows(sqlmock.NewRows(nil)) + err = cp.Load(tctx, s.tracker) + c.Assert(err, IsNil) + c.Assert(cp.GlobalPoint().Position, Equals, pos1) + c.Assert(cp.FlushedGlobalPoint().Position, Equals, pos1) + c.Assert(cp.SafeModeExitPoint().Position, Equals, pos2) } func (s *testCheckpointSuite) testTableCheckPoint(c *C, cp CheckPoint) { @@ -306,7 +345,7 @@ func (s *testCheckpointSuite) testTableCheckPoint(c *C, cp CheckPoint) { // flush + rollback s.mock.ExpectBegin() - s.mock.ExpectExec("(284)?"+flushCheckPointSQL).WithArgs(cpid, schema, table, pos2.Name, pos2.Pos, "", sqlmock.AnyArg(), false).WillReturnResult(sqlmock.NewResult(0, 1)) + s.mock.ExpectExec("(284)?"+flushCheckPointSQL).WithArgs(cpid, schema, table, pos2.Name, pos2.Pos, "", "", 0, "", sqlmock.AnyArg(), false).WillReturnResult(sqlmock.NewResult(0, 1)) s.mock.ExpectCommit() err = cp.FlushPointsExcept(tctx, nil, nil, nil) c.Assert(err, IsNil) @@ -341,7 +380,7 @@ func (s *testCheckpointSuite) testTableCheckPoint(c *C, cp CheckPoint) { // flush but except + rollback s.mock.ExpectBegin() - s.mock.ExpectExec("(320)?"+flushCheckPointSQL).WithArgs(cpid, "", "", pos2.Name, pos2.Pos, "", []byte("null"), true).WillReturnResult(sqlmock.NewResult(0, 1)) + s.mock.ExpectExec("(320)?"+flushCheckPointSQL).WithArgs(cpid, "", "", pos2.Name, pos2.Pos, "", "", 0, "", []byte("null"), true).WillReturnResult(sqlmock.NewResult(0, 1)) s.mock.ExpectCommit() err = cp.FlushPointsExcept(tctx, [][]string{{schema, table}}, nil, nil) c.Assert(err, IsNil) diff --git a/syncer/mode.go b/syncer/mode.go index d3fd5e144c..5cebac536c 100644 --- a/syncer/mode.go +++ b/syncer/mode.go @@ -32,9 +32,9 @@ func (s *Syncer) enableSafeModeInitializationPhase(tctx *tcontext.Context, safeM safeMode.Add(tctx, 1) // add 1 but should no corresponding -1, so keeps enabled s.tctx.L().Info("enable safe-mode by config") } - if s.cfg.SafeModeExitLoc != nil { + if s.checkpoint.SafeModeExitPoint() != nil { safeMode.Add(tctx, 1) // enable and will revert after pass SafeModeExitLoc - s.tctx.L().Info("enable safe-mode because of inconsistent dump, will exit at", zap.Stringer("location", *s.cfg.SafeModeExitLoc)) + s.tctx.L().Info("enable safe-mode because of inconsistent dump, will exit at", zap.Stringer("location", *s.checkpoint.SafeModeExitPoint())) } go func() { diff --git a/syncer/syncer.go b/syncer/syncer.go index 1911f2f04e..548f90b043 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -845,10 +845,11 @@ func (s *Syncer) resetShardingGroup(schema, table string) { } // flushCheckPoints flushes previous saved checkpoint in memory to persistent storage, like TiDB -// we flush checkpoints in three cases: +// we flush checkpoints in four cases: // 1. DDL executed // 2. at intervals (and job executed) // 3. pausing / stopping the sync (driven by `s.flushJobs`) +// 4. IsFreshTask return true // but when error occurred, we can not flush checkpoint, otherwise data may lost // and except rejecting to flush the checkpoint, we also need to rollback the checkpoint saved before // this should be handled when `s.Run` returned @@ -1388,9 +1389,10 @@ func (s *Syncer) Run(ctx context.Context) (err error) { } // check pass SafeModeExitLoc and try disable safe mode, but not in sharding or replacing error - if s.cfg.SafeModeExitLoc != nil && !s.isReplacingErr && shardingReSync == nil { - if binlog.CompareLocation(currentLocation, *s.cfg.SafeModeExitLoc, s.cfg.EnableGTID) >= 0 { - s.cfg.SafeModeExitLoc = nil + safeModeExitLoc := s.checkpoint.SafeModeExitPoint() + if safeModeExitLoc != nil && !s.isReplacingErr && shardingReSync == nil { + if binlog.CompareLocation(currentLocation, *safeModeExitLoc, s.cfg.EnableGTID) >= 0 { + s.checkpoint.SaveSafeModeExitPoint(nil) safeMode.Add(tctx, -1) } } diff --git a/syncer/syncer_test.go b/syncer/syncer_test.go index 17f3828255..467107b359 100644 --- a/syncer/syncer_test.go +++ b/syncer/syncer_test.go @@ -17,6 +17,9 @@ import ( "context" "database/sql" "fmt" + "io/ioutil" + "os" + "path/filepath" "strings" "sync" "testing" @@ -119,14 +122,20 @@ func (mp *MockStreamProducer) generateStreamer(location binlog.Location) (stream } func (s *testSyncerSuite) SetUpSuite(c *C) { + loaderDir, err := ioutil.TempDir("", "loader") + c.Assert(err, IsNil) + loaderCfg := config.LoaderConfig{ + Dir: loaderDir, + } s.cfg = &config.SubTaskConfig{ - From: getDBConfigFromEnv(), - To: getDBConfigFromEnv(), - ServerID: 101, - MetaSchema: "test", - Name: "syncer_ut", - Mode: config.ModeIncrement, - Flavor: "mysql", + From: getDBConfigFromEnv(), + To: getDBConfigFromEnv(), + ServerID: 101, + MetaSchema: "test", + Name: "syncer_ut", + Mode: config.ModeIncrement, + Flavor: "mysql", + LoaderConfig: loaderCfg, } s.cfg.From.Adjust() s.cfg.To.Adjust() @@ -213,7 +222,9 @@ func (s *testSyncerSuite) resetEventsGenerator(c *C) { } } -func (s *testSyncerSuite) TearDownSuite(c *C) {} +func (s *testSyncerSuite) TearDownSuite(c *C) { + os.RemoveAll(s.cfg.Dir) +} func (s *testSyncerSuite) mockParser(db *sql.DB, mock sqlmock.Sqlmock) (*parser.Parser, error) { mock.ExpectQuery("SHOW VARIABLES LIKE"). @@ -1355,7 +1366,7 @@ func (s *testSyncerSuite) TestExitSafeModeByConfig(c *C) { c.Assert(len(generatedEvents1), Equals, 19) safeModeExitLocation := binlog.NewLocation("") safeModeExitLocation.Position.Pos = generatedEvents1[18].Header.LogPos - syncer.cfg.SafeModeExitLoc = &safeModeExitLocation + syncer.checkpoint.SaveSafeModeExitPoint(&safeModeExitLocation) // check after safeModeExitLocation, safe mode is turned off events2 := mockBinlogEvents{ @@ -1457,6 +1468,34 @@ func (s *testSyncerSuite) TestExitSafeModeByConfig(c *C) { c.Assert(failpoint.Disable("github.com/pingcap/dm/syncer/SafeModeInitPhaseSeconds"), IsNil) } +func (s *testSyncerSuite) TestRemoveMetadataIsFine(c *C) { + cfg, err := s.cfg.Clone() + c.Assert(err, IsNil) + cfg.Mode = config.ModeAll + syncer := NewSyncer(cfg, nil) + fresh, err := syncer.IsFreshTask(context.Background()) + c.Assert(err, IsNil) + c.Assert(fresh, IsTrue) + + filename := filepath.Join(s.cfg.Dir, "metadata") + err = ioutil.WriteFile(filename, []byte( + fmt.Sprintf("SHOW MASTER STATUS:\n\tLog: BAD METADATA")), 0644) + c.Assert(err, IsNil) + c.Assert(syncer.checkpoint.LoadMeta(), NotNil) + + err = ioutil.WriteFile(filename, []byte( + fmt.Sprintf("SHOW MASTER STATUS:\n\tLog: mysql-bin.000003\n\tPos: 1234\n\tGTID:\n\n")), 0644) + c.Assert(err, IsNil) + c.Assert(syncer.checkpoint.LoadMeta(), IsNil) + + c.Assert(os.Remove(filename), IsNil) + + // after successful LoadMeta, IsFreshTask should return false so don't load again + fresh, err = syncer.IsFreshTask(context.Background()) + c.Assert(err, IsNil) + c.Assert(fresh, IsFalse) +} + func executeSQLAndWait(expectJobNum int) { for i := 0; i < 10; i++ { time.Sleep(time.Second) diff --git a/tests/import_v10x/run.sh b/tests/import_v10x/run.sh index b1d2072da5..3ef4b0343d 100644 --- a/tests/import_v10x/run.sh +++ b/tests/import_v10x/run.sh @@ -79,6 +79,11 @@ function run() { "\"result\": true" 1 diff $cur/conf/task.yaml $WORK_DIR/task.yaml || exit 1 + + run_sql "show create table \`dm_meta\`.\`test_syncer_checkpoint\`" $TIDB_PORT $TIDB_PASSWORD + check_contains "\`exit_safe_binlog_name\` varchar(128) DEFAULT ''" + check_contains "\`exit_safe_binlog_pos\` int(10) unsigned DEFAULT 0" + check_contains "\`exit_safe_binlog_gtid\` text DEFAULT NULL" run_sql_file $cur/data/db1.increment.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 run_sql_file $cur/data/db2.increment.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2