diff --git a/dm/worker/config.go b/dm/worker/config.go index 0c7d9ef049..4d300dceb7 100644 --- a/dm/worker/config.go +++ b/dm/worker/config.go @@ -19,6 +19,7 @@ import ( "encoding/json" "flag" "fmt" + "net" "strings" "github.com/BurntSushi/toml" @@ -46,11 +47,13 @@ func NewConfig() *Config { fs.BoolVar(&cfg.printVersion, "V", false, "prints version and exit") fs.BoolVar(&cfg.printSampleConfig, "print-sample-config", false, "print sample config file of dm-worker") fs.StringVar(&cfg.ConfigFile, "config", "", "path to config file") - fs.StringVar(&cfg.WorkerAddr, "worker-addr", "", "worker API server and status addr") + fs.StringVar(&cfg.WorkerAddr, "worker-addr", "", "listen address for client traffic") + fs.StringVar(&cfg.AdvertiseAddr, "advertise-addr", "", `advertise address for client traffic (default "${worker-addr}")`) fs.StringVar(&cfg.LogLevel, "L", "info", "log level: debug, info, warn, error, fatal") fs.StringVar(&cfg.LogFile, "log-file", "", "log file path") //fs.StringVar(&cfg.LogRotate, "log-rotate", "day", "log file rotate type, hour/day") - fs.StringVar(&cfg.Join, "join", "", "join to an existing cluster (usage: cluster's '${advertise-client-urls}'") + // NOTE: add `advertise-addr` for dm-master if needed. + fs.StringVar(&cfg.Join, "join", "", `join to an existing cluster (usage: dm-master cluster's "${master-addr}")`) fs.StringVar(&cfg.Name, "name", "", "human-readable name for DM-worker member") return cfg } @@ -64,8 +67,9 @@ type Config struct { LogFile string `toml:"log-file" json:"log-file"` LogRotate string `toml:"log-rotate" json:"log-rotate"` - Join string `toml:"join" json:"join" ` - WorkerAddr string `toml:"worker-addr" json:"worker-addr"` + Join string `toml:"join" json:"join" ` + WorkerAddr string `toml:"worker-addr" json:"worker-addr"` + AdvertiseAddr string `toml:"advertise-addr" json:"advertise-addr"` ConfigFile string `json:"config-file"` @@ -145,6 +149,28 @@ func (c *Config) Parse(arguments []string) error { return terror.ErrWorkerInvalidFlag.Generate(c.flagSet.Arg(0)) } + return c.adjust() +} + +// adjust adjusts the config. +func (c *Config) adjust() error { + host, _, err := net.SplitHostPort(c.WorkerAddr) + if err != nil { + return terror.ErrWorkerHostPortNotValid.Delegate(err, c.WorkerAddr) + } + + if c.AdvertiseAddr == "" { + if host == "" || host == "0.0.0.0" { + return terror.ErrWorkerHostPortNotValid.Generatef("worker-addr (%s) must include the 'host' part (should not be '0.0.0.0') when advertise-addr is not set", c.WorkerAddr) + } + c.AdvertiseAddr = c.WorkerAddr + } else { + host, _, err = net.SplitHostPort(c.AdvertiseAddr) + if err != nil || host == "" || host == "0.0.0.0" { + return terror.ErrWorkerHostPortNotValid.AnnotateDelegate(err, "advertise-addr (%s) must include the 'host' part and should not be '0.0.0.0'", c.AdvertiseAddr) + } + } + return nil } diff --git a/dm/worker/dm-worker.toml b/dm/worker/dm-worker.toml index 638ef7c59c..657efa4c98 100644 --- a/dm/worker/dm-worker.toml +++ b/dm/worker/dm-worker.toml @@ -6,6 +6,7 @@ log-file = "dm-worker.log" #dm-worker listen address worker-addr = ":8262" +advertise-addr = "127.0.0.1:8262" join = "127.0.0.1:8291" diff --git a/dm/worker/join.go b/dm/worker/join.go index b8b0e9327c..633a310391 100644 --- a/dm/worker/join.go +++ b/dm/worker/join.go @@ -52,7 +52,7 @@ func (s *Server) JoinMaster(endpoints []string) error { defer cancel() req := &pb.RegisterWorkerRequest{ Name: s.cfg.Name, - Address: s.cfg.WorkerAddr, + Address: s.cfg.AdvertiseAddr, } resp, err := client.RegisterWorker(ctx, req) if err != nil { @@ -78,7 +78,7 @@ func (s *Server) KeepAlive() (bool, error) { if err != nil { return false, err } - k := common.WorkerKeepAliveKeyAdapter.Encode(s.cfg.WorkerAddr, s.cfg.Name) + k := common.WorkerKeepAliveKeyAdapter.Encode(s.cfg.AdvertiseAddr, s.cfg.Name) _, err = s.etcdClient.Put(cliCtx, k, time.Now().String(), clientv3.WithLease(lease.ID)) if err != nil { return false, err diff --git a/dm/worker/server.go b/dm/worker/server.go index 0c789cc51a..029748a5ef 100644 --- a/dm/worker/server.go +++ b/dm/worker/server.go @@ -75,12 +75,6 @@ func NewServer(cfg *Config) *Server { // Start starts to serving func (s *Server) Start() error { var err error - - _, _, err = s.splitHostPort() - if err != nil { - return err - } - s.rootLis, err = net.Listen("tcp", s.cfg.WorkerAddr) if err != nil { return terror.ErrWorkerStartService.Delegate(err) @@ -700,10 +694,10 @@ func (s *Server) OperateMysqlWorker(ctx context.Context, req *pb.MysqlWorkerRequ } if resp.Result { op1 := clientv3.OpPut(common.UpstreamConfigKeyAdapter.Encode(cfg.SourceID), req.Config) - op2 := clientv3.OpPut(common.UpstreamBoundWorkerKeyAdapter.Encode(s.cfg.WorkerAddr), cfg.SourceID) + op2 := clientv3.OpPut(common.UpstreamBoundWorkerKeyAdapter.Encode(s.cfg.AdvertiseAddr), cfg.SourceID) if req.Op == pb.WorkerOp_StopWorker { op1 = clientv3.OpDelete(common.UpstreamConfigKeyAdapter.Encode(cfg.SourceID)) - op2 = clientv3.OpDelete(common.UpstreamBoundWorkerKeyAdapter.Encode(s.cfg.WorkerAddr)) + op2 = clientv3.OpDelete(common.UpstreamBoundWorkerKeyAdapter.Encode(s.cfg.AdvertiseAddr)) } resp.Msg = s.retryWriteEctd(op1, op2) // Because etcd was deployed with master in a single process, if we can not write data into etcd, most probably @@ -722,12 +716,3 @@ func makeCommonWorkerResponse(reqErr error) *pb.CommonWorkerResponse { } return resp } - -func (s *Server) splitHostPort() (host, port string, err error) { - // WorkerAddr's format may be "host:port" or ":port" - host, port, err = net.SplitHostPort(s.cfg.WorkerAddr) - if err != nil { - err = terror.ErrWorkerHostPortNotValid.Delegate(err, s.cfg.WorkerAddr) - } - return -} diff --git a/dm/worker/server_test.go b/dm/worker/server_test.go index a0c844fa07..5d347ea341 100644 --- a/dm/worker/server_test.go +++ b/dm/worker/server_test.go @@ -78,15 +78,8 @@ func (t *testServer) TestServer(c *C) { }() s := NewServer(cfg) - - workerAddr := cfg.WorkerAddr - s.cfg.WorkerAddr = "" - err = s.Start() - c.Assert(terror.ErrWorkerHostPortNotValid.Equal(err), IsTrue) - s.Close() - s.cfg.WorkerAddr = workerAddr - go func() { + defer s.Close() err1 := s.Start() c.Assert(err1, IsNil) }() @@ -98,7 +91,7 @@ func (t *testServer) TestServer(c *C) { t.testOperateWorker(c, s, dir, true) // check infos have be written into ETCD success. - t.testInfosInEtcd(c, hostName, workerAddr, dir) + t.testInfosInEtcd(c, hostName, cfg.AdvertiseAddr, dir) // check worker would retry connecting master rather than stop worker directly. ETCD = t.testRetryConnectMaster(c, s, ETCD, etcdDir, hostName) diff --git a/tests/_utils/run_dm_worker b/tests/_utils/run_dm_worker index 387f863e63..9ddc32d2b7 100755 --- a/tests/_utils/run_dm_worker +++ b/tests/_utils/run_dm_worker @@ -25,7 +25,8 @@ PWD=$(pwd) echo "[$(date)] <<<<<< START DM-WORKER on port $port, config: $conf >>>>>>" cd $workdir $binary -test.coverprofile="$TEST_DIR/cov.$TEST_NAME.worker.$port.$(date +"%s").out" DEVEL \ - --worker-addr=:$port \ + --worker-addr=0.0.0.0:$port \ + --advertise-addr=127.0.0.1:$port \ --log-file="$workdir/log/dm-worker.log" -L=info --config="$conf" \ >> $workdir/log/stdout.log 2>&1 & cd $PWD