Skip to content

Commit

Permalink
[FAB-2157] Retry connection to bootstrap peers
Browse files Browse the repository at this point in the history
In gossip, if a peer starts up and the bootstrap peers aren't available,
a reconnection attempt isn't being made.
This is risky especially in automatically-managed environments
like the cloud where machines can come up automatically and
there is a large number of them and no one analyzes their logs in real-time.

I didn't add a new test, but I re-arranged a test in the discovery layer
and in the gossip layer and made the bootstrap peers start after the rest
of the peers.

Change-Id: Ib49179cfd4d17e1ed9c6b33ee522769ec4efc082
Signed-off-by: Yacov Manevich <yacovm@il.ibm.com>
  • Loading branch information
yacovm committed Feb 13, 2017
1 parent 776b629 commit e393677
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 24 deletions.
43 changes: 28 additions & 15 deletions gossip/discovery/discovery_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func NewDiscoveryService(bootstrapPeers []string, self NetworkMember, comm CommS
go d.periodicalReconnectToDead()
go d.handlePresumedDeadPeers()

d.connect2BootstrapPeers(bootstrapPeers)
go d.connect2BootstrapPeers(bootstrapPeers)

d.logger.Info("Started", self, "incTime is", d.incTime)

Expand Down Expand Up @@ -156,22 +156,35 @@ func (d *gossipDiscoveryImpl) Connect(member NetworkMember) {
func (d *gossipDiscoveryImpl) connect2BootstrapPeers(endpoints []string) {
d.logger.Info("Entering:", endpoints)
defer d.logger.Info("Exiting")
wg := sync.WaitGroup{}
req := d.createMembershipRequest()
for _, endpoint := range endpoints {
wg.Add(1)
go func(endpoint string) {
defer wg.Done()
peer := &NetworkMember{
Endpoint: endpoint,
InternalEndpoint: &proto.SignedEndpoint{

for !d.somePeerIsKnown() {
var wg sync.WaitGroup
req := d.createMembershipRequest()
wg.Add(len(endpoints))
for _, endpoint := range endpoints {
go func(endpoint string) {
defer wg.Done()
peer := &NetworkMember{
Endpoint: endpoint,
},
}
d.comm.SendToPeer(peer, req)
}(endpoint)
InternalEndpoint: &proto.SignedEndpoint{
Endpoint: endpoint,
},
}
if !d.comm.Ping(peer) {
return
}
d.comm.SendToPeer(peer, req)
}(endpoint)
}
wg.Wait()
time.Sleep(reconnectInterval)
}
wg.Wait()
}

func (d *gossipDiscoveryImpl) somePeerIsKnown() bool {
d.lock.RLock()
defer d.lock.RUnlock()
return len(d.aliveLastTS) != 0
}

func (d *gossipDiscoveryImpl) InitiateSync(peerNum int) {
Expand Down
15 changes: 9 additions & 6 deletions gossip/discovery/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,19 +404,22 @@ func TestGetFullMembership(t *testing.T) {
nodeNum := 15
bootPeers := []string{bootPeer(5511), bootPeer(5512)}
instances := []*gossipInstance{}

inst := createDiscoveryInstance(5511, "d1", bootPeers)
instances = append(instances, inst)

inst = createDiscoveryInstance(5512, "d2", bootPeers)
instances = append(instances, inst)
var inst *gossipInstance

for i := 3; i <= nodeNum; i++ {
id := fmt.Sprintf("d%d", i)
inst = createDiscoveryInstance(5510+i, id, bootPeers)
instances = append(instances, inst)
}

time.Sleep(time.Second)

inst = createDiscoveryInstance(5511, "d1", bootPeers)
instances = append(instances, inst)

inst = createDiscoveryInstance(5512, "d2", bootPeers)
instances = append(instances, inst)

assertMembership(t, instances, nodeNum-1)

// Ensure that internal endpoint was propagated to everyone
Expand Down
10 changes: 7 additions & 3 deletions gossip/gossip/gossip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,9 +230,7 @@ func TestPull(t *testing.T) {

n := 5
msgsCount2Send := 10
boot := newGossipInstanceWithOnlyPull(portPrefix, 0, 100)
boot.JoinChan(&joinChanMsg{}, common.ChainID("A"))
boot.UpdateChannelMetadata([]byte("bla bla"), common.ChainID("A"))

peers := make([]Gossip, n)
wg := sync.WaitGroup{}
wg.Add(n)
Expand All @@ -247,6 +245,12 @@ func TestPull(t *testing.T) {
}
wg.Wait()

time.Sleep(time.Second)

boot := newGossipInstanceWithOnlyPull(portPrefix, 0, 100)
boot.JoinChan(&joinChanMsg{}, common.ChainID("A"))
boot.UpdateChannelMetadata([]byte("bla bla"), common.ChainID("A"))

knowAll := func() bool {
for i := 1; i <= n; i++ {
neighborCount := len(peers[i-1].Peers())
Expand Down

0 comments on commit e393677

Please sign in to comment.