Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Refactor worker for high available DM #424

Merged
merged 26 commits into from
Dec 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
66d7520
add interface for create worker
Little-Wallace Dec 11, 2019
4a33af9
add interface for mysql task
Little-Wallace Dec 12, 2019
c0a6081
refactor config
Little-Wallace Dec 12, 2019
4f49772
refactor worker
Little-Wallace Dec 13, 2019
562a8e6
Merge branch 'ha-dev' of github.com:pingcap/dm into my-ha-dev
Little-Wallace Dec 13, 2019
c77ae5d
fix test
Little-Wallace Dec 13, 2019
d3a7b30
fix ut config
Little-Wallace Dec 16, 2019
c1bbe85
fix fmt
Little-Wallace Dec 16, 2019
dbf2b1a
fix master api
Little-Wallace Dec 16, 2019
f035b9a
pass ctl
Little-Wallace Dec 16, 2019
3c8ab65
fix worker config init
Little-Wallace Dec 16, 2019
b581be3
fix worker test
Little-Wallace Dec 17, 2019
4cd141c
Merge branch 'ha-dev' of github.com:pingcap/dm into my-ha-dev
Little-Wallace Dec 17, 2019
74d35e9
refactor master for create mysql task
Little-Wallace Dec 17, 2019
f8cc19e
fix worker stop
Little-Wallace Dec 17, 2019
f978732
try when keepalive fail
Little-Wallace Dec 17, 2019
a6dded5
fix close bug
Little-Wallace Dec 18, 2019
f483837
HA worker
Little-Wallace Dec 18, 2019
94de741
Merge branch 'ha-dev' of github.com:pingcap/dm into my-ha-dev
Little-Wallace Dec 18, 2019
0048974
fix error format
Little-Wallace Dec 18, 2019
6438c5a
refactor start task
Little-Wallace Dec 18, 2019
27c213b
refact Task field name
Little-Wallace Dec 18, 2019
3b23198
remove useless refresh API
Little-Wallace Dec 19, 2019
0b47f9c
refactor worker
Little-Wallace Dec 20, 2019
0b88ae9
rename workerconfig to mysqlconfig
Little-Wallace Dec 20, 2019
f3d2521
fix client
Little-Wallace Dec 20, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions _utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,9 @@ ErrWorkerGetVersionFromKV,[code=40045:class=dm-worker:scope=internal:level=high]
ErrWorkerSaveVersionToKV,[code=40046:class=dm-worker:scope=internal:level=high],"save version %v into levelDB with key %v"
ErrWorkerVerAutoDowngrade,[code=40047:class=dm-worker:scope=internal:level=high],"the previous version %s is newer than current %s, automatic downgrade is not supported now, please handle it manually"
ErrWorkerStartService,[code=40048:class=dm-worker:scope=internal:level=high],"start server"
ErrWorkerNoStart,[code=40070:class=dm-worker:scope=internal:level=high],"worker has not started"
ErrWorkerAlreadyClosed,[code=40049:class=dm-worker:scope=internal:level=high],"worker already closed"
ErrWorkerAlreadyStart,[code=40071:class=dm-worker:scope=internal:level=high],"worker already started"
ErrWorkerNotRunningStage,[code=40050:class=dm-worker:scope=internal:level=high],"current stage is not running not valid"
ErrWorkerNotPausedStage,[code=40051:class=dm-worker:scope=internal:level=high],"current stage is not paused not valid"
ErrWorkerUpdateTaskStage,[code=40052:class=dm-worker:scope=internal:level=high],"can only update task on Paused stage, but current stage is %s"
Expand Down
62 changes: 62 additions & 0 deletions dm/config/checker_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package config

import (
"encoding/json"
"time"
)

// Backoff related constants
var (
DefaultCheckInterval = 5 * time.Second
DefaultBackoffRollback = 5 * time.Minute
DefaultBackoffMin = 1 * time.Second
DefaultBackoffMax = 5 * time.Minute
DefaultBackoffJitter = true
DefaultBackoffFactor float64 = 2
)

// Duration is used to hold a time.Duration field
type Duration struct {
time.Duration
}

// MarshalText hacks to satisfy the encoding.TextMarshaler interface
func (d Duration) MarshalText() ([]byte, error) {
return []byte(d.Duration.String()), nil
}

// UnmarshalText hacks to satisfy the encoding.TextUnmarshaler interface
func (d Duration) UnmarshalText(text []byte) error {
var err error
d.Duration, err = time.ParseDuration(string(text))
return err
}

// MarshalJSON hacks to satisfy the json.Marshaler interface
func (d *Duration) MarshalJSON() ([]byte, error) {
return json.Marshal(&struct {
Duration string `json:"Duration"`
}{
d.Duration.String(),
})
}

// CheckerConfig is configuration used for TaskStatusChecker
type CheckerConfig struct {
CheckEnable bool `toml:"check-enable" json:"check-enable"`
BackoffRollback Duration `toml:"backoff-rollback" json:"backoff-rollback"`
BackoffMax Duration `toml:"backoff-max" json:"backoff-max"`
// unexpose config
CheckInterval Duration `json:"-"`
BackoffMin Duration `json:"-"`
BackoffJitter bool `json:"-"`
BackoffFactor float64 `json:"-"`
}

// Adjust sets default value for field: CheckInterval/BackoffMin/BackoffJitter/BackoffFactor
func (cc *CheckerConfig) Adjust() {
cc.CheckInterval = Duration{Duration: DefaultCheckInterval}
cc.BackoffMin = Duration{Duration: DefaultBackoffMin}
cc.BackoffJitter = DefaultBackoffJitter
cc.BackoffFactor = DefaultBackoffFactor
}
280 changes: 280 additions & 0 deletions dm/config/mysql_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,280 @@
package config

import (
"bytes"
"context"
"database/sql"
"encoding/json"
"github.com/BurntSushi/toml"
"github.com/pingcap/dm/pkg/binlog"
"github.com/pingcap/dm/pkg/gtid"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/terror"
"github.com/pingcap/dm/pkg/tracing"
"github.com/pingcap/dm/pkg/utils"
"github.com/siddontang/go-mysql/mysql"
"math"
"math/rand"
"strings"
"time"
)

const (
// dbReadTimeout is readTimeout for DB connection in adjust
dbReadTimeout = "30s"
// dbGetTimeout is timeout for getting some information from DB
dbGetTimeout = 30 * time.Second

// the default base(min) server id generated by random
defaultBaseServerID = math.MaxUint32 / 10
)

// PurgeConfig is the configuration for Purger
type PurgeConfig struct {
Interval int64 `toml:"interval" json:"interval"` // check whether need to purge at this @Interval (seconds)
Expires int64 `toml:"expires" json:"expires"` // if file's modified time is older than @Expires (hours), then it can be purged
RemainSpace int64 `toml:"remain-space" json:"remain-space"` // if remain space in @RelayBaseDir less than @RemainSpace (GB), then it can be purged
}

// MysqlConfig is the configuration for Worker
type MysqlConfig struct {
EnableGTID bool `toml:"enable-gtid" json:"enable-gtid"`
AutoFixGTID bool `toml:"auto-fix-gtid" json:"auto-fix-gtid"`
RelayDir string `toml:"relay-dir" json:"relay-dir"`
MetaDir string `toml:"meta-dir" json:"meta-dir"`
Flavor string `toml:"flavor" json:"flavor"`
Charset string `toml:"charset" json:"charset"`

// relay synchronous starting point (if specified)
RelayBinLogName string `toml:"relay-binlog-name" json:"relay-binlog-name"`
RelayBinlogGTID string `toml:"relay-binlog-gtid" json:"relay-binlog-gtid"`

SourceID string `toml:"source-id" json:"source-id"`
From DBConfig `toml:"from" json:"from"`

// config items for purger
Purge PurgeConfig `toml:"purge" json:"purge"`

// config items for task status checker
Checker CheckerConfig `toml:"checker" json:"checker"`

// config items for tracer
Tracer tracing.Config `toml:"tracer" json:"tracer"`

// id of the worker on which this task run
ServerID uint32 `toml:"server-id" json:"server-id"`
}

// NewWorkerConfig creates a new base config for worker.
func NewWorkerConfig() *MysqlConfig {
c := &MysqlConfig{
RelayDir: "relay-dir",
Purge: PurgeConfig{
Interval: 60 * 60,
Expires: 0,
RemainSpace: 15,
},
Checker: CheckerConfig{
CheckEnable: true,
BackoffRollback: Duration{DefaultBackoffRollback},
BackoffMax: Duration{DefaultBackoffMax},
},
Tracer: tracing.Config{
Enable: false,
TracerAddr: "",
BatchSize: 20,
Checksum: false,
},
}
c.adjust()
return c
}

// Clone clones a config
func (c *MysqlConfig) Clone() *MysqlConfig {
clone := &MysqlConfig{}
*clone = *c
return clone
}

// Toml returns TOML format representation of config
func (c *MysqlConfig) Toml() (string, error) {
var b bytes.Buffer

err := toml.NewEncoder(&b).Encode(c)
if err != nil {
log.L().Error("fail to marshal config to toml", log.ShortError(err))
}

return b.String(), nil
}

// Parse parses flag definitions from the argument list.
func (c *MysqlConfig) Parse(content string) error {
// Parse first to get config file.
metaData, err := toml.Decode(content, c)
return c.check(&metaData, err)
}

func (c *MysqlConfig) String() string {
cfg, err := json.Marshal(c)
if err != nil {
log.L().Error("fail to marshal config to json", log.ShortError(err))
}
return string(cfg)
}

func (c *MysqlConfig) adjust() {
c.From.Adjust()
c.Checker.Adjust()
}

// Verify verifies the config
func (c *MysqlConfig) Verify() error {
if len(c.SourceID) == 0 {
return terror.ErrWorkerNeedSourceID.Generate()
}
if len(c.SourceID) > MaxSourceIDLength {
return terror.ErrWorkerTooLongSourceID.Generate(c.SourceID, MaxSourceIDLength)
}

var err error
if len(c.RelayBinLogName) > 0 {
if !binlog.VerifyFilename(c.RelayBinLogName) {
return terror.ErrWorkerRelayBinlogName.Generate(c.RelayBinLogName)
}
}
if len(c.RelayBinlogGTID) > 0 {
_, err = gtid.ParserGTID(c.Flavor, c.RelayBinlogGTID)
if err != nil {
return terror.WithClass(terror.Annotatef(err, "relay-binlog-gtid %s", c.RelayBinlogGTID), terror.ClassDMWorker)
}
}

_, err = c.DecryptPassword()
if err != nil {
return err
}

return nil
}

// DecryptPassword returns a decrypted config replica in config
func (c *MysqlConfig) DecryptPassword() (*MysqlConfig, error) {
clone := c.Clone()
var (
pswdFrom string
err error
)
if len(clone.From.Password) > 0 {
pswdFrom, err = utils.Decrypt(clone.From.Password)
if err != nil {
return nil, terror.WithClass(err, terror.ClassDMWorker)
}
}
clone.From.Password = pswdFrom
return clone, nil
}

// GenerateDBConfig creates DBConfig for DB
func (c *MysqlConfig) GenerateDBConfig() (*DBConfig, error) {
// decrypt password
clone, err := c.DecryptPassword()
if err != nil {
return nil, err
}
from := &clone.From
from.RawDBCfg = DefaultRawDBConfig().SetReadTimeout(dbReadTimeout)
return from, nil
}

// Adjust flavor and serverid of MysqlConfig
func (c *MysqlConfig) Adjust(db *sql.DB) (err error) {
c.From.Adjust()
c.Checker.Adjust()

if c.Flavor == "" || c.ServerID == 0 {
ctx, cancel := context.WithTimeout(context.Background(), dbGetTimeout)
defer cancel()

err = c.AdjustFlavor(ctx, db)
if err != nil {
return err
}

err = c.AdjustServerID(ctx, db)
if err != nil {
return err
}
}

return nil
}

// AdjustFlavor adjust Flavor from DB
func (c *MysqlConfig) AdjustFlavor(ctx context.Context, db *sql.DB) (err error) {
if c.Flavor != "" {
switch c.Flavor {
case mysql.MariaDBFlavor, mysql.MySQLFlavor:
return nil
default:
return terror.ErrNotSupportedFlavor.Generate(c.Flavor)
}
}

c.Flavor, err = utils.GetFlavor(ctx, db)
if ctx.Err() != nil {
err = terror.Annotatef(err, "time cost to get flavor info exceeds %s", dbGetTimeout)
}
return terror.WithScope(err, terror.ScopeUpstream)
}

// AdjustServerID adjust server id from DB
func (c *MysqlConfig) AdjustServerID(ctx context.Context, db *sql.DB) error {
if c.ServerID != 0 {
return nil
}

serverIDs, err := utils.GetAllServerID(ctx, db)
if ctx.Err() != nil {
err = terror.Annotatef(err, "time cost to get server-id info exceeds %s", dbGetTimeout)
}
if err != nil {
return terror.WithScope(err, terror.ScopeUpstream)
}

for i := 0; i < 5; i++ {
randomValue := uint32(rand.Intn(100000))
randomServerID := defaultBaseServerID + randomValue
if _, ok := serverIDs[randomServerID]; ok {
continue
}

c.ServerID = randomServerID
return nil
}

return terror.ErrInvalidServerID.Generatef("can't find a random available server ID")
}

// LoadFromFile loads config from file.
func (c *MysqlConfig) LoadFromFile(path string) error {
metaData, err := toml.DecodeFile(path, c)
return c.check(&metaData, err)
}

func (c *MysqlConfig) check(metaData *toml.MetaData, err error) error {
if err != nil {
return terror.ErrWorkerDecodeConfigFromFile.Delegate(err)
}
undecoded := metaData.Undecoded()
if len(undecoded) > 0 && err == nil {
var undecodedItems []string
for _, item := range undecoded {
undecodedItems = append(undecodedItems, item.String())
}
return terror.ErrWorkerUndecodedItemFromFile.Generate(strings.Join(undecodedItems, ","))
}
c.adjust()
return nil
}
2 changes: 1 addition & 1 deletion dm/ctl/common/operate_relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,6 @@ func OperateRelay(op pb.RelayOp, workers []string) (*pb.OperateWorkerRelayRespon
cli := MasterClient()
return cli.OperateWorkerRelayTask(ctx, &pb.OperateWorkerRelayRequest{
Op: op,
Workers: workers,
Sources: workers,
})
}
2 changes: 1 addition & 1 deletion dm/ctl/common/operate_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,6 @@ func OperateTask(op pb.TaskOp, name string, workers []string) (*pb.OperateTaskRe
return cli.OperateTask(ctx, &pb.OperateTaskRequest{
Op: op,
Name: name,
Workers: workers,
Sources: workers,
})
}
2 changes: 1 addition & 1 deletion dm/ctl/ctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ func NewRootCmd() *cobra.Command {
master.NewUpdateTaskCmd(),
master.NewQueryStatusCmd(),
master.NewQueryErrorCmd(),
master.NewRefreshWorkerTasks(),
master.NewSQLReplaceCmd(),
master.NewSQLSkipCmd(),
master.NewSQLInjectCmd(),
Expand All @@ -76,6 +75,7 @@ func NewRootCmd() *cobra.Command {
master.NewUpdateRelayCmd(),
master.NewPurgeRelayCmd(),
master.NewMigrateRelayCmd(),
master.NewOperateMysqlWorkerCmd(),
)
return cmd
}
Expand Down
2 changes: 1 addition & 1 deletion dm/ctl/master/break_ddl_lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func breakDDLLockFunc(cmd *cobra.Command, _ []string) {
defer cancel()
cli := common.MasterClient()
resp, err := cli.BreakWorkerDDLLock(ctx, &pb.BreakWorkerDDLLockRequest{
Workers: workers,
Sources: workers,
Task: taskName,
RemoveLockID: removeLockID,
ExecDDL: exec,
Expand Down
Loading