Skip to content

Commit

Permalink
dm-master(dm): use advertise address for ETCD client (#4608)
Browse files Browse the repository at this point in the history
close #4511
  • Loading branch information
dsdashun authored Feb 21, 2022
1 parent f203f67 commit 8e9d88b
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 31 deletions.
2 changes: 2 additions & 0 deletions dm/dm/master/election_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func (t *testElectionSuite) TestFailToStartLeader(c *check.C) {
cfg1.Name = "dm-master-1"
cfg1.DataDir = c.MkDir()
cfg1.MasterAddr = tempurl.Alloc()[len("http://"):]
cfg1.AdvertiseAddr = cfg1.MasterAddr
cfg1.PeerUrls = tempurl.Alloc()
cfg1.AdvertisePeerUrls = cfg1.PeerUrls
cfg1.InitialCluster = fmt.Sprintf("%s=%s", cfg1.Name, cfg1.AdvertisePeerUrls)
Expand All @@ -68,6 +69,7 @@ func (t *testElectionSuite) TestFailToStartLeader(c *check.C) {
cfg2.Name = "dm-master-2"
cfg2.DataDir = c.MkDir()
cfg2.MasterAddr = tempurl.Alloc()[len("http://"):]
cfg2.AdvertiseAddr = cfg2.MasterAddr
cfg2.PeerUrls = tempurl.Alloc()
cfg2.AdvertisePeerUrls = cfg2.PeerUrls
cfg2.Join = cfg1.MasterAddr // join to an existing cluster
Expand Down
2 changes: 1 addition & 1 deletion dm/dm/master/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func prepareJoinEtcd(cfg *Config) error {
}

// try to join self, invalid
if cfg.Join == cfg.MasterAddr {
if cfg.Join == cfg.AdvertiseAddr {
return terror.ErrMasterJoinEmbedEtcdFail.Generate(fmt.Sprintf("join self %s is forbidden", cfg.Join))
}

Expand Down
5 changes: 4 additions & 1 deletion dm/dm/master/etcd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (t *testEtcdSuite) TestPrepareJoinEtcd(c *check.C) {
cfgCluster.Name = "dm-master-1"
cfgCluster.DataDir = c.MkDir()
cfgCluster.MasterAddr = tempurl.Alloc()[len("http://"):]
cfgCluster.AdvertiseAddr = tempurl.Alloc()[len("http://"):]
cfgCluster.AdvertiseAddr = cfgCluster.MasterAddr
cfgCluster.PeerUrls = tempurl.Alloc()
c.Assert(cfgCluster.adjust(), check.IsNil)
cfgClusterEtcd := genEmbedEtcdConfigWithLogger("info")
Expand All @@ -79,6 +79,7 @@ func (t *testEtcdSuite) TestPrepareJoinEtcd(c *check.C) {
cfgBefore := t.cloneConfig(cfgCluster) // before `prepareJoinEtcd` applied
cfgBefore.DataDir = c.MkDir() // overwrite some config items
cfgBefore.MasterAddr = tempurl.Alloc()[len("http://"):]
cfgBefore.AdvertiseAddr = cfgBefore.MasterAddr
cfgBefore.PeerUrls = tempurl.Alloc()
cfgBefore.AdvertisePeerUrls = cfgBefore.PeerUrls
c.Assert(cfgBefore.adjust(), check.IsNil)
Expand Down Expand Up @@ -172,6 +173,7 @@ func (t *testEtcdSuite) TestPrepareJoinEtcd(c *check.C) {
cfgAfter2.Name = "dm-master-3" // overwrite some items
cfgAfter2.DataDir = c.MkDir()
cfgAfter2.MasterAddr = tempurl.Alloc()[len("http://"):]
cfgAfter2.AdvertiseAddr = cfgAfter2.MasterAddr
cfgAfter2.PeerUrls = tempurl.Alloc()
cfgAfter2.AdvertisePeerUrls = cfgAfter2.PeerUrls
err = prepareJoinEtcd(cfgAfter2)
Expand Down Expand Up @@ -229,6 +231,7 @@ func (t *testEtcdSuite) TestEtcdAutoCompaction(c *check.C) {

cfg.DataDir = c.MkDir()
cfg.MasterAddr = tempurl.Alloc()[len("http://"):]
cfg.AdvertiseAddr = cfg.MasterAddr
cfg.AutoCompactionRetention = "1s"

ctx, cancel := context.WithCancel(context.Background())
Expand Down
3 changes: 3 additions & 0 deletions dm/dm/master/openapi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ func (t *openAPISuite) TestRedirectRequestToLeader(c *check.C) {
cfg1.Name = "dm-master-1"
cfg1.DataDir = c.MkDir()
cfg1.MasterAddr = tempurl.Alloc()[len("http://"):]
cfg1.AdvertiseAddr = cfg1.MasterAddr
cfg1.PeerUrls = tempurl.Alloc()
cfg1.AdvertisePeerUrls = cfg1.PeerUrls
cfg1.InitialCluster = fmt.Sprintf("%s=%s", cfg1.Name, cfg1.AdvertisePeerUrls)
Expand All @@ -106,6 +107,7 @@ func (t *openAPISuite) TestRedirectRequestToLeader(c *check.C) {
cfg2.Name = "dm-master-2"
cfg2.DataDir = c.MkDir()
cfg2.MasterAddr = tempurl.Alloc()[len("http://"):]
cfg2.AdvertiseAddr = cfg2.MasterAddr
cfg2.PeerUrls = tempurl.Alloc()
cfg2.AdvertisePeerUrls = cfg2.PeerUrls
cfg2.Join = cfg1.MasterAddr // join to an existing cluster
Expand Down Expand Up @@ -144,6 +146,7 @@ func (t *openAPISuite) TestOpenAPIWillNotStartInDefaultConfig(c *check.C) {
cfg1.Name = "dm-master-1"
cfg1.DataDir = c.MkDir()
cfg1.MasterAddr = tempurl.Alloc()[len("http://"):]
cfg1.AdvertiseAddr = cfg1.MasterAddr
cfg1.PeerUrls = tempurl.Alloc()
cfg1.AdvertisePeerUrls = cfg1.PeerUrls
cfg1.InitialCluster = fmt.Sprintf("%s=%s", cfg1.Name, cfg1.AdvertisePeerUrls)
Expand Down
2 changes: 1 addition & 1 deletion dm/dm/master/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func (s *Server) Start(ctx context.Context) (err error) {

// create an etcd client used in the whole server instance.
// NOTE: we only use the local member's address now, but we can use all endpoints of the cluster if needed.
s.etcdClient, err = etcdutil.CreateClient([]string{withHost(s.cfg.MasterAddr)}, tls.TLSConfig())
s.etcdClient, err = etcdutil.CreateClient([]string{withHost(s.cfg.AdvertiseAddr)}, tls.TLSConfig())
if err != nil {
return
}
Expand Down
93 changes: 65 additions & 28 deletions dm/dm/master/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1366,40 +1366,51 @@ func (t *testMaster) TestOperateWorkerRelayTask(c *check.C) {
}

func (t *testMaster) TestServer(c *check.C) {
var err error
cfg := NewConfig()
c.Assert(cfg.Parse([]string{"-config=./dm-master.toml"}), check.IsNil)
cfg.PeerUrls = "http://127.0.0.1:8294"
cfg.DataDir = c.MkDir()
cfg.MasterAddr = tempurl.Alloc()[len("http://"):]
cfg.AdvertiseAddr = cfg.MasterAddr

basicServiceCheck := func(c *check.C, cfg *Config) {
t.testHTTPInterface(c, fmt.Sprintf("http://%s/status", cfg.AdvertiseAddr), []byte(utils.GetRawInfo()))
t.testHTTPInterface(c, fmt.Sprintf("http://%s/debug/pprof/", cfg.AdvertiseAddr), []byte("Types of profiles available"))
// HTTP API in this unit test is unstable, but we test it in `http_apis` in integration test.
// t.testHTTPInterface(c, fmt.Sprintf("http://%s/apis/v1alpha1/status/test-task", cfg.AdvertiseAddr), []byte("task test-task has no source or not exist"))
}
t.testNormalServerLifecycle(c, cfg, func(c *check.C, cfg *Config) {
basicServiceCheck(c, cfg)

// try to start another server with the same address. Expect it to fail
dupServer := NewServer(cfg)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
err1 := dupServer.Start(ctx)
c.Assert(terror.ErrMasterStartEmbedEtcdFail.Equal(err1), check.IsTrue)
c.Assert(err1.Error(), check.Matches, ".*bind: address already in use.*")
})

s := NewServer(cfg)

ctx, cancel := context.WithCancel(context.Background())
err1 := s.Start(ctx)
c.Assert(err1, check.IsNil)

t.testHTTPInterface(c, fmt.Sprintf("http://%s/status", cfg.MasterAddr), []byte(utils.GetRawInfo()))
t.testHTTPInterface(c, fmt.Sprintf("http://%s/debug/pprof/", cfg.MasterAddr), []byte("Types of profiles available"))
// HTTP API in this unit test is unstable, but we test it in `http_apis` in integration test.
// t.testHTTPInterface(c, fmt.Sprintf("http://%s/apis/v1alpha1/status/test-task", cfg.MasterAddr), []byte("task test-task has no source or not exist"))

dupServer := NewServer(cfg)
err := dupServer.Start(ctx)
c.Assert(terror.ErrMasterStartEmbedEtcdFail.Equal(err), check.IsTrue)
c.Assert(err.Error(), check.Matches, ".*bind: address already in use.*")

// close
cancel()
s.Close()

c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool {
return s.closed.Load()
}), check.IsTrue)
// test the listen address is 0.0.0.0
masterAddrStr := tempurl.Alloc()[len("http://"):]
_, masterPort, err := net.SplitHostPort(masterAddrStr)
c.Assert(err, check.IsNil)
cfg2 := NewConfig()
*cfg2 = *cfg
cfg2.MasterAddr = fmt.Sprintf("0.0.0.0:%s", masterPort)
cfg2.AdvertiseAddr = masterAddrStr
t.testNormalServerLifecycle(c, cfg2, basicServiceCheck)
}

func (t *testMaster) TestMasterTLS(c *check.C) {
var err error
masterAddr := tempurl.Alloc()[len("http://"):]
peerAddr := tempurl.Alloc()[len("http://"):]
_, masterPort, err := net.SplitHostPort(masterAddr)
c.Assert(err, check.IsNil)
_, peerPort, err := net.SplitHostPort(peerAddr)
c.Assert(err, check.IsNil)

// all with `https://` prefix
cfg := NewConfig()
Expand Down Expand Up @@ -1547,17 +1558,40 @@ func (t *testMaster) TestMasterTLS(c *check.C) {
c.Assert(cfg.AdvertisePeerUrls, check.Equals, "https://"+peerAddr)
c.Assert(cfg.InitialCluster, check.Equals, "master-tls=https://"+peerAddr)
t.testTLSPrefix(c, cfg)

// listen address set to 0.0.0.0
cfg = NewConfig()
c.Assert(cfg.Parse([]string{
"--name=master-tls",
fmt.Sprintf("--data-dir=%s", c.MkDir()),
fmt.Sprintf("--master-addr=0.0.0.0:%s", masterPort),
fmt.Sprintf("--advertise-addr=https://%s", masterAddr),
fmt.Sprintf("--peer-urls=0.0.0.0:%s", peerPort),
fmt.Sprintf("--advertise-peer-urls=https://%s", peerAddr),
fmt.Sprintf("--initial-cluster=master-tls=https://%s", peerAddr),
"--ssl-ca=./tls_for_test/ca.pem",
"--ssl-cert=./tls_for_test/dm.pem",
"--ssl-key=./tls_for_test/dm.key",
}), check.IsNil)
t.testTLSPrefix(c, cfg)
}

func (t *testMaster) testTLSPrefix(c *check.C, cfg *Config) {
t.testNormalServerLifecycle(c, cfg, func(c *check.C, cfg *Config) {
t.testHTTPInterface(c, fmt.Sprintf("https://%s/status", cfg.AdvertiseAddr), []byte(utils.GetRawInfo()))
t.testHTTPInterface(c, fmt.Sprintf("https://%s/debug/pprof/", cfg.AdvertiseAddr), []byte("Types of profiles available"))
})
}

func (t *testMaster) testNormalServerLifecycle(c *check.C, cfg *Config, checkLogic func(*check.C, *Config)) {
var err error
s := NewServer(cfg)

ctx, cancel := context.WithCancel(context.Background())
err1 := s.Start(ctx)
c.Assert(err1, check.IsNil)
err = s.Start(ctx)
c.Assert(err, check.IsNil)

t.testHTTPInterface(c, fmt.Sprintf("https://%s/status", cfg.AdvertiseAddr), []byte(utils.GetRawInfo()))
t.testHTTPInterface(c, fmt.Sprintf("https://%s/debug/pprof/", cfg.AdvertiseAddr), []byte("Types of profiles available"))
checkLogic(c, cfg)

// close
cancel()
Expand Down Expand Up @@ -1594,6 +1628,7 @@ func (t *testMaster) TestJoinMember(c *check.C) {
cfg1.Name = "dm-master-1"
cfg1.DataDir = c.MkDir()
cfg1.MasterAddr = tempurl.Alloc()[len("http://"):]
cfg1.AdvertiseAddr = cfg1.MasterAddr
cfg1.PeerUrls = tempurl.Alloc()
cfg1.AdvertisePeerUrls = cfg1.PeerUrls
cfg1.InitialCluster = fmt.Sprintf("%s=%s", cfg1.Name, cfg1.AdvertisePeerUrls)
Expand All @@ -1613,6 +1648,7 @@ func (t *testMaster) TestJoinMember(c *check.C) {
cfg2.Name = "dm-master-2"
cfg2.DataDir = c.MkDir()
cfg2.MasterAddr = tempurl.Alloc()[len("http://"):]
cfg2.AdvertiseAddr = cfg2.MasterAddr
cfg2.PeerUrls = tempurl.Alloc()
cfg2.AdvertisePeerUrls = cfg2.PeerUrls
cfg2.Join = cfg1.MasterAddr // join to an existing cluster
Expand Down Expand Up @@ -1647,6 +1683,7 @@ func (t *testMaster) TestJoinMember(c *check.C) {
cfg3.Name = "dm-master-3"
cfg3.DataDir = c.MkDir()
cfg3.MasterAddr = tempurl.Alloc()[len("http://"):]
cfg3.AdvertiseAddr = cfg3.MasterAddr
cfg3.PeerUrls = tempurl.Alloc()
cfg3.AdvertisePeerUrls = cfg3.PeerUrls
cfg3.Join = cfg1.MasterAddr // join to an existing cluster
Expand Down Expand Up @@ -1689,7 +1726,7 @@ func (t *testMaster) TestOperateSource(c *check.C) {
cfg1.Name = "dm-master-1"
cfg1.DataDir = c.MkDir()
cfg1.MasterAddr = tempurl.Alloc()[len("http://"):]
cfg1.AdvertiseAddr = tempurl.Alloc()[len("http://"):]
cfg1.AdvertiseAddr = cfg1.MasterAddr
cfg1.PeerUrls = tempurl.Alloc()
cfg1.AdvertisePeerUrls = cfg1.PeerUrls
cfg1.InitialCluster = fmt.Sprintf("%s=%s", cfg1.Name, cfg1.AdvertisePeerUrls)
Expand Down

0 comments on commit 8e9d88b

Please sign in to comment.