Skip to content

Commit

Permalink
ha: follower forward request to leader (pingcap#477)
Browse files Browse the repository at this point in the history
  • Loading branch information
WangXiangUSTC authored Feb 18, 2020
1 parent 2a8a594 commit 1478b5e
Show file tree
Hide file tree
Showing 38 changed files with 514 additions and 86 deletions.
2 changes: 2 additions & 0 deletions _utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,8 @@ ErrMasterParseURLFail,[code=38039:class=dm-master:scope=internal:level=high],"fa
ErrMasterJoinEmbedEtcdFail,[code=38040:class=dm-master:scope=internal:level=high],"fail to join embed etcd: %s"
ErrMasterCoordinatorNotStart,[code=38041:class=dm-master:scope=internal:level=high],"coordinator does not start"
ErrMasterAcquireWorkerFailed,[code=38042:class=dm-master:scope=internal:level=medium],"acquire worker failed: %s"
ErrMasterAdvertiseAddrNotValid,[code=38043:class=dm-master:scope=internal:level=high],"advertise address %s not valid"
ErrMasterRequestIsNotForwardToLeader,[code=38044:class=dm-master:scope=internal:level=high],"master is not leader, and can't forward request to leader"
ErrWorkerParseFlagSet,[code=40001:class=dm-worker:scope=internal:level=medium],"parse dm-worker config flag set"
ErrWorkerInvalidFlag,[code=40002:class=dm-worker:scope=internal:level=medium],"'%s' is an invalid flag"
ErrWorkerDecodeConfigFromFile,[code=40003:class=dm-worker:scope=internal:level=medium],"toml decode file"
Expand Down
12 changes: 11 additions & 1 deletion dm/master/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ type Config struct {
RPCRateLimit float64 `toml:"rpc-rate-limit" json:"rpc-rate-limit"`
RPCRateBurst int `toml:"rpc-rate-burst" json:"rpc-rate-burst"`

MasterAddr string `toml:"master-addr" json:"master-addr"`
MasterAddr string `toml:"master-addr" json:"master-addr"`
AdvertiseAddr string `toml:"advertise-addr" json:"advertise-addr"`

Deploy []*DeployMapper `toml:"deploy" json:"-"`
// TODO: remove
Expand Down Expand Up @@ -206,6 +207,15 @@ func (c *Config) adjust() error {
return terror.ErrMasterHostPortNotValid.Delegate(err, c.MasterAddr)
}

// AdvertiseAddr's format must be "host:port"
host, port, err := net.SplitHostPort(c.AdvertiseAddr)
if err != nil {
return terror.ErrMasterAdvertiseAddrNotValid.Delegate(err, c.AdvertiseAddr)
}
if len(host) == 0 || len(port) == 0 {
return terror.ErrMasterAdvertiseAddrNotValid.Generate(c.AdvertiseAddr)
}

c.DeployMap = make(map[string]string)
for _, item := range c.Deploy {
if err = item.Verify(); err != nil {
Expand Down
7 changes: 7 additions & 0 deletions dm/master/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ func (t *testConfigSuite) TestConfig(c *check.C) {
err error
cfg = &Config{}
masterAddr = ":8261"
advertiseAddr = "127.0.0.1:8261"
name = "dm-master"
dataDir = "default.dm-master"
peerURLs = "http://127.0.0.1:8291"
Expand Down Expand Up @@ -135,6 +136,7 @@ func (t *testConfigSuite) TestConfig(c *check.C) {
c.Assert(err, check.ErrorMatches, tc.errorReg)
} else {
c.Assert(cfg.MasterAddr, check.Equals, masterAddr)
c.Assert(cfg.AdvertiseAddr, check.Equals, advertiseAddr)
c.Assert(cfg.Name, check.Equals, name)
c.Assert(cfg.DataDir, check.Equals, dataDir)
c.Assert(cfg.PeerUrls, check.Equals, peerURLs)
Expand Down Expand Up @@ -189,6 +191,7 @@ func (t *testConfigSuite) TestInvalidConfig(c *check.C) {
// test config Verify failed
configContent := []byte(`
master-addr = ":8261"
advertise-addr = "127.0.0.1:8261"
[[deploy]]
dm-worker = "172.16.10.72:8262"`)
Expand All @@ -213,6 +216,7 @@ dm-worker = "172.16.10.72:8262"`)
// field still remain undecoded in config will cause verify failed
configContent2 := []byte(`
master-addr = ":8261"
advertise-addr = "127.0.0.1:8261"
aaa = "xxx"
[[deploy]]
Expand All @@ -237,6 +241,7 @@ func (t *testConfigSuite) TestGenEmbedEtcdConfig(c *check.C) {

cfg1 := NewConfig()
cfg1.MasterAddr = ":8261"
cfg1.AdvertiseAddr = "127.0.0.1:8261"
cfg1.InitialClusterState = embed.ClusterStateFlagExisting
c.Assert(cfg1.adjust(), check.IsNil)
etcdCfg, err := cfg1.genEmbedEtcdConfig()
Expand All @@ -252,10 +257,12 @@ func (t *testConfigSuite) TestGenEmbedEtcdConfig(c *check.C) {

cfg2 := *cfg1
cfg2.MasterAddr = "127.0.0.1\n:8261"
cfg2.AdvertiseAddr = "127.0.0.1:8261"
_, err = cfg2.genEmbedEtcdConfig()
c.Assert(terror.ErrMasterGenEmbedEtcdConfigFail.Equal(err), check.IsTrue)
c.Assert(err, check.ErrorMatches, "(?m).*invalid master-addr.*")
cfg2.MasterAddr = "172.100.8.8:8261"
cfg2.AdvertiseAddr = "172.100.8.8:8261"
etcdCfg, err = cfg2.genEmbedEtcdConfig()
c.Assert(err, check.IsNil)
c.Assert(etcdCfg.LCUrls, check.DeepEquals, []url.URL{{Scheme: "http", Host: "172.100.8.8:8261"}})
Expand Down
3 changes: 2 additions & 1 deletion dm/master/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,10 @@ func (c *Coordinator) IsStarted() bool {
// Stop stops the coordinator.
func (c *Coordinator) Stop() {
c.mu.Lock()
defer c.mu.Unlock()
c.cancel()
c.started = false
c.mu.Unlock()

c.wg.Wait()
log.L().Info("coordinator is stoped")
}
Expand Down
3 changes: 3 additions & 0 deletions dm/master/dm-master.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ log-file = "dm-master.log"
# dm-master listen address
master-addr = ":8261"

# addr(i.e. 'host:port') to advertise to the public
advertise-addr = "127.0.0.1:8261"

# human-readable name for this DM-master member
name = "dm-master"

Expand Down
75 changes: 67 additions & 8 deletions dm/master/election.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,27 @@ package master
import (
"context"
"go.uber.org/zap"
"time"

"github.com/pingcap/dm/dm/pb"
"github.com/pingcap/dm/pkg/election"
"github.com/pingcap/dm/pkg/log"

"google.golang.org/grpc"
)

const (
oneselfLeader = "oneself"
)

func (s *Server) electionNotify(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case leader := <-s.election.LeaderNotify():
// output the leader info.
if leader {
case notify := <-s.election.LeaderNotify():
switch notify {
case election.IsLeader:
log.L().Info("current member become the leader", zap.String("current member", s.cfg.Name))
err := s.coordinator.Start(ctx, s.etcdClient)
if err != nil {
Expand All @@ -37,15 +46,35 @@ func (s *Server) electionNotify(ctx context.Context) {
log.L().Error("recover subtask infos from coordinator fail", zap.Error(err))
}

} else {
_, leaderID, err2 := s.election.LeaderInfo(ctx)
if err2 == nil {
s.Lock()
s.leader = oneselfLeader
s.closeLeaderClient()
s.Unlock()
case election.RetireFromLeader, election.IsNotLeader:
if notify == election.RetireFromLeader {
s.coordinator.Stop()
}

leader, leaderID, leaderAddr, err2 := s.election.LeaderInfo(ctx)
if err2 != nil {
log.L().Warn("get leader info", zap.Error(err2))
continue
}

if notify == election.RetireFromLeader {
log.L().Info("current member retire from the leader", zap.String("leader", leaderID), zap.String("current member", s.cfg.Name))
} else {
log.L().Warn("get leader info", zap.Error(err2))
log.L().Info("get new leader", zap.String("leader", leaderID), zap.String("current member", s.cfg.Name))
}
s.coordinator.Stop()

s.Lock()
s.leader = leader
s.createLeaderClient(leaderAddr)
s.Unlock()
default:
log.L().Error("unknown notify type", zap.String("notify", notify))
}

case err := <-s.election.ErrorNotify():
// handle errors here, we do no meaningful things now.
// but maybe:
Expand All @@ -55,3 +84,33 @@ func (s *Server) electionNotify(ctx context.Context) {
}
}
}

func (s *Server) createLeaderClient(leaderAddr string) {
if s.leaderGrpcConn != nil {
s.leaderGrpcConn.Close()
s.leaderGrpcConn = nil
}

conn, err := grpc.Dial(leaderAddr, grpc.WithInsecure(), grpc.WithBackoffMaxDelay(3*time.Second))
if err != nil {
log.L().Error("can't create grpc connection with leader, can't forward request to leader", zap.String("leader", leaderAddr), zap.Error(err))
return
}
s.leaderGrpcConn = conn
s.leaderClient = pb.NewMasterClient(conn)
}

func (s *Server) closeLeaderClient() {
if s.leaderGrpcConn != nil {
s.leaderGrpcConn.Close()
s.leaderGrpcConn = nil
}
}

func (s *Server) isLeaderAndNeedForward() (isLeader bool, needForward bool) {
s.RLock()
defer s.RUnlock()
isLeader = (s.leader == oneselfLeader)
needForward = (s.leaderGrpcConn != nil)
return
}
1 change: 1 addition & 0 deletions dm/master/etcd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func (t *testEtcdSuite) TestPrepareJoinEtcd(c *check.C) {
cfgCluster.Name = "dm-master-1"
cfgCluster.DataDir = c.MkDir()
cfgCluster.MasterAddr = tempurl.Alloc()[len("http://"):]
cfgCluster.AdvertiseAddr = tempurl.Alloc()[len("http://"):]
cfgCluster.PeerUrls = tempurl.Alloc()
c.Assert(cfgCluster.adjust(), check.IsNil)
cfgClusterEtcd, err := cfgCluster.genEmbedEtcdConfig()
Expand Down
Loading

0 comments on commit 1478b5e

Please sign in to comment.