diff --git a/dot/network/connmgr_test.go b/dot/network/connmgr_test.go index ac2c8ff5c9..1ed6001816 100644 --- a/dot/network/connmgr_test.go +++ b/dot/network/connmgr_test.go @@ -82,12 +82,12 @@ func TestMaxPeers(t *testing.T) { continue } - n.host.h.Peerstore().AddAddrs(ainfo.ID, ainfo.Addrs, peerstore.PermanentAddrTTL) + n.host.p2pHost.Peerstore().AddAddrs(ainfo.ID, ainfo.Addrs, peerstore.PermanentAddrTTL) n.host.cm.peerSetHandler.AddPeer(0, ainfo.ID) } time.Sleep(200 * time.Millisecond) - p := nodes[0].host.h.Peerstore().Peers() + p := nodes[0].host.p2pHost.Peerstore().Peers() require.LessOrEqual(t, max, len(p)) } @@ -152,7 +152,7 @@ func TestPersistentPeers(t *testing.T) { time.Sleep(time.Millisecond * 600) // B should have connected to A during bootstrap - conns := nodeB.host.h.Network().ConnsToPeer(nodeA.host.id()) + conns := nodeB.host.p2pHost.Network().ConnsToPeer(nodeA.host.id()) require.NotEqual(t, 0, len(conns)) // if A disconnects from B, B should reconnect @@ -160,7 +160,7 @@ func TestPersistentPeers(t *testing.T) { time.Sleep(time.Millisecond * 500) - conns = nodeB.host.h.Network().ConnsToPeer(nodeA.host.id()) + conns = nodeB.host.p2pHost.Network().ConnsToPeer(nodeA.host.id()) require.NotEqual(t, 0, len(conns)) } @@ -239,7 +239,7 @@ func TestSetReservedPeer(t *testing.T) { require.Equal(t, 2, node3.host.peerCount()) - node3.host.h.Peerstore().AddAddrs(addrC.ID, addrC.Addrs, peerstore.PermanentAddrTTL) + node3.host.p2pHost.Peerstore().AddAddrs(addrC.ID, addrC.Addrs, peerstore.PermanentAddrTTL) node3.host.cm.peerSetHandler.SetReservedPeer(0, addrC.ID) time.Sleep(200 * time.Millisecond) diff --git a/dot/network/discovery_test.go b/dot/network/discovery_test.go index 3d8b2278ff..9858d9e428 100644 --- a/dot/network/discovery_test.go +++ b/dot/network/discovery_test.go @@ -35,7 +35,7 @@ func newTestDiscovery(t *testing.T, num int) []*discovery { require.NoError(t, err) disc := &discovery{ ctx: srvc.ctx, - h: srvc.host.h, + h: srvc.host.p2pHost, ds: ds, } @@ -200,7 +200,7 @@ func TestBeginDiscovery_ThreeNodes(t *testing.T) { time.Sleep(time.Millisecond * 500) // assert B and C can discover each other - addrs := nodeB.host.h.Peerstore().Addrs(nodeC.host.id()) + addrs := nodeB.host.p2pHost.Peerstore().Addrs(nodeC.host.id()) require.NotEqual(t, 0, len(addrs)) } diff --git a/dot/network/host.go b/dot/network/host.go index 1dbdda5d9c..9f626438bb 100644 --- a/dot/network/host.go +++ b/dot/network/host.go @@ -68,7 +68,7 @@ const ( // host wraps libp2p host with network host configuration and services type host struct { ctx context.Context - h libp2phost.Host + p2pHost libp2phost.Host discovery *discovery bootnodes []peer.AddrInfo persistentPeers []peer.AddrInfo @@ -211,7 +211,7 @@ func newHost(ctx context.Context, cfg *Config) (*host, error) { host := &host{ ctx: ctx, - h: h, + p2pHost: h, discovery: discovery, bootnodes: bns, protocolID: pid, @@ -236,14 +236,14 @@ func (h *host) close() error { } // close libp2p host - err = h.h.Close() + err = h.p2pHost.Close() if err != nil { logger.Errorf("Failed to close libp2p host: %s", err) return err } h.closeSync.Do(func() { - err = h.h.Peerstore().Close() + err = h.p2pHost.Peerstore().Close() if err != nil { logger.Errorf("Failed to close libp2p peerstore: %s", err) return @@ -260,28 +260,28 @@ func (h *host) close() error { // registerStreamHandler registers the stream handler for the given protocol id. func (h *host) registerStreamHandler(pid protocol.ID, handler func(libp2pnetwork.Stream)) { - h.h.SetStreamHandler(pid, handler) + h.p2pHost.SetStreamHandler(pid, handler) } // connect connects the host to a specific peer address func (h *host) connect(p peer.AddrInfo) (err error) { - h.h.Peerstore().AddAddrs(p.ID, p.Addrs, peerstore.PermanentAddrTTL) + h.p2pHost.Peerstore().AddAddrs(p.ID, p.Addrs, peerstore.PermanentAddrTTL) ctx, cancel := context.WithTimeout(h.ctx, connectTimeout) defer cancel() - err = h.h.Connect(ctx, p) + err = h.p2pHost.Connect(ctx, p) return err } // bootstrap connects the host to the configured bootnodes func (h *host) bootstrap() { for _, info := range h.persistentPeers { - h.h.Peerstore().AddAddrs(info.ID, info.Addrs, peerstore.PermanentAddrTTL) + h.p2pHost.Peerstore().AddAddrs(info.ID, info.Addrs, peerstore.PermanentAddrTTL) h.cm.peerSetHandler.AddReservedPeer(0, info.ID) } for _, addrInfo := range h.bootnodes { logger.Debugf("bootstrapping to peer %s", addrInfo.ID) - h.h.Peerstore().AddAddrs(addrInfo.ID, addrInfo.Addrs, peerstore.PermanentAddrTTL) + h.p2pHost.Peerstore().AddAddrs(addrInfo.ID, addrInfo.Addrs, peerstore.PermanentAddrTTL) h.cm.peerSetHandler.AddPeer(0, addrInfo.ID) } } @@ -290,7 +290,7 @@ func (h *host) bootstrap() { // the newly created stream. func (h *host) send(p peer.ID, pid protocol.ID, msg Message) (libp2pnetwork.Stream, error) { // open outbound stream with host protocol id - stream, err := h.h.NewStream(h.ctx, p, pid) + stream, err := h.p2pHost.NewStream(h.ctx, p, pid) if err != nil { logger.Tracef("failed to open new stream with peer %s using protocol %s: %s", p, pid, err) return nil, err @@ -334,12 +334,12 @@ func (h *host) writeToStream(s libp2pnetwork.Stream, msg Message) error { // id returns the host id func (h *host) id() peer.ID { - return h.h.ID() + return h.p2pHost.ID() } // Peers returns connected peers func (h *host) peers() []peer.ID { - return h.h.Network().Peers() + return h.p2pHost.Network().Peers() } // addReservedPeers adds the peers `addrs` to the protected peers list and connects to them @@ -354,7 +354,7 @@ func (h *host) addReservedPeers(addrs ...string) error { if err != nil { return err } - h.h.Peerstore().AddAddrs(addrInfo.ID, addrInfo.Addrs, peerstore.PermanentAddrTTL) + h.p2pHost.Peerstore().AddAddrs(addrInfo.ID, addrInfo.Addrs, peerstore.PermanentAddrTTL) h.cm.peerSetHandler.AddReservedPeer(0, addrInfo.ID) } @@ -369,7 +369,7 @@ func (h *host) removeReservedPeers(ids ...string) error { return err } h.cm.peerSetHandler.RemoveReservedPeer(0, peerID) - h.h.ConnManager().Unprotect(peerID, "") + h.p2pHost.ConnManager().Unprotect(peerID, "") } return nil @@ -378,7 +378,7 @@ func (h *host) removeReservedPeers(ids ...string) error { // supportsProtocol checks if the protocol is supported by peerID // returns an error if could not get peer protocols func (h *host) supportsProtocol(peerID peer.ID, protocol protocol.ID) (bool, error) { - peerProtocols, err := h.h.Peerstore().SupportsProtocols(peerID, string(protocol)) + peerProtocols, err := h.p2pHost.Peerstore().SupportsProtocols(peerID, string(protocol)) if err != nil { return false, err } @@ -388,21 +388,21 @@ func (h *host) supportsProtocol(peerID peer.ID, protocol protocol.ID) (bool, err // peerCount returns the number of connected peers func (h *host) peerCount() int { - peers := h.h.Network().Peers() + peers := h.p2pHost.Network().Peers() return len(peers) } // addrInfo returns the libp2p peer.AddrInfo of the host func (h *host) addrInfo() peer.AddrInfo { return peer.AddrInfo{ - ID: h.h.ID(), - Addrs: h.h.Addrs(), + ID: h.p2pHost.ID(), + Addrs: h.p2pHost.Addrs(), } } // multiaddrs returns the multiaddresses of the host func (h *host) multiaddrs() (multiaddrs []ma.Multiaddr) { - addrs := h.h.Addrs() + addrs := h.p2pHost.Addrs() for _, addr := range addrs { multiaddr, err := ma.NewMultiaddr(fmt.Sprintf("%s/p2p/%s", addr, h.id())) if err != nil { @@ -415,16 +415,16 @@ func (h *host) multiaddrs() (multiaddrs []ma.Multiaddr) { // protocols returns all protocols currently supported by the node func (h *host) protocols() []string { - return h.h.Mux().Protocols() + return h.p2pHost.Mux().Protocols() } // closePeer closes connection with peer. func (h *host) closePeer(peer peer.ID) error { - return h.h.Network().ClosePeer(peer) + return h.p2pHost.Network().ClosePeer(peer) } func (h *host) closeProtocolStream(pID protocol.ID, p peer.ID) { - connToPeer := h.h.Network().ConnsToPeer(p) + connToPeer := h.p2pHost.Network().ConnsToPeer(p) for _, c := range connToPeer { for _, st := range c.GetStreams() { if st.Protocol() != pID { diff --git a/dot/network/host_test.go b/dot/network/host_test.go index 1250b15181..baa812a9c4 100644 --- a/dot/network/host_test.go +++ b/dot/network/host_test.go @@ -170,13 +170,13 @@ func TestBootstrap(t *testing.T) { peerCountA := nodeA.host.peerCount() if peerCountA == 0 { - peerCountA := len(nodeA.host.h.Peerstore().Peers()) + peerCountA := len(nodeA.host.p2pHost.Peerstore().Peers()) require.NotZero(t, peerCountA) } peerCountB := nodeB.host.peerCount() if peerCountB == 0 { - peerCountB := len(nodeB.host.h.Peerstore().Peers()) + peerCountB := len(nodeB.host.p2pHost.Peerstore().Peers()) require.NotZero(t, peerCountB) } } @@ -498,7 +498,7 @@ func Test_RemoveReservedPeers(t *testing.T) { time.Sleep(100 * time.Millisecond) require.Equal(t, 1, nodeA.host.peerCount()) - isProtected := nodeA.host.h.ConnManager().IsProtected(nodeB.host.addrInfo().ID, "") + isProtected := nodeA.host.p2pHost.ConnManager().IsProtected(nodeB.host.addrInfo().ID, "") require.False(t, isProtected) err = nodeA.host.removeReservedPeers("unknown_perr_id") @@ -583,7 +583,7 @@ func TestPeerConnect(t *testing.T) { nodeB.noGossip = true addrInfoB := nodeB.host.addrInfo() - nodeA.host.h.Peerstore().AddAddrs(addrInfoB.ID, addrInfoB.Addrs, peerstore.PermanentAddrTTL) + nodeA.host.p2pHost.Peerstore().AddAddrs(addrInfoB.ID, addrInfoB.Addrs, peerstore.PermanentAddrTTL) nodeA.host.cm.peerSetHandler.AddPeer(0, addrInfoB.ID) time.Sleep(100 * time.Millisecond) @@ -621,7 +621,7 @@ func TestBannedPeer(t *testing.T) { nodeB.noGossip = true addrInfoB := nodeB.host.addrInfo() - nodeA.host.h.Peerstore().AddAddrs(addrInfoB.ID, addrInfoB.Addrs, peerstore.PermanentAddrTTL) + nodeA.host.p2pHost.Peerstore().AddAddrs(addrInfoB.ID, addrInfoB.Addrs, peerstore.PermanentAddrTTL) nodeA.host.cm.peerSetHandler.AddPeer(0, addrInfoB.ID) time.Sleep(100 * time.Millisecond) @@ -674,7 +674,7 @@ func TestPeerReputation(t *testing.T) { nodeB.noGossip = true addrInfoB := nodeB.host.addrInfo() - nodeA.host.h.Peerstore().AddAddrs(addrInfoB.ID, addrInfoB.Addrs, peerstore.PermanentAddrTTL) + nodeA.host.p2pHost.Peerstore().AddAddrs(addrInfoB.ID, addrInfoB.Addrs, peerstore.PermanentAddrTTL) nodeA.host.cm.peerSetHandler.AddPeer(0, addrInfoB.ID) time.Sleep(100 * time.Millisecond) diff --git a/dot/network/light_test.go b/dot/network/light_test.go index 85f26ffd40..7cbc037921 100644 --- a/dot/network/light_test.go +++ b/dot/network/light_test.go @@ -113,7 +113,7 @@ func TestHandleLightMessage_Response(t *testing.T) { } require.NoError(t, err) - stream, err := s.host.h.NewStream(s.ctx, b.host.id(), s.host.protocolID+lightID) + stream, err := s.host.p2pHost.NewStream(s.ctx, b.host.id(), s.host.protocolID+lightID) require.NoError(t, err) // Testing empty request diff --git a/dot/network/mdns.go b/dot/network/mdns.go index fc329218f1..04635087fe 100644 --- a/dot/network/mdns.go +++ b/dot/network/mdns.go @@ -47,7 +47,7 @@ func (m *mdns) start() { // create and start service mdns, err := libp2pdiscovery.NewMdnsService( m.host.ctx, - m.host.h, + m.host.p2pHost, MDNSPeriod, string(m.host.protocolID), ) @@ -89,7 +89,7 @@ func (n Notifee) HandlePeerFound(p peer.AddrInfo) { "Peer %s found using mDNS discovery, with host %s", p.ID, n.host.id()) - n.host.h.Peerstore().AddAddrs(p.ID, p.Addrs, peerstore.PermanentAddrTTL) + n.host.p2pHost.Peerstore().AddAddrs(p.ID, p.Addrs, peerstore.PermanentAddrTTL) // connect to found peer n.host.cm.peerSetHandler.AddPeer(0, p.ID) } diff --git a/dot/network/mdns_test.go b/dot/network/mdns_test.go index 901a57622f..82ce9dd87e 100644 --- a/dot/network/mdns_test.go +++ b/dot/network/mdns_test.go @@ -42,13 +42,13 @@ func TestMDNS(t *testing.T) { if peerCountA == 0 { // check peerstore for disconnected peers - peerCountA := len(nodeA.host.h.Peerstore().Peers()) + peerCountA := len(nodeA.host.p2pHost.Peerstore().Peers()) require.NotZero(t, peerCountA) } if peerCountB == 0 { // check peerstore for disconnected peers - peerCountB := len(nodeB.host.h.Peerstore().Peers()) + peerCountB := len(nodeB.host.p2pHost.Peerstore().Peers()) require.NotZero(t, peerCountB) } } diff --git a/dot/network/notifications_test.go b/dot/network/notifications_test.go index 5670114262..172f212c21 100644 --- a/dot/network/notifications_test.go +++ b/dot/network/notifications_test.go @@ -112,7 +112,7 @@ func TestCreateNotificationsMessageHandler_BlockAnnounce(t *testing.T) { } require.NoError(t, err) - stream, err := s.host.h.NewStream(s.ctx, b.host.id(), s.host.protocolID+blockAnnounceID) + stream, err := s.host.p2pHost.NewStream(s.ctx, b.host.id(), s.host.protocolID+blockAnnounceID) require.NoError(t, err) // create info and handler @@ -181,7 +181,7 @@ func TestCreateNotificationsMessageHandler_BlockAnnounceHandshake(t *testing.T) } require.NoError(t, err) - stream, err := s.host.h.NewStream(s.ctx, b.host.id(), s.host.protocolID+blockAnnounceID) + stream, err := s.host.p2pHost.NewStream(s.ctx, b.host.id(), s.host.protocolID+blockAnnounceID) require.NoError(t, err) // try invalid handshake @@ -250,7 +250,7 @@ func Test_HandshakeTimeout(t *testing.T) { info := newNotificationsProtocol(nodeA.host.protocolID+blockAnnounceID, nodeA.getBlockAnnounceHandshake, testHandshakeDecoder, nodeA.validateBlockAnnounceHandshake) - nodeB.host.h.SetStreamHandler(info.protocolID, func(stream libp2pnetwork.Stream) { + nodeB.host.p2pHost.SetStreamHandler(info.protocolID, func(stream libp2pnetwork.Stream) { // should not respond to a handshake message }) @@ -267,7 +267,7 @@ func Test_HandshakeTimeout(t *testing.T) { // clear handshake data from connection handler time.Sleep(time.Millisecond * 100) info.peersData.deleteOutboundHandshakeData(nodeB.host.id()) - connAToB := nodeA.host.h.Network().ConnsToPeer(nodeB.host.id()) + connAToB := nodeA.host.p2pHost.Network().ConnsToPeer(nodeB.host.id()) for _, stream := range connAToB[0].GetStreams() { _ = stream.Close() } @@ -289,7 +289,7 @@ func Test_HandshakeTimeout(t *testing.T) { require.Nil(t, data) // a stream should be open until timeout - connAToB = nodeA.host.h.Network().ConnsToPeer(nodeB.host.id()) + connAToB = nodeA.host.p2pHost.Network().ConnsToPeer(nodeB.host.id()) require.Len(t, connAToB, 1) require.Len(t, connAToB[0].GetStreams(), 1) @@ -301,7 +301,7 @@ func Test_HandshakeTimeout(t *testing.T) { require.Nil(t, data) // stream should be closed - connAToB = nodeA.host.h.Network().ConnsToPeer(nodeB.host.id()) + connAToB = nodeA.host.p2pHost.Network().ConnsToPeer(nodeB.host.id()) require.Len(t, connAToB, 1) require.Len(t, connAToB[0].GetStreams(), 0) } @@ -343,7 +343,7 @@ func TestCreateNotificationsMessageHandler_HandleTransaction(t *testing.T) { require.NoError(t, err) txnProtocolID := srvc1.host.protocolID + transactionsID - stream, err := srvc1.host.h.NewStream(srvc1.ctx, srvc2.host.id(), txnProtocolID) + stream, err := srvc1.host.p2pHost.NewStream(srvc1.ctx, srvc2.host.id(), txnProtocolID) require.NoError(t, err) // create info and handler diff --git a/dot/network/service.go b/dot/network/service.go index 2348090fce..6374bdb9ea 100644 --- a/dot/network/service.go +++ b/dot/network/service.go @@ -135,8 +135,7 @@ type Service struct { blockResponseBuf []byte blockResponseBufMu sync.Mutex - - telemetry telemetry.Client + telemetry telemetry.Client } // NewService creates a new network service from the configuration and message channels @@ -174,6 +173,7 @@ func NewService(cfg *Config) (*Service, error) { if cfg.batchSize == 0 { cfg.batchSize = defaultTxnBatchSize } + // create a new host instance host, err := newHost(ctx, cfg) if err != nil { @@ -277,7 +277,7 @@ func (s *Service) Start() error { // since this opens block announce streams, it should happen after the protocol is registered // NOTE: this only handles *incoming* connections - s.host.h.Network().SetConnHandler(s.handleConn) + s.host.p2pHost.Network().SetConnHandler(s.handleConn) // this handles all new connections (incoming and outgoing) // it creates a per-protocol mutex for sending outbound handshakes to the peer @@ -341,9 +341,9 @@ func (s *Service) updateMetrics() { return case <-ticker.C: peerCountGauge.Set(float64(s.host.peerCount())) - connectionsGauge.Set(float64(len(s.host.h.Network().Conns()))) + connectionsGauge.Set(float64(len(s.host.p2pHost.Network().Conns()))) nodeLatencyGauge.Set(float64( - s.host.h.Peerstore().LatencyEWMA(s.host.id()).Milliseconds())) + s.host.p2pHost.Peerstore().LatencyEWMA(s.host.id()).Milliseconds())) inboundBlockAnnounceStreamsGauge.Set(float64( s.getNumStreams(BlockAnnounceMsgType, true))) outboundBlockAnnounceStreamsGauge.Set(float64( @@ -357,7 +357,7 @@ func (s *Service) updateMetrics() { } func (s *Service) getTotalStreams(inbound bool) (count int64) { - for _, conn := range s.host.h.Network().Conns() { + for _, conn := range s.host.p2pHost.Network().Conns() { for _, stream := range conn.GetStreams() { streamIsInbound := isInbound(stream) if (streamIsInbound && inbound) || (!streamIsInbound && !inbound) { @@ -487,8 +487,6 @@ func (s *Service) Stop() error { logger.Errorf("Failed to close host: %s", err) } - s.host.cm.peerSetHandler.Stop() - // check if closeCh is closed, if not, close it. mainloop: for { @@ -692,7 +690,7 @@ func (s *Service) processMessage(msg peerset.Message) { } switch msg.Status { case peerset.Connect: - addrInfo := s.host.h.Peerstore().PeerInfo(peerID) + addrInfo := s.host.p2pHost.Peerstore().PeerInfo(peerID) if len(addrInfo.Addrs) == 0 { var err error addrInfo, err = s.host.discovery.findPeer(peerID) diff --git a/dot/network/service_test.go b/dot/network/service_test.go index ffb57489c5..741cf3b134 100644 --- a/dot/network/service_test.go +++ b/dot/network/service_test.go @@ -269,7 +269,8 @@ func TestBroadcastDuplicateMessage(t *testing.T) { } require.NoError(t, err) - stream, err := nodeA.host.h.NewStream(context.Background(), nodeB.host.id(), nodeB.host.protocolID+blockAnnounceID) + stream, err := nodeA.host.p2pHost.NewStream(context.Background(), + nodeB.host.id(), nodeB.host.protocolID+blockAnnounceID) require.NoError(t, err) require.NotNil(t, stream) @@ -361,7 +362,7 @@ func TestPersistPeerStore(t *testing.T) { } require.NoError(t, err) - require.NotEmpty(t, nodeA.host.h.Peerstore().PeerInfo(nodeB.host.id()).Addrs) + require.NotEmpty(t, nodeA.host.p2pHost.Peerstore().PeerInfo(nodeB.host.id()).Addrs) // Stop a node and reinitialise a new node with same base path. err = nodeA.Stop() @@ -369,7 +370,7 @@ func TestPersistPeerStore(t *testing.T) { // Since nodeAA uses the persistent peerstore of nodeA, it should be have nodeB in it's peerstore. nodeAA := createTestService(t, nodeA.cfg) - require.NotEmpty(t, nodeAA.host.h.Peerstore().PeerInfo(nodeB.host.id()).Addrs) + require.NotEmpty(t, nodeAA.host.p2pHost.Peerstore().PeerInfo(nodeB.host.id()).Addrs) } func TestHandleConn(t *testing.T) { diff --git a/dot/network/state.go b/dot/network/state.go index 508cfc7f00..75c66c8810 100644 --- a/dot/network/state.go +++ b/dot/network/state.go @@ -47,7 +47,6 @@ type TransactionHandler interface { // PeerSetHandler is the interface used by the connection manager to handle peerset. type PeerSetHandler interface { Start(context.Context) - Stop() ReportPeer(peerset.ReputationChange, ...peer.ID) PeerAdd PeerRemove diff --git a/dot/network/sync.go b/dot/network/sync.go index d21e7a189a..856447254e 100644 --- a/dot/network/sync.go +++ b/dot/network/sync.go @@ -24,13 +24,13 @@ var ( func (s *Service) DoBlockRequest(to peer.ID, req *BlockRequestMessage) (*BlockResponseMessage, error) { fullSyncID := s.host.protocolID + syncID - s.host.h.ConnManager().Protect(to, "") - defer s.host.h.ConnManager().Unprotect(to, "") + s.host.p2pHost.ConnManager().Protect(to, "") + defer s.host.p2pHost.ConnManager().Unprotect(to, "") ctx, cancel := context.WithTimeout(s.ctx, blockRequestTimeout) defer cancel() - stream, err := s.host.h.NewStream(ctx, to, fullSyncID) + stream, err := s.host.p2pHost.NewStream(ctx, to, fullSyncID) if err != nil { return nil, err } diff --git a/dot/peerset/handler.go b/dot/peerset/handler.go index 5c90d2be1b..4e0fd35f7a 100644 --- a/dot/peerset/handler.go +++ b/dot/peerset/handler.go @@ -117,7 +117,7 @@ func (h *Handler) PeerReputation(peerID peer.ID) (Reputation, error) { if err != nil { return 0, err } - return n.getReputation(), nil + return n.reputation, nil } // Start starts peerSet processing diff --git a/dot/peerset/mock_message_processor_test.go b/dot/peerset/mock_message_processor_test.go new file mode 100644 index 0000000000..91799591fa --- /dev/null +++ b/dot/peerset/mock_message_processor_test.go @@ -0,0 +1,46 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/ChainSafe/gossamer/dot/peerset (interfaces: MessageProcessor) + +// Package peerset is a generated GoMock package. +package peerset + +import ( + reflect "reflect" + + gomock "github.com/golang/mock/gomock" +) + +// MockMessageProcessor is a mock of MessageProcessor interface. +type MockMessageProcessor struct { + ctrl *gomock.Controller + recorder *MockMessageProcessorMockRecorder +} + +// MockMessageProcessorMockRecorder is the mock recorder for MockMessageProcessor. +type MockMessageProcessorMockRecorder struct { + mock *MockMessageProcessor +} + +// NewMockMessageProcessor creates a new mock instance. +func NewMockMessageProcessor(ctrl *gomock.Controller) *MockMessageProcessor { + mock := &MockMessageProcessor{ctrl: ctrl} + mock.recorder = &MockMessageProcessorMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockMessageProcessor) EXPECT() *MockMessageProcessorMockRecorder { + return m.recorder +} + +// Process mocks base method. +func (m *MockMessageProcessor) Process(arg0 Message) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Process", arg0) +} + +// Process indicates an expected call of Process. +func (mr *MockMessageProcessorMockRecorder) Process(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Process", reflect.TypeOf((*MockMessageProcessor)(nil).Process), arg0) +} diff --git a/dot/peerset/peerset.go b/dot/peerset/peerset.go index accdcd4a7e..df19ec2d51 100644 --- a/dot/peerset/peerset.go +++ b/dot/peerset/peerset.go @@ -8,6 +8,7 @@ import ( "fmt" "math" "strings" + "sync" "time" "github.com/ChainSafe/gossamer/internal/log" @@ -24,6 +25,7 @@ const ( // forgetAfterTime amount of time between the moment we disconnect // from a node and the moment we remove it from the list. forgetAfterTime = time.Second * 3600 // one hour + // default channel size for peerSet. msgChanSize = 100 ) @@ -157,14 +159,26 @@ type ReputationChange struct { Reason string } +func (r ReputationChange) String() string { + return fmt.Sprintf("value: %d, reason: %s", r.Value, r.Reason) +} + func newReputationChange(value Reputation, reason string) ReputationChange { return ReputationChange{value, reason} } +// MessageProcessor interface allows the network layer to receive and +// process messages from the peerstate layer +type MessageProcessor interface { + Process(Message) +} + // PeerSet is a container for all the components of a peerSet. type PeerSet struct { + sync.Mutex peerState *PeersState + reservedLock sync.RWMutex reservedNode map[peer.ID]struct{} // TODO: this will be useful for reserved only mode // this is for future purpose if reserved-only flag is enabled (#1888). @@ -261,6 +275,9 @@ func reputationTick(reput Reputation) Reputation { // updateTime updates the value of latestTimeUpdate and performs all the updates that // happen over time, such as Reputation increases for staying connected. func (ps *PeerSet) updateTime() error { + ps.Lock() + defer ps.Unlock() + currTime := time.Now() // identify the time difference between current time and last update time for peer reputation in seconds. // update the latestTimeUpdate to currTime. @@ -273,16 +290,11 @@ func (ps *PeerSet) updateTime() error { // For each elapsed second, move the node reputation towards zero. for i := int64(0); i < secDiff; i++ { for _, peerID := range ps.peerState.peers() { - n, err := ps.peerState.getNode(peerID) + after, err := ps.peerState.updateReputationByTick(peerID) if err != nil { - return err + return fmt.Errorf("cannot update reputation by tick: %w", err) } - before := n.getReputation() - after := reputationTick(before) - n.setReputation(after) - ps.peerState.nodes[peerID] = n - if after != 0 { continue } @@ -294,7 +306,11 @@ func (ps *PeerSet) updateTime() error { continue } - lastDiscoveredTime := ps.peerState.lastConnectedAndDiscovered(set, peerID) + lastDiscoveredTime, err := ps.peerState.lastConnectedAndDiscovered(set, peerID) + if err != nil { + return fmt.Errorf("cannot get last connected peer: %w", err) + } + if lastDiscoveredTime.Add(forgetAfterTime).Second() >= currTime.Second() { continue } @@ -302,7 +318,7 @@ func (ps *PeerSet) updateTime() error { // forget peer removes the peer from the list of members of the set. err = ps.peerState.forgetPeer(set, peerID) if err != nil { - return err + return fmt.Errorf("cannot forget peer: %w", err) } } } @@ -316,14 +332,15 @@ func (ps *PeerSet) updateTime() error { // be disconnected and a drop message for the peer is sent in order to disconnect. func (ps *PeerSet) reportPeer(change ReputationChange, peers ...peer.ID) error { // we want reputations to be up-to-date before adjusting them. - if err := ps.updateTime(); err != nil { - return err + err := ps.updateTime() + if err != nil { + return fmt.Errorf("cannot update time: %w", err) } for _, pid := range peers { rep, err := ps.peerState.addReputation(pid, change) if err != nil { - return err + return fmt.Errorf("cannot add reputation: %w", err) } if rep >= BannedThresholdValue { @@ -332,21 +349,24 @@ func (ps *PeerSet) reportPeer(change ReputationChange, peers ...peer.ID) error { setLen := ps.peerState.getSetLength() for i := 0; i < setLen; i++ { - if ps.peerState.peerStatus(i, pid) == connectedPeer { - // disconnect peer - err = ps.peerState.disconnect(i, pid) - if err != nil { - return err - } + if ps.peerState.peerStatus(i, pid) != connectedPeer { + continue + } - ps.resultMsgCh <- Message{ - Status: Drop, - setID: uint64(i), - PeerID: pid, - } - if err = ps.allocSlots(i); err != nil { - return err - } + // disconnect peer + err = ps.peerState.disconnect(i, pid) + if err != nil { + return fmt.Errorf("cannot disconnect: %w", err) + } + + ps.resultMsgCh <- Message{ + Status: Drop, + setID: uint64(i), + PeerID: pid, + } + + if err = ps.allocSlots(i); err != nil { + return fmt.Errorf("could not allocate slots: %w", err) } } } @@ -357,7 +377,7 @@ func (ps *PeerSet) reportPeer(change ReputationChange, peers ...peer.ID) error { func (ps *PeerSet) allocSlots(setIdx int) error { err := ps.updateTime() if err != nil { - return err + return fmt.Errorf("cannot update time: %w", err) } peerState := ps.peerState @@ -370,20 +390,19 @@ func (ps *PeerSet) allocSlots(setIdx int) error { peerState.discover(setIdx, reservePeer) } - var n *node - n, err = ps.peerState.getNode(reservePeer) + node, err := ps.peerState.getNode(reservePeer) if err != nil { - return err + return fmt.Errorf("cannot get node: %w", err) } - if n.getReputation() < BannedThresholdValue { + if node.reputation < BannedThresholdValue { logger.Warnf("reputation is lower than banned threshold value, reputation: %d, banned threshold value: %d", - n.getReputation(), BannedThresholdValue) + node.reputation, BannedThresholdValue) break } if err = peerState.tryOutgoing(setIdx, reservePeer); err != nil { - return err + return fmt.Errorf("cannot set as outgoing: %w", err) } ps.resultMsgCh <- Message{ @@ -405,7 +424,7 @@ func (ps *PeerSet) allocSlots(setIdx int) error { } n := peerState.nodes[peerID] - if n.getReputation() < BannedThresholdValue { + if n.reputation < BannedThresholdValue { logger.Critical("highest rated peer is below bannedThresholdValue") break } @@ -427,6 +446,9 @@ func (ps *PeerSet) allocSlots(setIdx int) error { } func (ps *PeerSet) addReservedPeers(setID int, peers ...peer.ID) error { + ps.reservedLock.Lock() + defer ps.reservedLock.Unlock() + for _, peerID := range peers { if _, ok := ps.reservedNode[peerID]; ok { logger.Debugf("peer %s already exists in peerSet", peerID) @@ -447,6 +469,9 @@ func (ps *PeerSet) addReservedPeers(setID int, peers ...peer.ID) error { } func (ps *PeerSet) removeReservedPeers(setID int, peers ...peer.ID) error { + ps.reservedLock.Lock() + defer ps.reservedLock.Unlock() + for _, peerID := range peers { if _, ok := ps.reservedNode[peerID]; !ok { logger.Debugf("peer %s doesn't exist in the peerSet", peerID) @@ -469,7 +494,7 @@ func (ps *PeerSet) removeReservedPeers(setID int, peers ...peer.ID) error { if ps.peerState.peerStatus(setID, peerID) == connectedPeer { err := ps.peerState.disconnect(setID, peerID) if err != nil { - return err + return fmt.Errorf("cannot disconnect: %w", err) } ps.resultMsgCh <- Message{ @@ -484,8 +509,11 @@ func (ps *PeerSet) removeReservedPeers(setID int, peers ...peer.ID) error { } func (ps *PeerSet) setReservedPeer(setID int, peers ...peer.ID) error { - toInsert, toRemove := make([]peer.ID, 0, len(peers)), make([]peer.ID, 0, len(peers)) + toInsert := make([]peer.ID, 0, len(peers)) + toRemove := make([]peer.ID, 0, len(peers)) + peerIDMap := make(map[peer.ID]struct{}, len(peers)) + for _, pid := range peers { peerIDMap[pid] = struct{}{} if _, ok := ps.reservedNode[pid]; ok { @@ -501,11 +529,17 @@ func (ps *PeerSet) setReservedPeer(setID int, peers ...peer.ID) error { toRemove = append(toRemove, pid) } - if err := ps.addReservedPeers(setID, toInsert...); err != nil { - return err + err := ps.addReservedPeers(setID, toInsert...) + if err != nil { + return fmt.Errorf("cannot add reserved peers: %w", err) } - return ps.removeReservedPeers(setID, toRemove...) + err = ps.removeReservedPeers(setID, toRemove...) + if err != nil { + return fmt.Errorf("cannot remove reserved peers: %w", err) + } + + return nil } func (ps *PeerSet) addPeer(setID int, peers peer.IDSlice) error { @@ -516,7 +550,7 @@ func (ps *PeerSet) addPeer(setID int, peers peer.IDSlice) error { ps.peerState.discover(setID, pid) if err := ps.allocSlots(setID); err != nil { - return err + return fmt.Errorf("could not allocate slots: %w", err) } } return nil @@ -539,15 +573,15 @@ func (ps *PeerSet) removePeer(setID int, peers ...peer.ID) error { // disconnect and forget err := ps.peerState.disconnect(setID, pid) if err != nil { - return err + return fmt.Errorf("cannot disconnect: %w", err) } if err = ps.peerState.forgetPeer(setID, pid); err != nil { - return err + return fmt.Errorf("cannot forget peer: %w", err) } } else if status == notConnectedPeer { if err := ps.peerState.forgetPeer(setID, pid); err != nil { - return err + return fmt.Errorf("cannot forget peer: %w", err) } } } @@ -558,14 +592,15 @@ func (ps *PeerSet) removePeer(setID int, peers ...peer.ID) error { // either with a corresponding `Accept` or `Reject`, except if we were already // connected to this peer. func (ps *PeerSet) incoming(setID int, peers ...peer.ID) error { - if err := ps.updateTime(); err != nil { - return err + err := ps.updateTime() + if err != nil { + return fmt.Errorf("cannot update time: %w", err) } - // This is for reserved only mode. for _, pid := range peers { if ps.isReservedOnly { - if _, ok := ps.reservedNode[pid]; !ok { + _, has := ps.reservedNode[pid] + if !has { ps.resultMsgCh <- Message{ Status: Reject, setID: uint64(setID), @@ -586,28 +621,35 @@ func (ps *PeerSet) incoming(setID int, peers ...peer.ID) error { } state := ps.peerState - p := state.nodes[pid] - switch { - case p.getReputation() < BannedThresholdValue: - ps.resultMsgCh <- Message{ - Status: Reject, - setID: uint64(setID), - PeerID: pid, - } - case state.tryAcceptIncoming(setID, pid) != nil: - ps.resultMsgCh <- Message{ - Status: Reject, - setID: uint64(setID), - PeerID: pid, - } - default: - logger.Debugf("incoming connection accepted from peer %s", pid) - ps.resultMsgCh <- Message{ - Status: Accept, - setID: uint64(setID), - PeerID: pid, + + var nodeReputation Reputation + + state.RLock() + node, has := state.nodes[pid] + if has { + nodeReputation = node.reputation + } + state.RUnlock() + + message := Message{ + setID: uint64(setID), + PeerID: pid, + } + + if nodeReputation < BannedThresholdValue { + message.Status = Reject + } else { + err := state.tryAcceptIncoming(setID, pid) + if err != nil { + logger.Errorf("cannot accept incomming peer %s: %s", pid, err) + message.Status = Reject + } else { + logger.Debugf("incoming connection accepted from peer %s", pid) + message.Status = Accept } } + + ps.resultMsgCh <- message } return nil @@ -630,7 +672,7 @@ const ( func (ps *PeerSet) disconnect(setIdx int, reason DropReason, peers ...peer.ID) error { err := ps.updateTime() if err != nil { - return err + return fmt.Errorf("cannot update time: %w", err) } state := ps.peerState @@ -645,8 +687,9 @@ func (ps *PeerSet) disconnect(setIdx int, reason DropReason, peers ...peer.ID) e state.nodes[pid] = n if err = state.disconnect(setIdx, pid); err != nil { - return err + return fmt.Errorf("cannot disconnect: %w", err) } + ps.resultMsgCh <- Message{ Status: Drop, setID: uint64(setIdx), @@ -656,7 +699,7 @@ func (ps *PeerSet) disconnect(setIdx int, reason DropReason, peers ...peer.ID) e // TODO: figure out the condition of connection refuse. if reason == RefusedDrop { if err = ps.removePeer(setIdx, pid); err != nil { - return err + return fmt.Errorf("cannot remove peer: %w", err) } } } @@ -667,31 +710,17 @@ func (ps *PeerSet) disconnect(setIdx int, reason DropReason, peers ...peer.ID) e // start handles all the action for the peerSet. func (ps *PeerSet) start(ctx context.Context, actionQueue chan action) { ps.actionQueue = actionQueue - ps.resultMsgCh = make(chan Message, msgChanSize) - go ps.doWork(ctx) -} -func (ps *PeerSet) doWork(ctx context.Context) { - ticker := time.NewTicker(ps.nextPeriodicAllocSlots) - - defer func() { - ticker.Stop() - close(ps.resultMsgCh) - }() + go ps.listenAction(ctx) + go ps.periodicallyAllocateSlots(ctx) +} +func (ps *PeerSet) listenAction(ctx context.Context) { for { select { case <-ctx.Done(): - // TODO: log context error? return - case <-ticker.C: - l := ps.peerState.getSetLength() - for i := 0; i < l; i++ { - if err := ps.allocSlots(i); err != nil { - logger.Warnf("failed to do action on peerSet: %s", err) - } - } case act, ok := <-ps.actionQueue: if !ok { return @@ -729,3 +758,26 @@ func (ps *PeerSet) doWork(ctx context.Context) { } } } + +func (ps *PeerSet) periodicallyAllocateSlots(ctx context.Context) { + ticker := time.NewTicker(ps.nextPeriodicAllocSlots) + + defer func() { + ticker.Stop() + close(ps.resultMsgCh) + }() + + for { + select { + case <-ctx.Done(): + logger.Debugf("peerset slot allocation exiting: %s", ctx.Err()) + return + case <-ticker.C: + for setID := 0; setID < ps.peerState.getSetLength(); setID++ { + if err := ps.allocSlots(setID); err != nil { + logger.Warnf("failed to allocate slots: %s", err) + } + } + } + } +} diff --git a/dot/peerset/peerset_test.go b/dot/peerset/peerset_test.go index 3c094734c8..d0bb8a52dd 100644 --- a/dot/peerset/peerset_test.go +++ b/dot/peerset/peerset_test.go @@ -11,49 +11,88 @@ import ( "github.com/stretchr/testify/require" ) -func TestPeerSetBanned(t *testing.T) { +func TestBanRejectAcceptPeer(t *testing.T) { + const testSetID = 0 + t.Parallel() handler := newTestPeerSet(t, 25, 25, nil, nil, false) ps := handler.peerSet - require.Equal(t, unknownPeer, ps.peerState.peerStatus(0, peer1)) - ps.peerState.discover(0, peer1) + + checkPeerStateSetNumIn(t, ps.peerState, testSetID, 0) + peer1Status := ps.peerState.peerStatus(testSetID, peer1) + require.Equal(t, unknownPeer, peer1Status) + + ps.peerState.discover(testSetID, peer1) // adding peer1 with incoming slot. - err := ps.peerState.tryAcceptIncoming(0, peer1) + err := ps.peerState.tryAcceptIncoming(testSetID, peer1) require.NoError(t, err) + checkPeerStateSetNumIn(t, ps.peerState, testSetID, 1) + peer1Status = ps.peerState.peerStatus(testSetID, peer1) + require.Equal(t, connectedPeer, peer1Status) + // we ban a node by setting its reputation under the threshold. rpc := newReputationChange(BannedThresholdValue-1, "") + // we need one for the message to be processed. + // report peer will disconnect the peer and set the `lastConnected` to time.Now handler.ReportPeer(rpc, peer1) - time.Sleep(time.Millisecond * 100) checkMessageStatus(t, <-ps.resultMsgCh, Drop) + checkPeerStateSetNumIn(t, ps.peerState, testSetID, 0) + peer1Status = ps.peerState.peerStatus(testSetID, peer1) + require.Equal(t, notConnectedPeer, peer1Status) + lastDisconectedAt := ps.peerState.nodes[peer1].lastConnected[testSetID] + // check that an incoming connection from that node gets refused. + // incoming should update the lastConnected time handler.Incoming(0, peer1) checkMessageStatus(t, <-ps.resultMsgCh, Reject) + triedToConnectAt := ps.peerState.nodes[peer1].lastConnected[testSetID] + require.True(t, lastDisconectedAt.Before(triedToConnectAt)) + // wait a bit for the node's reputation to go above the threshold. time.Sleep(time.Millisecond * 1200) // try again. This time the node should be accepted. handler.Incoming(0, peer1) - require.NoError(t, err) checkMessageStatus(t, <-ps.resultMsgCh, Accept) } func TestAddReservedPeers(t *testing.T) { - t.Parallel() + const testSetID = 0 + t.Parallel() handler := newTestPeerSet(t, 0, 2, []peer.ID{bootNode}, []peer.ID{}, false) ps := handler.peerSet - handler.AddReservedPeer(0, reservedPeer) - handler.AddReservedPeer(0, reservedPeer2) + checkNodePeerExists(t, ps.peerState, bootNode) + + require.Equal(t, connectedPeer, ps.peerState.peerStatus(testSetID, bootNode)) + checkPeerStateSetNumIn(t, ps.peerState, testSetID, 0) + checkPeerStateSetNumOut(t, ps.peerState, testSetID, 1) + + reservedPeers := peer.IDSlice{reservedPeer, reservedPeer2} - time.Sleep(time.Millisecond * 200) + for _, peerID := range reservedPeers { + handler.AddReservedPeer(testSetID, peerID) + time.Sleep(time.Millisecond * 100) + + checkReservedNodePeerExists(t, ps, peerID) + checkPeerIsInNoSlotsNode(t, ps.peerState, peerID, testSetID) + + require.Equal(t, connectedPeer, ps.peerState.peerStatus(testSetID, peerID)) + checkNodePeerMembershipState(t, ps.peerState, peerID, testSetID, outgoing) + + // peers in noSlotNodes maps should not increase the + // numIn and numOut count + checkPeerStateSetNumIn(t, ps.peerState, testSetID, 0) + checkPeerStateSetNumOut(t, ps.peerState, testSetID, 1) + } expectedMsgs := []Message{ {Status: Connect, setID: 0, PeerID: bootNode}, @@ -61,7 +100,6 @@ func TestAddReservedPeers(t *testing.T) { {Status: Connect, setID: 0, PeerID: reservedPeer2}, } - require.Equal(t, uint32(1), ps.peerState.sets[0].numOut) require.Equal(t, 3, len(ps.resultMsgCh)) for i := 0; ; i++ { @@ -74,37 +112,95 @@ func TestAddReservedPeers(t *testing.T) { } func TestPeerSetIncoming(t *testing.T) { - t.Parallel() + const testSetID = 0 + t.Parallel() handler := newTestPeerSet(t, 2, 1, []peer.ID{bootNode}, []peer.ID{}, false) - ps := handler.peerSet - // connect message will be added ingoing queue for bootnode. + ps := handler.peerSet checkMessageStatus(t, <-ps.resultMsgCh, Connect) - handler.Incoming(0, incomingPeer) - checkMessageStatus(t, <-ps.resultMsgCh, Accept) + require.Equal(t, connectedPeer, ps.peerState.peerStatus(testSetID, bootNode)) + checkPeerStateSetNumIn(t, ps.peerState, testSetID, 0) + checkPeerStateSetNumOut(t, ps.peerState, testSetID, 1) + + incomingPeers := []struct { + pid peer.ID + expectedStatus Status + expectedNumIn uint32 + // hasFreeIncomingSlot indicates the next slots + // are only available to noSlotNodes nodes + hasFreeIncomingSlot bool + }{ + { + pid: incomingPeer, + expectedStatus: Accept, + expectedNumIn: 1, + hasFreeIncomingSlot: false, + }, + { + pid: incoming2, + expectedStatus: Accept, + expectedNumIn: 2, + hasFreeIncomingSlot: true, + }, + { + pid: incoming3, + expectedStatus: Reject, + expectedNumIn: 2, + hasFreeIncomingSlot: true, + }, + } - handler.Incoming(0, incoming2) - checkMessageStatus(t, <-ps.resultMsgCh, Accept) + for _, tt := range incomingPeers { - handler.Incoming(0, incoming3) - checkMessageStatus(t, <-ps.resultMsgCh, Reject) + // all the incoming peers are unknow before calling the Incoming method + status := ps.peerState.peerStatus(testSetID, tt.pid) + require.Equal(t, unknownPeer, status) + + handler.Incoming(testSetID, tt.pid) + time.Sleep(time.Millisecond * 100) + + checkNodePeerExists(t, ps.peerState, tt.pid) + + freeSlots := ps.peerState.hasFreeIncomingSlot(testSetID) + require.Equal(t, tt.hasFreeIncomingSlot, freeSlots) + + checkPeerStateSetNumIn(t, ps.peerState, testSetID, tt.expectedNumIn) + // incoming peers should not chang the numOut count + checkPeerStateSetNumOut(t, ps.peerState, testSetID, 1) + + checkMessageStatus(t, <-ps.resultMsgCh, tt.expectedStatus) + } } func TestPeerSetDiscovered(t *testing.T) { - t.Parallel() + const testSetID = 0 + t.Parallel() handler := newTestPeerSet(t, 0, 2, []peer.ID{}, []peer.ID{reservedPeer}, false) ps := handler.peerSet + checkReservedNodePeerExists(t, ps, reservedPeer) + + _, isNoSlotNode := ps.peerState.sets[testSetID].noSlotNodes[reservedPeer] + require.True(t, isNoSlotNode) + + // reserved nodes should not increase the numOut count + checkPeerStateSetNumOut(t, ps.peerState, testSetID, 0) + handler.AddPeer(0, discovered1) handler.AddPeer(0, discovered1) handler.AddPeer(0, discovered2) time.Sleep(200 * time.Millisecond) + checkNodePeerExists(t, ps.peerState, discovered1) + checkNodePeerExists(t, ps.peerState, discovered2) + // AddPeer called twice with the same peer ID should not increase the numOut count + checkPeerStateSetNumOut(t, ps.peerState, testSetID, 2) + require.Equal(t, 3, len(ps.resultMsgCh)) for len(ps.resultMsgCh) == 0 { checkMessageStatus(t, <-ps.resultMsgCh, Connect) @@ -112,74 +208,201 @@ func TestPeerSetDiscovered(t *testing.T) { } func TestReAllocAfterBanned(t *testing.T) { - t.Parallel() + const testSetID = 0 + t.Parallel() handler := newTestPeerSet(t, 25, 25, []peer.ID{}, []peer.ID{}, false) ps := handler.peerSet - // adding peer1 with incoming slot. - if ps.peerState.peerStatus(0, peer1) == unknownPeer { - ps.peerState.discover(0, peer1) - err := ps.peerState.tryAcceptIncoming(0, peer1) - require.NoError(t, err) - } + peer1Status := ps.peerState.peerStatus(testSetID, peer1) + require.Equal(t, unknownPeer, peer1Status) + + ps.peerState.discover(testSetID, peer1) + err := ps.peerState.tryAcceptIncoming(testSetID, peer1) + require.NoError(t, err) + + // accepting the income peer which is not in the reserved peers + // should increase the numIn count by 1 + checkPeerStateSetNumIn(t, ps.peerState, testSetID, 1) // We ban a node by setting its reputation under the threshold. rep := newReputationChange(BannedThresholdValue-1, "") - // we need one for the message to be processed. handler.ReportPeer(rep, peer1) - time.Sleep(time.Millisecond * 100) + time.Sleep(time.Millisecond * 100) checkMessageStatus(t, <-ps.resultMsgCh, Drop) - // Check that an incoming connection from that node gets refused. + // banning a incoming peer should decrease the numIn count by 1 + checkPeerStateSetNumIn(t, ps.peerState, testSetID, 0) + checkNodePeerMembershipState(t, ps.peerState, peer1, testSetID, notConnected) - handler.Incoming(0, peer1) + n, exists := getNodePeer(ps.peerState, peer1) + require.True(t, exists) + + // when the peer1 was banned we updated its lastConnected field to time.Now() + lastTimeConnected := n.lastConnected[testSetID] + + // Check that an incoming connection from that node gets refused. + handler.Incoming(testSetID, peer1) checkMessageStatus(t, <-ps.resultMsgCh, Reject) - time.Sleep(time.Millisecond * 100) + // when calling Incoming method the peer1 is with status notConnectedPeer + // so we update its lastConnected field to time.Now() again + n, exists = getNodePeer(ps.peerState, peer1) + require.True(t, exists) + + currentLastTimeConnected := n.lastConnected[testSetID] + require.True(t, lastTimeConnected.Before(currentLastTimeConnected)) + + // wait a bit for the node's reputation to go above the threshold. + time.Sleep(allocTimeDuration + time.Second) + + checkPeerStateSetNumOut(t, ps.peerState, testSetID, 1) checkMessageStatus(t, <-ps.resultMsgCh, Connect) } func TestRemovePeer(t *testing.T) { - t.Parallel() + const testSetID = 0 + t.Parallel() handler := newTestPeerSet(t, 0, 2, []peer.ID{discovered1, discovered2}, nil, false) - ps := handler.peerSet - require.Equal(t, 2, len(ps.resultMsgCh)) + ps := handler.peerSet + require.Len(t, ps.resultMsgCh, 2) for len(ps.resultMsgCh) != 0 { checkMessageStatus(t, <-ps.resultMsgCh, Connect) } - handler.RemovePeer(0, discovered1, discovered2) + require.Len(t, ps.peerState.nodes, 2) + checkPeerStateSetNumOut(t, ps.peerState, testSetID, 2) + + handler.RemovePeer(testSetID, discovered1, discovered2) time.Sleep(200 * time.Millisecond) - require.Equal(t, 2, len(ps.resultMsgCh)) + require.Len(t, ps.resultMsgCh, 2) for len(ps.resultMsgCh) != 0 { checkMessageStatus(t, <-ps.resultMsgCh, Drop) } - require.Equal(t, 0, len(ps.peerState.nodes)) + checkPeerStateNodeCount(t, ps.peerState, 0) + checkPeerStateSetNumOut(t, ps.peerState, testSetID, 0) } func TestSetReservePeer(t *testing.T) { - t.Parallel() + const testSetID = 0 + t.Parallel() handler := newTestPeerSet(t, 0, 2, nil, []peer.ID{reservedPeer, reservedPeer2}, true) + ps := handler.peerSet + require.Len(t, ps.resultMsgCh, 2) - require.Equal(t, 2, len(ps.resultMsgCh)) for len(ps.resultMsgCh) != 0 { checkMessageStatus(t, <-ps.resultMsgCh, Connect) } + require.Len(t, ps.reservedNode, 2) + newRsrPeerSet := peer.IDSlice{reservedPeer, peer.ID("newRsrPeer")} - handler.SetReservedPeer(0, newRsrPeerSet...) + // add newRsrPeer but remove reservedPeer2 + handler.SetReservedPeer(testSetID, newRsrPeerSet...) time.Sleep(200 * time.Millisecond) - require.Equal(t, len(newRsrPeerSet), len(ps.reservedNode)) + checkPeerSetReservedNodeCount(t, ps, 2) for _, p := range newRsrPeerSet { - require.Contains(t, ps.reservedNode, p) + checkReservedNodePeerExists(t, ps, p) } } + +func getNodePeer(ps *PeersState, pid peer.ID) (node, bool) { + ps.RLock() + defer ps.RUnlock() + + n, exists := ps.nodes[pid] + if !exists { + return node{}, false + } + + return *n, exists +} + +func checkNodePeerMembershipState(t *testing.T, ps *PeersState, pid peer.ID, + setID int, ms MembershipState) { + t.Helper() + + ps.RLock() + defer ps.RUnlock() + + node, exists := ps.nodes[pid] + + require.True(t, exists) + require.Equal(t, ms, node.state[setID]) +} + +func checkNodePeerExists(t *testing.T, ps *PeersState, pid peer.ID) { + t.Helper() + + ps.RLock() + defer ps.RUnlock() + + _, exists := ps.nodes[pid] + require.True(t, exists) +} + +func checkReservedNodePeerExists(t *testing.T, ps *PeerSet, pid peer.ID) { + t.Helper() + + ps.Lock() + defer ps.Unlock() + + _, exists := ps.reservedNode[pid] + require.True(t, exists) +} + +func checkPeerIsInNoSlotsNode(t *testing.T, ps *PeersState, pid peer.ID, setID int) { + t.Helper() + + ps.RLock() + defer ps.RUnlock() + + _, exists := ps.sets[setID].noSlotNodes[pid] + require.True(t, exists) +} + +func checkPeerStateSetNumOut(t *testing.T, ps *PeersState, setID int, expectedNumOut uint32) { + t.Helper() + + ps.RLock() + defer ps.RUnlock() + + gotNumOut := ps.sets[setID].numOut + require.Equal(t, expectedNumOut, gotNumOut) +} + +func checkPeerStateSetNumIn(t *testing.T, ps *PeersState, setID int, expectedNumIn uint32) { + t.Helper() + + ps.RLock() + defer ps.RUnlock() + + gotNumIn := ps.sets[setID].numIn + require.Equal(t, expectedNumIn, gotNumIn) +} + +func checkPeerStateNodeCount(t *testing.T, ps *PeersState, expectedCount int) { + t.Helper() + + ps.RLock() + defer ps.RUnlock() + + require.Equal(t, expectedCount, len(ps.nodes)) +} + +func checkPeerSetReservedNodeCount(t *testing.T, ps *PeerSet, expectedCount int) { + t.Helper() + + ps.reservedLock.RLock() + defer ps.reservedLock.RUnlock() + + require.Equal(t, expectedCount, len(ps.reservedNode)) +} diff --git a/dot/peerset/peerstate.go b/dot/peerset/peerstate.go index 90b5f59040..a8f627d801 100644 --- a/dot/peerset/peerstate.go +++ b/dot/peerset/peerstate.go @@ -69,7 +69,7 @@ type node struct { lastConnected []time.Time // Reputation of the node, between int32 MIN and int32 MAX. - rep Reputation + reputation Reputation } // newNode creates a node with n number of sets and 0 reputation. @@ -88,17 +88,9 @@ func newNode(n int) *node { } } -func (n *node) getReputation() Reputation { - return n.rep -} - func (n *node) addReputation(modifier Reputation) Reputation { - n.rep = n.rep.add(modifier) - return n.rep -} - -func (n *node) setReputation(modifier Reputation) { - n.rep = modifier + n.reputation = n.reputation.add(modifier) + return n.reputation } // PeersState struct contains a list of nodes, where each node @@ -110,15 +102,17 @@ type PeersState struct { // since, single Info can also manage the flow. sets []Info - mu sync.Mutex + sync.RWMutex } func (ps *PeersState) getNode(p peer.ID) (*node, error) { + ps.RLock() + defer ps.RUnlock() if n, ok := ps.nodes[p]; ok { return n, nil } - return nil, ErrPeerDoesNotExist + return nil, fmt.Errorf("%w: for peer id %s", ErrPeerDoesNotExist, p) } // NewPeerState initiates a new PeersState @@ -126,6 +120,7 @@ func NewPeerState(cfgs []*config) (*PeersState, error) { if len(cfgs) == 0 { return nil, ErrConfigSetIsEmpty } + infoSet := make([]Info, 0, len(cfgs)) for _, cfg := range cfgs { info := Info{ @@ -154,12 +149,15 @@ func (ps *PeersState) getSetLength() int { // peerStatus returns the status of peer based on its connection state // i.e. connectedPeer, notConnectedPeer or unknownPeer. func (ps *PeersState) peerStatus(set int, peerID peer.ID) string { - n, err := ps.getNode(peerID) - if err != nil { + ps.RLock() + defer ps.RUnlock() + + node, has := ps.nodes[peerID] + if !has { return unknownPeer } - switch n.state[set] { + switch node.state[set] { case ingoing, outgoing: return connectedPeer case notConnected: @@ -171,6 +169,9 @@ func (ps *PeersState) peerStatus(set int, peerID peer.ID) string { // peers return the list of all the peers we know of. func (ps *PeersState) peers() []peer.ID { + ps.RLock() + defer ps.RUnlock() + peerIDs := make([]peer.ID, 0, len(ps.nodes)) for k := range ps.nodes { peerIDs = append(peerIDs, k) @@ -180,69 +181,97 @@ func (ps *PeersState) peers() []peer.ID { // sortedPeers returns the list of peers we are connected to of a specific set. func (ps *PeersState) sortedPeers(idx int) peer.IDSlice { - if len(ps.sets) < idx { + ps.RLock() + defer ps.RUnlock() + + if len(ps.sets) == 0 || len(ps.sets) < idx { logger.Debug("peer state doesn't have info for the provided index") return nil } - type kv struct { - peerID peer.ID - Node *node + type connectedPeerReputation struct { + peerID peer.ID + reputation Reputation } - var ss []kv - for k, v := range ps.nodes { - state := v.state[idx] + connectedPeersReps := make([]connectedPeerReputation, 0, len(ps.nodes)) + + for peerID, node := range ps.nodes { + state := node.state[idx] + if isPeerConnected(state) { - ss = append(ss, kv{k, v}) + connectedPeersReps = append(connectedPeersReps, connectedPeerReputation{ + peerID: peerID, + reputation: node.reputation, + }) } } - sort.Slice(ss, func(i, j int) bool { - return ss[i].Node.rep > ss[j].Node.rep + sort.Slice(connectedPeersReps, func(i, j int) bool { + return connectedPeersReps[i].reputation > connectedPeersReps[j].reputation }) - peerIDs := make(peer.IDSlice, len(ss)) - for i, kv := range ss { + peerIDs := make(peer.IDSlice, len(connectedPeersReps)) + for i, kv := range connectedPeersReps { peerIDs[i] = kv.peerID } return peerIDs } -func (ps *PeersState) addReputation(pid peer.ID, change ReputationChange) ( +func (ps *PeersState) updateReputationByTick(peerID peer.ID) (newReputation Reputation, err error) { + ps.Lock() + defer ps.Unlock() + + node, has := ps.nodes[peerID] + if !has { + return 0, fmt.Errorf("%w: for peer id %s", ErrPeerDoesNotExist, peerID) + } + + newReputation = reputationTick(node.reputation) + + node.reputation = newReputation + ps.nodes[peerID] = node + + return newReputation, nil +} + +func (ps *PeersState) addReputation(peerID peer.ID, change ReputationChange) ( newReputation Reputation, err error) { - ps.mu.Lock() - defer ps.mu.Unlock() - n, err := ps.getNode(pid) - if err != nil { - return 0, err + ps.Lock() + defer ps.Unlock() + + node, has := ps.nodes[peerID] + if !has { + return 0, fmt.Errorf("%w: for peer id %s", ErrPeerDoesNotExist, peerID) } - newReputation = n.addReputation(change.Value) - ps.nodes[pid] = n + newReputation = node.addReputation(change.Value) + ps.nodes[peerID] = node return newReputation, nil } // highestNotConnectedPeer returns the peer with the highest Reputation and that we are not connected to. -func (ps *PeersState) highestNotConnectedPeer(set int) peer.ID { - var maxRep = math.MinInt32 - var peerID peer.ID - for id, n := range ps.nodes { - if n.state[set] != notConnected { +func (ps *PeersState) highestNotConnectedPeer(set int) (highestPeerID peer.ID) { + ps.RLock() + defer ps.RUnlock() + + maxRep := math.MinInt32 + for peerID, node := range ps.nodes { + if node.state[set] != notConnected { continue } - val := int(n.rep) + val := int(node.reputation) if val >= maxRep { maxRep = val - peerID = id + highestPeerID = peerID } } - return peerID + return highestPeerID } func (ps *PeersState) hasFreeOutgoingSlot(set int) bool { @@ -258,6 +287,9 @@ func (ps *PeersState) hasFreeIncomingSlot(set int) bool { // addNoSlotNode adds a node to the list of nodes that don't occupy slots. // has no effect if the node was already in the group. func (ps *PeersState) addNoSlotNode(idx int, peerID peer.ID) error { + ps.Lock() + defer ps.Unlock() + if _, ok := ps.sets[idx].noSlotNodes[peerID]; ok { logger.Debugf("peer %s already exists in no slot node", peerID) return nil @@ -265,54 +297,63 @@ func (ps *PeersState) addNoSlotNode(idx int, peerID peer.ID) error { // Insert peerStatus ps.sets[idx].noSlotNodes[peerID] = struct{}{} - n, err := ps.getNode(peerID) - if err != nil { - return fmt.Errorf("could not get node for peer id %s: %w", peerID, err) + + node, has := ps.nodes[peerID] + if !has { + return fmt.Errorf("%w: for peer id %s", ErrPeerDoesNotExist, peerID) } - switch n.state[idx] { + switch node.state[idx] { case ingoing: ps.sets[idx].numIn-- case outgoing: ps.sets[idx].numOut-- } - ps.nodes[peerID] = n return nil } func (ps *PeersState) removeNoSlotNode(idx int, peerID peer.ID) error { + ps.Lock() + defer ps.Unlock() + if _, ok := ps.sets[idx].noSlotNodes[peerID]; !ok { logger.Debugf("peer %s is not in no-slot node map", peerID) return nil } delete(ps.sets[idx].noSlotNodes, peerID) - n, err := ps.getNode(peerID) - if err != nil { - return fmt.Errorf("could not get node for peer id %s: %w", peerID, err) + + node, has := ps.nodes[peerID] + if !has { + return fmt.Errorf("%w: for peer id %s", ErrPeerDoesNotExist, peerID) } - switch n.state[idx] { + switch node.state[idx] { case ingoing: ps.sets[idx].numIn++ case outgoing: ps.sets[idx].numOut++ } + return nil } // disconnect updates the node status to the notConnected state. // It should be called only when the node is in connected state. func (ps *PeersState) disconnect(idx int, peerID peer.ID) error { + ps.Lock() + defer ps.Unlock() + info := ps.sets[idx] - n, err := ps.getNode(peerID) - if err != nil { - return err + node, has := ps.nodes[peerID] + if !has { + return fmt.Errorf("%w: for peer id %s", ErrPeerDoesNotExist, peerID) } - if _, ok := info.noSlotNodes[peerID]; !ok { - switch n.state[idx] { + _, has = info.noSlotNodes[peerID] + if !has { + switch node.state[idx] { case ingoing: info.numIn-- case outgoing: @@ -323,48 +364,66 @@ func (ps *PeersState) disconnect(idx int, peerID peer.ID) error { } // set node state to notConnected. - n.state[idx] = notConnected - n.lastConnected[idx] = time.Now() + node.state[idx] = notConnected + node.lastConnected[idx] = time.Now() ps.sets[idx] = info + return nil } // discover takes input for set id and create a node and insert in the list. // the initial Reputation of the peer will be 0 and ingoing notMember state. func (ps *PeersState) discover(set int, peerID peer.ID) { + ps.Lock() + defer ps.Unlock() + numSet := len(ps.sets) - if _, err := ps.getNode(peerID); err != nil { + + _, has := ps.nodes[peerID] + if !has { n := newNode(numSet) n.state[set] = notConnected ps.nodes[peerID] = n } } -func (ps *PeersState) lastConnectedAndDiscovered(set int, peerID peer.ID) time.Time { - node, err := ps.getNode(peerID) - if err != nil && node.state[set] == notConnected { - return node.lastConnected[set] +func (ps *PeersState) lastConnectedAndDiscovered(set int, peerID peer.ID) (time.Time, error) { + ps.RLock() + defer ps.RUnlock() + + node, has := ps.nodes[peerID] + if !has { + return time.Time{}, fmt.Errorf("%w: for peer id %s", ErrPeerDoesNotExist, peerID) } - return time.Now() + + if node.state[set] == notConnected { + return node.lastConnected[set], nil + } + + return time.Now(), nil } // forgetPeer removes the peer with reputation 0 from the peerSet. func (ps *PeersState) forgetPeer(set int, peerID peer.ID) error { - n, err := ps.getNode(peerID) - if err != nil { - return err + ps.Lock() + defer ps.Unlock() + + node, has := ps.nodes[peerID] + if !has { + return fmt.Errorf("%w: for peer id %s", ErrPeerDoesNotExist, peerID) } - if n.state[set] != notMember { - n.state[set] = notMember + if node.state[set] != notMember { + node.state[set] = notMember } - if n.getReputation() != 0 { + if node.reputation != 0 { return nil } + // remove the peer from peerSet nodes entirely if it isn't a member of any set. remove := true - for _, state := range n.state { + for _, state := range node.state { if state != notMember { remove = false break @@ -383,18 +442,22 @@ func (ps *PeersState) forgetPeer(set int, peerID peer.ID) error { // If the slots are full, the node stays "not connected" and we return the error ErrOutgoingSlotsUnavailable. // non slot occupying nodes don't count towards the number of slots. func (ps *PeersState) tryOutgoing(setID int, peerID peer.ID) error { + ps.Lock() + defer ps.Unlock() + _, isNoSlotNode := ps.sets[setID].noSlotNodes[peerID] if !ps.hasFreeOutgoingSlot(setID) && !isNoSlotNode { return ErrOutgoingSlotsUnavailable } - n, err := ps.getNode(peerID) - if err != nil { - return err + node, has := ps.nodes[peerID] + if !has { + return fmt.Errorf("%w: for peer id %s", ErrPeerDoesNotExist, peerID) } - n.state[setID] = outgoing + node.state[setID] = outgoing + if !isNoSlotNode { ps.sets[setID].numOut++ } @@ -407,23 +470,23 @@ func (ps *PeersState) tryOutgoing(setID int, peerID peer.ID) error { // If the slots are full, the node stays "not connected" and we return Err. // non slot occupying nodes don't count towards the number of slots. func (ps *PeersState) tryAcceptIncoming(setID int, peerID peer.ID) error { - var isNoSlotOccupied bool - if _, ok := ps.sets[setID].noSlotNodes[peerID]; ok { - isNoSlotOccupied = true - } + ps.Lock() + defer ps.Unlock() + + _, isNoSlotOccupied := ps.sets[setID].noSlotNodes[peerID] // if slot is not available and the node is not a reserved node then error if ps.hasFreeIncomingSlot(setID) && !isNoSlotOccupied { return ErrIncomingSlotsUnavailable } - n, err := ps.getNode(peerID) - if err != nil { + node, has := ps.nodes[peerID] + if !has { // state inconsistency tryOutgoing on an unknown node - return err + return fmt.Errorf("%w: for peer id %s", ErrPeerDoesNotExist, peerID) } - n.state[setID] = ingoing + node.state[setID] = ingoing if !isNoSlotOccupied { // this need to be added as incoming connection allocate slot. ps.sets[setID].numIn++ diff --git a/dot/peerset/peerstate_test.go b/dot/peerset/peerstate_test.go index 3b58544f38..21405024b4 100644 --- a/dot/peerset/peerstate_test.go +++ b/dot/peerset/peerstate_test.go @@ -150,33 +150,34 @@ func TestHighestNotConnectedPeer(t *testing.T) { state.discover(0, peer1) n, err := state.getNode(peer1) require.NoError(t, err) - n.setReputation(50) + + n.reputation = 50 state.nodes[peer1] = n - require.Equal(t, Reputation(50), state.nodes[peer1].getReputation()) + require.Equal(t, Reputation(50), state.nodes[peer1].reputation) require.Equal(t, unknownPeer, state.peerStatus(0, peer2)) state.discover(0, peer2) n, err = state.getNode(peer2) require.NoError(t, err) - n.setReputation(25) + n.reputation = 25 state.nodes[peer2] = n // peer1 still has the highest reputation require.Equal(t, peer1, state.highestNotConnectedPeer(0)) - require.Equal(t, Reputation(25), state.nodes[peer2].getReputation()) + require.Equal(t, Reputation(25), state.nodes[peer2].reputation) require.Equal(t, notConnectedPeer, state.peerStatus(0, peer2)) n, err = state.getNode(peer2) require.NoError(t, err) - n.setReputation(75) + n.reputation = 75 state.nodes[peer2] = n require.Equal(t, peer2, state.highestNotConnectedPeer(0)) - require.Equal(t, Reputation(75), state.nodes[peer2].getReputation()) + require.Equal(t, Reputation(75), state.nodes[peer2].reputation) require.Equal(t, notConnectedPeer, state.peerStatus(0, peer2)) err = state.tryAcceptIncoming(0, peer2) @@ -191,38 +192,8 @@ func TestHighestNotConnectedPeer(t *testing.T) { require.Equal(t, notConnectedPeer, state.peerStatus(0, peer1)) n, err = state.getNode(peer1) require.NoError(t, err) - n.setReputation(100) + n.reputation = 100 state.nodes[peer1] = n require.Equal(t, peer1, state.highestNotConnectedPeer(0)) } - -func TestSortedPeers(t *testing.T) { - t.Parallel() - - const msgChanSize = 1 - state := newTestPeerState(t, 2, 1) - state.nodes[peer1] = newNode(1) - - err := state.addNoSlotNode(0, peer1) - require.NoError(t, err) - - state.discover(0, peer1) - err = state.tryAcceptIncoming(0, peer1) - require.NoError(t, err) - - require.Equal(t, connectedPeer, state.peerStatus(0, peer1)) - - // discover peer2 - state.discover(0, peer2) - // try to make peer2 as an incoming connection. - err = state.tryAcceptIncoming(0, peer2) - require.NoError(t, err) - - require.Equal(t, connectedPeer, state.peerStatus(0, peer1)) - - peerCh := make(chan peer.IDSlice, msgChanSize) - peerCh <- state.sortedPeers(0) - peers := <-peerCh - require.Equal(t, 2, len(peers)) -} diff --git a/dot/peerset/test_helpers.go b/dot/peerset/test_helpers.go index 3c1c88d2a0..1100b00ecb 100644 --- a/dot/peerset/test_helpers.go +++ b/dot/peerset/test_helpers.go @@ -25,15 +25,21 @@ const ( peer2 = peer.ID("testPeer2") ) -func newTestPeerSet(t *testing.T, in, out uint32, bootNodes, reservedPeers []peer.ID, reservedOnly bool) *Handler { +const allocTimeDuration = 2 * time.Second + +//go:generate mockgen -destination=mock_message_processor_test.go -package $GOPACKAGE . MessageProcessor + +func newTestPeerSet(t *testing.T, maxIn, maxOut uint32, bootNodes, + reservedPeers []peer.ID, reservedOnly bool) *Handler { t.Helper() + con := &ConfigSet{ Set: []*config{ { - maxInPeers: in, - maxOutPeers: out, + maxInPeers: maxIn, + maxOutPeers: maxOut, reservedOnly: reservedOnly, - periodicAllocTime: time.Second * 2, + periodicAllocTime: allocTimeDuration, }, }, }