Skip to content

Commit

Permalink
Add gossip timeout for dead nodes and make kRandomNodes pluggable
Browse files Browse the repository at this point in the history
  • Loading branch information
kyhavlov committed Dec 13, 2016
1 parent 56f5fd7 commit 59d7299
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 27 deletions.
15 changes: 11 additions & 4 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,12 @@ type Config struct {
// per GossipInterval. Increasing this number causes the gossip messages
// to propagate across the cluster more quickly at the expense of
// increased bandwidth.
GossipInterval time.Duration
GossipNodes int
//
// GossipToTheDeadTime is the interval after which a node has died that
// we will still try to gossip to it. This gives it a chance to refute.
GossipInterval time.Duration
GossipNodes int
GossipToTheDeadTime time.Duration

// EnableCompression is used to control message compression. This can
// be used to reduce bandwidth usage at the cost of slightly more CPU
Expand Down Expand Up @@ -212,8 +216,9 @@ func DefaultLANConfig() *Config {
DisableTcpPings: false, // TCP pings are safe, even with mixed versions
AwarenessMaxMultiplier: 8, // Probe interval backs off to 8 seconds

GossipNodes: 3, // Gossip to 3 nodes
GossipInterval: 200 * time.Millisecond, // Gossip more rapidly
GossipNodes: 3, // Gossip to 3 nodes
GossipInterval: 200 * time.Millisecond, // Gossip more rapidly
GossipToTheDeadTime: 30 * time.Second, // Same as push/pull

EnableCompression: true, // Enable compression by default

Expand All @@ -238,6 +243,7 @@ func DefaultWANConfig() *Config {
conf.ProbeInterval = 5 * time.Second
conf.GossipNodes = 4 // Gossip less frequently, but to an additional node
conf.GossipInterval = 500 * time.Millisecond
conf.GossipToTheDeadTime = 60 * time.Second
return conf
}

Expand All @@ -254,6 +260,7 @@ func DefaultLocalConfig() *Config {
conf.ProbeTimeout = 200 * time.Millisecond
conf.ProbeInterval = time.Second
conf.GossipInterval = 100 * time.Millisecond
conf.GossipToTheDeadTime = 15 * time.Second
return conf
}

Expand Down
33 changes: 26 additions & 7 deletions state.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,8 +310,11 @@ func (m *Memberlist) probeNode(node *nodeState) {

// Get some random live nodes.
m.nodeLock.RLock()
excludes := []string{m.config.Name, node.Name}
kNodes := kRandomNodes(m.config.IndirectChecks, excludes, m.nodes)
kNodes := kRandomNodes(m.config.IndirectChecks, m.nodes, func(n *nodeState) bool {
return n.Name == m.config.Name ||
n.Name == node.Name ||
n.State != stateAlive
})
m.nodeLock.RUnlock()

// Attempt an indirect ping.
Expand Down Expand Up @@ -460,10 +463,24 @@ func (m *Memberlist) resetNodes() {
func (m *Memberlist) gossip() {
defer metrics.MeasureSince([]string{"memberlist", "gossip"}, time.Now())

// Get some random live nodes
// Get some random live, suspect, or recently dead nodes
m.nodeLock.RLock()
excludes := []string{m.config.Name}
kNodes := kRandomNodes(m.config.GossipNodes, excludes, m.nodes)
kNodes := kRandomNodes(m.config.GossipNodes, m.nodes, func(n *nodeState) bool {
if n.Name == m.config.Name {
return true
}

switch n.State {
case stateAlive, stateSuspect:
return false

case stateDead:
return time.Since(n.StateChange) > m.config.GossipToTheDeadTime

default:
return true
}
})
m.nodeLock.RUnlock()

// Compute the bytes available
Expand Down Expand Up @@ -497,8 +514,10 @@ func (m *Memberlist) gossip() {
func (m *Memberlist) pushPull() {
// Get a random live node
m.nodeLock.RLock()
excludes := []string{m.config.Name}
nodes := kRandomNodes(1, excludes, m.nodes)
nodes := kRandomNodes(1, m.nodes, func(n *nodeState) bool {
return n.Name == m.config.Name ||
n.State != stateAlive
})
m.nodeLock.RUnlock()

// If no nodes, bail
Expand Down
50 changes: 50 additions & 0 deletions state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1680,6 +1680,56 @@ func TestMemberlist_Gossip(t *testing.T) {
}
}

func TestMemberlist_GossipToDead(t *testing.T) {
ch := make(chan NodeEvent, 2)

addr1 := getBindAddr()
addr2 := getBindAddr()
ip1 := []byte(addr1)
ip2 := []byte(addr2)

m1 := HostMemberlist(addr1.String(), t, func(c *Config) {
c.GossipInterval = time.Millisecond
c.GossipToTheDeadTime = 100 * time.Millisecond
})
m2 := HostMemberlist(addr2.String(), t, func(c *Config) {
c.Events = &ChannelEventDelegate{ch}
})

defer m1.Shutdown()
defer m2.Shutdown()

a1 := alive{Node: addr1.String(), Addr: ip1, Port: 7946, Incarnation: 1}
m1.aliveNode(&a1, nil, true)
a2 := alive{Node: addr2.String(), Addr: ip2, Port: 7946, Incarnation: 1}
m1.aliveNode(&a2, nil, false)

// Shouldn't send anything to m2 here, node has been dead for 2x the GossipToTheDeadTime
m1.nodeMap[addr2.String()].State = stateDead
m1.nodeMap[addr2.String()].StateChange = time.Now().Add(-200 * time.Millisecond)
m1.gossip()

for i := 0; i < 2; i++ {
select {
case <-ch:
t.Fatalf("shouldn't get gossip")
case <-time.After(50 * time.Millisecond):
}
}

// Should gossip to m2 because its state has changed within GossipToTheDeadTime
m1.nodeMap[addr2.String()].StateChange = time.Now().Add(-20 * time.Millisecond)
m1.gossip()

for i := 0; i < 2; i++ {
select {
case <-ch:
case <-time.After(50 * time.Millisecond):
t.Fatalf("timeout")
}
}
}

func TestMemberlist_PushPull(t *testing.T) {
addr1 := getBindAddr()
addr2 := getBindAddr()
Expand Down
20 changes: 7 additions & 13 deletions util.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,9 +207,10 @@ func moveDeadNodes(nodes []*nodeState) int {
return n - numDead
}

// kRandomNodes is used to select up to k random nodes, excluding a given
// node and any non-alive nodes. It is possible that less than k nodes are returned.
func kRandomNodes(k int, excludes []string, nodes []*nodeState) []*nodeState {
// kRandomNodes is used to select up to k random nodes, excluding any nodes where
// the filter function returns true. It is possible that less than k nodes are
// returned.
func kRandomNodes(k int, nodes []*nodeState, filterFn func(*nodeState) bool) []*nodeState {
n := len(nodes)
kNodes := make([]*nodeState, 0, k)
OUTER:
Expand All @@ -221,16 +222,9 @@ OUTER:
idx := randomOffset(n)
node := nodes[idx]

// Exclude node if match
for _, exclude := range excludes {
if node.Name == exclude {
continue OUTER
}
}

// Exclude if not alive
if node.State != stateAlive {
continue
// Give the filter a shot at it.
if filterFn != nil && filterFn(node) {
continue OUTER
}

// Check if we have this node already
Expand Down
13 changes: 10 additions & 3 deletions util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,9 +295,16 @@ func TestKRandomNodes(t *testing.T) {
})
}

s1 := kRandomNodes(3, []string{"test0"}, nodes)
s2 := kRandomNodes(3, []string{"test0"}, nodes)
s3 := kRandomNodes(3, []string{"test0"}, nodes)
filterFunc := func(n *nodeState) bool {
if n.Name == "test0" || n.State != stateAlive {
return true
}
return false
}

s1 := kRandomNodes(3, nodes, filterFunc)
s2 := kRandomNodes(3, nodes, filterFunc)
s3 := kRandomNodes(3, nodes, filterFunc)

if reflect.DeepEqual(s1, s2) {
t.Fatalf("unexpected equal")
Expand Down

0 comments on commit 59d7299

Please sign in to comment.