Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fix non deterministic panic during TestStableNetworkRPC integration test #3756

Merged
merged 9 commits into from
Feb 23, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions dot/network/connmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
type ConnManager struct {
sync.Mutex
host *host
min, max int
maxPeers int
connectHandler func(peer.ID)
disconnectHandler func(peer.ID)

Expand All @@ -33,15 +33,16 @@ type ConnManager struct {
peerSetHandler PeerSetHandler
}

func newConnManager(min, max int, peerSetCfg *peerset.ConfigSet) (*ConnManager, error) {
func newConnManager(max int, peerSetCfg *peerset.ConfigSet) (*ConnManager, error) {
// TODO: peerSetHandler never used from within connection manager and also referred outside through cm,
// so this should be refactored
psh, err := peerset.NewPeerSetHandler(peerSetCfg)
if err != nil {
return nil, err
}

return &ConnManager{
min: min,
max: max,
maxPeers: max,
protectedPeers: new(sync.Map),
persistentPeers: new(sync.Map),
peerSetHandler: psh,
Expand Down
2 changes: 1 addition & 1 deletion dot/network/connmgr_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func TestProtectUnprotectPeer(t *testing.T) {
)

peerCfgSet := peerset.NewConfigSet(uint32(max-min), uint32(max), false, slotAllocationTime)
cm, err := newConnManager(min, max, peerCfgSet)
cm, err := newConnManager(max, peerCfgSet)
require.NoError(t, err)

p1 := peer.ID("a")
Expand Down
28 changes: 15 additions & 13 deletions dot/network/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,35 +35,35 @@ var (

// discovery handles discovery of new peers via the kademlia DHT
type discovery struct {
ctx context.Context
dht *dual.DHT
rd *routing.RoutingDiscovery
h libp2phost.Host
bootnodes []peer.AddrInfo
ds *badger.Datastore
pid protocol.ID
minPeers, maxPeers int
handler PeerSetHandler
ctx context.Context
dht *dual.DHT
rd *routing.RoutingDiscovery
h libp2phost.Host
bootnodes []peer.AddrInfo
ds *badger.Datastore
pid protocol.ID
maxPeers int
handler PeerSetHandler
}

func newDiscovery(ctx context.Context, h libp2phost.Host,
bootnodes []peer.AddrInfo, ds *badger.Datastore,
pid protocol.ID, min, max int, handler PeerSetHandler) *discovery {
pid protocol.ID, max int, handler PeerSetHandler) *discovery {
return &discovery{
ctx: ctx,
h: h,
bootnodes: bootnodes,
ds: ds,
pid: pid,
minPeers: min,
maxPeers: max,
handler: handler,
}
}

// waitForPeers periodically checks kadDHT peers store for new peers and returns them,
// this function used for local environments to prepopulate bootnodes from mDNS
func (d *discovery) waitForPeers() (peers []peer.AddrInfo, err error) {
// get all currently connected peers and use them to bootstrap the DHT

currentPeers := d.h.Network().Peers()

t := time.NewTicker(startDHTTimeout)
Expand Down Expand Up @@ -92,6 +92,9 @@ func (d *discovery) waitForPeers() (peers []peer.AddrInfo, err error) {

// start creates the DHT.
func (d *discovery) start() error {
// this basically only works with enabled mDNS which is used only for local test setups. Without bootnodes kademilia
// would not bee able to connect to any peers and mDNS is used to find peers in local network.
// TODO: should be refactored because this if is basically used for local integration test purpose
P1sar marked this conversation as resolved.
Show resolved Hide resolved
if len(d.bootnodes) == 0 {
peers, err := d.waitForPeers()
if err != nil {
Expand All @@ -100,7 +103,6 @@ func (d *discovery) start() error {

d.bootnodes = peers
}

logger.Debugf("starting DHT with bootnodes %v...", d.bootnodes)
logger.Debugf("V1ProtocolOverride %v...", d.pid+"/kad")

Expand Down
9 changes: 7 additions & 2 deletions dot/network/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,14 +152,19 @@ func newHost(ctx context.Context, cfg *Config) (*host, error) {
// connections remain between min peers and max peers
const reservedOnly = false
peerCfgSet := peerset.NewConfigSet(
//TODO: there is no any understanding of maxOutPeers and maxInPirs calculations.
// This needs to be explicitly mentioned

// maxInPeers is later used in peerstate only and defines available Incoming connection slots
uint32(cfg.MaxPeers-cfg.MinPeers),
// maxOutPeers is later used in peerstate only and defines available Outgoing connection slots
uint32(cfg.MaxPeers/2),
reservedOnly,
peerSetSlotAllocTime,
)

// create connection manager
cm, err := newConnManager(cfg.MinPeers, cfg.MaxPeers, peerCfgSet)
cm, err := newConnManager(cfg.MaxPeers, peerCfgSet)
if err != nil {
return nil, fmt.Errorf("failed to create connection manager: %w", err)
}
Expand Down Expand Up @@ -243,7 +248,7 @@ func newHost(ctx context.Context, cfg *Config) (*host, error) {
}

bwc := metrics.NewBandwidthCounter()
discovery := newDiscovery(ctx, h, bns, ds, pid, cfg.MinPeers, cfg.MaxPeers, cm.peerSetHandler)
discovery := newDiscovery(ctx, h, bns, ds, pid, cfg.MaxPeers, cm.peerSetHandler)

host := &host{
ctx: ctx,
Expand Down
7 changes: 5 additions & 2 deletions dot/network/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@

package network

import "encoding/json"
import (
"encoding/json"
"io"
)

// Telemetry is the telemetry client to send telemetry messages.
type Telemetry interface {
Expand All @@ -22,5 +25,5 @@ type Logger interface {
// MDNS is the mDNS service interface.
type MDNS interface {
Start() error
Stop() error
io.Closer
}
4 changes: 2 additions & 2 deletions internal/mdns/notifee.go → dot/network/notifee.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright 2022 ChainSafe Systems (ON)
// SPDX-License-Identifier: LGPL-3.0-only

package mdns
package network

import (
"time"
Expand Down Expand Up @@ -35,7 +35,7 @@ type NotifeeTracker struct {
peerAdder PeerAdder
}

// HandlePeerFound tracks the address info from the peer found.
// HandlePeerFound is a libp2p.mdns.Notifee interface implementation for mDNS in libp2p.
func (n *NotifeeTracker) HandlePeerFound(p peer.AddrInfo) {
n.addressAdder.AddAddrs(p.ID, p.Addrs, peerstore.PermanentAddrTTL)
n.peerAdder.AddPeer(0, p.ID)
Expand Down
2 changes: 1 addition & 1 deletion dot/network/notifications_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ func Test_HandshakeTimeout(t *testing.T) {
// after the timeout
time.Sleep(handshakeTimeout)

// handshake data shouldn't exist still
// handshake data still shouldn't exist
data = info.peersData.getOutboundHandshakeData(nodeB.host.id())
require.Nil(t, data)

Expand Down
18 changes: 13 additions & 5 deletions dot/network/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ import (
"github.com/ChainSafe/gossamer/dot/telemetry"
"github.com/ChainSafe/gossamer/dot/types"
"github.com/ChainSafe/gossamer/internal/log"
"github.com/ChainSafe/gossamer/internal/mdns"
"github.com/ChainSafe/gossamer/internal/metrics"
"github.com/ChainSafe/gossamer/lib/common"
libp2pnetwork "github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/p2p/discovery/mdns"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
Expand Down Expand Up @@ -194,12 +194,12 @@ func NewService(cfg *Config) (*Service, error) {
}

serviceTag := string(host.protocolID)
notifee := mdns.NewNotifeeTracker(host.p2pHost.Peerstore(), host.cm.peerSetHandler)
notifee := NewNotifeeTracker(host.p2pHost.Peerstore(), host.cm.peerSetHandler)
mdnsLogger := log.NewFromGlobal(log.AddContext("module", "mdns"))
mdnsLogger.Debugf(
"Creating mDNS discovery service with host %s and protocol %s...",
host.id(), host.protocolID)
mdnsService := mdns.NewService(host.p2pHost, serviceTag, mdnsLogger, notifee)
mdnsService := mdns.NewMdnsService(host.p2pHost, serviceTag, notifee)

network := &Service{
ctx: ctx,
Expand Down Expand Up @@ -291,6 +291,8 @@ func (s *Service) Start() error {

// this handles all new connections (incoming and outgoing)
// it creates a per-protocol mutex for sending outbound handshakes to the peer
// connectHandler is a part of libp2p.Notifiee interface implementation and getting called in the very end
//after or Incoming or Outgoing node is connected
s.host.cm.connectHandler = func(peerID peer.ID) {
for _, prtl := range s.notificationsProtocols {
prtl.peersData.setMutex(peerID)
Expand Down Expand Up @@ -322,7 +324,8 @@ func (s *Service) Start() error {
return fmt.Errorf("starting mDNS service: %w", err)
}
}

// TODO: this is basically a hack that is used only in unit tests to disable kademilia dht.
// Should be replaced with a mock instead.
if !s.noDiscover {
go func() {
err = s.host.discovery.start()
Expand Down Expand Up @@ -467,7 +470,7 @@ func (s *Service) Stop() error {
s.cancel()

// close mDNS discovery service
err := s.mdns.Stop()
err := s.mdns.Close()
if err != nil {
logger.Errorf("Failed to close mDNS discovery service: %s", err)
}
Expand Down Expand Up @@ -694,6 +697,9 @@ func (s *Service) startPeerSetHandler() {
go s.startProcessingMsg()
}

// processMessage process messages from PeerSetHandler. Responsible for Connecting and Drop connection with peers.
// When Connect message received function looking for a PeerAddr in Peerstore.
// If address is not found in peerstore we are looking for a peer with DHT
func (s *Service) processMessage(msg peerset.Message) {
peerID := msg.PeerID
if peerID == "" {
Expand All @@ -714,6 +720,7 @@ func (s *Service) processMessage(msg peerset.Message) {

err := s.host.connect(addrInfo)
if err != nil {
// TODO: if error happens here outgoing (?) slot is occupied but no peer is really connected
logger.Warnf("failed to open connection for peer %s: %s", peerID, err)
return
}
Expand All @@ -728,6 +735,7 @@ func (s *Service) processMessage(msg peerset.Message) {
}
}

// startProcessingMsg function that listens to messages from the channel that belongs to PeerSet PeerSetHandler.
func (s *Service) startProcessingMsg() {
msgCh := s.host.cm.peerSetHandler.Messages()
for {
Expand Down
4 changes: 2 additions & 2 deletions dot/network/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func TestStringToAddrInfo(t *testing.T) {
for _, str := range TestPeers {
pi, err := stringToAddrInfo(str)
require.NoError(t, err)
require.Equal(t, pi.ID.Pretty(), str[len(str)-46:])
require.Equal(t, pi.ID.String(), str[len(str)-46:])
}
}

Expand All @@ -74,7 +74,7 @@ func TestStringsToAddrInfos(t *testing.T) {
require.NoError(t, err)

for k, pi := range pi {
require.Equal(t, pi.ID.Pretty(), TestPeers[k][len(TestPeers[k])-46:])
require.Equal(t, pi.ID.String(), TestPeers[k][len(TestPeers[k])-46:])
}
}

Expand Down
18 changes: 12 additions & 6 deletions dot/peerset/peerset.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,9 @@ type PeerSet struct {
// TODO: this will be useful for reserved only mode
// this is for future purpose if reserved-only flag is enabled (#1888).
isReservedOnly bool
resultMsgCh chan Message

// resultMsgCh is read by network.Service.
resultMsgCh chan Message
// time when the PeerSet was created.
created time.Time
// last time when we updated the reputations of connected nodes.
Expand Down Expand Up @@ -369,6 +371,7 @@ func (ps *PeerSet) reportPeer(change ReputationChange, peers ...peer.ID) error {
}

// allocSlots tries to fill available outgoing slots of nodes for the given set.
// By default this getting called every X seconds according to nextPeriodicAllocSlots ticker
func (ps *PeerSet) allocSlots(setIdx int) error {
err := ps.updateTime()
if err != nil {
Expand All @@ -382,7 +385,7 @@ func (ps *PeerSet) allocSlots(setIdx int) error {
case connectedPeer:
continue
case unknownPeer:
peerState.discover(setIdx, reservePeer)
peerState.insertPeer(setIdx, reservePeer)
}

node, err := ps.peerState.getNode(reservePeer)
Expand Down Expand Up @@ -425,7 +428,7 @@ func (ps *PeerSet) allocSlots(setIdx int) error {
}

if err = peerState.tryOutgoing(setIdx, peerID); err != nil {
logger.Errorf("could not set peer %s as outgoing connection: %s", peerID.Pretty(), err)
logger.Errorf("could not set peer %s as outgoing connection: %s", peerID.String(), err)
break
}

Expand All @@ -450,7 +453,7 @@ func (ps *PeerSet) addReservedPeers(setID int, peers ...peer.ID) error {
return nil
}

ps.peerState.discover(setID, peerID)
ps.peerState.insertPeer(setID, peerID)

ps.reservedNode[peerID] = struct{}{}
if err := ps.peerState.addNoSlotNode(setID, peerID); err != nil {
Expand Down Expand Up @@ -537,13 +540,16 @@ func (ps *PeerSet) setReservedPeer(setID int, peers ...peer.ID) error {
return nil
}

// addPeer checks peer existence in peerSet and if it does not insert the peer in to peerstate with
// default reputation and notConnected status. Afterwards runs allocSlots that checks availability of outgoing slots
// and put notConnected peers in to them
func (ps *PeerSet) addPeer(setID int, peers peer.IDSlice) error {
for _, pid := range peers {
if ps.peerState.peerStatus(setID, pid) != unknownPeer {
return nil
}

ps.peerState.discover(setID, pid)
ps.peerState.insertPeer(setID, pid)
if err := ps.allocSlots(setID); err != nil {
return fmt.Errorf("could not allocate slots: %w", err)
}
Expand Down Expand Up @@ -612,7 +618,7 @@ func (ps *PeerSet) incoming(setID int, peers ...peer.ID) error {
case notConnectedPeer:
ps.peerState.nodes[pid].lastConnected[setID] = time.Now()
case unknownPeer:
ps.peerState.discover(setID, pid)
ps.peerState.insertPeer(setID, pid)
}

state := ps.peerState
Expand Down
10 changes: 5 additions & 5 deletions dot/peerset/peerset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestBanRejectAcceptPeer(t *testing.T) {
peer1Status := ps.peerState.peerStatus(testSetID, peer1)
require.Equal(t, unknownPeer, peer1Status)

ps.peerState.discover(testSetID, peer1)
ps.peerState.insertPeer(testSetID, peer1)
// adding peer1 with incoming slot.
err := ps.peerState.tryAcceptIncoming(testSetID, peer1)
require.NoError(t, err)
Expand Down Expand Up @@ -136,19 +136,19 @@ func TestPeerSetIncoming(t *testing.T) {
pid: incomingPeer,
expectedStatus: Accept,
expectedNumIn: 1,
hasFreeIncomingSlot: false,
hasFreeIncomingSlot: true,
},
{
pid: incoming2,
expectedStatus: Accept,
expectedNumIn: 2,
hasFreeIncomingSlot: true,
hasFreeIncomingSlot: false, // since maxIn is 2, we will not have any free slots if 2 peers connected
},
{
pid: incoming3,
expectedStatus: Reject,
expectedNumIn: 2,
hasFreeIncomingSlot: true,
hasFreeIncomingSlot: false, // since maxIn is 2, we will not have any free slots if 2 peers connected
},
}

Expand Down Expand Up @@ -217,7 +217,7 @@ func TestReAllocAfterBanned(t *testing.T) {
peer1Status := ps.peerState.peerStatus(testSetID, peer1)
require.Equal(t, unknownPeer, peer1Status)

ps.peerState.discover(testSetID, peer1)
ps.peerState.insertPeer(testSetID, peer1)
err := ps.peerState.tryAcceptIncoming(testSetID, peer1)
require.NoError(t, err)

Expand Down
Loading
Loading