Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
master: address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
csuzhangxc committed Nov 13, 2019
1 parent ad716a2 commit dcda7b2
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 22 deletions.
8 changes: 1 addition & 7 deletions dm/master/dm-master.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,7 @@ advertise-peer-urls = "http://127.0.0.1:8291"
# initial cluster configuration for bootstrapping, e,g. dm-master=http://127.0.0.1:8291
initial-cluster = "dm-master=http://127.0.0.1:8291"

# Initial cluster state ("new" or "existing").
# Set to "new" for all members present during initial static or DNS bootstrapping.
# If this option is set to "existing", DM-master will attempt to join the existing cluster.
# If the wrong value is set, DM-master will attempt to start but fail safely.
initial-cluster-state = "new"

# Join to an existing pd cluster, a string of existing cluster's endpoints.
# Join to an existing DM-master cluster, a string of existing cluster's endpoints.
join = ""

# rpc configuration
Expand Down
39 changes: 29 additions & 10 deletions dm/master/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,22 @@ func startEtcd(masterCfg *Config,

// prepareJoinEtcd prepares config needed to join an existing cluster.
// learn from https://github.com/pingcap/pd/blob/37efcb05f397f26c70cda8dd44acaa3061c92159/server/join/join.go#L44.
//
// when setting `initial-cluster` explicitly to bootstrap a new cluster:
// - if local persistent data exist, just restart the previous cluster (in fact, it's not bootstrapping).
// - if local persistent data not exist, just bootstrap the cluster.
//
// when setting `join` to join an existing cluster (without `initial-cluster` set):
// - if local persistent data exists (in fact, it's not join):
// - just restart if `member` already exists (already joined before)
// - read `initial-cluster` back from local persistent data to restart (just like bootstrapping)
// - if local persistent data not exist:
// 1. fetch member list from the cluster to check if we can join now.
// 2. call `member add` to add the member info into the cluster.
// 3. generate config for join (`initial-cluster` and `initial-cluster-state`).
// 4. save `initial-cluster` in local persistent data for later restarting.
//
// NOTE: A member can't join to another cluster after it has joined a previous one.
func prepareJoinEtcd(cfg *Config) error {
// no need to join
if cfg.Join == "" {
Expand All @@ -82,25 +98,25 @@ func prepareJoinEtcd(cfg *Config) error {
return terror.ErrMasterJoinEmbedEtcdFail.Generate(fmt.Sprintf("join self %s is forbidden", cfg.Join))
}

// restart with previous data, no `InitialCluster` need to set
if isDataExist(filepath.Join(cfg.DataDir, "member")) {
cfg.InitialCluster = ""
cfg.InitialClusterState = embed.ClusterStateFlagExisting
return nil
}

// join with persistent data
joinFP := filepath.Join(cfg.DataDir, "join")
if _, err := os.Stat(joinFP); !os.IsNotExist(err) {
s, err := ioutil.ReadFile(joinFP)
if err != nil {
if s, err := ioutil.ReadFile(joinFP); err != nil {
if !os.IsNotExist(err) {
return terror.ErrMasterJoinEmbedEtcdFail.Delegate(err, "read persistent join data")
}
} else {
cfg.InitialCluster = strings.TrimSpace(string(s))
cfg.InitialClusterState = embed.ClusterStateFlagExisting
return nil
}

// restart with previous data, no `InitialCluster` need to set
if isDataExist(filepath.Join(cfg.DataDir, "member")) {
cfg.InitialCluster = ""
cfg.InitialClusterState = embed.ClusterStateFlagExisting
return nil
}

// if without previous data, we need a client to contact with the existing cluster.
client, err := clientv3.New(clientv3.Config{
Endpoints: strings.Split(cfg.Join, ","),
Expand Down Expand Up @@ -139,6 +155,9 @@ func prepareJoinEtcd(cfg *Config) error {
for _, m := range addResp.Members {
name := m.Name
if m.ID == addResp.Member.ID {
// the member only called `member add`,
// but has not started the process to complete the join should have an empty name.
// so, we use the `name` in config instead.
name = cfg.Name
}
if name == "" {
Expand Down
11 changes: 6 additions & 5 deletions dm/master/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,6 @@ func NewServer(cfg *Config) *Server {

// Start starts to serving
func (s *Server) Start(ctx context.Context) (err error) {
err = prepareJoinEtcd(s.cfg)
if err != nil {
return
}

// create clients to DM-workers
for _, workerAddr := range s.cfg.DeployMap {
s.workerClients[workerAddr], err = workerrpc.NewGRPCClient(workerAddr)
Expand Down Expand Up @@ -131,6 +126,12 @@ func (s *Server) Start(ctx context.Context) (err error) {
// gRPC API server
gRPCSvr := func(gs *grpc.Server) { pb.RegisterMasterServer(gs, s) }

// prepare config to join an existing cluster
err = prepareJoinEtcd(s.cfg)
if err != nil {
return
}

// start embed etcd server, gRPC API server and HTTP (API, status and debug) server.
s.etcd, err = startEtcd(s.cfg, gRPCSvr, userHandles)
if err != nil {
Expand Down

0 comments on commit dcda7b2

Please sign in to comment.