Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated cherry pick of #9415 #9500

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 18 additions & 11 deletions etcdserver/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ type raftNode struct {
lead uint64

mu sync.Mutex

tickMu sync.Mutex
// last lead elected time
lt time.Time

Expand Down Expand Up @@ -129,6 +131,13 @@ type raftNode struct {
done chan struct{}
}

// raft.Node does not have locks in Raft package
func (r *raftNode) tick() {
r.tickMu.Lock()
r.Tick()
r.tickMu.Unlock()
}

// start prepares and starts raftNode in a new goroutine. It is no longer safe
// to modify the fields after it has been started.
func (r *raftNode) start(rh *raftReadyHandler) {
Expand All @@ -144,7 +153,7 @@ func (r *raftNode) start(rh *raftReadyHandler) {
for {
select {
case <-r.ticker:
r.Tick()
r.tick()
case rd := <-r.Ready():
if rd.SoftState != nil {
if lead := atomic.LoadUint64(&r.lead); rd.SoftState.Lead != raft.None && lead != rd.SoftState.Lead {
Expand Down Expand Up @@ -321,13 +330,13 @@ func (r *raftNode) resumeSending() {
p.Resume()
}

// advanceTicksForElection advances ticks to the node for fast election.
// This reduces the time to wait for first leader election if bootstrapping the whole
// cluster, while leaving at least 1 heartbeat for possible existing leader
// to contact it.
func advanceTicksForElection(n raft.Node, electionTicks int) {
for i := 0; i < electionTicks-1; i++ {
n.Tick()
// advanceTicks advances ticks of Raft node.
// This can be used for fast-forwarding election
// ticks in multi data-center deployments, thus
// speeding up election process.
func (r *raftNode) advanceTicks(ticks int) {
for i := 0; i < ticks; i++ {
r.tick()
}
}

Expand Down Expand Up @@ -368,8 +377,7 @@ func startNode(cfg *ServerConfig, cl *membership.RaftCluster, ids []types.ID) (i
raftStatusMu.Lock()
raftStatus = n.Status
raftStatusMu.Unlock()
advanceTicksForElection(n, c.ElectionTick)
return
return id, n, s, w
}

func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *membership.RaftCluster, raft.Node, *raft.MemoryStorage, *wal.WAL) {
Expand Down Expand Up @@ -402,7 +410,6 @@ func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *membe
raftStatusMu.Lock()
raftStatus = n.Status
raftStatusMu.Unlock()
advanceTicksForElection(n, c.ElectionTick)
return id, cl, n, s, w
}

Expand Down
39 changes: 39 additions & 0 deletions etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,11 +506,50 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
return srv, nil
}

func (s *EtcdServer) adjustTicks() {
clusterN := len(s.cluster.Members())

// single-node fresh start, or single-node recovers from snapshot
if clusterN == 1 {
ticks := s.Cfg.ElectionTicks - 1
plog.Infof("%s as single-node; fast-forwarding %d ticks (election ticks %d)", s.ID(), ticks, s.Cfg.ElectionTicks)
s.r.advanceTicks(ticks)
return
}

// retry up to "rafthttp.ConnReadTimeout", which is 5-sec
// until peer connection reports; otherwise:
// 1. all connections failed, or
// 2. no active peers, or
// 3. restarted single-node with no snapshot
// then, do nothing, because advancing ticks would have no effect
waitTime := rafthttp.ConnReadTimeout
itv := 50 * time.Millisecond
for i := int64(0); i < int64(waitTime/itv); i++ {
select {
case <-time.After(itv):
case <-s.stopping:
return
}

peerN := s.r.transport.ActivePeers()
if peerN > 1 {
// multi-node received peer connection reports
// adjust ticks, in case slow leader message receive
ticks := s.Cfg.ElectionTicks - 2
plog.Infof("%s initialzed peer connection; fast-forwarding %d ticks (election ticks %d) with %d active peer(s)", s.ID(), ticks, s.Cfg.ElectionTicks, peerN)
s.r.advanceTicks(ticks)
return
}
}
}

// Start prepares and starts server in a new goroutine. It is no longer safe to
// modify a server's fields after it has been sent to Start.
// It also starts a goroutine to publish its server information.
func (s *EtcdServer) Start() {
s.start()
s.goAttach(func() { s.adjustTicks() })
s.goAttach(func() { s.publish(s.Cfg.ReqTimeout()) })
s.goAttach(s.purgeFile)
s.goAttach(func() { monitorFileDescriptor(s.stopping) })
Expand Down
1 change: 1 addition & 0 deletions etcdserver/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func (s *nopTransporterWithActiveTime) RemovePeer(id types.ID) {}
func (s *nopTransporterWithActiveTime) RemoveAllPeers() {}
func (s *nopTransporterWithActiveTime) UpdatePeer(id types.ID, us []string) {}
func (s *nopTransporterWithActiveTime) ActiveSince(id types.ID) time.Time { return s.activeMap[id] }
func (s *nopTransporterWithActiveTime) ActivePeers() int { return 0 }
func (s *nopTransporterWithActiveTime) Stop() {}
func (s *nopTransporterWithActiveTime) Pause() {}
func (s *nopTransporterWithActiveTime) Resume() {}
Expand Down
17 changes: 17 additions & 0 deletions rafthttp/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ type Transporter interface {
// If the connection is active since peer was added, it returns the adding time.
// If the connection is currently inactive, it returns zero time.
ActiveSince(id types.ID) time.Time
// ActivePeers returns the number of active peers.
ActivePeers() int
// Stop closes the connections and stops the transporter.
Stop()
}
Expand Down Expand Up @@ -362,6 +364,20 @@ func (t *Transport) Resume() {
}
}

// ActivePeers returns a channel that closes when an initial
// peer connection has been established. Use this to wait until the
// first peer connection becomes active.
func (t *Transport) ActivePeers() (cnt int) {
t.mu.RLock()
defer t.mu.RUnlock()
for _, p := range t.peers {
if !p.activeSince().IsZero() {
cnt++
}
}
return cnt
}

type nopTransporter struct{}

func NewNopTransporter() Transporter {
Expand All @@ -378,6 +394,7 @@ func (s *nopTransporter) RemovePeer(id types.ID) {}
func (s *nopTransporter) RemoveAllPeers() {}
func (s *nopTransporter) UpdatePeer(id types.ID, us []string) {}
func (s *nopTransporter) ActiveSince(id types.ID) time.Time { return time.Time{} }
func (s *nopTransporter) ActivePeers() int { return 0 }
func (s *nopTransporter) Stop() {}
func (s *nopTransporter) Pause() {}
func (s *nopTransporter) Resume() {}
Expand Down