Skip to content

Commit

Permalink
worker: use random value as server-id if server-id is not set (pingca…
Browse files Browse the repository at this point in the history
  • Loading branch information
WangXiangUSTC authored and csuzhangxc committed Oct 25, 2019
1 parent 0f1f30b commit eecbead
Show file tree
Hide file tree
Showing 36 changed files with 310 additions and 78 deletions.
4 changes: 4 additions & 0 deletions cmd/dm-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ package main
import (
"flag"
"fmt"
"math/rand"
"os"
"os/signal"
"strings"
"syscall"
"time"

"github.com/pingcap/dm/dm/worker"
"github.com/pingcap/dm/pkg/log"
Expand All @@ -30,6 +32,8 @@ import (
)

func main() {
rand.Seed(time.Now().UnixNano())

cfg := worker.NewConfig()
err := cfg.Parse(os.Args[1:])
switch errors.Cause(err) {
Expand Down
2 changes: 1 addition & 1 deletion dm/config/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ type SubTaskConfig struct {
IgnoreCheckingItems []string `toml:"ignore-checking-items" json:"ignore-checking-items"`
// it represents a MySQL/MariaDB instance or a replica group
SourceID string `toml:"source-id" json:"source-id"`
ServerID int `toml:"server-id" json:"server-id"`
ServerID uint32 `toml:"server-id" json:"server-id"`
Flavor string `toml:"flavor" json:"flavor"`
MetaSchema string `toml:"meta-schema" json:"meta-schema"`
RemoveMeta bool `toml:"remove-meta" json:"remove-meta"`
Expand Down
111 changes: 87 additions & 24 deletions dm/worker/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@ package worker
import (
"bytes"
"context"
"database/sql"
"encoding/base64"
"encoding/json"
"flag"
"fmt"
"io/ioutil"
"math"
"math/rand"
"strings"
"time"

Expand All @@ -39,18 +42,23 @@ import (
)

const (
// flavorReadTimeout is readTimeout for DB connection in adjustFlavor
flavorReadTimeout = "30s"
// flavorGetTimeout is timeout for getting version info from DB
flavorGetTimeout = 30 * time.Second
// dbReadTimeout is readTimeout for DB connection in adjust
dbReadTimeout = "30s"
// dbGetTimeout is timeout for getting some information from DB
dbGetTimeout = 30 * time.Second

// the default base(min) server id generated by random
defaultBaseServerID = math.MaxUint32 / 10
)

// SampleConfigFile is sample config file of dm-worker
// later we can read it from dm/worker/dm-worker.toml
// and assign it to SampleConfigFile while we build dm-worker
var SampleConfigFile string

var applyNewBaseDB = conn.DefaultDBProvider.Apply
var (
getAllServerIDFunc = utils.GetAllServerID
)

// NewConfig creates a new base config for worker.
func NewConfig() *Config {
Expand Down Expand Up @@ -94,7 +102,7 @@ type Config struct {
AutoFixGTID bool `toml:"auto-fix-gtid" json:"auto-fix-gtid"`
RelayDir string `toml:"relay-dir" json:"relay-dir"`
MetaDir string `toml:"meta-dir" json:"meta-dir"`
ServerID int `toml:"server-id" json:"server-id"`
ServerID uint32 `toml:"server-id" json:"server-id"`
Flavor string `toml:"flavor" json:"flavor"`
Charset string `toml:"charset" json:"charset"`

Expand Down Expand Up @@ -199,9 +207,7 @@ func (c *Config) Parse(arguments []string) error {
// assign tracer id to source id
c.Tracer.Source = c.SourceID

c.From.Adjust()
c.Checker.adjust()
err = c.adjustFlavor()
err = c.adjust()
if err != nil {
return err
}
Expand Down Expand Up @@ -256,8 +262,52 @@ func (c *Config) configFromFile(path string) error {
return c.verify()
}

func (c *Config) adjust() error {
c.From.Adjust()
c.Checker.adjust()

if c.Flavor == "" || c.ServerID == 0 {
fromDB, err := c.createFromDB()
if err != nil {
return err
}
defer fromDB.Close()

ctx, cancel := context.WithTimeout(context.Background(), dbGetTimeout)
defer cancel()

err = c.adjustFlavor(ctx, fromDB.DB)
if err != nil {
return err
}

err = c.adjustServerID(ctx, fromDB.DB)
if err != nil {
return err
}
}

return nil
}

func (c *Config) createFromDB() (*conn.BaseDB, error) {
// decrypt password
clone, err := c.DecryptPassword()
if err != nil {
return nil, err
}
from := clone.From
from.RawDBCfg = config.DefaultRawDBConfig().SetReadTimeout(dbReadTimeout)
fromDB, err := conn.DefaultDBProvider.Apply(from)
if err != nil {
return nil, terror.WithScope(err, terror.ScopeUpstream)
}

return fromDB, nil
}

// adjustFlavor adjusts flavor through querying from given database
func (c *Config) adjustFlavor() error {
func (c *Config) adjustFlavor(ctx context.Context, db *sql.DB) (err error) {
if c.Flavor != "" {
switch c.Flavor {
case mysql.MariaDBFlavor, mysql.MySQLFlavor:
Expand All @@ -266,26 +316,39 @@ func (c *Config) adjustFlavor() error {
return terror.ErrNotSupportedFlavor.Generate(c.Flavor)
}
}
// decrypt password
clone, err := c.DecryptPassword()
if err != nil {
return err

c.Flavor, err = utils.GetFlavor(ctx, db)
if ctx.Err() != nil {
err = terror.Annotatef(err, "time cost to get flavor info exceeds %s", dbGetTimeout)
}
return terror.WithScope(err, terror.ScopeUpstream)
}

func (c *Config) adjustServerID(ctx context.Context, db *sql.DB) error {
if c.ServerID != 0 {
return nil
}

serverIDs, err := getAllServerIDFunc(ctx, db)
if ctx.Err() != nil {
err = terror.Annotatef(err, "time cost to get server-id info exceeds %s", dbGetTimeout)
}
from := clone.From
from.RawDBCfg = config.DefaultRawDBConfig().SetReadTimeout(flavorReadTimeout)
fromDB, err := applyNewBaseDB(from)
if err != nil {
return terror.WithScope(err, terror.ScopeUpstream)
}
defer fromDB.Close()

ctx, cancel := context.WithTimeout(context.Background(), flavorGetTimeout)
defer cancel()
c.Flavor, err = utils.GetFlavor(ctx, fromDB.DB)
if ctx.Err() != nil {
err = terror.Annotatef(err, "time cost to get flavor info exceeds %s", flavorGetTimeout)
for i := 0; i < 5; i++ {
randomValue := uint32(rand.Intn(100000))
randomServerID := defaultBaseServerID + randomValue
if _, ok := serverIDs[randomServerID]; ok {
continue
}

c.ServerID = randomServerID
return nil
}
return terror.WithScope(err, terror.ScopeUpstream)

return terror.ErrInvalidServerID.Generatef("can't find a random available server ID")
}

// UpdateConfigFile write configure to local file
Expand Down
51 changes: 35 additions & 16 deletions dm/worker/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,26 @@
package worker

import (
"context"
"database/sql"
"fmt"
"io/ioutil"
"path"
"strings"

"github.com/DATA-DOG/go-sqlmock"
sqlmock "github.com/DATA-DOG/go-sqlmock"
. "github.com/pingcap/check"
"github.com/siddontang/go-mysql/mysql"

"github.com/pingcap/dm/dm/config"
"github.com/pingcap/dm/pkg/conn"
)

func (t *testServer) TestConfig(c *C) {
cfg := NewConfig()

c.Assert(cfg.Parse([]string{"-config=./dm-worker.toml", "-relay-dir=./xx"}), IsNil)
c.Assert(cfg.RelayDir, Equals, "./xx")
c.Assert(cfg.ServerID, Equals, 101)
c.Assert(cfg.ServerID, Equals, uint32(101))

dir := c.MkDir()
cfg.ConfigFile = path.Join(dir, "dm-worker.toml")
Expand All @@ -41,7 +42,7 @@ func (t *testServer) TestConfig(c *C) {
clone1 := cfg.Clone()
c.Assert(cfg, DeepEquals, clone1)
clone1.ServerID = 100
c.Assert(cfg.ServerID, Equals, 101)
c.Assert(cfg.ServerID, Equals, uint32(101))

// test format
c.Assert(cfg.String(), Matches, `.*"server-id":101.*`)
Expand All @@ -55,10 +56,10 @@ func (t *testServer) TestConfig(c *C) {
// test update config file and reload
c.Assert(cfg.UpdateConfigFile(tomlStr), IsNil)
c.Assert(cfg.Reload(), IsNil)
c.Assert(cfg.ServerID, Equals, 100)
c.Assert(cfg.ServerID, Equals, uint32(100))
c.Assert(cfg.UpdateConfigFile(originCfgStr), IsNil)
c.Assert(cfg.Reload(), IsNil)
c.Assert(cfg.ServerID, Equals, 101)
c.Assert(cfg.ServerID, Equals, uint32(101))

// test decrypt password
clone1.From.Password = "1234"
Expand Down Expand Up @@ -170,10 +171,8 @@ func subtestFlavor(c *C, cfg *Config, sqlInfo, expectedFlavor, expectedError str
WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).
AddRow("version", sqlInfo))
mock.ExpectClose()
applyNewBaseDB = func(config config.DBConfig) (*conn.BaseDB, error) {
return &conn.BaseDB{DB: db}, nil
}
err = cfg.adjustFlavor()

err = cfg.adjustFlavor(context.Background(), db)
if expectedError == "" {
c.Assert(err, IsNil)
c.Assert(cfg.Flavor, Equals, expectedFlavor)
Expand All @@ -187,18 +186,38 @@ func (t *testServer) TestAdjustFlavor(c *C) {
c.Assert(cfg.Parse([]string{"-config=./dm-worker.toml", "-relay-dir=./xx"}), IsNil)

cfg.Flavor = "mariadb"
err := cfg.adjustFlavor()
err := cfg.adjustFlavor(context.Background(), nil)
c.Assert(err, IsNil)
c.Assert(cfg.Flavor, Equals, mysql.MariaDBFlavor)
cfg.Flavor = "MongoDB"
err = cfg.adjustFlavor()
err = cfg.adjustFlavor(context.Background(), nil)
c.Assert(err, ErrorMatches, ".*flavor MongoDB not supported")

var origApplyNewBaseDB = applyNewBaseDB
subtestFlavor(c, cfg, "10.4.8-MariaDB-1:10.4.8+maria~bionic", mysql.MariaDBFlavor, "")
subtestFlavor(c, cfg, "5.7.26-log", mysql.MySQLFlavor, "")
}

func (t *testServer) TestAdjustServerID(c *C) {
var originGetAllServerIDFunc = getAllServerIDFunc
defer func() {
applyNewBaseDB = origApplyNewBaseDB
getAllServerIDFunc = originGetAllServerIDFunc
}()
getAllServerIDFunc = getMockServerIDs

subtestFlavor(c, cfg, "10.4.8-MariaDB-1:10.4.8+maria~bionic", mysql.MariaDBFlavor, "")
subtestFlavor(c, cfg, "5.7.26-log", mysql.MySQLFlavor, "")
cfg := NewConfig()
c.Assert(cfg.Parse([]string{"-config=./dm-worker.toml", "-relay-dir=./xx"}), IsNil)

cfg.adjustServerID(context.Background(), nil)
c.Assert(cfg.ServerID, Equals, uint32(101))

cfg.ServerID = 0
cfg.adjustServerID(context.Background(), nil)
c.Assert(cfg.ServerID, Not(Equals), 0)
}

func getMockServerIDs(ctx context.Context, db *sql.DB) (map[uint32]struct{}, error) {
return map[uint32]struct{}{
1: {},
2: {},
}, nil
}
Loading

0 comments on commit eecbead

Please sign in to comment.