Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

HA: add advertise-addr for DM-worker #450

Merged
merged 25 commits into from
Jan 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
ad3e066
fix all_mode integration_test
lichunzhu Jan 2, 2020
b3b3761
fix dmctl_advance & dmctl_basic integration_tests
lichunzhu Jan 3, 2020
eb6ec4c
initital commit for dmctl_command
lichunzhu Jan 3, 2020
7230952
fix full_mode test
lichunzhu Jan 3, 2020
1a865e4
refine http_apis integration_test
lichunzhu Jan 3, 2020
9a47eb0
fix import goroutine leak test
lichunzhu Jan 3, 2020
80e0269
fix incremental_mode integration_test
lichunzhu Jan 3, 2020
2400863
fix ha-dev bug
lichunzhu Jan 3, 2020
f337799
refine initial_unit integration_test
lichunzhu Jan 3, 2020
634d67b
fix load_interrupt integration_tests
lichunzhu Jan 3, 2020
1a807a4
fix online ddl integration_tests
lichunzhu Jan 3, 2020
18b63d1
fix worker start for all integration_tests
lichunzhu Jan 3, 2020
6bd6452
fix tests
lichunzhu Jan 5, 2020
efd038b
add operator_worker test
lichunzhu Jan 6, 2020
b9c7819
fix bug
lichunzhu Jan 6, 2020
060bbd4
wait more time in retry_cancel test
lichunzhu Jan 6, 2020
ce7819f
wait more time in incremental_mode test
lichunzhu Jan 6, 2020
03fcf74
wait more time in load_interrupt test
lichunzhu Jan 6, 2020
565c9e9
address comments
lichunzhu Jan 6, 2020
a59eb51
wait for time in relay_interrupt
lichunzhu Jan 6, 2020
2211cdc
*: add `advertise-addr` for DM-worker
csuzhangxc Jan 11, 2020
21275fb
Merge remote-tracking branch 'remotes/origin/ha-dev' into worker-adve…
csuzhangxc Jan 11, 2020
2024c7c
*: address comments
csuzhangxc Jan 13, 2020
2b689a7
Merge remote-tracking branch 'remotes/origin/ha-dev' into worker-adve…
csuzhangxc Jan 13, 2020
d6f8084
worker: fix for merging
csuzhangxc Jan 13, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 30 additions & 4 deletions dm/worker/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"encoding/json"
"flag"
"fmt"
"net"
"strings"

"github.com/BurntSushi/toml"
Expand Down Expand Up @@ -46,11 +47,13 @@ func NewConfig() *Config {
fs.BoolVar(&cfg.printVersion, "V", false, "prints version and exit")
fs.BoolVar(&cfg.printSampleConfig, "print-sample-config", false, "print sample config file of dm-worker")
fs.StringVar(&cfg.ConfigFile, "config", "", "path to config file")
fs.StringVar(&cfg.WorkerAddr, "worker-addr", "", "worker API server and status addr")
fs.StringVar(&cfg.WorkerAddr, "worker-addr", "", "listen address for client traffic")
fs.StringVar(&cfg.AdvertiseAddr, "advertise-addr", "", `advertise address for client traffic (default "${worker-addr}")`)
fs.StringVar(&cfg.LogLevel, "L", "info", "log level: debug, info, warn, error, fatal")
fs.StringVar(&cfg.LogFile, "log-file", "", "log file path")
//fs.StringVar(&cfg.LogRotate, "log-rotate", "day", "log file rotate type, hour/day")
fs.StringVar(&cfg.Join, "join", "", "join to an existing cluster (usage: cluster's '${advertise-client-urls}'")
// NOTE: add `advertise-addr` for dm-master if needed.
fs.StringVar(&cfg.Join, "join", "", `join to an existing cluster (usage: dm-master cluster's "${master-addr}")`)
fs.StringVar(&cfg.Name, "name", "", "human-readable name for DM-worker member")
return cfg
}
Expand All @@ -64,8 +67,9 @@ type Config struct {
LogFile string `toml:"log-file" json:"log-file"`
LogRotate string `toml:"log-rotate" json:"log-rotate"`

Join string `toml:"join" json:"join" `
WorkerAddr string `toml:"worker-addr" json:"worker-addr"`
Join string `toml:"join" json:"join" `
WorkerAddr string `toml:"worker-addr" json:"worker-addr"`
AdvertiseAddr string `toml:"advertise-addr" json:"advertise-addr"`

ConfigFile string `json:"config-file"`

Expand Down Expand Up @@ -145,6 +149,28 @@ func (c *Config) Parse(arguments []string) error {
return terror.ErrWorkerInvalidFlag.Generate(c.flagSet.Arg(0))
}

return c.adjust()
}

// adjust adjusts the config.
func (c *Config) adjust() error {
host, _, err := net.SplitHostPort(c.WorkerAddr)
if err != nil {
return terror.ErrWorkerHostPortNotValid.Delegate(err, c.WorkerAddr)
}

if c.AdvertiseAddr == "" {
if host == "" || host == "0.0.0.0" {
return terror.ErrWorkerHostPortNotValid.Generatef("worker-addr (%s) must include the 'host' part (should not be '0.0.0.0') when advertise-addr is not set", c.WorkerAddr)
}
c.AdvertiseAddr = c.WorkerAddr
} else {
host, _, err = net.SplitHostPort(c.AdvertiseAddr)
if err != nil || host == "" || host == "0.0.0.0" {
return terror.ErrWorkerHostPortNotValid.AnnotateDelegate(err, "advertise-addr (%s) must include the 'host' part and should not be '0.0.0.0'", c.AdvertiseAddr)
}
}

return nil
}

Expand Down
1 change: 1 addition & 0 deletions dm/worker/dm-worker.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ log-file = "dm-worker.log"

#dm-worker listen address
worker-addr = ":8262"
advertise-addr = "127.0.0.1:8262"
join = "127.0.0.1:8291"


4 changes: 2 additions & 2 deletions dm/worker/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (s *Server) JoinMaster(endpoints []string) error {
defer cancel()
req := &pb.RegisterWorkerRequest{
Name: s.cfg.Name,
Address: s.cfg.WorkerAddr,
Address: s.cfg.AdvertiseAddr,
}
resp, err := client.RegisterWorker(ctx, req)
if err != nil {
Expand All @@ -78,7 +78,7 @@ func (s *Server) KeepAlive() (bool, error) {
if err != nil {
return false, err
}
k := common.WorkerKeepAliveKeyAdapter.Encode(s.cfg.WorkerAddr, s.cfg.Name)
k := common.WorkerKeepAliveKeyAdapter.Encode(s.cfg.AdvertiseAddr, s.cfg.Name)
_, err = s.etcdClient.Put(cliCtx, k, time.Now().String(), clientv3.WithLease(lease.ID))
if err != nil {
return false, err
Expand Down
19 changes: 2 additions & 17 deletions dm/worker/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,6 @@ func NewServer(cfg *Config) *Server {
// Start starts to serving
func (s *Server) Start() error {
var err error

_, _, err = s.splitHostPort()
if err != nil {
return err
}

s.rootLis, err = net.Listen("tcp", s.cfg.WorkerAddr)
if err != nil {
return terror.ErrWorkerStartService.Delegate(err)
Expand Down Expand Up @@ -700,10 +694,10 @@ func (s *Server) OperateMysqlWorker(ctx context.Context, req *pb.MysqlWorkerRequ
}
if resp.Result {
op1 := clientv3.OpPut(common.UpstreamConfigKeyAdapter.Encode(cfg.SourceID), req.Config)
op2 := clientv3.OpPut(common.UpstreamBoundWorkerKeyAdapter.Encode(s.cfg.WorkerAddr), cfg.SourceID)
op2 := clientv3.OpPut(common.UpstreamBoundWorkerKeyAdapter.Encode(s.cfg.AdvertiseAddr), cfg.SourceID)
if req.Op == pb.WorkerOp_StopWorker {
op1 = clientv3.OpDelete(common.UpstreamConfigKeyAdapter.Encode(cfg.SourceID))
op2 = clientv3.OpDelete(common.UpstreamBoundWorkerKeyAdapter.Encode(s.cfg.WorkerAddr))
op2 = clientv3.OpDelete(common.UpstreamBoundWorkerKeyAdapter.Encode(s.cfg.AdvertiseAddr))
}
resp.Msg = s.retryWriteEctd(op1, op2)
// Because etcd was deployed with master in a single process, if we can not write data into etcd, most probably
Expand All @@ -722,12 +716,3 @@ func makeCommonWorkerResponse(reqErr error) *pb.CommonWorkerResponse {
}
return resp
}

func (s *Server) splitHostPort() (host, port string, err error) {
// WorkerAddr's format may be "host:port" or ":port"
host, port, err = net.SplitHostPort(s.cfg.WorkerAddr)
if err != nil {
err = terror.ErrWorkerHostPortNotValid.Delegate(err, s.cfg.WorkerAddr)
}
return
}
11 changes: 2 additions & 9 deletions dm/worker/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,15 +78,8 @@ func (t *testServer) TestServer(c *C) {
}()

s := NewServer(cfg)

workerAddr := cfg.WorkerAddr
s.cfg.WorkerAddr = ""
err = s.Start()
c.Assert(terror.ErrWorkerHostPortNotValid.Equal(err), IsTrue)
s.Close()
s.cfg.WorkerAddr = workerAddr

go func() {
defer s.Close()
err1 := s.Start()
c.Assert(err1, IsNil)
}()
Expand All @@ -98,7 +91,7 @@ func (t *testServer) TestServer(c *C) {
t.testOperateWorker(c, s, dir, true)

// check infos have be written into ETCD success.
t.testInfosInEtcd(c, hostName, workerAddr, dir)
t.testInfosInEtcd(c, hostName, cfg.AdvertiseAddr, dir)

// check worker would retry connecting master rather than stop worker directly.
ETCD = t.testRetryConnectMaster(c, s, ETCD, etcdDir, hostName)
Expand Down
3 changes: 2 additions & 1 deletion tests/_utils/run_dm_worker
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ PWD=$(pwd)
echo "[$(date)] <<<<<< START DM-WORKER on port $port, config: $conf >>>>>>"
cd $workdir
$binary -test.coverprofile="$TEST_DIR/cov.$TEST_NAME.worker.$port.$(date +"%s").out" DEVEL \
--worker-addr=:$port \
--worker-addr=0.0.0.0:$port \
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to check the host? 0.0.0.0:$port will become the advertise-addr, but it can not be used by dm-master

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 addressed in 2024c7c.

  1. add the check for 0.0.0.0 as the advertise-addr
  2. add advertise-addr when running dm-worker in tests.

--advertise-addr=127.0.0.1:$port \
--log-file="$workdir/log/dm-worker.log" -L=info --config="$conf" \
>> $workdir/log/stdout.log 2>&1 &
cd $PWD