Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add gossip timeout for dead nodes and make kRandomNodes pluggable #99

Merged
merged 1 commit into from
Dec 13, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,12 @@ type Config struct {
// per GossipInterval. Increasing this number causes the gossip messages
// to propagate across the cluster more quickly at the expense of
// increased bandwidth.
GossipInterval time.Duration
GossipNodes int
//
// GossipToTheDeadTime is the interval after which a node has died that
// we will still try to gossip to it. This gives it a chance to refute.
GossipInterval time.Duration
GossipNodes int
GossipToTheDeadTime time.Duration

// EnableCompression is used to control message compression. This can
// be used to reduce bandwidth usage at the cost of slightly more CPU
Expand Down Expand Up @@ -212,8 +216,9 @@ func DefaultLANConfig() *Config {
DisableTcpPings: false, // TCP pings are safe, even with mixed versions
AwarenessMaxMultiplier: 8, // Probe interval backs off to 8 seconds

GossipNodes: 3, // Gossip to 3 nodes
GossipInterval: 200 * time.Millisecond, // Gossip more rapidly
GossipNodes: 3, // Gossip to 3 nodes
GossipInterval: 200 * time.Millisecond, // Gossip more rapidly
GossipToTheDeadTime: 30 * time.Second, // Same as push/pull

EnableCompression: true, // Enable compression by default

Expand All @@ -238,6 +243,7 @@ func DefaultWANConfig() *Config {
conf.ProbeInterval = 5 * time.Second
conf.GossipNodes = 4 // Gossip less frequently, but to an additional node
conf.GossipInterval = 500 * time.Millisecond
conf.GossipToTheDeadTime = 60 * time.Second
return conf
}

Expand All @@ -254,6 +260,7 @@ func DefaultLocalConfig() *Config {
conf.ProbeTimeout = 200 * time.Millisecond
conf.ProbeInterval = time.Second
conf.GossipInterval = 100 * time.Millisecond
conf.GossipToTheDeadTime = 15 * time.Second
return conf
}

Expand Down
33 changes: 26 additions & 7 deletions state.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,8 +310,11 @@ func (m *Memberlist) probeNode(node *nodeState) {

// Get some random live nodes.
m.nodeLock.RLock()
excludes := []string{m.config.Name, node.Name}
kNodes := kRandomNodes(m.config.IndirectChecks, excludes, m.nodes)
kNodes := kRandomNodes(m.config.IndirectChecks, m.nodes, func(n *nodeState) bool {
return n.Name == m.config.Name ||
n.Name == node.Name ||
n.State != stateAlive
})
m.nodeLock.RUnlock()

// Attempt an indirect ping.
Expand Down Expand Up @@ -460,10 +463,24 @@ func (m *Memberlist) resetNodes() {
func (m *Memberlist) gossip() {
defer metrics.MeasureSince([]string{"memberlist", "gossip"}, time.Now())

// Get some random live nodes
// Get some random live, suspect, or recently dead nodes
m.nodeLock.RLock()
excludes := []string{m.config.Name}
kNodes := kRandomNodes(m.config.GossipNodes, excludes, m.nodes)
kNodes := kRandomNodes(m.config.GossipNodes, m.nodes, func(n *nodeState) bool {
if n.Name == m.config.Name {
return true
}

switch n.State {
case stateAlive, stateSuspect:
return false

case stateDead:
return time.Since(n.StateChange) > m.config.GossipToTheDeadTime

default:
return true
}
})
m.nodeLock.RUnlock()

// Compute the bytes available
Expand Down Expand Up @@ -497,8 +514,10 @@ func (m *Memberlist) gossip() {
func (m *Memberlist) pushPull() {
// Get a random live node
m.nodeLock.RLock()
excludes := []string{m.config.Name}
nodes := kRandomNodes(1, excludes, m.nodes)
nodes := kRandomNodes(1, m.nodes, func(n *nodeState) bool {
return n.Name == m.config.Name ||
n.State != stateAlive
})
m.nodeLock.RUnlock()

// If no nodes, bail
Expand Down
48 changes: 48 additions & 0 deletions state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1680,6 +1680,54 @@ func TestMemberlist_Gossip(t *testing.T) {
}
}

func TestMemberlist_GossipToDead(t *testing.T) {
ch := make(chan NodeEvent, 2)

addr1 := getBindAddr()
addr2 := getBindAddr()
ip1 := []byte(addr1)
ip2 := []byte(addr2)

m1 := HostMemberlist(addr1.String(), t, func(c *Config) {
c.GossipInterval = time.Millisecond
c.GossipToTheDeadTime = 100 * time.Millisecond
})
m2 := HostMemberlist(addr2.String(), t, func(c *Config) {
c.Events = &ChannelEventDelegate{ch}
})

defer m1.Shutdown()
defer m2.Shutdown()

a1 := alive{Node: addr1.String(), Addr: ip1, Port: 7946, Incarnation: 1}
m1.aliveNode(&a1, nil, true)
a2 := alive{Node: addr2.String(), Addr: ip2, Port: 7946, Incarnation: 1}
m1.aliveNode(&a2, nil, false)

// Shouldn't send anything to m2 here, node has been dead for 2x the GossipToTheDeadTime
m1.nodeMap[addr2.String()].State = stateDead
m1.nodeMap[addr2.String()].StateChange = time.Now().Add(-200 * time.Millisecond)
m1.gossip()

select {
case <-ch:
t.Fatalf("shouldn't get gossip")
case <-time.After(50 * time.Millisecond):
}

// Should gossip to m2 because its state has changed within GossipToTheDeadTime
m1.nodeMap[addr2.String()].StateChange = time.Now().Add(-20 * time.Millisecond)
m1.gossip()

for i := 0; i < 2; i++ {
select {
case <-ch:
case <-time.After(50 * time.Millisecond):
t.Fatalf("timeout")
}
}
}

func TestMemberlist_PushPull(t *testing.T) {
addr1 := getBindAddr()
addr2 := getBindAddr()
Expand Down
20 changes: 7 additions & 13 deletions util.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,9 +207,10 @@ func moveDeadNodes(nodes []*nodeState) int {
return n - numDead
}

// kRandomNodes is used to select up to k random nodes, excluding a given
// node and any non-alive nodes. It is possible that less than k nodes are returned.
func kRandomNodes(k int, excludes []string, nodes []*nodeState) []*nodeState {
// kRandomNodes is used to select up to k random nodes, excluding any nodes where
// the filter function returns true. It is possible that less than k nodes are
// returned.
func kRandomNodes(k int, nodes []*nodeState, filterFn func(*nodeState) bool) []*nodeState {
n := len(nodes)
kNodes := make([]*nodeState, 0, k)
OUTER:
Expand All @@ -221,16 +222,9 @@ OUTER:
idx := randomOffset(n)
node := nodes[idx]

// Exclude node if match
for _, exclude := range excludes {
if node.Name == exclude {
continue OUTER
}
}

// Exclude if not alive
if node.State != stateAlive {
continue
// Give the filter a shot at it.
if filterFn != nil && filterFn(node) {
continue OUTER
}

// Check if we have this node already
Expand Down
13 changes: 10 additions & 3 deletions util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,9 +295,16 @@ func TestKRandomNodes(t *testing.T) {
})
}

s1 := kRandomNodes(3, []string{"test0"}, nodes)
s2 := kRandomNodes(3, []string{"test0"}, nodes)
s3 := kRandomNodes(3, []string{"test0"}, nodes)
filterFunc := func(n *nodeState) bool {
if n.Name == "test0" || n.State != stateAlive {
return true
}
return false
}

s1 := kRandomNodes(3, nodes, filterFunc)
s2 := kRandomNodes(3, nodes, filterFunc)
s3 := kRandomNodes(3, nodes, filterFunc)

if reflect.DeepEqual(s1, s2) {
t.Fatalf("unexpected equal")
Expand Down