Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

dm-master: join a new member into an existing cluster #350

Merged
merged 34 commits into from
Nov 14, 2019
Merged
Show file tree
Hide file tree
Changes from 33 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
9893157
*: combine etcd server, gRPC API server and other HTTP servers
csuzhangxc Nov 2, 2019
025a3a8
*: fix tests; refine config item
csuzhangxc Nov 4, 2019
466550b
*: add test case for `parseURLs`
csuzhangxc Nov 4, 2019
f78762c
*: refine config; add more test cases
csuzhangxc Nov 4, 2019
3ad9ff2
tests: fix CI
csuzhangxc Nov 5, 2019
8c59390
Merge branch 'master' into dm-master-owner
csuzhangxc Nov 5, 2019
675a1c2
master: add more test cases for embed etcd config
csuzhangxc Nov 5, 2019
625a891
master: refine API tests
csuzhangxc Nov 5, 2019
cfb0096
master: refine code
csuzhangxc Nov 5, 2019
eb72b15
Merge branch 'master' into dm-master-owner
csuzhangxc Nov 6, 2019
6dfbe57
master: update sample config items oder
csuzhangxc Nov 6, 2019
3f5beac
Merge remote-tracking branch 'origin/dm-master-owner' into dm-master-…
csuzhangxc Nov 6, 2019
3d664b4
master: refine code
csuzhangxc Nov 6, 2019
139ef1a
*: prepare join embed etcd cluster
csuzhangxc Nov 7, 2019
31436e8
master: address comments
csuzhangxc Nov 11, 2019
fad73c9
*: add some tests for prepareJoinEtcd
csuzhangxc Nov 11, 2019
aee2173
*: add more tests for prepareJoinEtcd; tiny fix `AdvertisePeerUrls` a…
csuzhangxc Nov 12, 2019
abec6e2
master: address comment
csuzhangxc Nov 12, 2019
a67bcf9
master: address comments
csuzhangxc Nov 12, 2019
729a1ca
Merge remote-tracking branch 'remotes/origin/dm-master-owner' into dm…
csuzhangxc Nov 12, 2019
017f8e7
Merge remote-tracking branch 'remotes/origin/master' into dm-master-join
csuzhangxc Nov 12, 2019
6255adc
master: fix merge
csuzhangxc Nov 12, 2019
d30a8a7
*: go mod tidy
csuzhangxc Nov 12, 2019
aacb1ed
master: use random port for testing
csuzhangxc Nov 12, 2019
2a042e3
terror: update error message
csuzhangxc Nov 12, 2019
a388e33
master: update sample config
csuzhangxc Nov 12, 2019
6fa6bf2
master: update config test
csuzhangxc Nov 12, 2019
cdc9ef3
master: add more tests for prepareJoinEtcd
csuzhangxc Nov 12, 2019
5fd202f
master: join member into existing cluster
csuzhangxc Nov 12, 2019
ad716a2
Merge branch 'master' into dm-master-join
IANTHEREAL Nov 13, 2019
dcda7b2
master: address comments
csuzhangxc Nov 13, 2019
c9a413f
master: address comments
csuzhangxc Nov 13, 2019
0610cb6
Merge branch 'master' into dm-master-join
csuzhangxc Nov 14, 2019
b0f5332
master: address comments
csuzhangxc Nov 14, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions _utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -314,9 +314,10 @@ ErrMasterOperRequestTimeout,[code=38033:class=dm-master:scope=internal:level=hig
ErrMasterHandleHTTPApis,[code=38034:class=dm-master:scope=internal:level=high],"serve http apis to grpc"
ErrMasterHostPortNotValid,[code=38035:class=dm-master:scope=internal:level=high],"host:port '%s' not valid"
ErrMasterGetHostnameFail,[code=38036:class=dm-master:scope=internal:level=high],"get hostname fail"
ErrMasterGenEmbedEtcdConfigFail,[code=38037:class=dm-master:scope=internal:level=high],"generate config item %s for embed etcd fail"
ErrMasterStartEmbedEtcdFail,[code=38038:class=dm-master:scope=internal:level=high],"start embed etcd fail"
ErrMasterParseURLFail,[code=38039:class=dm-master:scope=internal:level=high],"parse URL %s fail"
ErrMasterGenEmbedEtcdConfigFail,[code=38037:class=dm-master:scope=internal:level=high],"fail to generate config item %s for embed etcd"
ErrMasterStartEmbedEtcdFail,[code=38038:class=dm-master:scope=internal:level=high],"fail to start embed etcd"
ErrMasterParseURLFail,[code=38039:class=dm-master:scope=internal:level=high],"fail to parse URL %s"
ErrMasterJoinEmbedEtcdFail,[code=38040:class=dm-master:scope=internal:level=high],"fail to join embed etcd: %s"
ErrWorkerParseFlagSet,[code=40001:class=dm-worker:scope=internal:level=medium],"parse dm-worker config flag set"
ErrWorkerInvalidFlag,[code=40002:class=dm-worker:scope=internal:level=medium],"'%s' is an invalid flag"
ErrWorkerDecodeConfigFromFile,[code=40003:class=dm-worker:scope=internal:level=medium],"toml decode file"
Expand Down
29 changes: 19 additions & 10 deletions dm/master/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,11 @@ import (
)

const (
defaultRPCTimeout = "30s"
defaultNamePrefix = "dm-master"
defaultDataDirPrefix = "default"
defaultPeerUrls = "http://127.0.0.1:8291"
defaultRPCTimeout = "30s"
defaultNamePrefix = "dm-master"
defaultDataDirPrefix = "default"
defaultPeerUrls = "http://127.0.0.1:8291"
defaultInitialClusterState = embed.ClusterStateFlagNew
)

// SampleConfigFile is sample config file of dm-master
Expand All @@ -65,6 +66,7 @@ func NewConfig() *Config {
fs.StringVar(&cfg.InitialCluster, "initial-cluster", "", fmt.Sprintf("initial cluster configuration for bootstrapping, e,g. dm-master=%s", defaultPeerUrls))
fs.StringVar(&cfg.PeerUrls, "peer-urls", defaultPeerUrls, "URLs for peer traffic")
fs.StringVar(&cfg.AdvertisePeerUrls, "advertise-peer-urls", "", `advertise URLs for peer traffic (default "${peer-urls}")`)
fs.StringVar(&cfg.Join, "join", "", `join to an existing cluster (usage: cluster's "${advertise-client-urls}"`)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems don't have config named advertise-client-urls

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a but, join should be the address of clients (endpoints), but use peer address before, fixed in b0f5332.


return cfg
}
Expand Down Expand Up @@ -108,11 +110,13 @@ type Config struct {
// etcd relative config items
// NOTE: we use `MasterAddr` to generate `ClientUrls` and `AdvertiseClientUrls`
// NOTE: more items will be add when adding leader election
Name string `toml:"name" json:"name"`
DataDir string `toml:"data-dir" json:"data-dir"`
PeerUrls string `toml:"peer-urls" json:"peer-urls"`
AdvertisePeerUrls string `toml:"advertise-peer-urls" json:"advertise-peer-urls"`
InitialCluster string `toml:"initial-cluster" json:"initial-cluster"`
Name string `toml:"name" json:"name"`
DataDir string `toml:"data-dir" json:"data-dir"`
PeerUrls string `toml:"peer-urls" json:"peer-urls"`
AdvertisePeerUrls string `toml:"advertise-peer-urls" json:"advertise-peer-urls"`
InitialCluster string `toml:"initial-cluster" json:"initial-cluster"`
InitialClusterState string `toml:"initial-cluster-state" json:"initial-cluster-state"`
Join string `toml:"join" json:"join"`

printVersion bool
printSampleConfig bool
Expand Down Expand Up @@ -250,7 +254,7 @@ func (c *Config) adjust() error {
}

if c.AdvertisePeerUrls == "" {
c.AdvertisePeerUrls = defaultPeerUrls
c.AdvertisePeerUrls = c.PeerUrls
}

if c.InitialCluster == "" {
Expand All @@ -261,6 +265,10 @@ func (c *Config) adjust() error {
c.InitialCluster = strings.Join(items, ",")
}

if c.InitialClusterState == "" {
c.InitialClusterState = defaultInitialClusterState
}

_, err = c.genEmbedEtcdConfig() // verify embed etcd config
return err
}
Expand Down Expand Up @@ -314,6 +322,7 @@ func (c *Config) genEmbedEtcdConfig() (*embed.Config, error) {
}

cfg.InitialCluster = c.InitialCluster
cfg.ClusterState = c.InitialClusterState

return cfg, nil
}
Expand Down
5 changes: 5 additions & 0 deletions dm/master/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

capturer "github.com/kami-zh/go-capturer"
"github.com/pingcap/check"
"go.etcd.io/etcd/embed"

"github.com/pingcap/dm/pkg/terror"
)
Expand Down Expand Up @@ -133,6 +134,8 @@ func (t *testConfigSuite) TestConfig(c *check.C) {
c.Assert(cfg.PeerUrls, check.Equals, peerURLs)
c.Assert(cfg.AdvertisePeerUrls, check.Equals, advertisePeerURLs)
c.Assert(cfg.InitialCluster, check.Equals, initialCluster)
c.Assert(cfg.InitialClusterState, check.Equals, embed.ClusterStateFlagNew)
c.Assert(cfg.Join, check.Equals, "")
c.Assert(cfg.DeployMap, check.DeepEquals, deployMap)
c.Assert(cfg.String(), check.Matches, fmt.Sprintf("{.*master-addr\":\"%s\".*}", masterAddr))
}
Expand Down Expand Up @@ -228,6 +231,7 @@ func (t *testConfigSuite) TestGenEmbedEtcdConfig(c *check.C) {

cfg1 := NewConfig()
cfg1.MasterAddr = ":8261"
cfg1.InitialClusterState = embed.ClusterStateFlagExisting
c.Assert(cfg1.adjust(), check.IsNil)
etcdCfg, err := cfg1.genEmbedEtcdConfig()
c.Assert(err, check.IsNil)
Expand All @@ -238,6 +242,7 @@ func (t *testConfigSuite) TestGenEmbedEtcdConfig(c *check.C) {
c.Assert(etcdCfg.LPUrls, check.DeepEquals, []url.URL{{Scheme: "http", Host: "127.0.0.1:8291"}})
c.Assert(etcdCfg.APUrls, check.DeepEquals, []url.URL{{Scheme: "http", Host: "127.0.0.1:8291"}})
c.Assert(etcdCfg.InitialCluster, check.DeepEquals, fmt.Sprintf("dm-master-%s=http://127.0.0.1:8291", hostname))
c.Assert(etcdCfg.ClusterState, check.Equals, embed.ClusterStateFlagExisting)

cfg2 := *cfg1
cfg2.MasterAddr = "127.0.0.1\n:8261"
Expand Down
3 changes: 3 additions & 0 deletions dm/master/dm-master.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ advertise-peer-urls = "http://127.0.0.1:8291"
# initial cluster configuration for bootstrapping, e,g. dm-master=http://127.0.0.1:8291
initial-cluster = "dm-master=http://127.0.0.1:8291"

# Join to an existing DM-master cluster, a string of existing cluster's endpoints.
join = ""

# rpc configuration
#
# rpc timeout is a positive number plus time unit. we use golang standard time
Expand Down
139 changes: 139 additions & 0 deletions dm/master/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,29 @@
package master

import (
"fmt"
"io/ioutil"
"net/http"
"os"
"path/filepath"
"strings"
"time"

"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/embed"
"go.uber.org/zap"
"google.golang.org/grpc"

"github.com/pingcap/dm/pkg/etcdutil"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/terror"
)

const (
// time waiting for etcd to be started
etcdStartTimeout = time.Minute
// privateDirMode grants owner to make/remove files inside the directory.
privateDirMode os.FileMode = 0700
)

// startEtcd starts an embedded etcd server.
Expand Down Expand Up @@ -59,3 +70,131 @@ func startEtcd(masterCfg *Config,
}
return e, nil
}

// prepareJoinEtcd prepares config needed to join an existing cluster.
// learn from https://github.com/pingcap/pd/blob/37efcb05f397f26c70cda8dd44acaa3061c92159/server/join/join.go#L44.
//
// when setting `initial-cluster` explicitly to bootstrap a new cluster:
// - if local persistent data exist, just restart the previous cluster (in fact, it's not bootstrapping).
// - if local persistent data not exist, just bootstrap the cluster as a new cluster.
//
// when setting `join` to join an existing cluster (without `initial-cluster` set):
// - if local persistent data exists (in fact, it's not join):
// - just restart if `member` already exists (already joined before)
// - read `initial-cluster` back from local persistent data to restart (just like bootstrapping)
// - if local persistent data not exist:
// 1. fetch member list from the cluster to check if we can join now.
// 2. call `member add` to add the member info into the cluster.
// 3. generate config for join (`initial-cluster` and `initial-cluster-state`).
// 4. save `initial-cluster` in local persistent data for later restarting.
//
// NOTE: A member can't join to another cluster after it has joined a previous one.
func prepareJoinEtcd(cfg *Config) error {
// no need to join
if cfg.Join == "" {
return nil
}

// try to join self, invalid
if cfg.Join == cfg.AdvertisePeerUrls {
return terror.ErrMasterJoinEmbedEtcdFail.Generate(fmt.Sprintf("join self %s is forbidden", cfg.Join))
}

// restart with previous data, no `InitialCluster` need to set
if isDataExist(filepath.Join(cfg.DataDir, "member")) {
cfg.InitialCluster = ""
cfg.InitialClusterState = embed.ClusterStateFlagExisting
return nil
}

// join with persistent data
joinFP := filepath.Join(cfg.DataDir, "join")
if s, err := ioutil.ReadFile(joinFP); err != nil {
if !os.IsNotExist(err) {
return terror.ErrMasterJoinEmbedEtcdFail.Delegate(err, "read persistent join data")
}
} else {
cfg.InitialCluster = strings.TrimSpace(string(s))
Copy link
Collaborator

@IANTHEREAL IANTHEREAL Nov 13, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please adding a log here shows that we use data of dir/join as cfg. InitialCluster .

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added in c9a413f.

cfg.InitialClusterState = embed.ClusterStateFlagExisting
log.L().Info("using persistent join data", zap.String("file", joinFP), zap.String("data", cfg.InitialCluster))
return nil
}

// if without previous data, we need a client to contact with the existing cluster.
client, err := clientv3.New(clientv3.Config{
Endpoints: strings.Split(cfg.Join, ","),
DialTimeout: etcdutil.DefaultDialTimeout,
})
if err != nil {
return terror.ErrMasterJoinEmbedEtcdFail.Delegate(err, fmt.Sprintf("create etcd client for %s", cfg.Join))
}
defer client.Close()

// `member list`
listResp, err := etcdutil.ListMembers(client)
if err != nil {
return terror.ErrMasterJoinEmbedEtcdFail.Delegate(err, fmt.Sprintf("list member for %s", cfg.Join))
}

// check members
for _, m := range listResp.Members {
if m.Name == "" {
return terror.ErrMasterJoinEmbedEtcdFail.Generate("there is a member that has not joined successfully")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this means other member joined failed, but why prevent this member join? And how to fix this error?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add some comments to explain it, and update the error message in b0f5332.

later maybe we can embed etcd client in dmctl?

}
if m.Name == cfg.Name {
// a failed DM-master re-joins the previous cluster.
return terror.ErrMasterJoinEmbedEtcdFail.Generate(fmt.Sprintf("missing data or joining a duplicate member %s", m.Name))
}
}

// `member add`, a new/deleted DM-master joins to an existing cluster.
addResp, err := etcdutil.AddMember(client, strings.Split(cfg.AdvertisePeerUrls, ","))
if err != nil {
return terror.ErrMasterJoinEmbedEtcdFail.Delegate(err, fmt.Sprintf("add member %s", cfg.AdvertisePeerUrls))
}

// generate `--initial-cluster`
ms := make([]string, 0, len(addResp.Members))
for _, m := range addResp.Members {
name := m.Name
if m.ID == addResp.Member.ID {
// the member only called `member add`,
// but has not started the process to complete the join should have an empty name.
// so, we use the `name` in config instead.
name = cfg.Name
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would m.Name be empty?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can be empty, I add some explanation in dcda7b2.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if name is empty, will it generate err at L142?

}
if name == "" {
return terror.ErrMasterJoinEmbedEtcdFail.Generate("there is a member that has not joined successfully")
}
for _, url := range m.PeerURLs {
ms = append(ms, fmt.Sprintf("%s=%s", name, url))
}
}
cfg.InitialCluster = strings.Join(ms, ",")
cfg.InitialClusterState = embed.ClusterStateFlagExisting

// save `--initial-cluster` in persist data
if err = os.MkdirAll(cfg.DataDir, privateDirMode); err != nil && !os.IsExist(err) {
return terror.ErrMasterJoinEmbedEtcdFail.Delegate(err, "make directory")
}
if err = ioutil.WriteFile(joinFP, []byte(cfg.InitialCluster), privateDirMode); err != nil {
return terror.ErrMasterJoinEmbedEtcdFail.Delegate(err, "write persistent join data")
}

return nil
}

// isDataExist returns whether the directory is empty (with data)
func isDataExist(d string) bool {
dir, err := os.Open(d)
if err != nil {
return false
}
defer dir.Close()

names, err := dir.Readdirnames(1) // read only one is enough
if err != nil {
return false
}
return len(names) != 0
}
Loading