Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

maintainence: attempt to fix network tests on CI #1627

Merged
merged 8 commits into from
Jun 8, 2021
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion dot/core/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,6 @@ func NewTestService(t *testing.T, cfg *Config) *Service {
config := &network.Config{
BasePath: testDatadirPath,
Port: 7001,
RandSeed: 1,
NoBootstrap: true,
NoMDNS: true,
BlockState: stateSrvc.Block,
Expand Down
2 changes: 0 additions & 2 deletions dot/network/block_announce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ func TestHandleBlockAnnounceMessage(t *testing.T) {
config := &Config{
BasePath: basePath,
Port: 7001,
RandSeed: 1,
NoBootstrap: true,
NoMDNS: true,
}
Expand All @@ -110,7 +109,6 @@ func TestValidateBlockAnnounceHandshake(t *testing.T) {
configA := &Config{
BasePath: utils.NewTestBasePath(t, "nodeA"),
Port: 7001,
RandSeed: 1,
NoBootstrap: true,
NoMDNS: true,
}
Expand Down
1 change: 0 additions & 1 deletion dot/network/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ type Config struct {
// build checks the configuration, sets up the private key for the network service,
// and applies default values where appropriate
func (c *Config) build() error {

// check state configuration
err := c.checkState()
if err != nil {
Expand Down
26 changes: 18 additions & 8 deletions dot/network/connmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,15 +197,25 @@ func (cm *ConnManager) Disconnected(n network.Network, c network.Conn) {
}

go func() {
for i := 0; i < maxRetries; i++ {
err := cm.host.connect(info)
if err != nil {
logger.Warn("failed to reconnect to persistent peer", "peer", c.RemotePeer(), "error", err)
time.Sleep(time.Minute)
continue
retry := 0
retryTimer := time.NewTicker(time.Minute)
defer retryTimer.Stop()
for {
select {
case <-cm.host.ctx.Done():
return
case <-retryTimer.C:
err := cm.host.connect(info)
if err != nil {
logger.Warn("failed to reconnect to persistent peer", "peer", c.RemotePeer(), "error", err)
continue
}

retry++
if retry > maxRetries {
return
}
}

return
}
}()

Expand Down
3 changes: 0 additions & 3 deletions dot/network/connmgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ func TestMaxPeers(t *testing.T) {
config := &Config{
BasePath: utils.NewTestBasePath(t, fmt.Sprintf("node%d", i)),
Port: 7000 + uint32(i),
RandSeed: 1 + int64(i),
NoBootstrap: true,
NoMDNS: true,
MaxPeers: max,
Expand Down Expand Up @@ -91,7 +90,6 @@ func TestPersistentPeers(t *testing.T) {
configA := &Config{
BasePath: utils.NewTestBasePath(t, "node-a"),
Port: 7000,
RandSeed: 1,
NoBootstrap: true,
NoMDNS: true,
}
Expand All @@ -101,7 +99,6 @@ func TestPersistentPeers(t *testing.T) {
configB := &Config{
BasePath: utils.NewTestBasePath(t, "node-b"),
Port: 7001,
RandSeed: 2,
NoMDNS: true,
PersistentPeers: []string{addrs[0].String()},
}
Expand Down
30 changes: 9 additions & 21 deletions dot/network/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ func newTestDiscovery(t *testing.T, num int) []*discovery {
config := &Config{
BasePath: utils.NewTestBasePath(t, fmt.Sprintf("node%d", i)),
Port: uint32(7001 + i),
RandSeed: int64(1 + i),
NoBootstrap: true,
NoMDNS: true,
}
Expand Down Expand Up @@ -119,7 +118,6 @@ func TestBeginDiscovery(t *testing.T) {
configA := &Config{
BasePath: utils.NewTestBasePath(t, "nodeA"),
Port: 7001,
RandSeed: 1,
NoBootstrap: true,
NoMDNS: true,
}
Expand All @@ -130,21 +128,18 @@ func TestBeginDiscovery(t *testing.T) {
configB := &Config{
BasePath: utils.NewTestBasePath(t, "nodeB"),
Port: 7002,
RandSeed: 2,
NoBootstrap: true,
NoMDNS: true,
}

nodeB := createTestService(t, configB)
nodeB.noGossip = true

addrInfosB, err := nodeB.host.addrInfos()
require.NoError(t, err)

err = nodeA.host.connect(*addrInfosB[0])
addrInfoB := nodeB.host.addrInfo()
err := nodeA.host.connect(addrInfoB)
if failedToDial(err) {
time.Sleep(TestBackoffTimeout)
err = nodeA.host.connect(*addrInfosB[0])
err = nodeA.host.connect(addrInfoB)
}
require.NoError(t, err)

Expand All @@ -159,7 +154,6 @@ func TestBeginDiscovery_ThreeNodes(t *testing.T) {
configA := &Config{
BasePath: utils.NewTestBasePath(t, "nodeA"),
Port: 7001,
RandSeed: 1,
NoBootstrap: true,
NoMDNS: true,
}
Expand All @@ -170,7 +164,6 @@ func TestBeginDiscovery_ThreeNodes(t *testing.T) {
configB := &Config{
BasePath: utils.NewTestBasePath(t, "nodeB"),
Port: 7002,
RandSeed: 2,
NoBootstrap: true,
NoMDNS: true,
}
Expand All @@ -181,7 +174,6 @@ func TestBeginDiscovery_ThreeNodes(t *testing.T) {
configC := &Config{
BasePath: utils.NewTestBasePath(t, "nodeC"),
Port: 7003,
RandSeed: 3,
NoBootstrap: true,
NoMDNS: true,
}
Expand All @@ -190,24 +182,20 @@ func TestBeginDiscovery_ThreeNodes(t *testing.T) {
nodeC.noGossip = true

// connect A and B
addrInfosB, err := nodeB.host.addrInfos()
require.NoError(t, err)

err = nodeA.host.connect(*addrInfosB[0])
addrInfoB := nodeB.host.addrInfo()
err := nodeA.host.connect(addrInfoB)
if failedToDial(err) {
time.Sleep(TestBackoffTimeout)
err = nodeA.host.connect(*addrInfosB[0])
err = nodeA.host.connect(addrInfoB)
}
require.NoError(t, err)

// connect A and C
addrInfosC, err := nodeC.host.addrInfos()
require.NoError(t, err)

err = nodeA.host.connect(*addrInfosC[0])
addrInfoC := nodeC.host.addrInfo()
err = nodeA.host.connect(addrInfoC)
if failedToDial(err) {
time.Sleep(TestBackoffTimeout)
err = nodeA.host.connect(*addrInfosC[0])
err = nodeA.host.connect(addrInfoC)
}
require.NoError(t, err)

Expand Down
25 changes: 9 additions & 16 deletions dot/network/gossip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ func TestGossip(t *testing.T) {
configA := &Config{
BasePath: basePathA,
Port: 7001,
RandSeed: 1,
NoBootstrap: true,
NoMDNS: true,
}
Expand All @@ -49,7 +48,6 @@ func TestGossip(t *testing.T) {
configB := &Config{
BasePath: basePathB,
Port: 7002,
RandSeed: 2,
NoBootstrap: true,
NoMDNS: true,
}
Expand All @@ -58,22 +56,19 @@ func TestGossip(t *testing.T) {
handlerB := newTestStreamHandler(testBlockAnnounceMessageDecoder)
nodeB.host.registerStreamHandler("", handlerB.handleStream)

addrInfosA, err := nodeA.host.addrInfos()
require.NoError(t, err)

err = nodeB.host.connect(*addrInfosA[0])
addrInfoA := nodeA.host.addrInfo()
err := nodeB.host.connect(addrInfoA)
// retry connect if "failed to dial" error
if failedToDial(err) {
time.Sleep(TestBackoffTimeout)
err = nodeB.host.connect(*addrInfosA[0])
err = nodeB.host.connect(addrInfoA)
}
require.NoError(t, err)

basePathC := utils.NewTestBasePath(t, "nodeC")
configC := &Config{
BasePath: basePathC,
Port: 7003,
RandSeed: 3,
NoBootstrap: true,
NoMDNS: true,
}
Expand All @@ -82,26 +77,24 @@ func TestGossip(t *testing.T) {
handlerC := newTestStreamHandler(testBlockAnnounceMessageDecoder)
nodeC.host.registerStreamHandler("", handlerC.handleStream)

err = nodeC.host.connect(*addrInfosA[0])
err = nodeC.host.connect(addrInfoA)
// retry connect if "failed to dial" error
if failedToDial(err) {
time.Sleep(TestBackoffTimeout)
err = nodeC.host.connect(*addrInfosA[0])
err = nodeC.host.connect(addrInfoA)
}
require.NoError(t, err)

addrInfosB, err := nodeB.host.addrInfos()
require.NoError(t, err)

err = nodeC.host.connect(*addrInfosB[0])
addrInfoB := nodeB.host.addrInfo()
err = nodeC.host.connect(addrInfoB)
// retry connect if "failed to dial" error
if failedToDial(err) {
time.Sleep(TestBackoffTimeout)
err = nodeC.host.connect(*addrInfosB[0])
err = nodeC.host.connect(addrInfoB)
}
require.NoError(t, err)

_, err = nodeA.host.send(addrInfosB[0].ID, "", testBlockAnnounceMessage)
_, err = nodeA.host.send(addrInfoB.ID, "", testBlockAnnounceMessage)
require.NoError(t, err)

time.Sleep(TestMessageTimeout)
Expand Down
42 changes: 22 additions & 20 deletions dot/network/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"net"
"path"
"sync"
"time"

"github.com/dgraph-io/ristretto"
Expand Down Expand Up @@ -48,6 +49,8 @@ var privateCIDRs = []string{
"169.254.0.0/16",
}

var connectTimeout = time.Second * 5

// host wraps libp2p host with network host configuration and services
type host struct {
ctx context.Context
Expand All @@ -60,6 +63,7 @@ type host struct {
ds *badger.Datastore
messageCache *messageCache
bwc *metrics.BandwidthCounter
closeSync sync.Once
}

// newHost creates a host wrapper with a new libp2p host instance
Expand Down Expand Up @@ -204,17 +208,19 @@ func (h *host) close() error {
return err
}

err = h.h.Peerstore().Close()
if err != nil {
logger.Error("Failed to close libp2p peerstore", "error", err)
return err
}
h.closeSync.Do(func() {
err = h.h.Peerstore().Close()
if err != nil {
logger.Error("Failed to close libp2p peerstore", "error", err)
return
}

err = h.ds.Close()
if err != nil {
logger.Error("Failed to close libp2p host datastore", "error", err)
return err
}
err = h.ds.Close()
if err != nil {
logger.Error("Failed to close libp2p host datastore", "error", err)
return
}
})
return nil
}

Expand All @@ -241,7 +247,7 @@ func (h *host) registerStreamHandlerWithOverwrite(pid protocol.ID, overwrite boo
// connect connects the host to a specific peer address
func (h *host) connect(p peer.AddrInfo) (err error) {
h.h.Peerstore().AddAddrs(p.ID, p.Addrs, peerstore.PermanentAddrTTL)
ctx, cancel := context.WithTimeout(h.ctx, time.Second*2)
ctx, cancel := context.WithTimeout(h.ctx, connectTimeout)
defer cancel()
err = h.h.Connect(ctx, p)
return err
Expand Down Expand Up @@ -379,16 +385,12 @@ func (h *host) peerCount() int {
return len(peers)
}

// addrInfos returns the libp2p AddrInfos of the host
func (h *host) addrInfos() (addrInfos []*peer.AddrInfo, err error) {
for _, multiaddr := range h.multiaddrs() {
addrInfo, err := peer.AddrInfoFromP2pAddr(multiaddr)
if err != nil {
return nil, err
}
addrInfos = append(addrInfos, addrInfo)
// addrInfo returns the libp2p peer.AddrInfo of the host
func (h *host) addrInfo() peer.AddrInfo {
return peer.AddrInfo{
ID: h.h.ID(),
Addrs: h.h.Addrs(),
}
return addrInfos, nil
}

// multiaddrs returns the multiaddresses of the host
Expand Down
Loading