Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cap maximum grpc wait time when heartbeating to heartbeatTimeout/2 #494

Merged
merged 10 commits into from
May 9, 2022
59 changes: 59 additions & 0 deletions raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2639,6 +2639,65 @@ func TestRaft_VoteNotGranted_WhenNodeNotInCluster(t *testing.T) {
}
}

// TestRaft_FollowerRemovalNoElection ensures that a leader election is not
// started when a standby is shut down and restarted.
func TestRaft_FollowerRemovalNoElection(t *testing.T) {
// Make a cluster
inmemConf := inmemConfig(t)
inmemConf.HeartbeatTimeout = 100 * time.Millisecond
inmemConf.ElectionTimeout = 100 * time.Millisecond
c := MakeCluster(3, t, inmemConf)

defer c.Close()
waitForLeader(c)

leader := c.Leader()

// Wait until we have 2 followers
limit := time.Now().Add(c.longstopTimeout)
var followers []*Raft
for time.Now().Before(limit) && len(followers) != 2 {
c.WaitEvent(nil, c.conf.CommitTimeout)
followers = c.GetInState(Follower)
}
if len(followers) != 2 {
t.Fatalf("expected two followers: %v", followers)
}

// Disconnect one of the followers and wait for the heartbeat timeout
i := 0
follower := c.rafts[i]
if follower == c.Leader() {
i = 1
follower = c.rafts[i]
}
logs := follower.logs
t.Logf("[INFO] restarting %v", follower)
// Shutdown follower
if f := follower.Shutdown(); f.Error() != nil {
t.Fatalf("error shuting down follower: %v", f.Error())
}

_, trans := NewInmemTransport(follower.localAddr)
conf := follower.config()
n, err := NewRaft(&conf, &MockFSM{}, logs, follower.stable, follower.snapshots, trans)
if err != nil {
t.Fatalf("error restarting follower: %v", err)
}
c.rafts[i] = n
c.trans[i] = n.trans.(*InmemTransport)
c.fsms[i] = n.fsm.(*MockFSM)
c.FullyConnect()
// There should be no re-election during this sleep
time.Sleep(250 * time.Millisecond)

// Let things settle and make sure we recovered.
c.EnsureLeader(t, leader.localAddr)
c.EnsureSame(t)
c.EnsureSamePeers(t)
n.Shutdown()
}

func TestRaft_VoteWithNoIDNoAddr(t *testing.T) {
// Make a cluster
c := MakeCluster(3, t, nil)
Expand Down
7 changes: 5 additions & 2 deletions replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,12 +402,15 @@ func (r *Raft) heartbeat(s *followerReplication, stopCh chan struct{}) {

start := time.Now()
if err := r.trans.AppendEntries(peer.ID, peer.Address, &req, &resp); err != nil {
r.logger.Error("failed to heartbeat to", "peer", peer.Address, "error", err)
nextBackoffTime := cappedExponentialBackoff(failureWait, failures, maxFailureScale, r.config().HeartbeatTimeout/2)
r.logger.Error("failed to heartbeat to", "peer", peer.Address, "backoff time",
nextBackoffTime, "error", err)
r.observe(FailedHeartbeatObservation{PeerID: peer.ID, LastContact: s.LastContact()})
failures++
select {
case <-time.After(backoff(failureWait, failures, maxFailureScale)):
case <-time.After(nextBackoffTime):
case <-stopCh:
return
}
} else {
if failures > 0 {
Expand Down
17 changes: 17 additions & 0 deletions util.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,23 @@ func backoff(base time.Duration, round, limit uint64) time.Duration {
return base
}

// cappedExponentialBackoff computes the exponential backoff with an adjustable
// cap on the max timeout.
func cappedExponentialBackoff(base time.Duration, round, limit uint64, cap time.Duration) time.Duration {
power := min(round, limit)
for power > 2 {
if base > cap {
return cap
}
Comment on lines +152 to +154
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if base > cap {
return cap
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So long as the limit is less than or equal to 50 (assuming 10ms base wait time) then removing this should be fine. If we use a limit higher than that (which wouldn't make any logical sense as the wait time would be astronomically high) then we could run into integer overflow issues with the next multiplication.

This is practically safe but results in code with technically undefined behavior.

base *= 2
power--
}
if base > cap {
return cap
}
return base
HridoyRoy marked this conversation as resolved.
Show resolved Hide resolved
}

// Needed for sorting []uint64, used to determine commitment
type uint64Slice []uint64

Expand Down