From f4c79d3e3e7f280a2b225180bbbe6a36317ef734 Mon Sep 17 00:00:00 2001
From: noot <36753753+noot@users.noreply.github.com>
Date: Thu, 20 May 2021 17:51:16 -0400
Subject: [PATCH] fix(dot/network): fix discovery between gossamer nodes
(#1594)
---
dot/network/discovery.go | 204 ++++++++++++++++++++++++++++++
dot/network/discovery_test.go | 229 ++++++++++++++++++++++++++++++++++
dot/network/host.go | 55 ++++----
dot/network/host_test.go | 89 -------------
dot/network/mdns.go | 28 ++---
dot/network/service.go | 44 +------
dot/network/service_test.go | 135 ++++----------------
dot/network/sync.go | 15 +--
dot/state/block.go | 3 +-
go.mod | 2 +
go.sum | 4 +
11 files changed, 509 insertions(+), 299 deletions(-)
create mode 100644 dot/network/discovery.go
create mode 100644 dot/network/discovery_test.go
diff --git a/dot/network/discovery.go b/dot/network/discovery.go
new file mode 100644
index 0000000000..2226cfd2b7
--- /dev/null
+++ b/dot/network/discovery.go
@@ -0,0 +1,204 @@
+// Copyright 2019 ChainSafe Systems (ON) Corp.
+// This file is part of gossamer.
+//
+// The gossamer library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The gossamer library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the gossamer library. If not, see .
+
+package network
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ badger "github.com/ipfs/go-ds-badger2"
+ libp2phost "github.com/libp2p/go-libp2p-core/host"
+ "github.com/libp2p/go-libp2p-core/peer"
+ "github.com/libp2p/go-libp2p-core/peerstore"
+ "github.com/libp2p/go-libp2p-core/protocol"
+ libp2pdiscovery "github.com/libp2p/go-libp2p-discovery"
+ kaddht "github.com/libp2p/go-libp2p-kad-dht"
+ "github.com/libp2p/go-libp2p-kad-dht/dual"
+)
+
+var (
+ startDHTTimeout = time.Second * 10
+ initialAdvertisementTimeout = time.Millisecond
+ tryAdvertiseTimeout = time.Second * 30
+ connectToPeersTimeout = time.Minute
+)
+
+// discovery handles discovery of new peers via the kademlia DHT
+type discovery struct {
+ ctx context.Context
+ dht *dual.DHT
+ h libp2phost.Host
+ bootnodes []peer.AddrInfo
+ ds *badger.Datastore
+ pid protocol.ID
+ minPeers, maxPeers int
+}
+
+func newDiscovery(ctx context.Context, h libp2phost.Host, bootnodes []peer.AddrInfo, ds *badger.Datastore, pid protocol.ID, min, max int) *discovery {
+ return &discovery{
+ ctx: ctx,
+ h: h,
+ bootnodes: bootnodes,
+ ds: ds,
+ pid: pid,
+ minPeers: min,
+ maxPeers: max,
+ }
+}
+
+// start creates the DHT.
+func (d *discovery) start() error {
+ if len(d.bootnodes) == 0 {
+ // get all currently connected peers and use them to bootstrap the DHT
+ peers := d.h.Network().Peers()
+
+ for {
+ if len(peers) > 0 {
+ break
+ }
+
+ select {
+ case <-time.After(startDHTTimeout):
+ logger.Debug("no peers yet, waiting to start DHT...")
+ // wait for peers to connect before starting DHT, otherwise DHT bootstrap nodes
+ // will be empty and we will fail to fill the routing table
+ case <-d.ctx.Done():
+ return nil
+ }
+
+ peers = d.h.Network().Peers()
+ }
+
+ for _, p := range peers {
+ d.bootnodes = append(d.bootnodes, d.h.Peerstore().PeerInfo(p))
+ }
+ }
+
+ logger.Debug("starting DHT...", "bootnodes", d.bootnodes)
+
+ dhtOpts := []dual.Option{
+ dual.DHTOption(kaddht.Datastore(d.ds)),
+ dual.DHTOption(kaddht.BootstrapPeers(d.bootnodes...)),
+ dual.DHTOption(kaddht.V1ProtocolOverride(d.pid + "/kad")),
+ dual.DHTOption(kaddht.Mode(kaddht.ModeAutoServer)),
+ }
+
+ // create DHT service
+ dht, err := dual.New(d.ctx, d.h, dhtOpts...)
+ if err != nil {
+ return err
+ }
+
+ d.dht = dht
+ return d.discoverAndAdvertise()
+}
+
+func (d *discovery) stop() error {
+ if d.dht == nil {
+ return nil
+ }
+
+ return d.dht.Close()
+}
+
+func (d *discovery) discoverAndAdvertise() error {
+ rd := libp2pdiscovery.NewRoutingDiscovery(d.dht)
+
+ err := d.dht.Bootstrap(d.ctx)
+ if err != nil {
+ return fmt.Errorf("failed to bootstrap DHT: %w", err)
+ }
+
+ // wait to connect to bootstrap peers
+ time.Sleep(time.Second)
+
+ go func() {
+ ttl := initialAdvertisementTimeout
+
+ for {
+ select {
+ case <-time.After(ttl):
+ logger.Debug("advertising ourselves in the DHT...")
+ err := d.dht.Bootstrap(d.ctx)
+ if err != nil {
+ logger.Warn("failed to bootstrap DHT", "error", err)
+ continue
+ }
+
+ ttl, err = rd.Advertise(d.ctx, string(d.pid))
+ if err != nil {
+ logger.Debug("failed to advertise in the DHT", "error", err)
+ ttl = tryAdvertiseTimeout
+ }
+ case <-d.ctx.Done():
+ return
+ }
+ }
+ }()
+
+ go func() {
+ logger.Debug("attempting to find DHT peers...")
+ peerCh, err := rd.FindPeers(d.ctx, string(d.pid))
+ if err != nil {
+ logger.Warn("failed to begin finding peers via DHT", "err", err)
+ return
+ }
+
+ peersToTry := make(map[*peer.AddrInfo]struct{})
+
+ for {
+ select {
+ case <-d.ctx.Done():
+ return
+ case <-time.After(connectToPeersTimeout):
+ if len(d.h.Network().Peers()) > d.minPeers {
+ continue
+ }
+
+ // reconnect to peers if peer count is low
+ for p := range peersToTry {
+ err = d.h.Connect(d.ctx, *p)
+ if err != nil {
+ logger.Trace("failed to connect to discovered peer", "peer", p.ID, "err", err)
+ delete(peersToTry, p)
+ }
+ }
+ case peer := <-peerCh:
+ if peer.ID == d.h.ID() || peer.ID == "" {
+ continue
+ }
+
+ logger.Trace("found new peer via DHT", "peer", peer.ID)
+
+ // found a peer, try to connect if we need more peers
+ if len(d.h.Network().Peers()) < d.maxPeers {
+ err = d.h.Connect(d.ctx, peer)
+ if err != nil {
+ logger.Trace("failed to connect to discovered peer", "peer", peer.ID, "err", err)
+ }
+ } else {
+ d.h.Peerstore().AddAddrs(peer.ID, peer.Addrs, peerstore.PermanentAddrTTL)
+ peersToTry[&peer] = struct{}{}
+ }
+ }
+ }
+ }()
+
+ logger.Debug("DHT discovery started!")
+ return nil
+}
diff --git a/dot/network/discovery_test.go b/dot/network/discovery_test.go
new file mode 100644
index 0000000000..fac6d57d91
--- /dev/null
+++ b/dot/network/discovery_test.go
@@ -0,0 +1,229 @@
+// Copyright 2019 ChainSafe Systems (ON) Corp.
+// This file is part of gossamer.
+//
+// The gossamer library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The gossamer library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the gossamer library. If not, see .
+
+package network
+
+import (
+ "context"
+ "fmt"
+ "testing"
+ "time"
+
+ "github.com/ChainSafe/gossamer/lib/utils"
+ "github.com/libp2p/go-libp2p-core/peer"
+ "github.com/libp2p/go-libp2p-kad-dht/dual"
+ kbucket "github.com/libp2p/go-libp2p-kbucket"
+
+ "github.com/stretchr/testify/require"
+)
+
+func newTestDiscovery(t *testing.T, num int) []*discovery {
+ t.Helper()
+ var discs []*discovery
+ for i := 0; i < num; i++ {
+ config := &Config{
+ BasePath: utils.NewTestBasePath(t, fmt.Sprintf("node%d", i)),
+ Port: uint32(7001 + i),
+ RandSeed: int64(1 + i),
+ NoBootstrap: true,
+ NoMDNS: true,
+ }
+
+ srvc := createTestService(t, config)
+ disc := &discovery{
+ ctx: srvc.ctx,
+ h: srvc.host.h,
+ }
+ go disc.start()
+ discs = append(discs, disc)
+ }
+ return discs
+}
+
+// nolint
+func connectNoSync(t *testing.T, ctx context.Context, a, b *discovery) {
+ t.Helper()
+
+ idB := b.h.ID()
+ addrB := b.h.Peerstore().Addrs(idB)
+ if len(addrB) == 0 {
+ t.Fatal("peers setup incorrectly: no local address")
+ }
+
+ a.h.Peerstore().AddAddrs(idB, addrB, time.Minute)
+ pi := peer.AddrInfo{ID: idB}
+
+ err := a.h.Connect(ctx, pi)
+ // retry connect if "failed to dial" error
+ if failedToDial(err) {
+ time.Sleep(TestBackoffTimeout)
+ err = a.h.Connect(ctx, pi)
+ }
+ require.NoError(t, err)
+}
+
+// nolint
+func wait(t *testing.T, ctx context.Context, a, b *dual.DHT) {
+ t.Helper()
+
+ // Loop until connection notification has been received.
+ // Under high load, this may not happen as immediately as we would like.
+ for a.LAN.RoutingTable().Find(b.LAN.PeerID()) == "" {
+ select {
+ case <-ctx.Done():
+ t.Fatal(ctx.Err())
+ case <-time.After(time.Millisecond * 5):
+ }
+ }
+}
+
+// Set `NoMDNS` to true and test routing via kademlia DHT service.
+func TestKadDHT(t *testing.T) {
+ if testing.Short() {
+ return
+ }
+
+ nodes := newTestDiscovery(t, 3)
+
+ ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
+ defer cancel()
+
+ _, err := nodes[2].dht.FindPeer(ctx, nodes[1].h.ID())
+ require.Equal(t, err, kbucket.ErrLookupFailure)
+
+ connectNoSync(t, ctx, nodes[1], nodes[0])
+ connectNoSync(t, ctx, nodes[2], nodes[0])
+
+ // Can't use `connect` because b and c are only clients.
+ wait(t, ctx, nodes[1].dht, nodes[0].dht)
+ wait(t, ctx, nodes[2].dht, nodes[0].dht)
+
+ _, err = nodes[2].dht.FindPeer(ctx, nodes[1].h.ID())
+ require.NoError(t, err)
+}
+
+func TestBeginDiscovery(t *testing.T) {
+ configA := &Config{
+ BasePath: utils.NewTestBasePath(t, "nodeA"),
+ Port: 7001,
+ RandSeed: 1,
+ NoBootstrap: true,
+ NoMDNS: true,
+ }
+
+ nodeA := createTestService(t, configA)
+ nodeA.noGossip = true
+
+ configB := &Config{
+ BasePath: utils.NewTestBasePath(t, "nodeB"),
+ Port: 7002,
+ RandSeed: 2,
+ NoBootstrap: true,
+ NoMDNS: true,
+ }
+
+ nodeB := createTestService(t, configB)
+ nodeB.noGossip = true
+
+ addrInfosB, err := nodeB.host.addrInfos()
+ require.NoError(t, err)
+
+ err = nodeA.host.connect(*addrInfosB[0])
+ if failedToDial(err) {
+ time.Sleep(TestBackoffTimeout)
+ err = nodeA.host.connect(*addrInfosB[0])
+ }
+ require.NoError(t, err)
+
+ err = nodeA.host.discovery.start()
+ require.NoError(t, err)
+
+ err = nodeB.host.discovery.start()
+ require.NoError(t, err)
+}
+
+func TestBeginDiscovery_ThreeNodes(t *testing.T) {
+ configA := &Config{
+ BasePath: utils.NewTestBasePath(t, "nodeA"),
+ Port: 7001,
+ RandSeed: 1,
+ NoBootstrap: true,
+ NoMDNS: true,
+ }
+
+ nodeA := createTestService(t, configA)
+ nodeA.noGossip = true
+
+ configB := &Config{
+ BasePath: utils.NewTestBasePath(t, "nodeB"),
+ Port: 7002,
+ RandSeed: 2,
+ NoBootstrap: true,
+ NoMDNS: true,
+ }
+
+ nodeB := createTestService(t, configB)
+ nodeB.noGossip = true
+
+ configC := &Config{
+ BasePath: utils.NewTestBasePath(t, "nodeC"),
+ Port: 7003,
+ RandSeed: 3,
+ NoBootstrap: true,
+ NoMDNS: true,
+ }
+
+ nodeC := createTestService(t, configC)
+ nodeC.noGossip = true
+
+ // connect A and B
+ addrInfosB, err := nodeB.host.addrInfos()
+ require.NoError(t, err)
+
+ err = nodeA.host.connect(*addrInfosB[0])
+ if failedToDial(err) {
+ time.Sleep(TestBackoffTimeout)
+ err = nodeA.host.connect(*addrInfosB[0])
+ }
+ require.NoError(t, err)
+
+ // connect A and C
+ addrInfosC, err := nodeC.host.addrInfos()
+ require.NoError(t, err)
+
+ err = nodeA.host.connect(*addrInfosC[0])
+ if failedToDial(err) {
+ time.Sleep(TestBackoffTimeout)
+ err = nodeA.host.connect(*addrInfosC[0])
+ }
+ require.NoError(t, err)
+
+ // begin advertising and discovery for all nodes
+ err = nodeA.host.discovery.start()
+ require.NoError(t, err)
+
+ err = nodeB.host.discovery.start()
+ require.NoError(t, err)
+
+ err = nodeC.host.discovery.start()
+ require.NoError(t, err)
+
+ time.Sleep(time.Millisecond * 500)
+
+ // assert B and C can discover each other
+ addrs := nodeB.host.h.Peerstore().Addrs(nodeC.host.id())
+ require.NotEqual(t, 0, len(addrs))
+}
diff --git a/dot/network/host.go b/dot/network/host.go
index 78445dc37c..125e7f4781 100644
--- a/dot/network/host.go
+++ b/dot/network/host.go
@@ -32,12 +32,11 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
"github.com/libp2p/go-libp2p-core/protocol"
- kaddht "github.com/libp2p/go-libp2p-kad-dht"
- "github.com/libp2p/go-libp2p-kad-dht/dual"
"github.com/libp2p/go-libp2p-peerstore/pstoreds"
secio "github.com/libp2p/go-libp2p-secio"
- rhost "github.com/libp2p/go-libp2p/p2p/host/routed"
ma "github.com/multiformats/go-multiaddr"
+
+ "github.com/chyeh/pubip"
)
var privateCIDRs = []string{
@@ -53,7 +52,7 @@ var privateCIDRs = []string{
type host struct {
ctx context.Context
h libp2phost.Host
- dht *dual.DHT
+ discovery *discovery
bootnodes []peer.AddrInfo
persistentPeers []peer.AddrInfo
protocolID protocol.ID
@@ -71,6 +70,18 @@ func newHost(ctx context.Context, cfg *Config) (*host, error) {
return nil, err
}
+ var externalAddr ma.Multiaddr
+ ip, err := pubip.Get()
+ if err != nil {
+ logger.Error("failed to get public IP", "error", err)
+ } else {
+ logger.Debug("got public IP", "IP", ip)
+ externalAddr, err = ma.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", ip, cfg.Port))
+ if err != nil {
+ return nil, err
+ }
+ }
+
// create connection manager
cm := newConnManager(cfg.MinPeers, cfg.MaxPeers)
@@ -98,13 +109,6 @@ func newHost(ctx context.Context, cfg *Config) (*host, error) {
return nil, err
}
- dhtOpts := []dual.Option{
- dual.DHTOption(kaddht.Datastore(ds)),
- dual.DHTOption(kaddht.BootstrapPeers(bns...)),
- dual.DHTOption(kaddht.V1ProtocolOverride(pid + "/kad")),
- dual.DHTOption(kaddht.Mode(kaddht.ModeAutoServer)),
- }
-
privateIPs := ma.NewFilters()
for _, cidr := range privateCIDRs {
_, ipnet, err := net.ParseCIDR(cidr) //nolint
@@ -130,13 +134,17 @@ func newHost(ctx context.Context, cfg *Config) (*host, error) {
libp2p.ConnectionManager(cm),
libp2p.ChainOptions(libp2p.DefaultSecurity, libp2p.Security(secio.ID, secio.New)), // TODO: deprecate secio?
libp2p.AddrsFactory(func(as []ma.Multiaddr) []ma.Multiaddr {
- ok := []ma.Multiaddr{}
+ addrs := []ma.Multiaddr{}
for _, addr := range as {
if !privateIPs.AddrBlocked(addr) {
- ok = append(ok, addr)
+ addrs = append(addrs, addr)
}
}
- return ok
+ if externalAddr == nil {
+ return addrs
+ }
+
+ return append(addrs, externalAddr)
}),
}
@@ -146,15 +154,6 @@ func newHost(ctx context.Context, cfg *Config) (*host, error) {
return nil, err
}
- // create DHT service
- dht, err := dual.New(ctx, h, dhtOpts...)
- if err != nil {
- return nil, err
- }
-
- // wrap host and DHT service with routed host
- h = rhost.Wrap(h, dht)
-
cacheSize := 64 << 20 // 64 MB
config := ristretto.Config{
NumCounters: int64(float64(cacheSize) * 0.05 * 2),
@@ -170,11 +169,12 @@ func newHost(ctx context.Context, cfg *Config) (*host, error) {
}
bwc := metrics.NewBandwidthCounter()
+ discovery := newDiscovery(ctx, h, bns, ds, pid, cfg.MinPeers, cfg.MaxPeers)
host := &host{
ctx: ctx,
h: h,
- dht: dht,
+ discovery: discovery,
bootnodes: bns,
protocolID: pid,
cm: cm,
@@ -191,7 +191,7 @@ func newHost(ctx context.Context, cfg *Config) (*host, error) {
// close closes host services and the libp2p host (host services first)
func (h *host) close() error {
// close DHT service
- err := h.dht.Close()
+ err := h.discovery.stop()
if err != nil {
logger.Error("Failed to close DHT service", "error", err)
return err
@@ -247,15 +247,12 @@ func (h *host) connect(p peer.AddrInfo) (err error) {
return err
}
-func (h *host) addToPeerstore(p peer.AddrInfo) {
- h.h.Peerstore().AddAddrs(p.ID, p.Addrs, peerstore.PermanentAddrTTL)
-}
-
// bootstrap connects the host to the configured bootnodes
func (h *host) bootstrap() {
failed := 0
all := append(h.bootnodes, h.persistentPeers...)
for _, addrInfo := range all {
+ logger.Debug("bootstrapping to peer", "peer", addrInfo.ID)
err := h.connect(addrInfo)
if err != nil {
logger.Debug("failed to bootstrap to peer", "error", err)
diff --git a/dot/network/host_test.go b/dot/network/host_test.go
index 26090f8cb0..1d0b17e621 100644
--- a/dot/network/host_test.go
+++ b/dot/network/host_test.go
@@ -17,17 +17,12 @@
package network
import (
- "context"
- "fmt"
"net"
"testing"
"time"
"github.com/ChainSafe/gossamer/lib/common"
"github.com/ChainSafe/gossamer/lib/utils"
- "github.com/libp2p/go-libp2p-core/peer"
- "github.com/libp2p/go-libp2p-kad-dht/dual"
- kbucket "github.com/libp2p/go-libp2p-kbucket"
ma "github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/require"
@@ -371,87 +366,3 @@ func TestStreamCloseMetadataCleanup(t *testing.T) {
_, ok = info.getHandshakeData(nodeB.host.id(), true)
require.False(t, ok)
}
-
-func createServiceHelper(t *testing.T, num int) []*Service {
- t.Helper()
- var srvcs []*Service
- for i := 0; i < num; i++ {
- config := &Config{
- BasePath: utils.NewTestBasePath(t, fmt.Sprintf("node%d", i)),
- Port: uint32(7001 + i),
- RandSeed: int64(1 + i),
- NoBootstrap: true,
- NoMDNS: true,
- }
-
- srvc := createTestService(t, config)
- srvc.noGossip = true
- handler := newTestStreamHandler(testBlockAnnounceMessageDecoder)
- srvc.host.registerStreamHandler("", handler.handleStream)
-
- srvcs = append(srvcs, srvc)
- }
- return srvcs
-}
-
-// nolint
-func connectNoSync(t *testing.T, ctx context.Context, a, b *Service) {
- t.Helper()
-
- idB := b.host.h.ID()
- addrB := b.host.h.Peerstore().Addrs(idB)
- if len(addrB) == 0 {
- t.Fatal("peers setup incorrectly: no local address")
- }
-
- a.host.h.Peerstore().AddAddrs(idB, addrB, time.Minute)
- pi := peer.AddrInfo{ID: idB}
-
- err := a.host.h.Connect(ctx, pi)
- // retry connect if "failed to dial" error
- if failedToDial(err) {
- time.Sleep(TestBackoffTimeout)
- err = a.host.h.Connect(ctx, pi)
- }
- require.NoError(t, err)
-}
-
-// nolint
-func wait(t *testing.T, ctx context.Context, a, b *dual.DHT) {
- t.Helper()
-
- // Loop until connection notification has been received.
- // Under high load, this may not happen as immediately as we would like.
- for a.LAN.RoutingTable().Find(b.LAN.PeerID()) == "" {
- select {
- case <-ctx.Done():
- t.Fatal(ctx.Err())
- case <-time.After(time.Millisecond * 5):
- }
- }
-}
-
-// Set `NoMDNS` to true and test routing via kademlia DHT service.
-func TestKadDHT(t *testing.T) {
- if testing.Short() {
- return
- }
-
- nodes := createServiceHelper(t, 3)
-
- ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
- defer cancel()
-
- _, err := nodes[2].host.dht.FindPeer(ctx, nodes[1].host.id())
- require.Equal(t, err, kbucket.ErrLookupFailure)
-
- connectNoSync(t, ctx, nodes[1], nodes[0])
- connectNoSync(t, ctx, nodes[2], nodes[0])
-
- // Can't use `connect` because b and c are only clients.
- wait(t, ctx, nodes[1].host.dht, nodes[0].host.dht)
- wait(t, ctx, nodes[2].host.dht, nodes[0].host.dht)
-
- _, err = nodes[2].host.dht.FindPeer(ctx, nodes[1].host.id())
- require.NoError(t, err)
-}
diff --git a/dot/network/mdns.go b/dot/network/mdns.go
index 226a171d8a..6bd418bf6d 100644
--- a/dot/network/mdns.go
+++ b/dot/network/mdns.go
@@ -22,7 +22,7 @@ import (
log "github.com/ChainSafe/log15"
"github.com/libp2p/go-libp2p-core/peer"
- discovery "github.com/libp2p/go-libp2p/p2p/discovery"
+ libp2pdiscovery "github.com/libp2p/go-libp2p/p2p/discovery"
)
// MDNSPeriod is 1 minute
@@ -39,7 +39,7 @@ type Notifee struct {
type mdns struct {
logger log.Logger
host *host
- mdns discovery.Service
+ mdns libp2pdiscovery.Service
}
// newMDNS creates a new mDNS instance from the host
@@ -52,7 +52,7 @@ func newMDNS(host *host) *mdns {
// startMDNS starts a new mDNS discovery service
func (m *mdns) start() {
- m.logger.Trace(
+ m.logger.Debug(
"Starting mDNS discovery service...",
"host", m.host.id(),
"period", MDNSPeriod,
@@ -60,7 +60,7 @@ func (m *mdns) start() {
)
// create and start service
- mdns, err := discovery.NewMdnsService(
+ mdns, err := libp2pdiscovery.NewMdnsService(
m.host.ctx,
m.host.h,
MDNSPeriod,
@@ -83,16 +83,16 @@ func (m *mdns) start() {
// close shuts down the mDNS discovery service
func (m *mdns) close() error {
-
// check if service is running
- if m.mdns != nil {
-
- // close service
- err := m.mdns.Close()
- if err != nil {
- m.logger.Error("Failed to close mDNS discovery service", "error", err)
- return err
- }
+ if m.mdns == nil {
+ return nil
+ }
+
+ // close service
+ err := m.mdns.Close()
+ if err != nil {
+ m.logger.Warn("Failed to close mDNS discovery service", "error", err)
+ return err
}
return nil
@@ -100,7 +100,7 @@ func (m *mdns) close() error {
// HandlePeerFound is event handler called when a peer is found
func (n Notifee) HandlePeerFound(p peer.AddrInfo) {
- n.logger.Trace(
+ n.logger.Debug(
"Peer found using mDNS discovery",
"host", n.host.id(),
"peer", p.ID,
diff --git a/dot/network/service.go b/dot/network/service.go
index bed9b7203e..c529f22112 100644
--- a/dot/network/service.go
+++ b/dot/network/service.go
@@ -19,7 +19,6 @@ package network
import (
"context"
"errors"
- "fmt"
"io"
"os"
"sync"
@@ -34,7 +33,6 @@ import (
libp2pnetwork "github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
- discovery "github.com/libp2p/go-libp2p-discovery"
)
const (
@@ -234,7 +232,7 @@ func (s *Service) Start() error {
if !s.noDiscover {
go func() {
- err = s.beginDiscovery()
+ err = s.host.discovery.start()
if err != nil {
logger.Error("failed to begin DHT discovery", "error", err)
}
@@ -334,46 +332,6 @@ func (s *Service) handleConn(conn libp2pnetwork.Conn) {
s.syncQueue.updatePeerScore(conn.RemotePeer(), 1)
}
-func (s *Service) beginDiscovery() error {
- rd := discovery.NewRoutingDiscovery(s.host.dht)
-
- err := s.host.dht.Bootstrap(s.ctx)
- if err != nil {
- return fmt.Errorf("failed to bootstrap DHT: %w", err)
- }
-
- // wait to connect to bootstrap peers
- time.Sleep(time.Second)
-
- go func() {
- peerCh, err := rd.FindPeers(s.ctx, s.cfg.ProtocolID)
- if err != nil {
- logger.Error("failed to begin finding peers via DHT", "err", err)
- }
-
- for peer := range peerCh {
- if peer.ID == s.host.id() {
- return
- }
-
- logger.Debug("found new peer via DHT", "peer", peer.ID)
-
- // found a peer, try to connect if we need more peers
- if s.host.peerCount() < s.cfg.MaxPeers {
- err = s.host.connect(peer)
- if err != nil {
- logger.Debug("failed to connect to discovered peer", "peer", peer.ID, "err", err)
- }
- } else {
- s.host.addToPeerstore(peer)
- }
- }
- }()
-
- logger.Debug("DHT discovery started!")
- return nil
-}
-
// Stop closes running instances of the host and network services as well as
// the message channel from the network service to the core service (services that
// are dependent on the host instance should be closed first)
diff --git a/dot/network/service_test.go b/dot/network/service_test.go
index 20a23e1a2a..24be8fd46f 100644
--- a/dot/network/service_test.go
+++ b/dot/network/service_test.go
@@ -41,6 +41,28 @@ func failedToDial(err error) bool {
return err != nil && strings.Contains(err.Error(), "failed to dial")
}
+func createServiceHelper(t *testing.T, num int) []*Service {
+ t.Helper()
+ var srvcs []*Service
+ for i := 0; i < num; i++ {
+ config := &Config{
+ BasePath: utils.NewTestBasePath(t, fmt.Sprintf("node%d", i)),
+ Port: uint32(7001 + i),
+ RandSeed: int64(1 + i),
+ NoBootstrap: true,
+ NoMDNS: true,
+ }
+
+ srvc := createTestService(t, config)
+ srvc.noGossip = true
+ handler := newTestStreamHandler(testBlockAnnounceMessageDecoder)
+ srvc.host.registerStreamHandler("", handler.handleStream)
+
+ srvcs = append(srvcs, srvc)
+ }
+ return srvcs
+}
+
// helper method to create and start a new network service
func createTestService(t *testing.T, cfg *Config) (srvc *Service) {
if cfg == nil {
@@ -265,119 +287,6 @@ func TestService_Health(t *testing.T) {
require.Equal(t, s.Health().IsSyncing, false)
}
-func TestBeginDiscovery(t *testing.T) {
- configA := &Config{
- BasePath: utils.NewTestBasePath(t, "nodeA"),
- Port: 7001,
- RandSeed: 1,
- NoBootstrap: true,
- NoMDNS: true,
- }
-
- nodeA := createTestService(t, configA)
- nodeA.noGossip = true
-
- configB := &Config{
- BasePath: utils.NewTestBasePath(t, "nodeB"),
- Port: 7002,
- RandSeed: 2,
- NoBootstrap: true,
- NoMDNS: true,
- }
-
- nodeB := createTestService(t, configB)
- nodeB.noGossip = true
-
- addrInfosB, err := nodeB.host.addrInfos()
- require.NoError(t, err)
-
- err = nodeA.host.connect(*addrInfosB[0])
- if failedToDial(err) {
- time.Sleep(TestBackoffTimeout)
- err = nodeA.host.connect(*addrInfosB[0])
- }
- require.NoError(t, err)
-
- err = nodeA.beginDiscovery()
- require.NoError(t, err)
-
- err = nodeB.beginDiscovery()
- require.NoError(t, err)
-}
-
-func TestBeginDiscovery_ThreeNodes(t *testing.T) {
- configA := &Config{
- BasePath: utils.NewTestBasePath(t, "nodeA"),
- Port: 7001,
- RandSeed: 1,
- NoBootstrap: true,
- NoMDNS: true,
- }
-
- nodeA := createTestService(t, configA)
- nodeA.noGossip = true
-
- configB := &Config{
- BasePath: utils.NewTestBasePath(t, "nodeB"),
- Port: 7002,
- RandSeed: 2,
- NoBootstrap: true,
- NoMDNS: true,
- }
-
- nodeB := createTestService(t, configB)
- nodeB.noGossip = true
-
- configC := &Config{
- BasePath: utils.NewTestBasePath(t, "nodeC"),
- Port: 7003,
- RandSeed: 3,
- NoBootstrap: true,
- NoMDNS: true,
- }
-
- nodeC := createTestService(t, configC)
- nodeC.noGossip = true
-
- // connect A and B
- addrInfosB, err := nodeB.host.addrInfos()
- require.NoError(t, err)
-
- err = nodeA.host.connect(*addrInfosB[0])
- if failedToDial(err) {
- time.Sleep(TestBackoffTimeout)
- err = nodeA.host.connect(*addrInfosB[0])
- }
- require.NoError(t, err)
-
- // connect A and C
- addrInfosC, err := nodeC.host.addrInfos()
- require.NoError(t, err)
-
- err = nodeA.host.connect(*addrInfosC[0])
- if failedToDial(err) {
- time.Sleep(TestBackoffTimeout)
- err = nodeA.host.connect(*addrInfosC[0])
- }
- require.NoError(t, err)
-
- // begin advertising and discovery for all nodes
- err = nodeA.beginDiscovery()
- require.NoError(t, err)
-
- err = nodeB.beginDiscovery()
- require.NoError(t, err)
-
- err = nodeC.beginDiscovery()
- require.NoError(t, err)
-
- time.Sleep(time.Millisecond * 500)
-
- // assert B and C can discover each other
- addrs := nodeB.host.h.Peerstore().Addrs(nodeC.host.id())
- require.NotEqual(t, 0, len(addrs))
-}
-
func TestPersistPeerStore(t *testing.T) {
nodes := createServiceHelper(t, 2)
nodeA := nodes[0]
diff --git a/dot/network/sync.go b/dot/network/sync.go
index e8fb87add2..546ee009ab 100644
--- a/dot/network/sync.go
+++ b/dot/network/sync.go
@@ -322,7 +322,7 @@ func (q *syncQueue) benchmark() {
continue
}
- logger.Info("💤 node waiting", "head", before.Number, "finalised", finalised.Number)
+ logger.Info("💤 node waiting", "peer count", len(q.s.host.peers()), "head", before.Number, "finalised", finalised.Number)
time.Sleep(time.Second * 5)
continue
}
@@ -337,19 +337,16 @@ func (q *syncQueue) benchmark() {
q.benchmarker.end(after.Number.Uint64())
- logger.Info("🔗 imported blocks", "from", before.Number, "to", after.Number,
- "hashes", fmt.Sprintf("[%s ... %s]", before.Hash(), after.Hash()),
- )
-
- if q.goal-before.Number.Int64() < int64(blockRequestSize) {
- continue
- }
-
logger.Info("🚣 currently syncing",
+ "peer count", len(q.s.host.peers()),
"goal", q.goal,
"average blocks/second", q.benchmarker.mostRecentAverage(),
"overall average", q.benchmarker.average(),
)
+
+ logger.Info("🔗 imported blocks", "from", before.Number, "to", after.Number,
+ "hashes", fmt.Sprintf("[%s ... %s]", before.Hash(), after.Hash()),
+ )
}
}
diff --git a/dot/state/block.go b/dot/state/block.go
index ea5d8c5267..2ac4110113 100644
--- a/dot/state/block.go
+++ b/dot/state/block.go
@@ -38,8 +38,7 @@ const pruneKeyBufferSize = 1000
// BlockState defines fields for manipulating the state of blocks, such as BlockTree, BlockDB and Header
type BlockState struct {
- bt *blocktree.BlockTree
- //baseDB chaindb.Database
+ bt *blocktree.BlockTree
baseState *BaseState
db chaindb.Database
sync.RWMutex
diff --git a/go.mod b/go.mod
index f9b8258164..727573dead 100644
--- a/go.mod
+++ b/go.mod
@@ -8,6 +8,7 @@ require (
github.com/btcsuite/btcutil v1.0.2
github.com/bytecodealliance/wasmtime-go v0.20.0
github.com/centrifuge/go-substrate-rpc-client/v2 v2.0.1
+ github.com/chyeh/pubip v0.0.0-20170203095919-b7e679cf541c
github.com/cosmos/go-bip39 v1.0.0
github.com/davidlazar/go-crypto v0.0.0-20190912175916-7055855a373f // indirect
github.com/dgraph-io/badger/v2 v2.2007.2
@@ -30,6 +31,7 @@ require (
github.com/huin/goupnp v1.0.1-0.20200620063722-49508fba0031 // indirect
github.com/ipfs/go-ds-badger2 v0.1.0
github.com/jcelliott/lumber v0.0.0-20160324203708-dd349441af25 // indirect
+ github.com/jpillora/backoff v1.0.0 // indirect
github.com/jpillora/ipfilter v1.2.2
github.com/kylelemons/godebug v1.1.0 // indirect
github.com/libp2p/go-libp2p v0.12.0
diff --git a/go.sum b/go.sum
index 0d4ff24d2c..13ce535da6 100644
--- a/go.sum
+++ b/go.sum
@@ -42,6 +42,8 @@ github.com/centrifuge/go-substrate-rpc-client/v2 v2.0.1 h1:c9GeUnImFq66CnMAWhTpV
github.com/centrifuge/go-substrate-rpc-client/v2 v2.0.1/go.mod h1:0QCYd0jumsmjB7dZx4bovVhZtHd9VdF5E9q+0nu2xFY=
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
+github.com/chyeh/pubip v0.0.0-20170203095919-b7e679cf541c h1:++BhWlmSX+n8m3O4gPfy3S4PTZ0TMzH6nelerBLPUng=
+github.com/chyeh/pubip v0.0.0-20170203095919-b7e679cf541c/go.mod h1:C7ma6h458jTWT65mXC58L1Q6hnEtr0unur8cMc0UEXM=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
@@ -256,6 +258,8 @@ github.com/jcelliott/lumber v0.0.0-20160324203708-dd349441af25 h1:EFT6MH3igZK/dI
github.com/jcelliott/lumber v0.0.0-20160324203708-dd349441af25/go.mod h1:sWkGw/wsaHtRsT9zGQ/WyJCotGWG/Anow/9hsAcBWRw=
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
+github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA=
+github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
github.com/jpillora/ipfilter v1.2.2 h1:lfENG7V1/T+ZutAtSbt6gssvzj3Ql0JmcFlqS/BES2E=
github.com/jpillora/ipfilter v1.2.2/go.mod h1:xvAYjA+48eM9E5+sg9yI55N5lE9sefckjsnDvSiEA+g=
github.com/jrick/logrotate v1.0.0/go.mod h1:LNinyqDIJnpAur+b8yyulnQw/wDuN1+BYKlTRt3OuAQ=