Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

dm-master: embed etcd in DM-master #344

Merged
merged 16 commits into from
Nov 12, 2019
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions _utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,10 @@ ErrMasterOperRespNotSuccess,[code=38032:class=dm-master:scope=internal:level=hig
ErrMasterOperRequestTimeout,[code=38033:class=dm-master:scope=internal:level=high],"request is timeout, but request may be successful, please execute `query-status` to check status"
ErrMasterHandleHTTPApis,[code=38034:class=dm-master:scope=internal:level=high],"serve http apis to grpc"
ErrMasterHostPortNotValid,[code=38035:class=dm-master:scope=internal:level=high],"host:port '%s' not valid"
ErrMasterGetHostnameFail,[code=38036:class=dm-master:scope=internal:level=high],"get hostname fail"
ErrMasterGenEmbedEtcdConfigFail,[code=38037:class=dm-master:scope=internal:level=high],"generate config item %s for embed etcd fail"
ErrMasterStartEmbedEtcdFail,[code=38038:class=dm-master:scope=internal:level=high],"start embed etcd fail"
ErrMasterParseURLFail,[code=38039:class=dm-master:scope=internal:level=high],"parse URL %s fail"
ErrWorkerParseFlagSet,[code=40001:class=dm-worker:scope=internal:level=medium],"parse dm-worker config flag set"
ErrWorkerInvalidFlag,[code=40002:class=dm-worker:scope=internal:level=medium],"'%s' is an invalid flag"
ErrWorkerDecodeConfigFromFile,[code=40003:class=dm-worker:scope=internal:level=medium],"toml decode file"
Expand Down
30 changes: 18 additions & 12 deletions cmd/dm-master/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package main

import (
"context"
"flag"
"fmt"
"os"
Expand All @@ -30,6 +31,7 @@ import (
)

func main() {
// 1. parse config
cfg := master.NewConfig()
err := cfg.Parse(os.Args[1:])
switch errors.Cause(err) {
Expand All @@ -41,6 +43,7 @@ func main() {
os.Exit(2)
}

// 2. init logger
err = log.InitLogger(&log.Config{
File: cfg.LogFile,
Level: strings.ToLower(cfg.LogLevel),
Expand All @@ -50,39 +53,42 @@ func main() {
os.Exit(2)
}

// 3. print process version information
utils.PrintInfo("dm-master", func() {
log.L().Info("", zap.Stringer("dm-master config", cfg))
})

// 4. start the server
ctx, cancel := context.WithCancel(context.Background())
server := master.NewServer(cfg)
err = server.Start(ctx)
if err != nil {
log.L().Error("fail to start dm-master", zap.Error(err))
os.Exit(2)
}

// 5. wait for stopping the process
sc := make(chan os.Signal, 1)
signal.Notify(sc,
syscall.SIGHUP,
syscall.SIGINT,
syscall.SIGTERM,
syscall.SIGQUIT)

server := master.NewServer(cfg)

go func() {
sig := <-sc
log.L().Info("got signal to exit", zap.Stringer("signal", sig))
server.Close()
cancel()
}()
<-ctx.Done()

err = server.Start()
if err != nil {
log.L().Error("fail to start dm-master", zap.Error(err))
}
// 6. close the server
server.Close()

log.L().Info("dm-master exit")

// 7. flush log
syncErr := log.L().Sync()
if syncErr != nil {
fmt.Fprintln(os.Stderr, "sync log failed", syncErr)
}

if err != nil || syncErr != nil {
os.Exit(1)
}
}
131 changes: 125 additions & 6 deletions dm/master/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,33 @@ import (
"flag"
"fmt"
"io/ioutil"
"net"
"net/url"
"os"
"strings"
"time"

"github.com/BurntSushi/toml"
"go.etcd.io/etcd/embed"
"go.uber.org/zap"

"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/terror"
"github.com/pingcap/dm/pkg/utils"
)

"github.com/BurntSushi/toml"
"go.uber.org/zap"
const (
defaultRPCTimeout = "30s"
defaultNamePrefix = "dm-master"
defaultDataDirPrefix = "default"
defaultPeerUrls = "http://127.0.0.1:8291"
)

// SampleConfigFile is sample config file of dm-master
// later we can read it from dm/master/dm-master.toml
// and assign it to SampleConfigFile while we build dm-master
var SampleConfigFile string

var defaultRPCTimeout = "30s"

// NewConfig creates a config for dm-master
func NewConfig() *Config {
cfg := &Config{}
Expand All @@ -51,6 +60,12 @@ func NewConfig() *Config {
fs.StringVar(&cfg.LogFile, "log-file", "", "log file path")
//fs.StringVar(&cfg.LogRotate, "log-rotate", "day", "log file rotate type, hour/day")

fs.StringVar(&cfg.Name, "name", "", "human-readable name for this DM-master member")
fs.StringVar(&cfg.DataDir, "data-dir", "", `path to the data directory (default "default.${name}")`)
fs.StringVar(&cfg.InitialCluster, "initial-cluster", "", fmt.Sprintf("initial cluster configuration for bootstrapping, e,g. dm-master=%s", defaultPeerUrls))
fs.StringVar(&cfg.PeerUrls, "peer-urls", defaultPeerUrls, "URLs for peer traffic")
fs.StringVar(&cfg.AdvertisePeerUrls, "advertise-peer-urls", "", `advertise URLs for peer traffic (default "${peer-urls}")`)

return cfg
}

Expand Down Expand Up @@ -90,6 +105,15 @@ type Config struct {

ConfigFile string `json:"config-file"`

// etcd relative config items
// NOTE: we use `MasterAddr` to generate `ClientUrls` and `AdvertiseClientUrls`
// NOTE: more items will be add when adding leader election
Name string `toml:"name" json:"name"`
DataDir string `toml:"data-dir" json:"data-dir"`
PeerUrls string `toml:"peer-urls" json:"peer-urls"`
AdvertisePeerUrls string `toml:"advertise-peer-urls" json:"advertise-peer-urls"`
InitialCluster string `toml:"initial-cluster" json:"initial-cluster"`

printVersion bool
printSampleConfig bool
}
Expand Down Expand Up @@ -169,9 +193,15 @@ func (c *Config) configFromFile(path string) error {

// adjust adjusts configs
func (c *Config) adjust() error {
// MasterAddr's format may be "host:port" or ":port"
_, _, err := net.SplitHostPort(c.MasterAddr)
if err != nil {
return terror.ErrMasterHostPortNotValid.Delegate(err, c.MasterAddr)
}

c.DeployMap = make(map[string]string)
for _, item := range c.Deploy {
if err := item.Verify(); err != nil {
if err = item.Verify(); err != nil {
return err
}

Expand Down Expand Up @@ -202,7 +232,37 @@ func (c *Config) adjust() error {
c.RPCRateBurst = DefaultBurst
}

return nil
if c.Name == "" {
var hostname string
hostname, err = os.Hostname()
WangXiangUSTC marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return terror.ErrMasterGetHostnameFail.Delegate(err)
}
c.Name = fmt.Sprintf("%s-%s", defaultNamePrefix, hostname)
}

if c.DataDir == "" {
c.DataDir = fmt.Sprintf("%s.%s", defaultDataDirPrefix, c.Name)
}

if c.PeerUrls == "" {
c.PeerUrls = defaultPeerUrls
}

if c.AdvertisePeerUrls == "" {
c.AdvertisePeerUrls = defaultPeerUrls
}

if c.InitialCluster == "" {
items := strings.Split(c.AdvertisePeerUrls, ",")
for i, item := range items {
items[i] = fmt.Sprintf("%s=%s", c.Name, item)
}
c.InitialCluster = strings.Join(items, ",")
}

_, err = c.genEmbedEtcdConfig() // verify embed etcd config
return err
}

// UpdateConfigFile write config to local file
Expand All @@ -228,3 +288,62 @@ func (c *Config) Reload() error {

return c.adjust()
}

// genEmbedEtcdConfig generates the configuration needed by embed etcd.
func (c *Config) genEmbedEtcdConfig() (*embed.Config, error) {
cfg := embed.NewConfig()
cfg.Name = c.Name
cfg.Dir = c.DataDir

// reuse the previous master-addr as the client listening URL.
cURL, err := parseURLs(c.MasterAddr)
if err != nil {
return nil, terror.ErrMasterGenEmbedEtcdConfigFail.Delegate(err, "invalid master-addr")
}
cfg.LCUrls = cURL
cfg.ACUrls = cURL

cfg.LPUrls, err = parseURLs(c.PeerUrls)
if err != nil {
return nil, terror.ErrMasterGenEmbedEtcdConfigFail.Delegate(err, "invalid peer-urls")
}

cfg.APUrls, err = parseURLs(c.AdvertisePeerUrls)
if err != nil {
return nil, terror.ErrMasterGenEmbedEtcdConfigFail.Delegate(err, "invalid advertise-peer-urls")
}

cfg.InitialCluster = c.InitialCluster

return cfg, nil
}

// parseURLs parse a string into multiple urls.
// if the URL in the string without protocol scheme, use `http` as the default.
// if no IP exists in the address, `0.0.0.0` is used.
func parseURLs(s string) ([]url.URL, error) {
if s == "" {
return nil, nil
}

items := strings.Split(s, ",")
urls := make([]url.URL, 0, len(items))
for _, item := range items {
u, err := url.Parse(item)
// tolerate valid `master-addr`, but invalid URL format, like:
// `:8261`: missing protocol scheme
// `127.0.0.1:8261`: first path segment in URL cannot contain colon
if err != nil && (strings.Contains(err.Error(), "missing protocol scheme") ||
strings.Contains(err.Error(), "first path segment in URL cannot contain colon")) {
WangXiangUSTC marked this conversation as resolved.
Show resolved Hide resolved
u, err = url.Parse("http://" + item)
}
if err != nil {
return nil, terror.ErrMasterParseURLFail.Delegate(err, item)
}
if strings.Index(u.Host, ":") == 0 {
u.Host = "0.0.0.0" + u.Host
}
urls = append(urls, *u)
}
return urls, nil
}
Loading