Skip to content

Commit

Permalink
HA: add advertise-addr for DM-worker (pingcap#450)
Browse files Browse the repository at this point in the history
  • Loading branch information
csuzhangxc authored Jan 15, 2020
1 parent 4f0b875 commit 255846f
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 33 deletions.
34 changes: 30 additions & 4 deletions dm/worker/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"encoding/json"
"flag"
"fmt"
"net"
"strings"

"github.com/BurntSushi/toml"
Expand Down Expand Up @@ -46,11 +47,13 @@ func NewConfig() *Config {
fs.BoolVar(&cfg.printVersion, "V", false, "prints version and exit")
fs.BoolVar(&cfg.printSampleConfig, "print-sample-config", false, "print sample config file of dm-worker")
fs.StringVar(&cfg.ConfigFile, "config", "", "path to config file")
fs.StringVar(&cfg.WorkerAddr, "worker-addr", "", "worker API server and status addr")
fs.StringVar(&cfg.WorkerAddr, "worker-addr", "", "listen address for client traffic")
fs.StringVar(&cfg.AdvertiseAddr, "advertise-addr", "", `advertise address for client traffic (default "${worker-addr}")`)
fs.StringVar(&cfg.LogLevel, "L", "info", "log level: debug, info, warn, error, fatal")
fs.StringVar(&cfg.LogFile, "log-file", "", "log file path")
//fs.StringVar(&cfg.LogRotate, "log-rotate", "day", "log file rotate type, hour/day")
fs.StringVar(&cfg.Join, "join", "", "join to an existing cluster (usage: cluster's '${advertise-client-urls}'")
// NOTE: add `advertise-addr` for dm-master if needed.
fs.StringVar(&cfg.Join, "join", "", `join to an existing cluster (usage: dm-master cluster's "${master-addr}")`)
fs.StringVar(&cfg.Name, "name", "", "human-readable name for DM-worker member")
return cfg
}
Expand All @@ -64,8 +67,9 @@ type Config struct {
LogFile string `toml:"log-file" json:"log-file"`
LogRotate string `toml:"log-rotate" json:"log-rotate"`

Join string `toml:"join" json:"join" `
WorkerAddr string `toml:"worker-addr" json:"worker-addr"`
Join string `toml:"join" json:"join" `
WorkerAddr string `toml:"worker-addr" json:"worker-addr"`
AdvertiseAddr string `toml:"advertise-addr" json:"advertise-addr"`

ConfigFile string `json:"config-file"`

Expand Down Expand Up @@ -145,6 +149,28 @@ func (c *Config) Parse(arguments []string) error {
return terror.ErrWorkerInvalidFlag.Generate(c.flagSet.Arg(0))
}

return c.adjust()
}

// adjust adjusts the config.
func (c *Config) adjust() error {
host, _, err := net.SplitHostPort(c.WorkerAddr)
if err != nil {
return terror.ErrWorkerHostPortNotValid.Delegate(err, c.WorkerAddr)
}

if c.AdvertiseAddr == "" {
if host == "" || host == "0.0.0.0" {
return terror.ErrWorkerHostPortNotValid.Generatef("worker-addr (%s) must include the 'host' part (should not be '0.0.0.0') when advertise-addr is not set", c.WorkerAddr)
}
c.AdvertiseAddr = c.WorkerAddr
} else {
host, _, err = net.SplitHostPort(c.AdvertiseAddr)
if err != nil || host == "" || host == "0.0.0.0" {
return terror.ErrWorkerHostPortNotValid.AnnotateDelegate(err, "advertise-addr (%s) must include the 'host' part and should not be '0.0.0.0'", c.AdvertiseAddr)
}
}

return nil
}

Expand Down
1 change: 1 addition & 0 deletions dm/worker/dm-worker.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ log-file = "dm-worker.log"

#dm-worker listen address
worker-addr = ":8262"
advertise-addr = "127.0.0.1:8262"
join = "127.0.0.1:8291"


4 changes: 2 additions & 2 deletions dm/worker/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (s *Server) JoinMaster(endpoints []string) error {
defer cancel()
req := &pb.RegisterWorkerRequest{
Name: s.cfg.Name,
Address: s.cfg.WorkerAddr,
Address: s.cfg.AdvertiseAddr,
}
resp, err := client.RegisterWorker(ctx, req)
if err != nil {
Expand All @@ -78,7 +78,7 @@ func (s *Server) KeepAlive() (bool, error) {
if err != nil {
return false, err
}
k := common.WorkerKeepAliveKeyAdapter.Encode(s.cfg.WorkerAddr, s.cfg.Name)
k := common.WorkerKeepAliveKeyAdapter.Encode(s.cfg.AdvertiseAddr, s.cfg.Name)
_, err = s.etcdClient.Put(cliCtx, k, time.Now().String(), clientv3.WithLease(lease.ID))
if err != nil {
return false, err
Expand Down
19 changes: 2 additions & 17 deletions dm/worker/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,6 @@ func NewServer(cfg *Config) *Server {
// Start starts to serving
func (s *Server) Start() error {
var err error

_, _, err = s.splitHostPort()
if err != nil {
return err
}

s.rootLis, err = net.Listen("tcp", s.cfg.WorkerAddr)
if err != nil {
return terror.ErrWorkerStartService.Delegate(err)
Expand Down Expand Up @@ -700,10 +694,10 @@ func (s *Server) OperateMysqlWorker(ctx context.Context, req *pb.MysqlWorkerRequ
}
if resp.Result {
op1 := clientv3.OpPut(common.UpstreamConfigKeyAdapter.Encode(cfg.SourceID), req.Config)
op2 := clientv3.OpPut(common.UpstreamBoundWorkerKeyAdapter.Encode(s.cfg.WorkerAddr), cfg.SourceID)
op2 := clientv3.OpPut(common.UpstreamBoundWorkerKeyAdapter.Encode(s.cfg.AdvertiseAddr), cfg.SourceID)
if req.Op == pb.WorkerOp_StopWorker {
op1 = clientv3.OpDelete(common.UpstreamConfigKeyAdapter.Encode(cfg.SourceID))
op2 = clientv3.OpDelete(common.UpstreamBoundWorkerKeyAdapter.Encode(s.cfg.WorkerAddr))
op2 = clientv3.OpDelete(common.UpstreamBoundWorkerKeyAdapter.Encode(s.cfg.AdvertiseAddr))
}
resp.Msg = s.retryWriteEctd(op1, op2)
// Because etcd was deployed with master in a single process, if we can not write data into etcd, most probably
Expand All @@ -722,12 +716,3 @@ func makeCommonWorkerResponse(reqErr error) *pb.CommonWorkerResponse {
}
return resp
}

func (s *Server) splitHostPort() (host, port string, err error) {
// WorkerAddr's format may be "host:port" or ":port"
host, port, err = net.SplitHostPort(s.cfg.WorkerAddr)
if err != nil {
err = terror.ErrWorkerHostPortNotValid.Delegate(err, s.cfg.WorkerAddr)
}
return
}
11 changes: 2 additions & 9 deletions dm/worker/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,15 +78,8 @@ func (t *testServer) TestServer(c *C) {
}()

s := NewServer(cfg)

workerAddr := cfg.WorkerAddr
s.cfg.WorkerAddr = ""
err = s.Start()
c.Assert(terror.ErrWorkerHostPortNotValid.Equal(err), IsTrue)
s.Close()
s.cfg.WorkerAddr = workerAddr

go func() {
defer s.Close()
err1 := s.Start()
c.Assert(err1, IsNil)
}()
Expand All @@ -98,7 +91,7 @@ func (t *testServer) TestServer(c *C) {
t.testOperateWorker(c, s, dir, true)

// check infos have be written into ETCD success.
t.testInfosInEtcd(c, hostName, workerAddr, dir)
t.testInfosInEtcd(c, hostName, cfg.AdvertiseAddr, dir)

// check worker would retry connecting master rather than stop worker directly.
ETCD = t.testRetryConnectMaster(c, s, ETCD, etcdDir, hostName)
Expand Down
3 changes: 2 additions & 1 deletion tests/_utils/run_dm_worker
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ PWD=$(pwd)
echo "[$(date)] <<<<<< START DM-WORKER on port $port, config: $conf >>>>>>"
cd $workdir
$binary -test.coverprofile="$TEST_DIR/cov.$TEST_NAME.worker.$port.$(date +"%s").out" DEVEL \
--worker-addr=:$port \
--worker-addr=0.0.0.0:$port \
--advertise-addr=127.0.0.1:$port \
--log-file="$workdir/log/dm-worker.log" -L=info --config="$conf" \
>> $workdir/log/stdout.log 2>&1 &
cd $PWD

0 comments on commit 255846f

Please sign in to comment.