This repository has been archived by the owner on Nov 24, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 188
dm-master: join a new member into an existing cluster #350
Merged
Merged
Changes from all commits
Commits
Show all changes
34 commits
Select commit
Hold shift + click to select a range
9893157
*: combine etcd server, gRPC API server and other HTTP servers
csuzhangxc 025a3a8
*: fix tests; refine config item
csuzhangxc 466550b
*: add test case for `parseURLs`
csuzhangxc f78762c
*: refine config; add more test cases
csuzhangxc 3ad9ff2
tests: fix CI
csuzhangxc 8c59390
Merge branch 'master' into dm-master-owner
csuzhangxc 675a1c2
master: add more test cases for embed etcd config
csuzhangxc 625a891
master: refine API tests
csuzhangxc cfb0096
master: refine code
csuzhangxc eb72b15
Merge branch 'master' into dm-master-owner
csuzhangxc 6dfbe57
master: update sample config items oder
csuzhangxc 3f5beac
Merge remote-tracking branch 'origin/dm-master-owner' into dm-master-…
csuzhangxc 3d664b4
master: refine code
csuzhangxc 139ef1a
*: prepare join embed etcd cluster
csuzhangxc 31436e8
master: address comments
csuzhangxc fad73c9
*: add some tests for prepareJoinEtcd
csuzhangxc aee2173
*: add more tests for prepareJoinEtcd; tiny fix `AdvertisePeerUrls` a…
csuzhangxc abec6e2
master: address comment
csuzhangxc a67bcf9
master: address comments
csuzhangxc 729a1ca
Merge remote-tracking branch 'remotes/origin/dm-master-owner' into dm…
csuzhangxc 017f8e7
Merge remote-tracking branch 'remotes/origin/master' into dm-master-join
csuzhangxc 6255adc
master: fix merge
csuzhangxc d30a8a7
*: go mod tidy
csuzhangxc aacb1ed
master: use random port for testing
csuzhangxc 2a042e3
terror: update error message
csuzhangxc a388e33
master: update sample config
csuzhangxc 6fa6bf2
master: update config test
csuzhangxc cdc9ef3
master: add more tests for prepareJoinEtcd
csuzhangxc 5fd202f
master: join member into existing cluster
csuzhangxc ad716a2
Merge branch 'master' into dm-master-join
IANTHEREAL dcda7b2
master: address comments
csuzhangxc c9a413f
master: address comments
csuzhangxc 0610cb6
Merge branch 'master' into dm-master-join
csuzhangxc b0f5332
master: address comments
csuzhangxc File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,18 +14,29 @@ | |
package master | ||
|
||
import ( | ||
"fmt" | ||
"io/ioutil" | ||
"net/http" | ||
"os" | ||
"path/filepath" | ||
"strings" | ||
"time" | ||
|
||
"go.etcd.io/etcd/clientv3" | ||
"go.etcd.io/etcd/embed" | ||
"go.uber.org/zap" | ||
"google.golang.org/grpc" | ||
|
||
"github.com/pingcap/dm/pkg/etcdutil" | ||
"github.com/pingcap/dm/pkg/log" | ||
"github.com/pingcap/dm/pkg/terror" | ||
) | ||
|
||
const ( | ||
// time waiting for etcd to be started | ||
etcdStartTimeout = time.Minute | ||
// privateDirMode grants owner to make/remove files inside the directory. | ||
privateDirMode os.FileMode = 0700 | ||
) | ||
|
||
// startEtcd starts an embedded etcd server. | ||
|
@@ -59,3 +70,140 @@ func startEtcd(masterCfg *Config, | |
} | ||
return e, nil | ||
} | ||
|
||
// prepareJoinEtcd prepares config needed to join an existing cluster. | ||
// learn from https://github.com/pingcap/pd/blob/37efcb05f397f26c70cda8dd44acaa3061c92159/server/join/join.go#L44. | ||
// | ||
// when setting `initial-cluster` explicitly to bootstrap a new cluster: | ||
// - if local persistent data exist, just restart the previous cluster (in fact, it's not bootstrapping). | ||
// - if local persistent data not exist, just bootstrap the cluster as a new cluster. | ||
// | ||
// when setting `join` to join an existing cluster (without `initial-cluster` set): | ||
// - if local persistent data exists (in fact, it's not join): | ||
// - just restart if `member` already exists (already joined before) | ||
// - read `initial-cluster` back from local persistent data to restart (just like bootstrapping) | ||
// - if local persistent data not exist: | ||
// 1. fetch member list from the cluster to check if we can join now. | ||
// 2. call `member add` to add the member info into the cluster. | ||
// 3. generate config for join (`initial-cluster` and `initial-cluster-state`). | ||
// 4. save `initial-cluster` in local persistent data for later restarting. | ||
// | ||
// NOTE: A member can't join to another cluster after it has joined a previous one. | ||
func prepareJoinEtcd(cfg *Config) error { | ||
// no need to join | ||
if cfg.Join == "" { | ||
return nil | ||
} | ||
|
||
// try to join self, invalid | ||
clientURLs := strings.Split(cfg.Join, ",") | ||
for _, clientURL := range clientURLs { | ||
if clientURL == cfg.MasterAddr { | ||
return terror.ErrMasterJoinEmbedEtcdFail.Generate(fmt.Sprintf("join self %s is forbidden", cfg.Join)) | ||
} | ||
} | ||
|
||
// restart with previous data, no `InitialCluster` need to set | ||
if isDataExist(filepath.Join(cfg.DataDir, "member")) { | ||
cfg.InitialCluster = "" | ||
cfg.InitialClusterState = embed.ClusterStateFlagExisting | ||
return nil | ||
} | ||
|
||
// join with persistent data | ||
joinFP := filepath.Join(cfg.DataDir, "join") | ||
if s, err := ioutil.ReadFile(joinFP); err != nil { | ||
if !os.IsNotExist(err) { | ||
return terror.ErrMasterJoinEmbedEtcdFail.Delegate(err, "read persistent join data") | ||
} | ||
} else { | ||
cfg.InitialCluster = strings.TrimSpace(string(s)) | ||
cfg.InitialClusterState = embed.ClusterStateFlagExisting | ||
log.L().Info("using persistent join data", zap.String("file", joinFP), zap.String("data", cfg.InitialCluster)) | ||
return nil | ||
} | ||
|
||
// if without previous data, we need a client to contact with the existing cluster. | ||
client, err := clientv3.New(clientv3.Config{ | ||
Endpoints: strings.Split(cfg.Join, ","), | ||
DialTimeout: etcdutil.DefaultDialTimeout, | ||
}) | ||
if err != nil { | ||
return terror.ErrMasterJoinEmbedEtcdFail.Delegate(err, fmt.Sprintf("create etcd client for %s", cfg.Join)) | ||
} | ||
defer client.Close() | ||
|
||
// `member list` | ||
listResp, err := etcdutil.ListMembers(client) | ||
if err != nil { | ||
return terror.ErrMasterJoinEmbedEtcdFail.Delegate(err, fmt.Sprintf("list member for %s", cfg.Join)) | ||
} | ||
|
||
// check members | ||
for _, m := range listResp.Members { | ||
if m.Name == "" { // the previous existing member without name (not complete the join operation) | ||
// we can't generate `initial-cluster` correctly with empty member name, | ||
// and if added a member but not started it to complete the join, | ||
// the later join operation may encounter `etcdserver: re-configuration failed due to not enough started members`. | ||
return terror.ErrMasterJoinEmbedEtcdFail.Generate("there is a member that has not joined successfully, continue the join or remove it") | ||
} | ||
if m.Name == cfg.Name { | ||
// a failed DM-master re-joins the previous cluster. | ||
return terror.ErrMasterJoinEmbedEtcdFail.Generate(fmt.Sprintf("missing data or joining a duplicate member %s", m.Name)) | ||
} | ||
} | ||
|
||
// `member add`, a new/deleted DM-master joins to an existing cluster. | ||
addResp, err := etcdutil.AddMember(client, strings.Split(cfg.AdvertisePeerUrls, ",")) | ||
if err != nil { | ||
return terror.ErrMasterJoinEmbedEtcdFail.Delegate(err, fmt.Sprintf("add member %s", cfg.AdvertisePeerUrls)) | ||
} | ||
|
||
// generate `--initial-cluster` | ||
ms := make([]string, 0, len(addResp.Members)) | ||
for _, m := range addResp.Members { | ||
name := m.Name | ||
if m.ID == addResp.Member.ID { | ||
// the member only called `member add`, | ||
// but has not started the process to complete the join should have an empty name. | ||
// so, we use the `name` in config instead. | ||
name = cfg.Name | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. would There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It can be empty, I add some explanation in dcda7b2. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if name is empty, will it generate err at L142? |
||
} | ||
if name == "" { | ||
// this should be checked in the previous `member list` operation if having only one member is join. | ||
// if multi join operations exist, the behavior may be unexpected. | ||
// check again here only to decrease the unexpectedness. | ||
return terror.ErrMasterJoinEmbedEtcdFail.Generate("there is a member that has not joined successfully, continue the join or remove it") | ||
} | ||
for _, url := range m.PeerURLs { | ||
ms = append(ms, fmt.Sprintf("%s=%s", name, url)) | ||
} | ||
} | ||
cfg.InitialCluster = strings.Join(ms, ",") | ||
cfg.InitialClusterState = embed.ClusterStateFlagExisting | ||
|
||
// save `--initial-cluster` in persist data | ||
if err = os.MkdirAll(cfg.DataDir, privateDirMode); err != nil && !os.IsExist(err) { | ||
return terror.ErrMasterJoinEmbedEtcdFail.Delegate(err, "make directory") | ||
} | ||
if err = ioutil.WriteFile(joinFP, []byte(cfg.InitialCluster), privateDirMode); err != nil { | ||
return terror.ErrMasterJoinEmbedEtcdFail.Delegate(err, "write persistent join data") | ||
} | ||
|
||
return nil | ||
} | ||
|
||
// isDataExist returns whether the directory is empty (with data) | ||
func isDataExist(d string) bool { | ||
dir, err := os.Open(d) | ||
if err != nil { | ||
return false | ||
} | ||
defer dir.Close() | ||
|
||
names, err := dir.Readdirnames(1) // read only one is enough | ||
if err != nil { | ||
return false | ||
} | ||
return len(names) != 0 | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please adding a log here shows that we use data of
dir/join
ascfg. InitialCluster
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added in c9a413f.