Skip to content

Commit

Permalink
dm-master: embed etcd in DM-master (pingcap#344)
Browse files Browse the repository at this point in the history
  • Loading branch information
csuzhangxc authored Nov 12, 2019
1 parent 333b750 commit 41275db
Show file tree
Hide file tree
Showing 14 changed files with 562 additions and 336 deletions.
4 changes: 4 additions & 0 deletions _utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,10 @@ ErrMasterOperRespNotSuccess,[code=38032:class=dm-master:scope=internal:level=hig
ErrMasterOperRequestTimeout,[code=38033:class=dm-master:scope=internal:level=high],"request is timeout, but request may be successful, please execute `query-status` to check status"
ErrMasterHandleHTTPApis,[code=38034:class=dm-master:scope=internal:level=high],"serve http apis to grpc"
ErrMasterHostPortNotValid,[code=38035:class=dm-master:scope=internal:level=high],"host:port '%s' not valid"
ErrMasterGetHostnameFail,[code=38036:class=dm-master:scope=internal:level=high],"get hostname fail"
ErrMasterGenEmbedEtcdConfigFail,[code=38037:class=dm-master:scope=internal:level=high],"generate config item %s for embed etcd fail"
ErrMasterStartEmbedEtcdFail,[code=38038:class=dm-master:scope=internal:level=high],"start embed etcd fail"
ErrMasterParseURLFail,[code=38039:class=dm-master:scope=internal:level=high],"parse URL %s fail"
ErrWorkerParseFlagSet,[code=40001:class=dm-worker:scope=internal:level=medium],"parse dm-worker config flag set"
ErrWorkerInvalidFlag,[code=40002:class=dm-worker:scope=internal:level=medium],"'%s' is an invalid flag"
ErrWorkerDecodeConfigFromFile,[code=40003:class=dm-worker:scope=internal:level=medium],"toml decode file"
Expand Down
30 changes: 18 additions & 12 deletions cmd/dm-master/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package main

import (
"context"
"flag"
"fmt"
"os"
Expand All @@ -30,6 +31,7 @@ import (
)

func main() {
// 1. parse config
cfg := master.NewConfig()
err := cfg.Parse(os.Args[1:])
switch errors.Cause(err) {
Expand All @@ -41,6 +43,7 @@ func main() {
os.Exit(2)
}

// 2. init logger
err = log.InitLogger(&log.Config{
File: cfg.LogFile,
Level: strings.ToLower(cfg.LogLevel),
Expand All @@ -50,39 +53,42 @@ func main() {
os.Exit(2)
}

// 3. print process version information
utils.PrintInfo("dm-master", func() {
log.L().Info("", zap.Stringer("dm-master config", cfg))
})

// 4. start the server
ctx, cancel := context.WithCancel(context.Background())
server := master.NewServer(cfg)
err = server.Start(ctx)
if err != nil {
log.L().Error("fail to start dm-master", zap.Error(err))
os.Exit(2)
}

// 5. wait for stopping the process
sc := make(chan os.Signal, 1)
signal.Notify(sc,
syscall.SIGHUP,
syscall.SIGINT,
syscall.SIGTERM,
syscall.SIGQUIT)

server := master.NewServer(cfg)

go func() {
sig := <-sc
log.L().Info("got signal to exit", zap.Stringer("signal", sig))
server.Close()
cancel()
}()
<-ctx.Done()

err = server.Start()
if err != nil {
log.L().Error("fail to start dm-master", zap.Error(err))
}
// 6. close the server
server.Close()

log.L().Info("dm-master exit")

// 7. flush log
syncErr := log.L().Sync()
if syncErr != nil {
fmt.Fprintln(os.Stderr, "sync log failed", syncErr)
}

if err != nil || syncErr != nil {
os.Exit(1)
}
}
131 changes: 125 additions & 6 deletions dm/master/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,33 @@ import (
"flag"
"fmt"
"io/ioutil"
"net"
"net/url"
"os"
"strings"
"time"

"github.com/BurntSushi/toml"
"go.etcd.io/etcd/embed"
"go.uber.org/zap"

"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/terror"
"github.com/pingcap/dm/pkg/utils"
)

"github.com/BurntSushi/toml"
"go.uber.org/zap"
const (
defaultRPCTimeout = "30s"
defaultNamePrefix = "dm-master"
defaultDataDirPrefix = "default"
defaultPeerUrls = "http://127.0.0.1:8291"
)

// SampleConfigFile is sample config file of dm-master
// later we can read it from dm/master/dm-master.toml
// and assign it to SampleConfigFile while we build dm-master
var SampleConfigFile string

var defaultRPCTimeout = "30s"

// NewConfig creates a config for dm-master
func NewConfig() *Config {
cfg := &Config{}
Expand All @@ -51,6 +60,12 @@ func NewConfig() *Config {
fs.StringVar(&cfg.LogFile, "log-file", "", "log file path")
//fs.StringVar(&cfg.LogRotate, "log-rotate", "day", "log file rotate type, hour/day")

fs.StringVar(&cfg.Name, "name", "", "human-readable name for this DM-master member")
fs.StringVar(&cfg.DataDir, "data-dir", "", `path to the data directory (default "default.${name}")`)
fs.StringVar(&cfg.InitialCluster, "initial-cluster", "", fmt.Sprintf("initial cluster configuration for bootstrapping, e,g. dm-master=%s", defaultPeerUrls))
fs.StringVar(&cfg.PeerUrls, "peer-urls", defaultPeerUrls, "URLs for peer traffic")
fs.StringVar(&cfg.AdvertisePeerUrls, "advertise-peer-urls", "", `advertise URLs for peer traffic (default "${peer-urls}")`)

return cfg
}

Expand Down Expand Up @@ -90,6 +105,15 @@ type Config struct {

ConfigFile string `json:"config-file"`

// etcd relative config items
// NOTE: we use `MasterAddr` to generate `ClientUrls` and `AdvertiseClientUrls`
// NOTE: more items will be add when adding leader election
Name string `toml:"name" json:"name"`
DataDir string `toml:"data-dir" json:"data-dir"`
PeerUrls string `toml:"peer-urls" json:"peer-urls"`
AdvertisePeerUrls string `toml:"advertise-peer-urls" json:"advertise-peer-urls"`
InitialCluster string `toml:"initial-cluster" json:"initial-cluster"`

printVersion bool
printSampleConfig bool
}
Expand Down Expand Up @@ -169,9 +193,15 @@ func (c *Config) configFromFile(path string) error {

// adjust adjusts configs
func (c *Config) adjust() error {
// MasterAddr's format may be "host:port" or ":port"
_, _, err := net.SplitHostPort(c.MasterAddr)
if err != nil {
return terror.ErrMasterHostPortNotValid.Delegate(err, c.MasterAddr)
}

c.DeployMap = make(map[string]string)
for _, item := range c.Deploy {
if err := item.Verify(); err != nil {
if err = item.Verify(); err != nil {
return err
}

Expand Down Expand Up @@ -202,7 +232,37 @@ func (c *Config) adjust() error {
c.RPCRateBurst = DefaultBurst
}

return nil
if c.Name == "" {
var hostname string
hostname, err = os.Hostname()
if err != nil {
return terror.ErrMasterGetHostnameFail.Delegate(err)
}
c.Name = fmt.Sprintf("%s-%s", defaultNamePrefix, hostname)
}

if c.DataDir == "" {
c.DataDir = fmt.Sprintf("%s.%s", defaultDataDirPrefix, c.Name)
}

if c.PeerUrls == "" {
c.PeerUrls = defaultPeerUrls
}

if c.AdvertisePeerUrls == "" {
c.AdvertisePeerUrls = defaultPeerUrls
}

if c.InitialCluster == "" {
items := strings.Split(c.AdvertisePeerUrls, ",")
for i, item := range items {
items[i] = fmt.Sprintf("%s=%s", c.Name, item)
}
c.InitialCluster = strings.Join(items, ",")
}

_, err = c.genEmbedEtcdConfig() // verify embed etcd config
return err
}

// UpdateConfigFile write config to local file
Expand All @@ -228,3 +288,62 @@ func (c *Config) Reload() error {

return c.adjust()
}

// genEmbedEtcdConfig generates the configuration needed by embed etcd.
func (c *Config) genEmbedEtcdConfig() (*embed.Config, error) {
cfg := embed.NewConfig()
cfg.Name = c.Name
cfg.Dir = c.DataDir

// reuse the previous master-addr as the client listening URL.
cURL, err := parseURLs(c.MasterAddr)
if err != nil {
return nil, terror.ErrMasterGenEmbedEtcdConfigFail.Delegate(err, "invalid master-addr")
}
cfg.LCUrls = cURL
cfg.ACUrls = cURL

cfg.LPUrls, err = parseURLs(c.PeerUrls)
if err != nil {
return nil, terror.ErrMasterGenEmbedEtcdConfigFail.Delegate(err, "invalid peer-urls")
}

cfg.APUrls, err = parseURLs(c.AdvertisePeerUrls)
if err != nil {
return nil, terror.ErrMasterGenEmbedEtcdConfigFail.Delegate(err, "invalid advertise-peer-urls")
}

cfg.InitialCluster = c.InitialCluster

return cfg, nil
}

// parseURLs parse a string into multiple urls.
// if the URL in the string without protocol scheme, use `http` as the default.
// if no IP exists in the address, `0.0.0.0` is used.
func parseURLs(s string) ([]url.URL, error) {
if s == "" {
return nil, nil
}

items := strings.Split(s, ",")
urls := make([]url.URL, 0, len(items))
for _, item := range items {
u, err := url.Parse(item)
// tolerate valid `master-addr`, but invalid URL format, like:
// `:8261`: missing protocol scheme
// `127.0.0.1:8261`: first path segment in URL cannot contain colon
if err != nil && (strings.Contains(err.Error(), "missing protocol scheme") ||
strings.Contains(err.Error(), "first path segment in URL cannot contain colon")) {
u, err = url.Parse("http://" + item)
}
if err != nil {
return nil, terror.ErrMasterParseURLFail.Delegate(err, item)
}
if strings.Index(u.Host, ":") == 0 {
u.Host = "0.0.0.0" + u.Host
}
urls = append(urls, *u)
}
return urls, nil
}
Loading

0 comments on commit 41275db

Please sign in to comment.