Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

dm-master: join a new member into an existing cluster #350

Merged
merged 34 commits into from
Nov 14, 2019
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
9893157
*: combine etcd server, gRPC API server and other HTTP servers
csuzhangxc Nov 2, 2019
025a3a8
*: fix tests; refine config item
csuzhangxc Nov 4, 2019
466550b
*: add test case for `parseURLs`
csuzhangxc Nov 4, 2019
f78762c
*: refine config; add more test cases
csuzhangxc Nov 4, 2019
3ad9ff2
tests: fix CI
csuzhangxc Nov 5, 2019
8c59390
Merge branch 'master' into dm-master-owner
csuzhangxc Nov 5, 2019
675a1c2
master: add more test cases for embed etcd config
csuzhangxc Nov 5, 2019
625a891
master: refine API tests
csuzhangxc Nov 5, 2019
cfb0096
master: refine code
csuzhangxc Nov 5, 2019
eb72b15
Merge branch 'master' into dm-master-owner
csuzhangxc Nov 6, 2019
6dfbe57
master: update sample config items oder
csuzhangxc Nov 6, 2019
3f5beac
Merge remote-tracking branch 'origin/dm-master-owner' into dm-master-…
csuzhangxc Nov 6, 2019
3d664b4
master: refine code
csuzhangxc Nov 6, 2019
139ef1a
*: prepare join embed etcd cluster
csuzhangxc Nov 7, 2019
31436e8
master: address comments
csuzhangxc Nov 11, 2019
fad73c9
*: add some tests for prepareJoinEtcd
csuzhangxc Nov 11, 2019
aee2173
*: add more tests for prepareJoinEtcd; tiny fix `AdvertisePeerUrls` a…
csuzhangxc Nov 12, 2019
abec6e2
master: address comment
csuzhangxc Nov 12, 2019
a67bcf9
master: address comments
csuzhangxc Nov 12, 2019
729a1ca
Merge remote-tracking branch 'remotes/origin/dm-master-owner' into dm…
csuzhangxc Nov 12, 2019
017f8e7
Merge remote-tracking branch 'remotes/origin/master' into dm-master-join
csuzhangxc Nov 12, 2019
6255adc
master: fix merge
csuzhangxc Nov 12, 2019
d30a8a7
*: go mod tidy
csuzhangxc Nov 12, 2019
aacb1ed
master: use random port for testing
csuzhangxc Nov 12, 2019
2a042e3
terror: update error message
csuzhangxc Nov 12, 2019
a388e33
master: update sample config
csuzhangxc Nov 12, 2019
6fa6bf2
master: update config test
csuzhangxc Nov 12, 2019
cdc9ef3
master: add more tests for prepareJoinEtcd
csuzhangxc Nov 12, 2019
5fd202f
master: join member into existing cluster
csuzhangxc Nov 12, 2019
ad716a2
Merge branch 'master' into dm-master-join
IANTHEREAL Nov 13, 2019
dcda7b2
master: address comments
csuzhangxc Nov 13, 2019
c9a413f
master: address comments
csuzhangxc Nov 13, 2019
0610cb6
Merge branch 'master' into dm-master-join
csuzhangxc Nov 14, 2019
b0f5332
master: address comments
csuzhangxc Nov 14, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions _utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -314,9 +314,10 @@ ErrMasterOperRequestTimeout,[code=38033:class=dm-master:scope=internal:level=hig
ErrMasterHandleHTTPApis,[code=38034:class=dm-master:scope=internal:level=high],"serve http apis to grpc"
ErrMasterHostPortNotValid,[code=38035:class=dm-master:scope=internal:level=high],"host:port '%s' not valid"
ErrMasterGetHostnameFail,[code=38036:class=dm-master:scope=internal:level=high],"get hostname fail"
ErrMasterGenEmbedEtcdConfigFail,[code=38037:class=dm-master:scope=internal:level=high],"generate config item %s for embed etcd fail"
ErrMasterStartEmbedEtcdFail,[code=38038:class=dm-master:scope=internal:level=high],"start embed etcd fail"
ErrMasterParseURLFail,[code=38039:class=dm-master:scope=internal:level=high],"parse URL %s fail"
ErrMasterGenEmbedEtcdConfigFail,[code=38037:class=dm-master:scope=internal:level=high],"fail to generate config item %s for embed etcd"
ErrMasterStartEmbedEtcdFail,[code=38038:class=dm-master:scope=internal:level=high],"fail to start embed etcd"
ErrMasterParseURLFail,[code=38039:class=dm-master:scope=internal:level=high],"fail to parse URL %s"
ErrMasterJoinEmbedEtcdFail,[code=38040:class=dm-master:scope=internal:level=high],"fail to join embed etcd: %s"
ErrWorkerParseFlagSet,[code=40001:class=dm-worker:scope=internal:level=medium],"parse dm-worker config flag set"
ErrWorkerInvalidFlag,[code=40002:class=dm-worker:scope=internal:level=medium],"'%s' is an invalid flag"
ErrWorkerDecodeConfigFromFile,[code=40003:class=dm-worker:scope=internal:level=medium],"toml decode file"
Expand Down
29 changes: 19 additions & 10 deletions dm/master/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,11 @@ import (
)

const (
defaultRPCTimeout = "30s"
defaultNamePrefix = "dm-master"
defaultDataDirPrefix = "default"
defaultPeerUrls = "http://127.0.0.1:8291"
defaultRPCTimeout = "30s"
defaultNamePrefix = "dm-master"
defaultDataDirPrefix = "default"
defaultPeerUrls = "http://127.0.0.1:8291"
defaultInitialClusterState = embed.ClusterStateFlagNew
)

// SampleConfigFile is sample config file of dm-master
Expand All @@ -65,6 +66,7 @@ func NewConfig() *Config {
fs.StringVar(&cfg.InitialCluster, "initial-cluster", "", fmt.Sprintf("initial cluster configuration for bootstrapping, e,g. dm-master=%s", defaultPeerUrls))
fs.StringVar(&cfg.PeerUrls, "peer-urls", defaultPeerUrls, "URLs for peer traffic")
fs.StringVar(&cfg.AdvertisePeerUrls, "advertise-peer-urls", "", `advertise URLs for peer traffic (default "${peer-urls}")`)
fs.StringVar(&cfg.Join, "join", "", `join to an existing cluster (usage: cluster's "${advertise-client-urls}"`)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems don't have config named advertise-client-urls

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a but, join should be the address of clients (endpoints), but use peer address before, fixed in b0f5332.


return cfg
}
Expand Down Expand Up @@ -108,11 +110,13 @@ type Config struct {
// etcd relative config items
// NOTE: we use `MasterAddr` to generate `ClientUrls` and `AdvertiseClientUrls`
// NOTE: more items will be add when adding leader election
Name string `toml:"name" json:"name"`
DataDir string `toml:"data-dir" json:"data-dir"`
PeerUrls string `toml:"peer-urls" json:"peer-urls"`
AdvertisePeerUrls string `toml:"advertise-peer-urls" json:"advertise-peer-urls"`
InitialCluster string `toml:"initial-cluster" json:"initial-cluster"`
Name string `toml:"name" json:"name"`
DataDir string `toml:"data-dir" json:"data-dir"`
PeerUrls string `toml:"peer-urls" json:"peer-urls"`
AdvertisePeerUrls string `toml:"advertise-peer-urls" json:"advertise-peer-urls"`
InitialCluster string `toml:"initial-cluster" json:"initial-cluster"`
InitialClusterState string `toml:"initial-cluster-state" json:"initial-cluster-state"`
Join string `toml:"join" json:"join"`

printVersion bool
printSampleConfig bool
Expand Down Expand Up @@ -250,7 +254,7 @@ func (c *Config) adjust() error {
}

if c.AdvertisePeerUrls == "" {
c.AdvertisePeerUrls = defaultPeerUrls
c.AdvertisePeerUrls = c.PeerUrls
}

if c.InitialCluster == "" {
Expand All @@ -261,6 +265,10 @@ func (c *Config) adjust() error {
c.InitialCluster = strings.Join(items, ",")
}

if c.InitialClusterState == "" {
c.InitialClusterState = defaultInitialClusterState
}

_, err = c.genEmbedEtcdConfig() // verify embed etcd config
return err
}
Expand Down Expand Up @@ -314,6 +322,7 @@ func (c *Config) genEmbedEtcdConfig() (*embed.Config, error) {
}

cfg.InitialCluster = c.InitialCluster
cfg.ClusterState = c.InitialClusterState

return cfg, nil
}
Expand Down
5 changes: 5 additions & 0 deletions dm/master/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

capturer "github.com/kami-zh/go-capturer"
"github.com/pingcap/check"
"go.etcd.io/etcd/embed"

"github.com/pingcap/dm/pkg/terror"
)
Expand Down Expand Up @@ -133,6 +134,8 @@ func (t *testConfigSuite) TestConfig(c *check.C) {
c.Assert(cfg.PeerUrls, check.Equals, peerURLs)
c.Assert(cfg.AdvertisePeerUrls, check.Equals, advertisePeerURLs)
c.Assert(cfg.InitialCluster, check.Equals, initialCluster)
c.Assert(cfg.InitialClusterState, check.Equals, embed.ClusterStateFlagNew)
c.Assert(cfg.Join, check.Equals, "")
c.Assert(cfg.DeployMap, check.DeepEquals, deployMap)
c.Assert(cfg.String(), check.Matches, fmt.Sprintf("{.*master-addr\":\"%s\".*}", masterAddr))
}
Expand Down Expand Up @@ -228,6 +231,7 @@ func (t *testConfigSuite) TestGenEmbedEtcdConfig(c *check.C) {

cfg1 := NewConfig()
cfg1.MasterAddr = ":8261"
cfg1.InitialClusterState = embed.ClusterStateFlagExisting
c.Assert(cfg1.adjust(), check.IsNil)
etcdCfg, err := cfg1.genEmbedEtcdConfig()
c.Assert(err, check.IsNil)
Expand All @@ -238,6 +242,7 @@ func (t *testConfigSuite) TestGenEmbedEtcdConfig(c *check.C) {
c.Assert(etcdCfg.LPUrls, check.DeepEquals, []url.URL{{Scheme: "http", Host: "127.0.0.1:8291"}})
c.Assert(etcdCfg.APUrls, check.DeepEquals, []url.URL{{Scheme: "http", Host: "127.0.0.1:8291"}})
c.Assert(etcdCfg.InitialCluster, check.DeepEquals, fmt.Sprintf("dm-master-%s=http://127.0.0.1:8291", hostname))
c.Assert(etcdCfg.ClusterState, check.Equals, embed.ClusterStateFlagExisting)

cfg2 := *cfg1
cfg2.MasterAddr = "127.0.0.1\n:8261"
Expand Down
9 changes: 9 additions & 0 deletions dm/master/dm-master.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,15 @@ advertise-peer-urls = "http://127.0.0.1:8291"
# initial cluster configuration for bootstrapping, e,g. dm-master=http://127.0.0.1:8291
initial-cluster = "dm-master=http://127.0.0.1:8291"

# Initial cluster state ("new" or "existing").
# Set to "new" for all members present during initial static or DNS bootstrapping.
# If this option is set to "existing", DM-master will attempt to join the existing cluster.
# If the wrong value is set, DM-master will attempt to start but fail safely.
initial-cluster-state = "new"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when dm-master restart, need update it to "existing"?

Copy link
Collaborator

@IANTHEREAL IANTHEREAL Nov 13, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also have doubts about the behavior of initial start, join start, and also the restart later, I think we better have a complete discussion. ( I don't the behavior of PD

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed this section in sample config, and add some comments in dcda7b2.


# Join to an existing pd cluster, a string of existing cluster's endpoints.
WangXiangUSTC marked this conversation as resolved.
Show resolved Hide resolved
join = ""

# rpc configuration
#
# rpc timeout is a positive number plus time unit. we use golang standard time
Expand Down
117 changes: 117 additions & 0 deletions dm/master/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,27 @@
package master

import (
"fmt"
"io/ioutil"
"net/http"
"os"
"path/filepath"
"strings"
"time"

"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/embed"
"google.golang.org/grpc"

"github.com/pingcap/dm/pkg/etcdutil"
"github.com/pingcap/dm/pkg/terror"
)

const (
// time waiting for etcd to be started
etcdStartTimeout = time.Minute
// privateDirMode grants owner to make/remove files inside the directory.
privateDirMode os.FileMode = 0700
)

// startEtcd starts an embedded etcd server.
Expand Down Expand Up @@ -59,3 +68,111 @@ func startEtcd(masterCfg *Config,
}
return e, nil
}

// prepareJoinEtcd prepares config needed to join an existing cluster.
// learn from https://github.com/pingcap/pd/blob/37efcb05f397f26c70cda8dd44acaa3061c92159/server/join/join.go#L44.
func prepareJoinEtcd(cfg *Config) error {
// no need to join
if cfg.Join == "" {
return nil
}

// try to join self, invalid
if cfg.Join == cfg.AdvertisePeerUrls {
return terror.ErrMasterJoinEmbedEtcdFail.Generate(fmt.Sprintf("join self %s is forbidden", cfg.Join))
}

// join with persistent data
joinFP := filepath.Join(cfg.DataDir, "join")
if _, err := os.Stat(joinFP); !os.IsNotExist(err) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe err is not nil but not NotExist error, do we need handle it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if not NotExist, ioutil.ReadFile in the next line should trigger it again, then we will handle it.
but I will re-write this block to make it clearer.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not NotExist == Exist 😆

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm re-writing, don't worry.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

re-wrote in dcda7b2.

s, err := ioutil.ReadFile(joinFP)
if err != nil {
return terror.ErrMasterJoinEmbedEtcdFail.Delegate(err, "read persistent join data")
}
cfg.InitialCluster = strings.TrimSpace(string(s))
Copy link
Collaborator

@IANTHEREAL IANTHEREAL Nov 13, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please adding a log here shows that we use data of dir/join as cfg. InitialCluster .

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added in c9a413f.

cfg.InitialClusterState = embed.ClusterStateFlagExisting
return nil
}

// restart with previous data, no `InitialCluster` need to set
if isDataExist(filepath.Join(cfg.DataDir, "member")) {
Copy link
Collaborator

@IANTHEREAL IANTHEREAL Nov 13, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we check dir/member firstly? if it add member successfully, does the data exist in dir/member?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeh, it's better. addressed in dcda7b2.

cfg.InitialCluster = ""
cfg.InitialClusterState = embed.ClusterStateFlagExisting
return nil
}

// if without previous data, we need a client to contact with the existing cluster.
client, err := clientv3.New(clientv3.Config{
Endpoints: strings.Split(cfg.Join, ","),
DialTimeout: etcdutil.DefaultDialTimeout,
})
if err != nil {
return terror.ErrMasterJoinEmbedEtcdFail.Delegate(err, fmt.Sprintf("create etcd client for %s", cfg.Join))
}
defer client.Close()

// `member list`
listResp, err := etcdutil.ListMembers(client)
if err != nil {
return terror.ErrMasterJoinEmbedEtcdFail.Delegate(err, fmt.Sprintf("list member for %s", cfg.Join))
}

// check members
for _, m := range listResp.Members {
if m.Name == "" {
return terror.ErrMasterJoinEmbedEtcdFail.Generate("there is a member that has not joined successfully")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this means other member joined failed, but why prevent this member join? And how to fix this error?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add some comments to explain it, and update the error message in b0f5332.

later maybe we can embed etcd client in dmctl?

}
if m.Name == cfg.Name {
// a failed DM-master re-joins the previous cluster.
return terror.ErrMasterJoinEmbedEtcdFail.Generate(fmt.Sprintf("missing data or joining a duplicate member %s", m.Name))
}
}

// `member add`, a new/deleted DM-master joins to an existing cluster.
addResp, err := etcdutil.AddMember(client, strings.Split(cfg.AdvertisePeerUrls, ","))
if err != nil {
return terror.ErrMasterJoinEmbedEtcdFail.Delegate(err, fmt.Sprintf("add member %s", cfg.AdvertisePeerUrls))
}

// generate `--initial-cluster`
ms := make([]string, 0, len(addResp.Members))
for _, m := range addResp.Members {
name := m.Name
if m.ID == addResp.Member.ID {
name = cfg.Name
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would m.Name be empty?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can be empty, I add some explanation in dcda7b2.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if name is empty, will it generate err at L142?

}
if name == "" {
return terror.ErrMasterJoinEmbedEtcdFail.Generate("there is a member that has not joined successfully")
}
for _, url := range m.PeerURLs {
ms = append(ms, fmt.Sprintf("%s=%s", name, url))
}
}
cfg.InitialCluster = strings.Join(ms, ",")
cfg.InitialClusterState = embed.ClusterStateFlagExisting

// save `--initial-cluster` in persist data
if err = os.MkdirAll(cfg.DataDir, privateDirMode); err != nil && !os.IsExist(err) {
return terror.ErrMasterJoinEmbedEtcdFail.Delegate(err, "make directory")
}
if err = ioutil.WriteFile(joinFP, []byte(cfg.InitialCluster), privateDirMode); err != nil {
return terror.ErrMasterJoinEmbedEtcdFail.Delegate(err, "write persistent join data")
}

return nil
}

// isDataExist returns whether the directory is empty (with data)
func isDataExist(d string) bool {
dir, err := os.Open(d)
if err != nil {
return false
}
defer dir.Close()

names, err := dir.Readdirnames(1) // read only one is enough
if err != nil {
return false
}
return len(names) != 0
}
Loading