diff --git a/_utils/terror_gen/errors_release.txt b/_utils/terror_gen/errors_release.txt index 94bb26d014..fae97c27fd 100644 --- a/_utils/terror_gen/errors_release.txt +++ b/_utils/terror_gen/errors_release.txt @@ -314,9 +314,10 @@ ErrMasterOperRequestTimeout,[code=38033:class=dm-master:scope=internal:level=hig ErrMasterHandleHTTPApis,[code=38034:class=dm-master:scope=internal:level=high],"serve http apis to grpc" ErrMasterHostPortNotValid,[code=38035:class=dm-master:scope=internal:level=high],"host:port '%s' not valid" ErrMasterGetHostnameFail,[code=38036:class=dm-master:scope=internal:level=high],"get hostname fail" -ErrMasterGenEmbedEtcdConfigFail,[code=38037:class=dm-master:scope=internal:level=high],"generate config item %s for embed etcd fail" -ErrMasterStartEmbedEtcdFail,[code=38038:class=dm-master:scope=internal:level=high],"start embed etcd fail" -ErrMasterParseURLFail,[code=38039:class=dm-master:scope=internal:level=high],"parse URL %s fail" +ErrMasterGenEmbedEtcdConfigFail,[code=38037:class=dm-master:scope=internal:level=high],"fail to generate config item %s for embed etcd" +ErrMasterStartEmbedEtcdFail,[code=38038:class=dm-master:scope=internal:level=high],"fail to start embed etcd" +ErrMasterParseURLFail,[code=38039:class=dm-master:scope=internal:level=high],"fail to parse URL %s" +ErrMasterJoinEmbedEtcdFail,[code=38040:class=dm-master:scope=internal:level=high],"fail to join embed etcd: %s" ErrWorkerParseFlagSet,[code=40001:class=dm-worker:scope=internal:level=medium],"parse dm-worker config flag set" ErrWorkerInvalidFlag,[code=40002:class=dm-worker:scope=internal:level=medium],"'%s' is an invalid flag" ErrWorkerDecodeConfigFromFile,[code=40003:class=dm-worker:scope=internal:level=medium],"toml decode file" diff --git a/dm/master/config.go b/dm/master/config.go index c895f540d2..4c1e510a54 100644 --- a/dm/master/config.go +++ b/dm/master/config.go @@ -35,10 +35,11 @@ import ( ) const ( - defaultRPCTimeout = "30s" - defaultNamePrefix = "dm-master" - defaultDataDirPrefix = "default" - defaultPeerUrls = "http://127.0.0.1:8291" + defaultRPCTimeout = "30s" + defaultNamePrefix = "dm-master" + defaultDataDirPrefix = "default" + defaultPeerUrls = "http://127.0.0.1:8291" + defaultInitialClusterState = embed.ClusterStateFlagNew ) // SampleConfigFile is sample config file of dm-master @@ -65,6 +66,7 @@ func NewConfig() *Config { fs.StringVar(&cfg.InitialCluster, "initial-cluster", "", fmt.Sprintf("initial cluster configuration for bootstrapping, e,g. dm-master=%s", defaultPeerUrls)) fs.StringVar(&cfg.PeerUrls, "peer-urls", defaultPeerUrls, "URLs for peer traffic") fs.StringVar(&cfg.AdvertisePeerUrls, "advertise-peer-urls", "", `advertise URLs for peer traffic (default "${peer-urls}")`) + fs.StringVar(&cfg.Join, "join", "", `join to an existing cluster (usage: cluster's "${master-addr}" list, e,g. "127.0.0.1:8261,127.0.0.1:18261"`) return cfg } @@ -108,11 +110,13 @@ type Config struct { // etcd relative config items // NOTE: we use `MasterAddr` to generate `ClientUrls` and `AdvertiseClientUrls` // NOTE: more items will be add when adding leader election - Name string `toml:"name" json:"name"` - DataDir string `toml:"data-dir" json:"data-dir"` - PeerUrls string `toml:"peer-urls" json:"peer-urls"` - AdvertisePeerUrls string `toml:"advertise-peer-urls" json:"advertise-peer-urls"` - InitialCluster string `toml:"initial-cluster" json:"initial-cluster"` + Name string `toml:"name" json:"name"` + DataDir string `toml:"data-dir" json:"data-dir"` + PeerUrls string `toml:"peer-urls" json:"peer-urls"` + AdvertisePeerUrls string `toml:"advertise-peer-urls" json:"advertise-peer-urls"` + InitialCluster string `toml:"initial-cluster" json:"initial-cluster"` + InitialClusterState string `toml:"initial-cluster-state" json:"initial-cluster-state"` + Join string `toml:"join" json:"join"` // cluster's client address (endpoints), not peer address printVersion bool printSampleConfig bool @@ -250,7 +254,7 @@ func (c *Config) adjust() error { } if c.AdvertisePeerUrls == "" { - c.AdvertisePeerUrls = defaultPeerUrls + c.AdvertisePeerUrls = c.PeerUrls } if c.InitialCluster == "" { @@ -261,6 +265,10 @@ func (c *Config) adjust() error { c.InitialCluster = strings.Join(items, ",") } + if c.InitialClusterState == "" { + c.InitialClusterState = defaultInitialClusterState + } + _, err = c.genEmbedEtcdConfig() // verify embed etcd config return err } @@ -314,6 +322,7 @@ func (c *Config) genEmbedEtcdConfig() (*embed.Config, error) { } cfg.InitialCluster = c.InitialCluster + cfg.ClusterState = c.InitialClusterState return cfg, nil } diff --git a/dm/master/config_test.go b/dm/master/config_test.go index 553c400fb6..c887f0e562 100644 --- a/dm/master/config_test.go +++ b/dm/master/config_test.go @@ -25,6 +25,7 @@ import ( capturer "github.com/kami-zh/go-capturer" "github.com/pingcap/check" + "go.etcd.io/etcd/embed" "github.com/pingcap/dm/pkg/terror" ) @@ -133,6 +134,8 @@ func (t *testConfigSuite) TestConfig(c *check.C) { c.Assert(cfg.PeerUrls, check.Equals, peerURLs) c.Assert(cfg.AdvertisePeerUrls, check.Equals, advertisePeerURLs) c.Assert(cfg.InitialCluster, check.Equals, initialCluster) + c.Assert(cfg.InitialClusterState, check.Equals, embed.ClusterStateFlagNew) + c.Assert(cfg.Join, check.Equals, "") c.Assert(cfg.DeployMap, check.DeepEquals, deployMap) c.Assert(cfg.String(), check.Matches, fmt.Sprintf("{.*master-addr\":\"%s\".*}", masterAddr)) } @@ -228,6 +231,7 @@ func (t *testConfigSuite) TestGenEmbedEtcdConfig(c *check.C) { cfg1 := NewConfig() cfg1.MasterAddr = ":8261" + cfg1.InitialClusterState = embed.ClusterStateFlagExisting c.Assert(cfg1.adjust(), check.IsNil) etcdCfg, err := cfg1.genEmbedEtcdConfig() c.Assert(err, check.IsNil) @@ -238,6 +242,7 @@ func (t *testConfigSuite) TestGenEmbedEtcdConfig(c *check.C) { c.Assert(etcdCfg.LPUrls, check.DeepEquals, []url.URL{{Scheme: "http", Host: "127.0.0.1:8291"}}) c.Assert(etcdCfg.APUrls, check.DeepEquals, []url.URL{{Scheme: "http", Host: "127.0.0.1:8291"}}) c.Assert(etcdCfg.InitialCluster, check.DeepEquals, fmt.Sprintf("dm-master-%s=http://127.0.0.1:8291", hostname)) + c.Assert(etcdCfg.ClusterState, check.Equals, embed.ClusterStateFlagExisting) cfg2 := *cfg1 cfg2.MasterAddr = "127.0.0.1\n:8261" diff --git a/dm/master/dm-master.toml b/dm/master/dm-master.toml index bcd08b20a3..2e51295207 100644 --- a/dm/master/dm-master.toml +++ b/dm/master/dm-master.toml @@ -22,6 +22,9 @@ advertise-peer-urls = "http://127.0.0.1:8291" # initial cluster configuration for bootstrapping, e,g. dm-master=http://127.0.0.1:8291 initial-cluster = "dm-master=http://127.0.0.1:8291" +# Join to an existing DM-master cluster, a string of existing cluster's endpoints. +join = "" + # rpc configuration # # rpc timeout is a positive number plus time unit. we use golang standard time diff --git a/dm/master/etcd.go b/dm/master/etcd.go index 398b3be1cc..671b6cdb57 100644 --- a/dm/master/etcd.go +++ b/dm/master/etcd.go @@ -14,18 +14,29 @@ package master import ( + "fmt" + "io/ioutil" "net/http" + "os" + "path/filepath" + "strings" "time" + "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/embed" + "go.uber.org/zap" "google.golang.org/grpc" + "github.com/pingcap/dm/pkg/etcdutil" + "github.com/pingcap/dm/pkg/log" "github.com/pingcap/dm/pkg/terror" ) const ( // time waiting for etcd to be started etcdStartTimeout = time.Minute + // privateDirMode grants owner to make/remove files inside the directory. + privateDirMode os.FileMode = 0700 ) // startEtcd starts an embedded etcd server. @@ -59,3 +70,140 @@ func startEtcd(masterCfg *Config, } return e, nil } + +// prepareJoinEtcd prepares config needed to join an existing cluster. +// learn from https://github.com/pingcap/pd/blob/37efcb05f397f26c70cda8dd44acaa3061c92159/server/join/join.go#L44. +// +// when setting `initial-cluster` explicitly to bootstrap a new cluster: +// - if local persistent data exist, just restart the previous cluster (in fact, it's not bootstrapping). +// - if local persistent data not exist, just bootstrap the cluster as a new cluster. +// +// when setting `join` to join an existing cluster (without `initial-cluster` set): +// - if local persistent data exists (in fact, it's not join): +// - just restart if `member` already exists (already joined before) +// - read `initial-cluster` back from local persistent data to restart (just like bootstrapping) +// - if local persistent data not exist: +// 1. fetch member list from the cluster to check if we can join now. +// 2. call `member add` to add the member info into the cluster. +// 3. generate config for join (`initial-cluster` and `initial-cluster-state`). +// 4. save `initial-cluster` in local persistent data for later restarting. +// +// NOTE: A member can't join to another cluster after it has joined a previous one. +func prepareJoinEtcd(cfg *Config) error { + // no need to join + if cfg.Join == "" { + return nil + } + + // try to join self, invalid + clientURLs := strings.Split(cfg.Join, ",") + for _, clientURL := range clientURLs { + if clientURL == cfg.MasterAddr { + return terror.ErrMasterJoinEmbedEtcdFail.Generate(fmt.Sprintf("join self %s is forbidden", cfg.Join)) + } + } + + // restart with previous data, no `InitialCluster` need to set + if isDataExist(filepath.Join(cfg.DataDir, "member")) { + cfg.InitialCluster = "" + cfg.InitialClusterState = embed.ClusterStateFlagExisting + return nil + } + + // join with persistent data + joinFP := filepath.Join(cfg.DataDir, "join") + if s, err := ioutil.ReadFile(joinFP); err != nil { + if !os.IsNotExist(err) { + return terror.ErrMasterJoinEmbedEtcdFail.Delegate(err, "read persistent join data") + } + } else { + cfg.InitialCluster = strings.TrimSpace(string(s)) + cfg.InitialClusterState = embed.ClusterStateFlagExisting + log.L().Info("using persistent join data", zap.String("file", joinFP), zap.String("data", cfg.InitialCluster)) + return nil + } + + // if without previous data, we need a client to contact with the existing cluster. + client, err := clientv3.New(clientv3.Config{ + Endpoints: strings.Split(cfg.Join, ","), + DialTimeout: etcdutil.DefaultDialTimeout, + }) + if err != nil { + return terror.ErrMasterJoinEmbedEtcdFail.Delegate(err, fmt.Sprintf("create etcd client for %s", cfg.Join)) + } + defer client.Close() + + // `member list` + listResp, err := etcdutil.ListMembers(client) + if err != nil { + return terror.ErrMasterJoinEmbedEtcdFail.Delegate(err, fmt.Sprintf("list member for %s", cfg.Join)) + } + + // check members + for _, m := range listResp.Members { + if m.Name == "" { // the previous existing member without name (not complete the join operation) + // we can't generate `initial-cluster` correctly with empty member name, + // and if added a member but not started it to complete the join, + // the later join operation may encounter `etcdserver: re-configuration failed due to not enough started members`. + return terror.ErrMasterJoinEmbedEtcdFail.Generate("there is a member that has not joined successfully, continue the join or remove it") + } + if m.Name == cfg.Name { + // a failed DM-master re-joins the previous cluster. + return terror.ErrMasterJoinEmbedEtcdFail.Generate(fmt.Sprintf("missing data or joining a duplicate member %s", m.Name)) + } + } + + // `member add`, a new/deleted DM-master joins to an existing cluster. + addResp, err := etcdutil.AddMember(client, strings.Split(cfg.AdvertisePeerUrls, ",")) + if err != nil { + return terror.ErrMasterJoinEmbedEtcdFail.Delegate(err, fmt.Sprintf("add member %s", cfg.AdvertisePeerUrls)) + } + + // generate `--initial-cluster` + ms := make([]string, 0, len(addResp.Members)) + for _, m := range addResp.Members { + name := m.Name + if m.ID == addResp.Member.ID { + // the member only called `member add`, + // but has not started the process to complete the join should have an empty name. + // so, we use the `name` in config instead. + name = cfg.Name + } + if name == "" { + // this should be checked in the previous `member list` operation if having only one member is join. + // if multi join operations exist, the behavior may be unexpected. + // check again here only to decrease the unexpectedness. + return terror.ErrMasterJoinEmbedEtcdFail.Generate("there is a member that has not joined successfully, continue the join or remove it") + } + for _, url := range m.PeerURLs { + ms = append(ms, fmt.Sprintf("%s=%s", name, url)) + } + } + cfg.InitialCluster = strings.Join(ms, ",") + cfg.InitialClusterState = embed.ClusterStateFlagExisting + + // save `--initial-cluster` in persist data + if err = os.MkdirAll(cfg.DataDir, privateDirMode); err != nil && !os.IsExist(err) { + return terror.ErrMasterJoinEmbedEtcdFail.Delegate(err, "make directory") + } + if err = ioutil.WriteFile(joinFP, []byte(cfg.InitialCluster), privateDirMode); err != nil { + return terror.ErrMasterJoinEmbedEtcdFail.Delegate(err, "write persistent join data") + } + + return nil +} + +// isDataExist returns whether the directory is empty (with data) +func isDataExist(d string) bool { + dir, err := os.Open(d) + if err != nil { + return false + } + defer dir.Close() + + names, err := dir.Readdirnames(1) // read only one is enough + if err != nil { + return false + } + return len(names) != 0 +} diff --git a/dm/master/etcd_test.go b/dm/master/etcd_test.go new file mode 100644 index 0000000000..dc5f962649 --- /dev/null +++ b/dm/master/etcd_test.go @@ -0,0 +1,187 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package master + +import ( + "fmt" + "io/ioutil" + "os" + "path/filepath" + "sort" + "strings" + "time" + + "github.com/pingcap/check" + "github.com/pingcap/pd/pkg/tempurl" + "go.etcd.io/etcd/embed" + + "github.com/pingcap/dm/pkg/terror" +) + +var _ = check.Suite(&testEtcdSuite{}) + +type testEtcdSuite struct { +} + +func (t *testEtcdSuite) TestPrepareJoinEtcd(c *check.C) { + cfgCluster := NewConfig() // used to start an etcd cluster + cfgCluster.Name = "dm-master-1" + cfgCluster.DataDir = c.MkDir() + cfgCluster.MasterAddr = tempurl.Alloc()[len("http://"):] + cfgCluster.PeerUrls = tempurl.Alloc() + c.Assert(cfgCluster.adjust(), check.IsNil) + + cfgBefore := t.cloneConfig(cfgCluster) // before `prepareJoinEtcd` applied + cfgBefore.DataDir = c.MkDir() // overwrite some config items + cfgBefore.MasterAddr = tempurl.Alloc()[len("http://"):] + cfgBefore.PeerUrls = tempurl.Alloc() + cfgBefore.AdvertisePeerUrls = cfgBefore.PeerUrls + c.Assert(cfgBefore.adjust(), check.IsNil) + + cfgAfter := t.cloneConfig(cfgBefore) // after `prepareJoinEtcd applied + + joinCluster := cfgCluster.MasterAddr + joinFP := filepath.Join(cfgBefore.DataDir, "join") + memberDP := filepath.Join(cfgBefore.DataDir, "member") + + // not set `join`, do nothing + c.Assert(prepareJoinEtcd(cfgAfter), check.IsNil) + c.Assert(cfgAfter, check.DeepEquals, cfgBefore) + + // try to join self + cfgAfter.Join = cfgAfter.MasterAddr + err := prepareJoinEtcd(cfgAfter) + c.Assert(terror.ErrMasterJoinEmbedEtcdFail.Equal(err), check.IsTrue) + c.Assert(err, check.ErrorMatches, ".*fail to join embed etcd: join self.*is forbidden.*") + + // update `join` to a valid item + cfgBefore.Join = joinCluster + + // join with persistent data + c.Assert(ioutil.WriteFile(joinFP, []byte(joinCluster), privateDirMode), check.IsNil) + cfgAfter = t.cloneConfig(cfgBefore) + c.Assert(prepareJoinEtcd(cfgAfter), check.IsNil) + c.Assert(cfgAfter.InitialCluster, check.Equals, joinCluster) + c.Assert(cfgAfter.InitialClusterState, check.Equals, embed.ClusterStateFlagExisting) + c.Assert(os.Remove(joinFP), check.IsNil) // remove the persistent data + + // join with invalid persistent data + c.Assert(os.Mkdir(joinFP, privateDirMode), check.IsNil) // use directory as invalid persistent data (file) + cfgAfter = t.cloneConfig(cfgBefore) + err = prepareJoinEtcd(cfgAfter) + c.Assert(terror.ErrMasterJoinEmbedEtcdFail.Equal(err), check.IsTrue) + c.Assert(err, check.ErrorMatches, ".*fail to join embed etcd: read persistent join data.*") + c.Assert(os.Remove(joinFP), check.IsNil) // remove the persistent data + c.Assert(cfgAfter, check.DeepEquals, cfgBefore) // not changed + + // restart with previous data + c.Assert(os.Mkdir(memberDP, privateDirMode), check.IsNil) + c.Assert(os.Mkdir(filepath.Join(memberDP, "wal"), privateDirMode), check.IsNil) + c.Assert(prepareJoinEtcd(cfgAfter), check.IsNil) + c.Assert(cfgAfter.InitialCluster, check.Equals, "") + c.Assert(cfgAfter.InitialClusterState, check.Equals, embed.ClusterStateFlagExisting) + c.Assert(os.RemoveAll(memberDP), check.IsNil) // remove previous data + + // start an etcd cluster + e1, err := startEtcd(cfgCluster, nil, nil) + c.Assert(err, check.IsNil) + defer e1.Close() + + // same `name`, duplicate + cfgAfter = t.cloneConfig(cfgBefore) + err = prepareJoinEtcd(cfgAfter) + c.Assert(terror.ErrMasterJoinEmbedEtcdFail.Equal(err), check.IsTrue) + c.Assert(err, check.ErrorMatches, ".*fail to join embed etcd: missing data or joining a duplicate member.*") + c.Assert(cfgAfter, check.DeepEquals, cfgBefore) // not changed + + // set a different name + cfgBefore.Name = "dm-master-2" + + // add member with invalid `advertise-peer-urls` + cfgAfter = t.cloneConfig(cfgBefore) + cfgAfter.AdvertisePeerUrls = "invalid-advertise-peer-urls" + err = prepareJoinEtcd(cfgAfter) + c.Assert(terror.ErrMasterJoinEmbedEtcdFail.Equal(err), check.IsTrue) + c.Assert(err, check.ErrorMatches, ".*fail to join embed etcd: add member.*") + + // join with existing cluster + cfgAfter = t.cloneConfig(cfgBefore) + c.Assert(prepareJoinEtcd(cfgAfter), check.IsNil) + c.Assert(cfgAfter.InitialClusterState, check.Equals, embed.ClusterStateFlagExisting) + obtainClusters := strings.Split(cfgAfter.InitialCluster, ",") + sort.Strings(obtainClusters) + expectedClusters := []string{ + cfgCluster.InitialCluster, + fmt.Sprintf("%s=%s", cfgAfter.Name, cfgAfter.PeerUrls), + } + sort.Strings(expectedClusters) + c.Assert(obtainClusters, check.DeepEquals, expectedClusters) + + // join data should exist now + joinData, err := ioutil.ReadFile(joinFP) + c.Assert(err, check.IsNil) + c.Assert(string(joinData), check.Equals, cfgAfter.InitialCluster) + + // prepare join done, but has not start the etcd to complete the join, can not join anymore. + cfgAfter2 := t.cloneConfig(cfgBefore) + cfgAfter2.Name = "dm-master-3" // overwrite some items + cfgAfter2.DataDir = c.MkDir() + cfgAfter2.MasterAddr = tempurl.Alloc()[len("http://"):] + cfgAfter2.PeerUrls = tempurl.Alloc() + cfgAfter2.AdvertisePeerUrls = cfgAfter2.PeerUrls + err = prepareJoinEtcd(cfgAfter2) + c.Assert(terror.ErrMasterJoinEmbedEtcdFail.Equal(err), check.IsTrue) + c.Assert(err, check.ErrorMatches, ".*fail to join embed etcd: there is a member that has not joined successfully, continue the join or remove it.*") + + // start the joining etcd + e2, err := startEtcd(cfgAfter, nil, nil) + c.Assert(err, check.IsNil) + defer e2.Close() + + // try join again + for i := 0; i < 20; i++ { + err = prepareJoinEtcd(cfgAfter2) + if err == nil { + break + } + // for `etcdserver: unhealthy cluster`, try again later + c.Assert(terror.ErrMasterJoinEmbedEtcdFail.Equal(err), check.IsTrue) + c.Assert(err, check.ErrorMatches, ".*fail to join embed etcd: add member.*: etcdserver: unhealthy cluster.*") + time.Sleep(500 * time.Millisecond) + } + c.Assert(err, check.IsNil) +} + +func (t *testEtcdSuite) cloneConfig(cfg *Config) *Config { + clone := NewConfig() + *clone = *cfg + return clone +} + +func (t *testEtcdSuite) TestIsDataExist(c *check.C) { + d := "./directory-not-exists" + c.Assert(isDataExist(d), check.IsFalse) + + // empty directory + d = c.MkDir() + c.Assert(isDataExist(d), check.IsFalse) + + // data exists in the directory + for i := 1; i <= 3; i++ { + fp := filepath.Join(d, fmt.Sprintf("file.%d", i)) + c.Assert(ioutil.WriteFile(fp, nil, privateDirMode), check.IsNil) + c.Assert(isDataExist(d), check.IsTrue) + c.Assert(isDataExist(fp), check.IsFalse) // not a directory + } +} diff --git a/dm/master/server.go b/dm/master/server.go index 53795f360e..3cc367a9df 100644 --- a/dm/master/server.go +++ b/dm/master/server.go @@ -126,6 +126,12 @@ func (s *Server) Start(ctx context.Context) (err error) { // gRPC API server gRPCSvr := func(gs *grpc.Server) { pb.RegisterMasterServer(gs, s) } + // prepare config to join an existing cluster + err = prepareJoinEtcd(s.cfg) + if err != nil { + return + } + // start embed etcd server, gRPC API server and HTTP (API, status and debug) server. s.etcd, err = startEtcd(s.cfg, gRPCSvr, userHandles) if err != nil { diff --git a/dm/master/server_test.go b/dm/master/server_test.go index 7e4cedcd69..72a40b82af 100644 --- a/dm/master/server_test.go +++ b/dm/master/server_test.go @@ -18,6 +18,7 @@ import ( "context" "io/ioutil" "net/http" + "strings" "fmt" "io" @@ -28,12 +29,15 @@ import ( "github.com/golang/mock/gomock" "github.com/pingcap/check" "github.com/pingcap/errors" + "github.com/pingcap/pd/pkg/tempurl" + "go.etcd.io/etcd/clientv3" "github.com/pingcap/dm/checker" "github.com/pingcap/dm/dm/config" "github.com/pingcap/dm/dm/master/workerrpc" "github.com/pingcap/dm/dm/pb" "github.com/pingcap/dm/dm/pbmock" + "github.com/pingcap/dm/pkg/etcdutil" "github.com/pingcap/dm/pkg/terror" "github.com/pingcap/dm/pkg/utils" ) @@ -1489,7 +1493,7 @@ func (t *testMaster) TestServer(c *check.C) { cfg := NewConfig() c.Assert(cfg.Parse([]string{"-config=./dm-master.toml"}), check.IsNil) cfg.DataDir = c.MkDir() - cfg.MasterAddr = "127.0.0.1:18261" // use a different port + cfg.MasterAddr = tempurl.Alloc()[len("http://"):] s := NewServer(cfg) @@ -1525,3 +1529,57 @@ func (t *testMaster) testHTTPInterface(c *check.C, url string, contain []byte) { c.Assert(err, check.IsNil) c.Assert(bytes.Contains(body, contain), check.IsTrue) } + +func (t *testMaster) TestJoinMember(c *check.C) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + + // create a new cluster + cfg1 := NewConfig() + c.Assert(cfg1.Parse([]string{"-config=./dm-master.toml"}), check.IsNil) + cfg1.Name = "dm-master-1" + cfg1.DataDir = c.MkDir() + cfg1.MasterAddr = tempurl.Alloc()[len("http://"):] + cfg1.PeerUrls = tempurl.Alloc() + cfg1.AdvertisePeerUrls = cfg1.PeerUrls + cfg1.InitialCluster = fmt.Sprintf("%s=%s", cfg1.Name, cfg1.AdvertisePeerUrls) + + s1 := NewServer(cfg1) + c.Assert(s1.Start(ctx), check.IsNil) + defer s1.Close() + + // join to an existing cluster + cfg2 := NewConfig() + c.Assert(cfg2.Parse([]string{"-config=./dm-master.toml"}), check.IsNil) + cfg2.Name = "dm-master-2" + cfg2.DataDir = c.MkDir() + cfg2.MasterAddr = tempurl.Alloc()[len("http://"):] + cfg2.PeerUrls = tempurl.Alloc() + cfg2.AdvertisePeerUrls = cfg2.PeerUrls + cfg2.Join = cfg1.MasterAddr // join to an existing cluster + + s2 := NewServer(cfg2) + c.Assert(s2.Start(ctx), check.IsNil) + defer s2.Close() + + client, err := clientv3.New(clientv3.Config{ + Endpoints: strings.Split(cfg1.AdvertisePeerUrls, ","), + DialTimeout: etcdutil.DefaultDialTimeout, + }) + c.Assert(err, check.IsNil) + defer client.Close() + + // verify membersm + listResp, err := etcdutil.ListMembers(client) + c.Assert(err, check.IsNil) + c.Assert(listResp.Members, check.HasLen, 2) + names := make(map[string]struct{}, len(listResp.Members)) + for _, m := range listResp.Members { + names[m.Name] = struct{}{} + } + _, ok := names[cfg1.Name] + c.Assert(ok, check.IsTrue) + _, ok = names[cfg2.Name] + c.Assert(ok, check.IsTrue) + + cancel() +} diff --git a/go.mod b/go.mod index d734b235fa..d9defea3f5 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ require ( github.com/DATA-DOG/go-sqlmock v1.3.3 github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d // indirect github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e - github.com/coreos/etcd v3.3.15+incompatible // indirect + github.com/coreos/etcd v3.3.15+incompatible github.com/coreos/go-semver v0.3.0 // indirect github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f // indirect github.com/cznic/sortutil v0.0.0-20181122101858-f5f958428db8 // indirect @@ -36,6 +36,7 @@ require ( github.com/pingcap/kvproto v0.0.0-20190827032240-9696cd0c6acb // indirect github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd github.com/pingcap/parser v0.0.0-20191008032157-51a2e3b2e34b + github.com/pingcap/pd v0.0.0-20190712044914-75a1f9f3062b github.com/pingcap/tidb v0.0.0-20190827060935-cc07b110825e github.com/pingcap/tidb-tools v3.0.0-beta.1.0.20190821032033-e6ccf3994944+incompatible github.com/pingcap/tipb v0.0.0-20191008064422-018b2fadf414 // indirect @@ -72,6 +73,7 @@ require ( google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55 google.golang.org/grpc v1.23.0 gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect + gopkg.in/stretchr/testify.v1 v1.2.2 // indirect gopkg.in/yaml.v2 v2.2.4 sigs.k8s.io/yaml v1.1.0 // indirect sourcegraph.com/sourcegraph/appdash v0.0.0-20190731080439-ebfcffb1b5c0 // indirect diff --git a/go.sum b/go.sum index 7dbe5cb5e8..55bd2239d9 100644 --- a/go.sum +++ b/go.sum @@ -533,6 +533,8 @@ gopkg.in/gemnasium/logrus-airbrake-hook.v2 v2.1.2/go.mod h1:Xk6kEKp8OKb+X14hQBKW gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8= gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k= gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo= +gopkg.in/stretchr/testify.v1 v1.2.2 h1:yhQC6Uy5CqibAIlk1wlusa/MJ3iAN49/BsR/dCCKz3M= +gopkg.in/stretchr/testify.v1 v1.2.2/go.mod h1:QI5V/q6UbPmuhtm10CaFZxED9NreB8PnFYN9JcR6TxU= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74= diff --git a/pkg/etcdutil/etcdutil.go b/pkg/etcdutil/etcdutil.go new file mode 100644 index 0000000000..f58a8a0801 --- /dev/null +++ b/pkg/etcdutil/etcdutil.go @@ -0,0 +1,46 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +// learn from https://github.com/pingcap/pd/blob/v3.0.5/pkg/etcdutil/etcdutil.go. + +package etcdutil + +import ( + "context" + "time" + + "go.etcd.io/etcd/clientv3" +) + +const ( + // DefaultDialTimeout is the maximum amount of time a dial will wait for a + // connection to setup. 30s is long enough for most of the network conditions. + DefaultDialTimeout = 30 * time.Second + + // DefaultRequestTimeout 10s is long enough for most of etcd clusters. + DefaultRequestTimeout = 10 * time.Second +) + +// ListMembers returns a list of internal etcd members. +func ListMembers(client *clientv3.Client) (*clientv3.MemberListResponse, error) { + ctx, cancel := context.WithTimeout(client.Ctx(), DefaultRequestTimeout) + defer cancel() + return client.MemberList(ctx) +} + +// AddMember adds an etcd member. +func AddMember(client *clientv3.Client, peerAddrs []string) (*clientv3.MemberAddResponse, error) { + ctx, cancel := context.WithTimeout(client.Ctx(), DefaultRequestTimeout) + defer cancel() + return client.MemberAdd(ctx, peerAddrs) +} diff --git a/pkg/etcdutil/etcdutil_test.go b/pkg/etcdutil/etcdutil_test.go new file mode 100644 index 0000000000..315a3ca379 --- /dev/null +++ b/pkg/etcdutil/etcdutil_test.go @@ -0,0 +1,153 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package etcdutil + +import ( + "fmt" + "net/url" + "strings" + "testing" + "time" + + "github.com/coreos/etcd/etcdserver/etcdserverpb" + . "github.com/pingcap/check" + "go.etcd.io/etcd/clientv3" + "go.etcd.io/etcd/embed" +) + +var _ = Suite(&testEtcdUtilSuite{}) + +type testEtcdUtilSuite struct { +} + +func TestSuite(t *testing.T) { + TestingT(t) +} + +func (t *testEtcdUtilSuite) newConfig(c *C, name string, basePort uint16, portCount int) (*embed.Config, uint16) { + cfg := embed.NewConfig() + cfg.Name = name + cfg.Dir = c.MkDir() + + cfg.LCUrls = []url.URL{} + for i := 0; i < portCount; i++ { + cu, err := url.Parse(fmt.Sprintf("http://127.0.0.1:%d", basePort)) + c.Assert(err, IsNil) + cfg.LCUrls = append(cfg.LCUrls, *cu) + basePort++ + } + cfg.ACUrls = cfg.LCUrls + + cfg.LPUrls = []url.URL{} + ic := make([]string, 0, portCount) + for i := 0; i < portCount; i++ { + pu, err := url.Parse(fmt.Sprintf("http://127.0.0.1:%d", basePort)) + c.Assert(err, IsNil) + cfg.LPUrls = append(cfg.LPUrls, *pu) + ic = append(ic, fmt.Sprintf("%s=%s", cfg.Name, pu)) + basePort++ + } + cfg.APUrls = cfg.LPUrls + + cfg.InitialCluster = strings.Join(ic, ",") + cfg.ClusterState = embed.ClusterStateFlagNew + return cfg, basePort +} + +func (t *testEtcdUtilSuite) urlsToStrings(URLs []url.URL) []string { + ret := make([]string, 0, len(URLs)) + for _, u := range URLs { + ret = append(ret, u.String()) + } + return ret +} + +func (t *testEtcdUtilSuite) startEtcd(c *C, cfg *embed.Config) *embed.Etcd { + e, err := embed.StartEtcd(cfg) + c.Assert(err, IsNil) + + timeout := time.Second + select { + case <-e.Server.ReadyNotify(): + case <-time.After(timeout): + c.Fatalf("start embed etcd timeout %v", timeout) + } + + return e +} + +func (t *testEtcdUtilSuite) createEtcdClient(c *C, cfg *embed.Config) *clientv3.Client { + cli, err := clientv3.New(clientv3.Config{Endpoints: t.urlsToStrings(cfg.LCUrls)}) + c.Assert(err, IsNil) + return cli +} + +func (t *testEtcdUtilSuite) checkMember(c *C, mid uint64, m *etcdserverpb.Member, cfg *embed.Config) { + if m.Name != "" { // no name exists after `member add` + c.Assert(m.Name, Equals, cfg.Name) + } + c.Assert(m.ID, Equals, mid) + c.Assert(m.ClientURLs, DeepEquals, t.urlsToStrings(cfg.ACUrls)) + c.Assert(m.PeerURLs, DeepEquals, t.urlsToStrings(cfg.APUrls)) +} + +func (t *testEtcdUtilSuite) TestMemberUtil(c *C) { + for i := 1; i <= 3; i++ { + t.testMemberUtilInternal(c, i) + } +} + +func (t *testEtcdUtilSuite) testMemberUtilInternal(c *C, portCount int) { + var basePort uint16 = 8361 + + // start a etcd + cfg1, basePort := t.newConfig(c, "etcd1", basePort, portCount) + etcd1 := t.startEtcd(c, cfg1) + defer etcd1.Close() + + // list member + cli := t.createEtcdClient(c, cfg1) + listResp1, err := ListMembers(cli) + c.Assert(err, IsNil) + c.Assert(listResp1.Members, HasLen, 1) + t.checkMember(c, uint64(etcd1.Server.ID()), listResp1.Members[0], cfg1) + + // add member + cfg2, basePort := t.newConfig(c, "etcd2", basePort, portCount) + cfg2.InitialCluster = cfg1.InitialCluster + "," + cfg2.InitialCluster + cfg2.ClusterState = embed.ClusterStateFlagExisting + addResp, err := AddMember(cli, t.urlsToStrings(cfg2.APUrls)) + c.Assert(err, IsNil) + c.Assert(addResp.Members, HasLen, 2) + + // start the added member + etcd2 := t.startEtcd(c, cfg2) + defer etcd2.Close() + c.Assert(addResp.Member.ID, Equals, uint64(etcd2.Server.ID())) + + // list member again + listResp2, err := ListMembers(cli) + c.Assert(err, IsNil) + c.Assert(listResp2.Members, HasLen, 2) + for _, m := range listResp2.Members { + switch m.ID { + case uint64(etcd1.Server.ID()): + t.checkMember(c, uint64(etcd1.Server.ID()), m, cfg1) + case uint64(etcd2.Server.ID()): + t.checkMember(c, uint64(etcd2.Server.ID()), m, cfg2) + default: + c.Fatalf("unknown member %v", m) + } + } +} diff --git a/pkg/terror/error_list.go b/pkg/terror/error_list.go index c6933a59b3..231e1b4aa1 100644 --- a/pkg/terror/error_list.go +++ b/pkg/terror/error_list.go @@ -387,6 +387,7 @@ const ( codeMasterGenEmbedEtcdConfigFail codeMasterStartEmbedEtcdFail codeMasterParseURLFail + codeMasterJoinEmbedEtcdFail ) // DM-worker error code @@ -827,9 +828,10 @@ var ( ErrMasterHandleHTTPApis = New(codeMasterHandleHTTPApis, ClassDMMaster, ScopeInternal, LevelHigh, "serve http apis to grpc") ErrMasterHostPortNotValid = New(codeMasterHostPortNotValid, ClassDMMaster, ScopeInternal, LevelHigh, "host:port '%s' not valid") ErrMasterGetHostnameFail = New(codeMasterGetHostnameFail, ClassDMMaster, ScopeInternal, LevelHigh, "get hostname fail") - ErrMasterGenEmbedEtcdConfigFail = New(codeMasterGenEmbedEtcdConfigFail, ClassDMMaster, ScopeInternal, LevelHigh, "generate config item %s for embed etcd fail") - ErrMasterStartEmbedEtcdFail = New(codeMasterStartEmbedEtcdFail, ClassDMMaster, ScopeInternal, LevelHigh, "start embed etcd fail") - ErrMasterParseURLFail = New(codeMasterParseURLFail, ClassDMMaster, ScopeInternal, LevelHigh, "parse URL %s fail") + ErrMasterGenEmbedEtcdConfigFail = New(codeMasterGenEmbedEtcdConfigFail, ClassDMMaster, ScopeInternal, LevelHigh, "fail to generate config item %s for embed etcd") + ErrMasterStartEmbedEtcdFail = New(codeMasterStartEmbedEtcdFail, ClassDMMaster, ScopeInternal, LevelHigh, "fail to start embed etcd") + ErrMasterParseURLFail = New(codeMasterParseURLFail, ClassDMMaster, ScopeInternal, LevelHigh, "fail to parse URL %s") + ErrMasterJoinEmbedEtcdFail = New(codeMasterJoinEmbedEtcdFail, ClassDMMaster, ScopeInternal, LevelHigh, "fail to join embed etcd: %s") // DM-worker error ErrWorkerParseFlagSet = New(codeWorkerParseFlagSet, ClassDMWorker, ScopeInternal, LevelMedium, "parse dm-worker config flag set")