Skip to content

Commit

Permalink
Merge pull request #836 from philips/reduce-heartbeat-logs
Browse files Browse the repository at this point in the history
fix(server): reduce the screaming heartbeat logs
  • Loading branch information
philips committed Jun 8, 2014
2 parents dfeecd2 + 1c958f8 commit 868b7f7
Showing 1 changed file with 47 additions and 4 deletions.
51 changes: 47 additions & 4 deletions server/peer_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ import (
)

const (
// MaxHeartbeatTimeoutBackoff is the maximum number of seconds before we warn
// the user again about a peer not accepting heartbeats.
MaxHeartbeatTimeoutBackoff = 15 * time.Second

// ThresholdMonitorTimeout is the time between log notifications that the
// Raft heartbeat is too close to the election timeout.
ThresholdMonitorTimeout = 5 * time.Second
Expand Down Expand Up @@ -70,10 +74,18 @@ type PeerServer struct {
routineGroup sync.WaitGroup
timeoutThresholdChan chan interface{}

logBackoffs map[string]*logBackoff

metrics *metrics.Bucket
sync.Mutex
}

type logBackoff struct {
next time.Time
backoff time.Duration
count int
}

// TODO: find a good policy to do snapshot
type snapshotConf struct {
// Etcd will check if snapshot is need every checkingInterval
Expand All @@ -97,6 +109,7 @@ func NewPeerServer(psConfig PeerServerConfig, client *Client, registry *Registry
serverStats: serverStats,

timeoutThresholdChan: make(chan interface{}, 1),
logBackoffs: make(map[string]*logBackoff),

metrics: mb,
}
Expand Down Expand Up @@ -687,11 +700,12 @@ func (s *PeerServer) raftEventLogger(event raft.Event) {
case raft.RemovePeerEventType:
log.Infof("%s: peer removed: '%v'", s.Config.Name, value)
case raft.HeartbeatIntervalEventType:
var name = "<unknown>"
if peer, ok := value.(*raft.Peer); ok {
name = peer.Name
peer, ok := value.(*raft.Peer);
if !ok {
log.Warnf("%s: heatbeat timeout from unknown peer", s.Config.Name)
return
}
log.Infof("%s: warning: heartbeat timed out: '%v'", s.Config.Name, name)
s.logHeartbeatTimeout(peer)
case raft.ElectionTimeoutThresholdEventType:
select {
case s.timeoutThresholdChan <- value:
Expand All @@ -701,6 +715,35 @@ func (s *PeerServer) raftEventLogger(event raft.Event) {
}
}

// logHeartbeatTimeout logs about the edge triggered heartbeat timeout event
// only if we haven't warned within a reasonable interval.
func (s *PeerServer) logHeartbeatTimeout(peer *raft.Peer) {
b, ok := s.logBackoffs[peer.Name]
if !ok {
b = &logBackoff{time.Time{}, time.Second, 1}
s.logBackoffs[peer.Name] = b
}

if peer.LastActivity().After(b.next) {
b.next = time.Time{}
b.backoff = time.Second
b.count = 1
}

if b.next.After(time.Now()) {
b.count++
return
}

b.backoff = 2 * b.backoff
if b.backoff > MaxHeartbeatTimeoutBackoff {
b.backoff = MaxHeartbeatTimeoutBackoff
}
b.next = time.Now().Add(b.backoff)

log.Infof("%s: warning: heartbeat time out peer=%q missed=%d backoff=%q", s.Config.Name, peer.Name, b.count, b.backoff)
}

func (s *PeerServer) recordMetricEvent(event raft.Event) {
name := fmt.Sprintf("raft.event.%s", event.Type())
value := event.Value().(time.Duration)
Expand Down

0 comments on commit 868b7f7

Please sign in to comment.