Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

*: save exitSafeModeLoc in checkpoint, load it, upgrade strategy #988

Merged
merged 26 commits into from
Sep 10, 2020
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
5b1d743
save my work
lance6716 Sep 3, 2020
a285f79
Merge branch 'master' of https://github.com/pingcap/dm into save-in-c…
lance6716 Sep 4, 2020
5df81a9
save work
lance6716 Sep 4, 2020
aa07165
Merge branch 'master' into save-in-checkpoint
lance6716 Sep 4, 2020
e090e09
fix hound
lance6716 Sep 4, 2020
e466c38
fix SQL
lance6716 Sep 4, 2020
df384da
fix tests
lance6716 Sep 5, 2020
37b5b84
Merge branch 'master' into save-in-checkpoint
lance6716 Sep 5, 2020
2bf5e6d
test checkpoint
lance6716 Sep 7, 2020
eeeea4e
fix test, add integration test
lance6716 Sep 7, 2020
64dd8cc
Merge branch 'master' into save-in-checkpoint
lance6716 Sep 7, 2020
c91b8f5
test removeMetadata
lance6716 Sep 7, 2020
adbadb9
revert some behaviour
lance6716 Sep 7, 2020
f0d07d2
print metadata in `query-status`
lance6716 Sep 7, 2020
e6d0634
Merge branch 'master' of https://github.com/pingcap/dm into save-in-c…
lance6716 Sep 8, 2020
10e9358
address comment
lance6716 Sep 8, 2020
4f4be9b
address comment
lance6716 Sep 8, 2020
556ffaf
address comment
lance6716 Sep 8, 2020
fa401af
address comment (and make it build)
lance6716 Sep 8, 2020
22da4f9
Merge branch 'save-in-checkpoint' of github.com:lance6716/dm into sav…
lance6716 Sep 8, 2020
d4705aa
address comment
lance6716 Sep 8, 2020
e618c76
address comment
lance6716 Sep 8, 2020
3ee0d8c
address comment
lance6716 Sep 8, 2020
f4b3f53
Merge branch 'master' into save-in-checkpoint
GMHDBJD Sep 9, 2020
77dd424
Merge branch 'master' into save-in-checkpoint
GMHDBJD Sep 10, 2020
2146cea
Merge branch 'master' into save-in-checkpoint
lance6716 Sep 10, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion _utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ ErrDBInvalidConn,[code=10003:class=database:scope=not-set:level=high], "Message:
ErrDBUnExpect,[code=10004:class=database:scope=not-set:level=high], "Message: unexpect database error: %s"
ErrDBQueryFailed,[code=10005:class=database:scope=not-set:level=high], "Message: query statement failed: %s"
ErrDBExecuteFailed,[code=10006:class=database:scope=not-set:level=high], "Message: execute statement failed: %s"
ErrParseMydumperMeta,[code=11001:class=functional:scope=internal:level=high], "Message: parse mydumper metadata error: %s"
ErrParseMydumperMeta,[code=11001:class=functional:scope=internal:level=high], "Message: parse mydumper metadata error: %s, metadata: %s"
ErrGetFileSize,[code=11002:class=functional:scope=internal:level=high], "Message: get file %s size"
ErrDropMultipleTables,[code=11003:class=functional:scope=internal:level=high], "Message: not allowed operation: drop multiple tables in one statement, Workaround: It is recommended to include only one DDL operation in a statement executed upstream. Please manually handle it using dmctl (skipping the DDL statement or replacing the DDL statement with a specified DDL statement). For details, see https://docs.pingcap.com/tidb-data-migration/stable/skip-or-replace-abnormal-sql-statements"
ErrRenameMultipleTables,[code=11004:class=functional:scope=internal:level=high], "Message: not allowed operation: rename multiple tables in one statement, Workaround: It is recommended to include only one DDL operation in a statement executed upstream. Please manually handle it using dmctl (skipping the DDL statement or replacing the DDL statement with a specified DDL statement). For details, see https://docs.pingcap.com/tidb-data-migration/stable/skip-or-replace-abnormal-sql-statements"
Expand Down
3 changes: 0 additions & 3 deletions dm/config/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"strings"
"time"

"github.com/pingcap/dm/pkg/binlog"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/terror"

Expand Down Expand Up @@ -242,8 +241,6 @@ type SyncerConfig struct {
EnableGTID bool `yaml:"enable-gtid" toml:"enable-gtid" json:"enable-gtid"`
DisableCausality bool `yaml:"disable-detect" toml:"disable-detect" json:"disable-detect"`
SafeMode bool `yaml:"safe-mode" toml:"safe-mode" json:"safe-mode"`
// when dump unit can't run consistent dump, enable safe mode until pass exit location of dumping
SafeModeExitLoc *binlog.Location `yaml:"-" toml:"-" json:"-"`
// deprecated, use `ansi-quotes` in top level config instead
EnableANSIQuotes bool `yaml:"enable-ansi-quotes" toml:"enable-ansi-quotes" json:"enable-ansi-quotes"`
}
Expand Down
7 changes: 6 additions & 1 deletion dm/master/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,12 @@ func (s *Server) bootstrap(ctx context.Context) error {
}
}

err := upgrade.TryUpgrade(s.etcdClient)
uctx := upgrade.Context{
Context: ctx,
SubTaskConfigs: s.scheduler.GetSubTaskCfgs(),
}
err := upgrade.TryUpgrade(s.etcdClient, uctx)

if err != nil {
return err
}
Expand Down
20 changes: 20 additions & 0 deletions dm/master/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,26 @@ func (s *Scheduler) GetSubTaskCfgsByTask(task string) map[string]*config.SubTask
return cloneM
}

// GetSubTaskCfgs gets all subconfig, return nil when error happens
func (s *Scheduler) GetSubTaskCfgs() map[string]map[string]config.SubTaskConfig {
s.mu.RLock()
defer s.mu.RUnlock()
clone := make(map[string]map[string]config.SubTaskConfig, len(s.subTaskCfgs))
for task, m := range s.subTaskCfgs {
clone2 := make(map[string]config.SubTaskConfig, len(m))
for source, cfg := range m {
cfg2, err := cfg.Clone()
if err != nil {
return nil
}
clone2[source] = *cfg2
}
clone[task] = clone2
}

return clone
}

// AddWorker adds the information of the DM-worker when registering a new instance.
// This only adds the information of the DM-worker,
// in order to know whether it's online (ready to handle works),
Expand Down
2 changes: 1 addition & 1 deletion errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ workaround = ""
tags = ["not-set", "high"]

[error.DM-functional-11001]
message = "parse mydumper metadata error: %s"
message = "parse mydumper metadata error: %s, metadata: %s"
description = ""
workaround = ""
tags = ["internal", "high"]
Expand Down
7 changes: 6 additions & 1 deletion loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"encoding/hex"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"strings"
Expand Down Expand Up @@ -1274,8 +1275,12 @@ func (l *Loader) getMydumpMetadata() error {
metafile := filepath.Join(l.cfg.LoaderConfig.Dir, "metadata")
loc, _, err := dumpling.ParseMetaData(metafile, l.cfg.Flavor)
if err != nil {
toPrint, err2 := ioutil.ReadFile(metafile)
if err2 != nil {
toPrint = []byte(err2.Error())
}
l.logCtx.L().Error("fail to parse dump metadata", log.ShortError(err))
return err
return terror.ErrParseMydumperMeta.Generate(err, toPrint)
}

l.metaBinlog.Set(loc.Position.String())
Expand Down
20 changes: 10 additions & 10 deletions pkg/dumpling/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,16 @@ import (

"github.com/pingcap/dm/pkg/binlog"
"github.com/pingcap/dm/pkg/gtid"
"github.com/pingcap/dm/pkg/terror"
)

// ParseMetaData parses mydumper's output meta file and returns binlog location.
// since v2.0.0, dumpling maybe configured to output master status after connection pool is established,
// we return this location as well.
func ParseMetaData(filename, flavor string) (*binlog.Location, *binlog.Location, error) {
invalidErr := fmt.Errorf("file %s invalid format", filename)
fd, err := os.Open(filename)
if err != nil {
return nil, nil, terror.ErrParseMydumperMeta.Generate(err)
return nil, nil, err
}
defer fd.Close()

Expand Down Expand Up @@ -89,7 +89,7 @@ func ParseMetaData(filename, flavor string) (*binlog.Location, *binlog.Location,
if err2 == io.EOF {
break
} else if err2 != nil {
return nil, nil, terror.ErrParseMydumperMeta.Generate(err2)
return nil, nil, err2
}
line = strings.TrimSpace(line)
if len(line) == 0 {
Expand All @@ -99,14 +99,14 @@ func ParseMetaData(filename, flavor string) (*binlog.Location, *binlog.Location,
switch line {
case "SHOW MASTER STATUS:":
if err3 := parsePosAndGTID(&pos, &gtidStr); err3 != nil {
return nil, nil, terror.ErrParseMydumperMeta.Generate(err3)
return nil, nil, err3
}
case "SHOW SLAVE STATUS:":
// ref: https://github.com/maxbube/mydumper/blob/master/mydumper.c#L434
for {
line, err3 := br.ReadString('\n')
if err3 != nil {
return nil, nil, terror.ErrParseMydumperMeta.Generate(err3)
return nil, nil, err3
}
line = strings.TrimSpace(line)
if len(line) == 0 {
Expand All @@ -116,20 +116,20 @@ func ParseMetaData(filename, flavor string) (*binlog.Location, *binlog.Location,
case "SHOW MASTER STATUS: /* AFTER CONNECTION POOL ESTABLISHED */":
useLocation2 = true
if err3 := parsePosAndGTID(&pos2, &gtidStr2); err3 != nil {
return nil, nil, terror.ErrParseMydumperMeta.Generate(err3)
return nil, nil, err3
}
default:
// do nothing for Started dump, Finished dump...
}
}

if len(pos.Name) == 0 || pos.Pos == uint32(0) {
return nil, nil, terror.ErrParseMydumperMeta.Generate(fmt.Sprintf("file %s invalid format", filename))
return nil, nil, invalidErr
}

gset, err := gtid.ParserGTID(flavor, gtidStr)
if err != nil {
return nil, nil, terror.ErrParseMydumperMeta.Generate(fmt.Sprintf("file %s invalid format", filename))
return nil, nil, invalidErr
}
loc = &binlog.Location{
Position: pos,
Expand All @@ -138,11 +138,11 @@ func ParseMetaData(filename, flavor string) (*binlog.Location, *binlog.Location,

if useLocation2 {
if len(pos2.Name) == 0 || pos2.Pos == uint32(0) {
return nil, nil, terror.ErrParseMydumperMeta.Generate(fmt.Sprintf("file %s invalid format", filename))
return nil, nil, invalidErr
}
gset2, err := gtid.ParserGTID(flavor, gtidStr2)
if err != nil {
return nil, nil, terror.ErrParseMydumperMeta.Generate(fmt.Sprintf("file %s invalid format", filename))
return nil, nil, invalidErr
}
loc2 = &binlog.Location{
Position: pos2,
Expand Down
2 changes: 1 addition & 1 deletion pkg/terror/error_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -607,7 +607,7 @@ var (
ErrDBExecuteFailed = New(codeDBExecuteFailed, ClassDatabase, ScopeNotSet, LevelHigh, "execute statement failed: %s", "")

// Functional error
ErrParseMydumperMeta = New(codeParseMydumperMeta, ClassFunctional, ScopeInternal, LevelHigh, "parse mydumper metadata error: %s", "")
ErrParseMydumperMeta = New(codeParseMydumperMeta, ClassFunctional, ScopeInternal, LevelHigh, "parse mydumper metadata error: %s, metadata: %s", "")
ErrGetFileSize = New(codeGetFileSize, ClassFunctional, ScopeInternal, LevelHigh, "get file %s size", "")
ErrDropMultipleTables = New(codeDropMultipleTables, ClassFunctional, ScopeInternal, LevelHigh, "not allowed operation: drop multiple tables in one statement", "It is recommended to include only one DDL operation in a statement executed upstream. Please manually handle it using dmctl (skipping the DDL statement or replacing the DDL statement with a specified DDL statement). For details, see https://docs.pingcap.com/tidb-data-migration/stable/skip-or-replace-abnormal-sql-statements")
ErrRenameMultipleTables = New(codeRenameMultipleTables, ClassFunctional, ScopeInternal, LevelHigh, "not allowed operation: rename multiple tables in one statement", "It is recommended to include only one DDL operation in a statement executed upstream. Please manually handle it using dmctl (skipping the DDL statement or replacing the DDL statement with a specified DDL statement). For details, see https://docs.pingcap.com/tidb-data-migration/stable/skip-or-replace-abnormal-sql-statements")
Expand Down
102 changes: 97 additions & 5 deletions pkg/upgrade/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,48 @@

package upgrade

import "go.etcd.io/etcd/clientv3"
import (
"context"
"fmt"

"github.com/pingcap/tidb-tools/pkg/dbutil"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"

"github.com/pingcap/dm/dm/config"
"github.com/pingcap/dm/pkg/conn"
tcontext "github.com/pingcap/dm/pkg/context"
"github.com/pingcap/dm/pkg/cputil"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/utils"
)

var (
// upgrades records all functions used to upgrade from one version to the later version.
upgrades = []func(cli *clientv3.Client) error{
upgrades = []func(cli *clientv3.Client, uctx Context) error{
upgradeToVer1,
upgradeToVer2,
}
)

// Context is used to pass something to TryUpgrade
// NOTE that zero value of Context is nil, be aware of nil-dereference
type Context struct {
context.Context
SubTaskConfigs map[string]map[string]config.SubTaskConfig
}

// NewUpgradeContext creates a Context, avoid nil Context member
func NewUpgradeContext() Context {
return Context{
Context: context.Background(),
SubTaskConfigs: make(map[string]map[string]config.SubTaskConfig),
}
}

// TryUpgrade tries to upgrade the cluster from an older version to a new version.
// This methods should have no side effects even calling multiple times.
func TryUpgrade(cli *clientv3.Client) error {
func TryUpgrade(cli *clientv3.Client, uctx Context) error {
// 1. get previous version from etcd.
preVer, _, err := GetVersion(cli)
if err != nil {
Expand Down Expand Up @@ -52,7 +82,7 @@ func TryUpgrade(cli *clientv3.Client) error {

// 4. do upgrade operations.
for _, upgrade := range upgrades {
err = upgrade(cli)
err = upgrade(cli, uctx)
if err != nil {
return err
}
Expand All @@ -65,6 +95,68 @@ func TryUpgrade(cli *clientv3.Client) error {

// upgradeToVer1 does upgrade operations from Ver0 to Ver1.
// in fact, this do nothing now, and just for demonstration.
func upgradeToVer1(cli *clientv3.Client) error {
func upgradeToVer1(cli *clientv3.Client, uctx Context) error {
return nil
}

// upgradeToVer2 does upgrade operations from Ver1 to Ver2 (v2.0.0-rc.3) to upgrade syncer checkpoint schema
// TODO: determine v2.0.0-rc.3 or another version in above line
func upgradeToVer2(cli *clientv3.Client, uctx Context) error {
upgradeTaskName := "upgradeToVer2"
logger := log.L().WithFields(zap.String("task", upgradeTaskName))
csuzhangxc marked this conversation as resolved.
Show resolved Hide resolved

if uctx.SubTaskConfigs == nil {
logger.Info("no downstream DB, skipping")
return nil
}

// tableName -> DBConfig
dbConfigs := map[string]config.DBConfig{}
for task, m := range uctx.SubTaskConfigs {
for sourceID, subCfg := range m {
tableName := dbutil.TableName(subCfg.MetaSchema, cputil.SyncerCheckpoint(subCfg.Name))
subCfg2, err := subCfg.DecryptPassword()
if err != nil {
log.L().Error("subconfig error when upgrading", zap.String("task", task),
zap.String("source id", sourceID), zap.String("subtask config", subCfg.String()), zap.Error(err))
return err
}
dbConfigs[tableName] = subCfg2.To
}
}

toClose := make([]*conn.BaseDB, 0, len(dbConfigs))
defer func() {
for _, db := range toClose {
db.Close()
}
}()
for tableName, cfg := range dbConfigs {
targetDB, err := conn.DefaultDBProvider.Apply(cfg)
if err != nil {
logger.Error("target DB error when upgrading", zap.String("table name", tableName))
return err
csuzhangxc marked this conversation as resolved.
Show resolved Hide resolved
}
toClose = append(toClose, targetDB)
// try to add columns.
// NOTE: ignore already exists error to continue the process.
queries := []string{
fmt.Sprintf(`ALTER TABLE %s ADD COLUMN exit_safe_binlog_name VARCHAR(128) DEFAULT '' AFTER binlog_gtid`, tableName),
fmt.Sprintf(`ALTER TABLE %s ADD COLUMN exit_safe_binlog_pos INT UNSIGNED DEFAULT 0 AFTER exit_safe_binlog_name`, tableName),
fmt.Sprintf(`ALTER TABLE %s ADD COLUMN exit_safe_binlog_gtid TEXT AFTER exit_safe_binlog_pos`, tableName),
}
tctx := tcontext.NewContext(uctx.Context, logger)
dbConn, err := targetDB.GetBaseConn(tctx.Ctx)
if err != nil {
logger.Error("skip target DB when upgrading", zap.String("table name", tableName))
return err
}
_, err = dbConn.ExecuteSQLWithIgnoreError(tctx, nil, upgradeTaskName, utils.IgnoreErrorCheckpoint, queries)
if err != nil {
logger.Error("error while adding column for checkpoint table", zap.String("table name", tableName))
return err
}
}

return nil
}
12 changes: 6 additions & 6 deletions pkg/upgrade/upgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ func (t *testForEtcd) TestTryUpgrade(c *C) {
upgrades = oldUpgrades
}()
mockVerNo := uint64(0)
upgrades = []func(cli *clientv3.Client) error{
func(cli *clientv3.Client) error {
upgrades = []func(cli *clientv3.Client, uctx Context) error{
func(cli *clientv3.Client, uctx Context) error {
mockVerNo = currentInternalNo + 1
return nil
},
Expand All @@ -70,15 +70,15 @@ func (t *testForEtcd) TestTryUpgrade(c *C) {
c.Assert(ver.NotSet(), IsTrue)

// try to upgrade, but do nothing except the current version recorded.
c.Assert(TryUpgrade(etcdTestCli), IsNil)
c.Assert(TryUpgrade(etcdTestCli, NewUpgradeContext()), IsNil)
ver, rev2, err := GetVersion(etcdTestCli)
c.Assert(err, IsNil)
c.Assert(rev2, Greater, rev1)
c.Assert(ver, DeepEquals, CurrentVersion)
c.Assert(mockVerNo, Equals, uint64(0))

// try to upgrade again, do nothing because the version is the same.
c.Assert(TryUpgrade(etcdTestCli), IsNil)
c.Assert(TryUpgrade(etcdTestCli, NewUpgradeContext()), IsNil)
ver, rev3, err := GetVersion(etcdTestCli)
c.Assert(err, IsNil)
c.Assert(rev3, Equals, rev2)
Expand All @@ -94,7 +94,7 @@ func (t *testForEtcd) TestTryUpgrade(c *C) {
CurrentVersion = newerVer

// try to upgrade, to a newer version, upgrade operations applied.
c.Assert(TryUpgrade(etcdTestCli), IsNil)
c.Assert(TryUpgrade(etcdTestCli, NewUpgradeContext()), IsNil)
ver, rev4, err := GetVersion(etcdTestCli)
c.Assert(err, IsNil)
c.Assert(rev4, Greater, rev3)
Expand All @@ -103,7 +103,7 @@ func (t *testForEtcd) TestTryUpgrade(c *C) {

// try to upgrade, to an older version, do nothing.
CurrentVersion = oldCurrentVer
c.Assert(TryUpgrade(etcdTestCli), IsNil)
c.Assert(TryUpgrade(etcdTestCli, NewUpgradeContext()), IsNil)
ver, rev5, err := GetVersion(etcdTestCli)
c.Assert(err, IsNil)
c.Assert(rev5, Equals, rev4)
Expand Down
2 changes: 1 addition & 1 deletion pkg/upgrade/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ const (
// The current internal version number of the DM cluster used when upgrading from an older version.
// NOTE: +1 when a new incompatible version is introduced, so it's different from the release version.
// NOTE: it's the version of the cluster (= the version of DM-master leader now), other component versions are not recorded yet.
currentInternalNo uint64 = 1
currentInternalNo uint64 = 2
// The minimum internal version number of the DM cluster used when importing from v1.0.x.
minInternalNo uint64 = 0
)
Expand Down
Loading