Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
dm-master: join a new member into an existing cluster (#350)
Browse files Browse the repository at this point in the history
  • Loading branch information
csuzhangxc authored Nov 14, 2019
1 parent fcaab27 commit 4abf519
Show file tree
Hide file tree
Showing 13 changed files with 640 additions and 18 deletions.
7 changes: 4 additions & 3 deletions _utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -314,9 +314,10 @@ ErrMasterOperRequestTimeout,[code=38033:class=dm-master:scope=internal:level=hig
ErrMasterHandleHTTPApis,[code=38034:class=dm-master:scope=internal:level=high],"serve http apis to grpc"
ErrMasterHostPortNotValid,[code=38035:class=dm-master:scope=internal:level=high],"host:port '%s' not valid"
ErrMasterGetHostnameFail,[code=38036:class=dm-master:scope=internal:level=high],"get hostname fail"
ErrMasterGenEmbedEtcdConfigFail,[code=38037:class=dm-master:scope=internal:level=high],"generate config item %s for embed etcd fail"
ErrMasterStartEmbedEtcdFail,[code=38038:class=dm-master:scope=internal:level=high],"start embed etcd fail"
ErrMasterParseURLFail,[code=38039:class=dm-master:scope=internal:level=high],"parse URL %s fail"
ErrMasterGenEmbedEtcdConfigFail,[code=38037:class=dm-master:scope=internal:level=high],"fail to generate config item %s for embed etcd"
ErrMasterStartEmbedEtcdFail,[code=38038:class=dm-master:scope=internal:level=high],"fail to start embed etcd"
ErrMasterParseURLFail,[code=38039:class=dm-master:scope=internal:level=high],"fail to parse URL %s"
ErrMasterJoinEmbedEtcdFail,[code=38040:class=dm-master:scope=internal:level=high],"fail to join embed etcd: %s"
ErrWorkerParseFlagSet,[code=40001:class=dm-worker:scope=internal:level=medium],"parse dm-worker config flag set"
ErrWorkerInvalidFlag,[code=40002:class=dm-worker:scope=internal:level=medium],"'%s' is an invalid flag"
ErrWorkerDecodeConfigFromFile,[code=40003:class=dm-worker:scope=internal:level=medium],"toml decode file"
Expand Down
29 changes: 19 additions & 10 deletions dm/master/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,11 @@ import (
)

const (
defaultRPCTimeout = "30s"
defaultNamePrefix = "dm-master"
defaultDataDirPrefix = "default"
defaultPeerUrls = "http://127.0.0.1:8291"
defaultRPCTimeout = "30s"
defaultNamePrefix = "dm-master"
defaultDataDirPrefix = "default"
defaultPeerUrls = "http://127.0.0.1:8291"
defaultInitialClusterState = embed.ClusterStateFlagNew
)

// SampleConfigFile is sample config file of dm-master
Expand All @@ -65,6 +66,7 @@ func NewConfig() *Config {
fs.StringVar(&cfg.InitialCluster, "initial-cluster", "", fmt.Sprintf("initial cluster configuration for bootstrapping, e,g. dm-master=%s", defaultPeerUrls))
fs.StringVar(&cfg.PeerUrls, "peer-urls", defaultPeerUrls, "URLs for peer traffic")
fs.StringVar(&cfg.AdvertisePeerUrls, "advertise-peer-urls", "", `advertise URLs for peer traffic (default "${peer-urls}")`)
fs.StringVar(&cfg.Join, "join", "", `join to an existing cluster (usage: cluster's "${master-addr}" list, e,g. "127.0.0.1:8261,127.0.0.1:18261"`)

return cfg
}
Expand Down Expand Up @@ -108,11 +110,13 @@ type Config struct {
// etcd relative config items
// NOTE: we use `MasterAddr` to generate `ClientUrls` and `AdvertiseClientUrls`
// NOTE: more items will be add when adding leader election
Name string `toml:"name" json:"name"`
DataDir string `toml:"data-dir" json:"data-dir"`
PeerUrls string `toml:"peer-urls" json:"peer-urls"`
AdvertisePeerUrls string `toml:"advertise-peer-urls" json:"advertise-peer-urls"`
InitialCluster string `toml:"initial-cluster" json:"initial-cluster"`
Name string `toml:"name" json:"name"`
DataDir string `toml:"data-dir" json:"data-dir"`
PeerUrls string `toml:"peer-urls" json:"peer-urls"`
AdvertisePeerUrls string `toml:"advertise-peer-urls" json:"advertise-peer-urls"`
InitialCluster string `toml:"initial-cluster" json:"initial-cluster"`
InitialClusterState string `toml:"initial-cluster-state" json:"initial-cluster-state"`
Join string `toml:"join" json:"join"` // cluster's client address (endpoints), not peer address

printVersion bool
printSampleConfig bool
Expand Down Expand Up @@ -250,7 +254,7 @@ func (c *Config) adjust() error {
}

if c.AdvertisePeerUrls == "" {
c.AdvertisePeerUrls = defaultPeerUrls
c.AdvertisePeerUrls = c.PeerUrls
}

if c.InitialCluster == "" {
Expand All @@ -261,6 +265,10 @@ func (c *Config) adjust() error {
c.InitialCluster = strings.Join(items, ",")
}

if c.InitialClusterState == "" {
c.InitialClusterState = defaultInitialClusterState
}

_, err = c.genEmbedEtcdConfig() // verify embed etcd config
return err
}
Expand Down Expand Up @@ -314,6 +322,7 @@ func (c *Config) genEmbedEtcdConfig() (*embed.Config, error) {
}

cfg.InitialCluster = c.InitialCluster
cfg.ClusterState = c.InitialClusterState

return cfg, nil
}
Expand Down
5 changes: 5 additions & 0 deletions dm/master/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

capturer "github.com/kami-zh/go-capturer"
"github.com/pingcap/check"
"go.etcd.io/etcd/embed"

"github.com/pingcap/dm/pkg/terror"
)
Expand Down Expand Up @@ -133,6 +134,8 @@ func (t *testConfigSuite) TestConfig(c *check.C) {
c.Assert(cfg.PeerUrls, check.Equals, peerURLs)
c.Assert(cfg.AdvertisePeerUrls, check.Equals, advertisePeerURLs)
c.Assert(cfg.InitialCluster, check.Equals, initialCluster)
c.Assert(cfg.InitialClusterState, check.Equals, embed.ClusterStateFlagNew)
c.Assert(cfg.Join, check.Equals, "")
c.Assert(cfg.DeployMap, check.DeepEquals, deployMap)
c.Assert(cfg.String(), check.Matches, fmt.Sprintf("{.*master-addr\":\"%s\".*}", masterAddr))
}
Expand Down Expand Up @@ -228,6 +231,7 @@ func (t *testConfigSuite) TestGenEmbedEtcdConfig(c *check.C) {

cfg1 := NewConfig()
cfg1.MasterAddr = ":8261"
cfg1.InitialClusterState = embed.ClusterStateFlagExisting
c.Assert(cfg1.adjust(), check.IsNil)
etcdCfg, err := cfg1.genEmbedEtcdConfig()
c.Assert(err, check.IsNil)
Expand All @@ -238,6 +242,7 @@ func (t *testConfigSuite) TestGenEmbedEtcdConfig(c *check.C) {
c.Assert(etcdCfg.LPUrls, check.DeepEquals, []url.URL{{Scheme: "http", Host: "127.0.0.1:8291"}})
c.Assert(etcdCfg.APUrls, check.DeepEquals, []url.URL{{Scheme: "http", Host: "127.0.0.1:8291"}})
c.Assert(etcdCfg.InitialCluster, check.DeepEquals, fmt.Sprintf("dm-master-%s=http://127.0.0.1:8291", hostname))
c.Assert(etcdCfg.ClusterState, check.Equals, embed.ClusterStateFlagExisting)

cfg2 := *cfg1
cfg2.MasterAddr = "127.0.0.1\n:8261"
Expand Down
3 changes: 3 additions & 0 deletions dm/master/dm-master.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ advertise-peer-urls = "http://127.0.0.1:8291"
# initial cluster configuration for bootstrapping, e,g. dm-master=http://127.0.0.1:8291
initial-cluster = "dm-master=http://127.0.0.1:8291"

# Join to an existing DM-master cluster, a string of existing cluster's endpoints.
join = ""

# rpc configuration
#
# rpc timeout is a positive number plus time unit. we use golang standard time
Expand Down
148 changes: 148 additions & 0 deletions dm/master/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,29 @@
package master

import (
"fmt"
"io/ioutil"
"net/http"
"os"
"path/filepath"
"strings"
"time"

"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/embed"
"go.uber.org/zap"
"google.golang.org/grpc"

"github.com/pingcap/dm/pkg/etcdutil"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/terror"
)

const (
// time waiting for etcd to be started
etcdStartTimeout = time.Minute
// privateDirMode grants owner to make/remove files inside the directory.
privateDirMode os.FileMode = 0700
)

// startEtcd starts an embedded etcd server.
Expand Down Expand Up @@ -59,3 +70,140 @@ func startEtcd(masterCfg *Config,
}
return e, nil
}

// prepareJoinEtcd prepares config needed to join an existing cluster.
// learn from https://github.com/pingcap/pd/blob/37efcb05f397f26c70cda8dd44acaa3061c92159/server/join/join.go#L44.
//
// when setting `initial-cluster` explicitly to bootstrap a new cluster:
// - if local persistent data exist, just restart the previous cluster (in fact, it's not bootstrapping).
// - if local persistent data not exist, just bootstrap the cluster as a new cluster.
//
// when setting `join` to join an existing cluster (without `initial-cluster` set):
// - if local persistent data exists (in fact, it's not join):
// - just restart if `member` already exists (already joined before)
// - read `initial-cluster` back from local persistent data to restart (just like bootstrapping)
// - if local persistent data not exist:
// 1. fetch member list from the cluster to check if we can join now.
// 2. call `member add` to add the member info into the cluster.
// 3. generate config for join (`initial-cluster` and `initial-cluster-state`).
// 4. save `initial-cluster` in local persistent data for later restarting.
//
// NOTE: A member can't join to another cluster after it has joined a previous one.
func prepareJoinEtcd(cfg *Config) error {
// no need to join
if cfg.Join == "" {
return nil
}

// try to join self, invalid
clientURLs := strings.Split(cfg.Join, ",")
for _, clientURL := range clientURLs {
if clientURL == cfg.MasterAddr {
return terror.ErrMasterJoinEmbedEtcdFail.Generate(fmt.Sprintf("join self %s is forbidden", cfg.Join))
}
}

// restart with previous data, no `InitialCluster` need to set
if isDataExist(filepath.Join(cfg.DataDir, "member")) {
cfg.InitialCluster = ""
cfg.InitialClusterState = embed.ClusterStateFlagExisting
return nil
}

// join with persistent data
joinFP := filepath.Join(cfg.DataDir, "join")
if s, err := ioutil.ReadFile(joinFP); err != nil {
if !os.IsNotExist(err) {
return terror.ErrMasterJoinEmbedEtcdFail.Delegate(err, "read persistent join data")
}
} else {
cfg.InitialCluster = strings.TrimSpace(string(s))
cfg.InitialClusterState = embed.ClusterStateFlagExisting
log.L().Info("using persistent join data", zap.String("file", joinFP), zap.String("data", cfg.InitialCluster))
return nil
}

// if without previous data, we need a client to contact with the existing cluster.
client, err := clientv3.New(clientv3.Config{
Endpoints: strings.Split(cfg.Join, ","),
DialTimeout: etcdutil.DefaultDialTimeout,
})
if err != nil {
return terror.ErrMasterJoinEmbedEtcdFail.Delegate(err, fmt.Sprintf("create etcd client for %s", cfg.Join))
}
defer client.Close()

// `member list`
listResp, err := etcdutil.ListMembers(client)
if err != nil {
return terror.ErrMasterJoinEmbedEtcdFail.Delegate(err, fmt.Sprintf("list member for %s", cfg.Join))
}

// check members
for _, m := range listResp.Members {
if m.Name == "" { // the previous existing member without name (not complete the join operation)
// we can't generate `initial-cluster` correctly with empty member name,
// and if added a member but not started it to complete the join,
// the later join operation may encounter `etcdserver: re-configuration failed due to not enough started members`.
return terror.ErrMasterJoinEmbedEtcdFail.Generate("there is a member that has not joined successfully, continue the join or remove it")
}
if m.Name == cfg.Name {
// a failed DM-master re-joins the previous cluster.
return terror.ErrMasterJoinEmbedEtcdFail.Generate(fmt.Sprintf("missing data or joining a duplicate member %s", m.Name))
}
}

// `member add`, a new/deleted DM-master joins to an existing cluster.
addResp, err := etcdutil.AddMember(client, strings.Split(cfg.AdvertisePeerUrls, ","))
if err != nil {
return terror.ErrMasterJoinEmbedEtcdFail.Delegate(err, fmt.Sprintf("add member %s", cfg.AdvertisePeerUrls))
}

// generate `--initial-cluster`
ms := make([]string, 0, len(addResp.Members))
for _, m := range addResp.Members {
name := m.Name
if m.ID == addResp.Member.ID {
// the member only called `member add`,
// but has not started the process to complete the join should have an empty name.
// so, we use the `name` in config instead.
name = cfg.Name
}
if name == "" {
// this should be checked in the previous `member list` operation if having only one member is join.
// if multi join operations exist, the behavior may be unexpected.
// check again here only to decrease the unexpectedness.
return terror.ErrMasterJoinEmbedEtcdFail.Generate("there is a member that has not joined successfully, continue the join or remove it")
}
for _, url := range m.PeerURLs {
ms = append(ms, fmt.Sprintf("%s=%s", name, url))
}
}
cfg.InitialCluster = strings.Join(ms, ",")
cfg.InitialClusterState = embed.ClusterStateFlagExisting

// save `--initial-cluster` in persist data
if err = os.MkdirAll(cfg.DataDir, privateDirMode); err != nil && !os.IsExist(err) {
return terror.ErrMasterJoinEmbedEtcdFail.Delegate(err, "make directory")
}
if err = ioutil.WriteFile(joinFP, []byte(cfg.InitialCluster), privateDirMode); err != nil {
return terror.ErrMasterJoinEmbedEtcdFail.Delegate(err, "write persistent join data")
}

return nil
}

// isDataExist returns whether the directory is empty (with data)
func isDataExist(d string) bool {
dir, err := os.Open(d)
if err != nil {
return false
}
defer dir.Close()

names, err := dir.Readdirnames(1) // read only one is enough
if err != nil {
return false
}
return len(names) != 0
}
Loading

0 comments on commit 4abf519

Please sign in to comment.