Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
*: save exitSafeModeLoc in checkpoint, load it, upgrade strategy (#988)…
Browse files Browse the repository at this point in the history
… (#1017)
  • Loading branch information
lance6716 authored Sep 10, 2020
1 parent c06229b commit 3db5dcc
Show file tree
Hide file tree
Showing 19 changed files with 382 additions and 88 deletions.
2 changes: 1 addition & 1 deletion _utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ ErrDBInvalidConn,[code=10003:class=database:scope=not-set:level=high], "Message:
ErrDBUnExpect,[code=10004:class=database:scope=not-set:level=high], "Message: unexpect database error: %s"
ErrDBQueryFailed,[code=10005:class=database:scope=not-set:level=high], "Message: query statement failed: %s"
ErrDBExecuteFailed,[code=10006:class=database:scope=not-set:level=high], "Message: execute statement failed: %s"
ErrParseMydumperMeta,[code=11001:class=functional:scope=internal:level=high], "Message: parse mydumper metadata error: %s"
ErrParseMydumperMeta,[code=11001:class=functional:scope=internal:level=high], "Message: parse mydumper metadata error: %s, metadata: %s"
ErrGetFileSize,[code=11002:class=functional:scope=internal:level=high], "Message: get file %s size"
ErrDropMultipleTables,[code=11003:class=functional:scope=internal:level=high], "Message: not allowed operation: drop multiple tables in one statement, Workaround: It is recommended to include only one DDL operation in a statement executed upstream. Please manually handle it using dmctl (skipping the DDL statement or replacing the DDL statement with a specified DDL statement). For details, see https://docs.pingcap.com/tidb-data-migration/stable/skip-or-replace-abnormal-sql-statements"
ErrRenameMultipleTables,[code=11004:class=functional:scope=internal:level=high], "Message: not allowed operation: rename multiple tables in one statement, Workaround: It is recommended to include only one DDL operation in a statement executed upstream. Please manually handle it using dmctl (skipping the DDL statement or replacing the DDL statement with a specified DDL statement). For details, see https://docs.pingcap.com/tidb-data-migration/stable/skip-or-replace-abnormal-sql-statements"
Expand Down
3 changes: 0 additions & 3 deletions dm/config/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"strings"
"time"

"github.com/pingcap/dm/pkg/binlog"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/terror"

Expand Down Expand Up @@ -242,8 +241,6 @@ type SyncerConfig struct {
EnableGTID bool `yaml:"enable-gtid" toml:"enable-gtid" json:"enable-gtid"`
DisableCausality bool `yaml:"disable-detect" toml:"disable-detect" json:"disable-detect"`
SafeMode bool `yaml:"safe-mode" toml:"safe-mode" json:"safe-mode"`
// when dump unit can't run consistent dump, enable safe mode until pass exit location of dumping
SafeModeExitLoc *binlog.Location `yaml:"-" toml:"-" json:"-"`
// deprecated, use `ansi-quotes` in top level config instead
EnableANSIQuotes bool `yaml:"enable-ansi-quotes" toml:"enable-ansi-quotes" json:"enable-ansi-quotes"`
}
Expand Down
7 changes: 6 additions & 1 deletion dm/master/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,12 @@ func (s *Server) bootstrap(ctx context.Context) error {
}
}

err := upgrade.TryUpgrade(s.etcdClient)
uctx := upgrade.Context{
Context: ctx,
SubTaskConfigs: s.scheduler.GetSubTaskCfgs(),
}
err := upgrade.TryUpgrade(s.etcdClient, uctx)

if err != nil {
return err
}
Expand Down
20 changes: 20 additions & 0 deletions dm/master/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,26 @@ func (s *Scheduler) GetSubTaskCfgsByTask(task string) map[string]*config.SubTask
return cloneM
}

// GetSubTaskCfgs gets all subconfig, return nil when error happens
func (s *Scheduler) GetSubTaskCfgs() map[string]map[string]config.SubTaskConfig {
s.mu.RLock()
defer s.mu.RUnlock()
clone := make(map[string]map[string]config.SubTaskConfig, len(s.subTaskCfgs))
for task, m := range s.subTaskCfgs {
clone2 := make(map[string]config.SubTaskConfig, len(m))
for source, cfg := range m {
cfg2, err := cfg.Clone()
if err != nil {
return nil
}
clone2[source] = *cfg2
}
clone[task] = clone2
}

return clone
}

// AddWorker adds the information of the DM-worker when registering a new instance.
// This only adds the information of the DM-worker,
// in order to know whether it's online (ready to handle works),
Expand Down
2 changes: 1 addition & 1 deletion errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ workaround = ""
tags = ["not-set", "high"]

[error.DM-functional-11001]
message = "parse mydumper metadata error: %s"
message = "parse mydumper metadata error: %s, metadata: %s"
description = ""
workaround = ""
tags = ["internal", "high"]
Expand Down
7 changes: 6 additions & 1 deletion loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"encoding/hex"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"strings"
Expand Down Expand Up @@ -1274,8 +1275,12 @@ func (l *Loader) getMydumpMetadata() error {
metafile := filepath.Join(l.cfg.LoaderConfig.Dir, "metadata")
loc, _, err := dumpling.ParseMetaData(metafile, l.cfg.Flavor)
if err != nil {
toPrint, err2 := ioutil.ReadFile(metafile)
if err2 != nil {
toPrint = []byte(err2.Error())
}
l.logCtx.L().Error("fail to parse dump metadata", log.ShortError(err))
return err
return terror.ErrParseMydumperMeta.Generate(err, toPrint)
}

l.metaBinlog.Set(loc.Position.String())
Expand Down
20 changes: 10 additions & 10 deletions pkg/dumpling/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,16 @@ import (

"github.com/pingcap/dm/pkg/binlog"
"github.com/pingcap/dm/pkg/gtid"
"github.com/pingcap/dm/pkg/terror"
)

// ParseMetaData parses mydumper's output meta file and returns binlog location.
// since v2.0.0, dumpling maybe configured to output master status after connection pool is established,
// we return this location as well.
func ParseMetaData(filename, flavor string) (*binlog.Location, *binlog.Location, error) {
invalidErr := fmt.Errorf("file %s invalid format", filename)
fd, err := os.Open(filename)
if err != nil {
return nil, nil, terror.ErrParseMydumperMeta.Generate(err)
return nil, nil, err
}
defer fd.Close()

Expand Down Expand Up @@ -89,7 +89,7 @@ func ParseMetaData(filename, flavor string) (*binlog.Location, *binlog.Location,
if err2 == io.EOF {
break
} else if err2 != nil {
return nil, nil, terror.ErrParseMydumperMeta.Generate(err2)
return nil, nil, err2
}
line = strings.TrimSpace(line)
if len(line) == 0 {
Expand All @@ -99,14 +99,14 @@ func ParseMetaData(filename, flavor string) (*binlog.Location, *binlog.Location,
switch line {
case "SHOW MASTER STATUS:":
if err3 := parsePosAndGTID(&pos, &gtidStr); err3 != nil {
return nil, nil, terror.ErrParseMydumperMeta.Generate(err3)
return nil, nil, err3
}
case "SHOW SLAVE STATUS:":
// ref: https://github.com/maxbube/mydumper/blob/master/mydumper.c#L434
for {
line, err3 := br.ReadString('\n')
if err3 != nil {
return nil, nil, terror.ErrParseMydumperMeta.Generate(err3)
return nil, nil, err3
}
line = strings.TrimSpace(line)
if len(line) == 0 {
Expand All @@ -116,20 +116,20 @@ func ParseMetaData(filename, flavor string) (*binlog.Location, *binlog.Location,
case "SHOW MASTER STATUS: /* AFTER CONNECTION POOL ESTABLISHED */":
useLocation2 = true
if err3 := parsePosAndGTID(&pos2, &gtidStr2); err3 != nil {
return nil, nil, terror.ErrParseMydumperMeta.Generate(err3)
return nil, nil, err3
}
default:
// do nothing for Started dump, Finished dump...
}
}

if len(pos.Name) == 0 || pos.Pos == uint32(0) {
return nil, nil, terror.ErrParseMydumperMeta.Generate(fmt.Sprintf("file %s invalid format", filename))
return nil, nil, invalidErr
}

gset, err := gtid.ParserGTID(flavor, gtidStr)
if err != nil {
return nil, nil, terror.ErrParseMydumperMeta.Generate(fmt.Sprintf("file %s invalid format", filename))
return nil, nil, invalidErr
}
loc = &binlog.Location{
Position: pos,
Expand All @@ -138,11 +138,11 @@ func ParseMetaData(filename, flavor string) (*binlog.Location, *binlog.Location,

if useLocation2 {
if len(pos2.Name) == 0 || pos2.Pos == uint32(0) {
return nil, nil, terror.ErrParseMydumperMeta.Generate(fmt.Sprintf("file %s invalid format", filename))
return nil, nil, invalidErr
}
gset2, err := gtid.ParserGTID(flavor, gtidStr2)
if err != nil {
return nil, nil, terror.ErrParseMydumperMeta.Generate(fmt.Sprintf("file %s invalid format", filename))
return nil, nil, invalidErr
}
loc2 = &binlog.Location{
Position: pos2,
Expand Down
2 changes: 1 addition & 1 deletion pkg/terror/error_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -607,7 +607,7 @@ var (
ErrDBExecuteFailed = New(codeDBExecuteFailed, ClassDatabase, ScopeNotSet, LevelHigh, "execute statement failed: %s", "")

// Functional error
ErrParseMydumperMeta = New(codeParseMydumperMeta, ClassFunctional, ScopeInternal, LevelHigh, "parse mydumper metadata error: %s", "")
ErrParseMydumperMeta = New(codeParseMydumperMeta, ClassFunctional, ScopeInternal, LevelHigh, "parse mydumper metadata error: %s, metadata: %s", "")
ErrGetFileSize = New(codeGetFileSize, ClassFunctional, ScopeInternal, LevelHigh, "get file %s size", "")
ErrDropMultipleTables = New(codeDropMultipleTables, ClassFunctional, ScopeInternal, LevelHigh, "not allowed operation: drop multiple tables in one statement", "It is recommended to include only one DDL operation in a statement executed upstream. Please manually handle it using dmctl (skipping the DDL statement or replacing the DDL statement with a specified DDL statement). For details, see https://docs.pingcap.com/tidb-data-migration/stable/skip-or-replace-abnormal-sql-statements")
ErrRenameMultipleTables = New(codeRenameMultipleTables, ClassFunctional, ScopeInternal, LevelHigh, "not allowed operation: rename multiple tables in one statement", "It is recommended to include only one DDL operation in a statement executed upstream. Please manually handle it using dmctl (skipping the DDL statement or replacing the DDL statement with a specified DDL statement). For details, see https://docs.pingcap.com/tidb-data-migration/stable/skip-or-replace-abnormal-sql-statements")
Expand Down
102 changes: 97 additions & 5 deletions pkg/upgrade/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,48 @@

package upgrade

import "go.etcd.io/etcd/clientv3"
import (
"context"
"fmt"

"github.com/pingcap/tidb-tools/pkg/dbutil"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"

"github.com/pingcap/dm/dm/config"
"github.com/pingcap/dm/pkg/conn"
tcontext "github.com/pingcap/dm/pkg/context"
"github.com/pingcap/dm/pkg/cputil"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/utils"
)

var (
// upgrades records all functions used to upgrade from one version to the later version.
upgrades = []func(cli *clientv3.Client) error{
upgrades = []func(cli *clientv3.Client, uctx Context) error{
upgradeToVer1,
upgradeToVer2,
}
)

// Context is used to pass something to TryUpgrade
// NOTE that zero value of Context is nil, be aware of nil-dereference
type Context struct {
context.Context
SubTaskConfigs map[string]map[string]config.SubTaskConfig
}

// NewUpgradeContext creates a Context, avoid nil Context member
func NewUpgradeContext() Context {
return Context{
Context: context.Background(),
SubTaskConfigs: make(map[string]map[string]config.SubTaskConfig),
}
}

// TryUpgrade tries to upgrade the cluster from an older version to a new version.
// This methods should have no side effects even calling multiple times.
func TryUpgrade(cli *clientv3.Client) error {
func TryUpgrade(cli *clientv3.Client, uctx Context) error {
// 1. get previous version from etcd.
preVer, _, err := GetVersion(cli)
if err != nil {
Expand Down Expand Up @@ -52,7 +82,7 @@ func TryUpgrade(cli *clientv3.Client) error {

// 4. do upgrade operations.
for _, upgrade := range upgrades {
err = upgrade(cli)
err = upgrade(cli, uctx)
if err != nil {
return err
}
Expand All @@ -65,6 +95,68 @@ func TryUpgrade(cli *clientv3.Client) error {

// upgradeToVer1 does upgrade operations from Ver0 to Ver1.
// in fact, this do nothing now, and just for demonstration.
func upgradeToVer1(cli *clientv3.Client) error {
func upgradeToVer1(cli *clientv3.Client, uctx Context) error {
return nil
}

// upgradeToVer2 does upgrade operations from Ver1 to Ver2 (v2.0.0-rc.3) to upgrade syncer checkpoint schema
// TODO: determine v2.0.0-rc.3 or another version in above line
func upgradeToVer2(cli *clientv3.Client, uctx Context) error {
upgradeTaskName := "upgradeToVer2"
logger := log.L().WithFields(zap.String("task", upgradeTaskName))

if uctx.SubTaskConfigs == nil {
logger.Info("no downstream DB, skipping")
return nil
}

// tableName -> DBConfig
dbConfigs := map[string]config.DBConfig{}
for task, m := range uctx.SubTaskConfigs {
for sourceID, subCfg := range m {
tableName := dbutil.TableName(subCfg.MetaSchema, cputil.SyncerCheckpoint(subCfg.Name))
subCfg2, err := subCfg.DecryptPassword()
if err != nil {
log.L().Error("subconfig error when upgrading", zap.String("task", task),
zap.String("source id", sourceID), zap.String("subtask config", subCfg.String()), zap.Error(err))
return err
}
dbConfigs[tableName] = subCfg2.To
}
}

toClose := make([]*conn.BaseDB, 0, len(dbConfigs))
defer func() {
for _, db := range toClose {
db.Close()
}
}()
for tableName, cfg := range dbConfigs {
targetDB, err := conn.DefaultDBProvider.Apply(cfg)
if err != nil {
logger.Error("target DB error when upgrading", zap.String("table name", tableName))
return err
}
toClose = append(toClose, targetDB)
// try to add columns.
// NOTE: ignore already exists error to continue the process.
queries := []string{
fmt.Sprintf(`ALTER TABLE %s ADD COLUMN exit_safe_binlog_name VARCHAR(128) DEFAULT '' AFTER binlog_gtid`, tableName),
fmt.Sprintf(`ALTER TABLE %s ADD COLUMN exit_safe_binlog_pos INT UNSIGNED DEFAULT 0 AFTER exit_safe_binlog_name`, tableName),
fmt.Sprintf(`ALTER TABLE %s ADD COLUMN exit_safe_binlog_gtid TEXT AFTER exit_safe_binlog_pos`, tableName),
}
tctx := tcontext.NewContext(uctx.Context, logger)
dbConn, err := targetDB.GetBaseConn(tctx.Ctx)
if err != nil {
logger.Error("skip target DB when upgrading", zap.String("table name", tableName))
return err
}
_, err = dbConn.ExecuteSQLWithIgnoreError(tctx, nil, upgradeTaskName, utils.IgnoreErrorCheckpoint, queries)
if err != nil {
logger.Error("error while adding column for checkpoint table", zap.String("table name", tableName))
return err
}
}

return nil
}
12 changes: 6 additions & 6 deletions pkg/upgrade/upgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ func (t *testForEtcd) TestTryUpgrade(c *C) {
upgrades = oldUpgrades
}()
mockVerNo := uint64(0)
upgrades = []func(cli *clientv3.Client) error{
func(cli *clientv3.Client) error {
upgrades = []func(cli *clientv3.Client, uctx Context) error{
func(cli *clientv3.Client, uctx Context) error {
mockVerNo = currentInternalNo + 1
return nil
},
Expand All @@ -70,15 +70,15 @@ func (t *testForEtcd) TestTryUpgrade(c *C) {
c.Assert(ver.NotSet(), IsTrue)

// try to upgrade, but do nothing except the current version recorded.
c.Assert(TryUpgrade(etcdTestCli), IsNil)
c.Assert(TryUpgrade(etcdTestCli, NewUpgradeContext()), IsNil)
ver, rev2, err := GetVersion(etcdTestCli)
c.Assert(err, IsNil)
c.Assert(rev2, Greater, rev1)
c.Assert(ver, DeepEquals, CurrentVersion)
c.Assert(mockVerNo, Equals, uint64(0))

// try to upgrade again, do nothing because the version is the same.
c.Assert(TryUpgrade(etcdTestCli), IsNil)
c.Assert(TryUpgrade(etcdTestCli, NewUpgradeContext()), IsNil)
ver, rev3, err := GetVersion(etcdTestCli)
c.Assert(err, IsNil)
c.Assert(rev3, Equals, rev2)
Expand All @@ -94,7 +94,7 @@ func (t *testForEtcd) TestTryUpgrade(c *C) {
CurrentVersion = newerVer

// try to upgrade, to a newer version, upgrade operations applied.
c.Assert(TryUpgrade(etcdTestCli), IsNil)
c.Assert(TryUpgrade(etcdTestCli, NewUpgradeContext()), IsNil)
ver, rev4, err := GetVersion(etcdTestCli)
c.Assert(err, IsNil)
c.Assert(rev4, Greater, rev3)
Expand All @@ -103,7 +103,7 @@ func (t *testForEtcd) TestTryUpgrade(c *C) {

// try to upgrade, to an older version, do nothing.
CurrentVersion = oldCurrentVer
c.Assert(TryUpgrade(etcdTestCli), IsNil)
c.Assert(TryUpgrade(etcdTestCli, NewUpgradeContext()), IsNil)
ver, rev5, err := GetVersion(etcdTestCli)
c.Assert(err, IsNil)
c.Assert(rev5, Equals, rev4)
Expand Down
2 changes: 1 addition & 1 deletion pkg/upgrade/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ const (
// The current internal version number of the DM cluster used when upgrading from an older version.
// NOTE: +1 when a new incompatible version is introduced, so it's different from the release version.
// NOTE: it's the version of the cluster (= the version of DM-master leader now), other component versions are not recorded yet.
currentInternalNo uint64 = 1
currentInternalNo uint64 = 2
// The minimum internal version number of the DM cluster used when importing from v1.0.x.
minInternalNo uint64 = 0
)
Expand Down
Loading

0 comments on commit 3db5dcc

Please sign in to comment.