-
Notifications
You must be signed in to change notification settings - Fork 188
dm-master: join a new member into an existing cluster #350
Changes from 31 commits
9893157
025a3a8
466550b
f78762c
3ad9ff2
8c59390
675a1c2
625a891
cfb0096
eb72b15
6dfbe57
3f5beac
3d664b4
139ef1a
31436e8
fad73c9
aee2173
abec6e2
a67bcf9
729a1ca
017f8e7
6255adc
d30a8a7
aacb1ed
2a042e3
a388e33
6fa6bf2
cdc9ef3
5fd202f
ad716a2
dcda7b2
c9a413f
0610cb6
b0f5332
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -14,18 +14,27 @@ | |||||
package master | ||||||
|
||||||
import ( | ||||||
"fmt" | ||||||
"io/ioutil" | ||||||
"net/http" | ||||||
"os" | ||||||
"path/filepath" | ||||||
"strings" | ||||||
"time" | ||||||
|
||||||
"go.etcd.io/etcd/clientv3" | ||||||
"go.etcd.io/etcd/embed" | ||||||
"google.golang.org/grpc" | ||||||
|
||||||
"github.com/pingcap/dm/pkg/etcdutil" | ||||||
"github.com/pingcap/dm/pkg/terror" | ||||||
) | ||||||
|
||||||
const ( | ||||||
// time waiting for etcd to be started | ||||||
etcdStartTimeout = time.Minute | ||||||
// privateDirMode grants owner to make/remove files inside the directory. | ||||||
privateDirMode os.FileMode = 0700 | ||||||
) | ||||||
|
||||||
// startEtcd starts an embedded etcd server. | ||||||
|
@@ -59,3 +68,130 @@ func startEtcd(masterCfg *Config, | |||||
} | ||||||
return e, nil | ||||||
} | ||||||
|
||||||
// prepareJoinEtcd prepares config needed to join an existing cluster. | ||||||
// learn from https://github.com/pingcap/pd/blob/37efcb05f397f26c70cda8dd44acaa3061c92159/server/join/join.go#L44. | ||||||
// | ||||||
// when setting `initial-cluster` explicitly to bootstrap a new cluster: | ||||||
// - if local persistent data exist, just restart the previous cluster (in fact, it's not bootstrapping). | ||||||
// - if local persistent data not exist, just bootstrap the cluster. | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done in c9a413f |
||||||
// | ||||||
// when setting `join` to join an existing cluster (without `initial-cluster` set): | ||||||
// - if local persistent data exists (in fact, it's not join): | ||||||
// - just restart if `member` already exists (already joined before) | ||||||
// - read `initial-cluster` back from local persistent data to restart (just like bootstrapping) | ||||||
// - if local persistent data not exist: | ||||||
// 1. fetch member list from the cluster to check if we can join now. | ||||||
// 2. call `member add` to add the member info into the cluster. | ||||||
// 3. generate config for join (`initial-cluster` and `initial-cluster-state`). | ||||||
// 4. save `initial-cluster` in local persistent data for later restarting. | ||||||
// | ||||||
// NOTE: A member can't join to another cluster after it has joined a previous one. | ||||||
func prepareJoinEtcd(cfg *Config) error { | ||||||
// no need to join | ||||||
if cfg.Join == "" { | ||||||
return nil | ||||||
} | ||||||
|
||||||
// try to join self, invalid | ||||||
if cfg.Join == cfg.AdvertisePeerUrls { | ||||||
return terror.ErrMasterJoinEmbedEtcdFail.Generate(fmt.Sprintf("join self %s is forbidden", cfg.Join)) | ||||||
} | ||||||
|
||||||
// restart with previous data, no `InitialCluster` need to set | ||||||
if isDataExist(filepath.Join(cfg.DataDir, "member")) { | ||||||
cfg.InitialCluster = "" | ||||||
cfg.InitialClusterState = embed.ClusterStateFlagExisting | ||||||
return nil | ||||||
} | ||||||
|
||||||
// join with persistent data | ||||||
joinFP := filepath.Join(cfg.DataDir, "join") | ||||||
if s, err := ioutil.ReadFile(joinFP); err != nil { | ||||||
if !os.IsNotExist(err) { | ||||||
return terror.ErrMasterJoinEmbedEtcdFail.Delegate(err, "read persistent join data") | ||||||
} | ||||||
} else { | ||||||
cfg.InitialCluster = strings.TrimSpace(string(s)) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. please adding a log here shows that we use data of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. added in c9a413f. |
||||||
cfg.InitialClusterState = embed.ClusterStateFlagExisting | ||||||
return nil | ||||||
} | ||||||
|
||||||
// if without previous data, we need a client to contact with the existing cluster. | ||||||
client, err := clientv3.New(clientv3.Config{ | ||||||
Endpoints: strings.Split(cfg.Join, ","), | ||||||
DialTimeout: etcdutil.DefaultDialTimeout, | ||||||
}) | ||||||
if err != nil { | ||||||
return terror.ErrMasterJoinEmbedEtcdFail.Delegate(err, fmt.Sprintf("create etcd client for %s", cfg.Join)) | ||||||
} | ||||||
defer client.Close() | ||||||
|
||||||
// `member list` | ||||||
listResp, err := etcdutil.ListMembers(client) | ||||||
if err != nil { | ||||||
return terror.ErrMasterJoinEmbedEtcdFail.Delegate(err, fmt.Sprintf("list member for %s", cfg.Join)) | ||||||
} | ||||||
|
||||||
// check members | ||||||
for _, m := range listResp.Members { | ||||||
if m.Name == "" { | ||||||
return terror.ErrMasterJoinEmbedEtcdFail.Generate("there is a member that has not joined successfully") | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this means other member joined failed, but why prevent this member join? And how to fix this error? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. add some comments to explain it, and update the error message in b0f5332. later maybe we can embed etcd client in dmctl? |
||||||
} | ||||||
if m.Name == cfg.Name { | ||||||
// a failed DM-master re-joins the previous cluster. | ||||||
return terror.ErrMasterJoinEmbedEtcdFail.Generate(fmt.Sprintf("missing data or joining a duplicate member %s", m.Name)) | ||||||
} | ||||||
} | ||||||
|
||||||
// `member add`, a new/deleted DM-master joins to an existing cluster. | ||||||
addResp, err := etcdutil.AddMember(client, strings.Split(cfg.AdvertisePeerUrls, ",")) | ||||||
if err != nil { | ||||||
return terror.ErrMasterJoinEmbedEtcdFail.Delegate(err, fmt.Sprintf("add member %s", cfg.AdvertisePeerUrls)) | ||||||
} | ||||||
|
||||||
// generate `--initial-cluster` | ||||||
ms := make([]string, 0, len(addResp.Members)) | ||||||
for _, m := range addResp.Members { | ||||||
name := m.Name | ||||||
if m.ID == addResp.Member.ID { | ||||||
// the member only called `member add`, | ||||||
// but has not started the process to complete the join should have an empty name. | ||||||
// so, we use the `name` in config instead. | ||||||
name = cfg.Name | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. would There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It can be empty, I add some explanation in dcda7b2. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if name is empty, will it generate err at L142? |
||||||
} | ||||||
if name == "" { | ||||||
return terror.ErrMasterJoinEmbedEtcdFail.Generate("there is a member that has not joined successfully") | ||||||
} | ||||||
for _, url := range m.PeerURLs { | ||||||
ms = append(ms, fmt.Sprintf("%s=%s", name, url)) | ||||||
} | ||||||
} | ||||||
cfg.InitialCluster = strings.Join(ms, ",") | ||||||
cfg.InitialClusterState = embed.ClusterStateFlagExisting | ||||||
|
||||||
// save `--initial-cluster` in persist data | ||||||
if err = os.MkdirAll(cfg.DataDir, privateDirMode); err != nil && !os.IsExist(err) { | ||||||
return terror.ErrMasterJoinEmbedEtcdFail.Delegate(err, "make directory") | ||||||
} | ||||||
if err = ioutil.WriteFile(joinFP, []byte(cfg.InitialCluster), privateDirMode); err != nil { | ||||||
return terror.ErrMasterJoinEmbedEtcdFail.Delegate(err, "write persistent join data") | ||||||
} | ||||||
|
||||||
return nil | ||||||
} | ||||||
|
||||||
// isDataExist returns whether the directory is empty (with data) | ||||||
func isDataExist(d string) bool { | ||||||
dir, err := os.Open(d) | ||||||
if err != nil { | ||||||
return false | ||||||
} | ||||||
defer dir.Close() | ||||||
|
||||||
names, err := dir.Readdirnames(1) // read only one is enough | ||||||
if err != nil { | ||||||
return false | ||||||
} | ||||||
return len(names) != 0 | ||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems don't have config named
advertise-client-urls
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a but,
join
should be the address of clients (endpoints), but use peer address before, fixed in b0f5332.