Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(dot/peerset): remove race conditions from peerset package #2267

Merged
merged 58 commits into from
May 5, 2022
Merged
Show file tree
Hide file tree
Changes from 52 commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
6a7d933
chore: adding read/write mutexes to peerstate
EclesioMeloJunior Feb 3, 2022
b0eb77f
chore: remove channels and use the executor as parameter
EclesioMeloJunior Feb 3, 2022
98cb4bc
chore: remove unneeded stop and ctx
EclesioMeloJunior Feb 3, 2022
a6e97fd
chore: remove unused cancel ctx
EclesioMeloJunior Feb 3, 2022
d8f6700
chore: keep the same log informations
EclesioMeloJunior Feb 4, 2022
a93fafe
chore: resolve lint
EclesioMeloJunior Feb 4, 2022
858a72b
Merge branch 'development' into eclesio/refactor-peerstate
EclesioMeloJunior Feb 7, 2022
55dca9c
chore: define struct inside function
EclesioMeloJunior Feb 7, 2022
f0c0e10
chore: use sync.Mutex composition
EclesioMeloJunior Feb 7, 2022
1f39a56
chore: adding license
EclesioMeloJunior Feb 7, 2022
5366804
chore: keep `Reputation` at `peerset`
EclesioMeloJunior Feb 7, 2022
3695853
chore: replace fun signature to interface implementation
EclesioMeloJunior Mar 4, 2022
36ab22e
chore: move comment
EclesioMeloJunior Mar 4, 2022
fc0cf71
chore: improve test description
EclesioMeloJunior Mar 4, 2022
147575c
chore: improve test exepect function calls
EclesioMeloJunior Mar 4, 2022
8e693b5
chore: add comment to network.Process exported function
EclesioMeloJunior Mar 8, 2022
a340adc
chore: fix TestPeerSetDiscovered unit test
EclesioMeloJunior Mar 11, 2022
a1d5391
Merge branch 'development' into eclesio/refactor-peerstate
EclesioMeloJunior Mar 11, 2022
39e1974
chore: fix typo at comment
EclesioMeloJunior Mar 11, 2022
3773559
chore: wrap errors and add String to ReputationChange
EclesioMeloJunior Mar 11, 2022
71c5ea4
Merge branch 'eclesio/refactor-peerstate' of github.com:ChainSafe/gos…
EclesioMeloJunior Mar 11, 2022
78a5468
chore: remove unneeded comment
EclesioMeloJunior Mar 11, 2022
50ef6cb
chore: implement MessageProcessor interface at network/host
EclesioMeloJunior Mar 14, 2022
4f285ee
chore: rename `doWork` to `periodicallyAllocateSlots`
EclesioMeloJunior Mar 14, 2022
237cfe9
chore: set processor at testing
EclesioMeloJunior Mar 14, 2022
d02fafc
chore: address comments
EclesioMeloJunior Mar 14, 2022
a4adf71
chore: address comments
EclesioMeloJunior Mar 15, 2022
1b49a64
chore: wrap `ErrPeerDoesNotExist`
EclesioMeloJunior Mar 15, 2022
9d4c81a
chore: addressing comments
EclesioMeloJunior Mar 15, 2022
af4380e
chore: fix PeerStateHandler interface
EclesioMeloJunior Mar 16, 2022
c184ccf
chore: improve `Test_Ban_Reject_Accept_Peer` assertions
EclesioMeloJunior Mar 16, 2022
5fe401c
chore: improve the peerset test assertions
EclesioMeloJunior Mar 17, 2022
ca7f676
Update dot/network/state.go
EclesioMeloJunior Mar 17, 2022
60791f1
chore: fix lll lint issue
EclesioMeloJunior Mar 17, 2022
4856eb2
Update dot/peerset/handler.go
EclesioMeloJunior Mar 17, 2022
ad060fe
chore: implementing the reject case in the incoming peer connections
EclesioMeloJunior Mar 17, 2022
2c870d2
chore: fix race conditions in test assertion
EclesioMeloJunior Mar 18, 2022
249eecf
Merge branch 'eclesio/refactor-peerstate' of github.com:ChainSafe/gos…
EclesioMeloJunior Mar 18, 2022
9dcb348
chore: exec `go fmt ./...`
EclesioMeloJunior Mar 18, 2022
c726949
chore: add `//nolint:unparam`
EclesioMeloJunior Mar 18, 2022
9946ead
Merge branch 'development' into eclesio/refactor-peerstate
EclesioMeloJunior Mar 21, 2022
aab7e3a
Merge branch 'development' into eclesio/refactor-peerstate
EclesioMeloJunior Mar 30, 2022
7d2f7c7
chore: use channels with protected maps
EclesioMeloJunior Mar 30, 2022
266d7a2
chore: remove nolintlint warns
EclesioMeloJunior Mar 30, 2022
8a52a8a
Merge branch 'development' into eclesio/refactor-peerstate
EclesioMeloJunior Apr 25, 2022
0dbad30
chore: fixing the newtork service Stop method
EclesioMeloJunior Apr 25, 2022
2207aad
chore: reintroduce the `msgChanSize` comment
EclesioMeloJunior Apr 25, 2022
04221fa
chore: close the `resultMsgCh` once ctx ends
EclesioMeloJunior Apr 25, 2022
0dffe8a
chore: use variables on eclesio/refactor-peerstate
EclesioMeloJunior Apr 25, 2022
77784e2
chore: adding comments
EclesioMeloJunior Apr 25, 2022
d563386
chore: logging ctx error as debug
EclesioMeloJunior Apr 25, 2022
df15a5d
chore: fixing the tests helpers
EclesioMeloJunior Apr 25, 2022
a3b6ea8
Merge branch 'development' into eclesio/refactor-peerstate
EclesioMeloJunior May 5, 2022
3480177
chore: address comment, asserting log format types
EclesioMeloJunior May 5, 2022
233d2eb
chore: removing diffs
EclesioMeloJunior May 5, 2022
9315906
chore: removing diffs, fix logger format
EclesioMeloJunior May 5, 2022
cb763ab
chore: remove unneeded time.Sleep
EclesioMeloJunior May 5, 2022
53987c1
chore: address error wraping
EclesioMeloJunior May 5, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions dot/network/connmgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,12 @@ func TestMaxPeers(t *testing.T) {
continue
}

n.host.h.Peerstore().AddAddrs(ainfo.ID, ainfo.Addrs, peerstore.PermanentAddrTTL)
n.host.p2pHost.Peerstore().AddAddrs(ainfo.ID, ainfo.Addrs, peerstore.PermanentAddrTTL)
n.host.cm.peerSetHandler.AddPeer(0, ainfo.ID)
}

time.Sleep(200 * time.Millisecond)
p := nodes[0].host.h.Peerstore().Peers()
p := nodes[0].host.p2pHost.Peerstore().Peers()
require.LessOrEqual(t, max, len(p))
}

Expand Down Expand Up @@ -152,15 +152,15 @@ func TestPersistentPeers(t *testing.T) {
time.Sleep(time.Millisecond * 600)

// B should have connected to A during bootstrap
conns := nodeB.host.h.Network().ConnsToPeer(nodeA.host.id())
conns := nodeB.host.p2pHost.Network().ConnsToPeer(nodeA.host.id())
require.NotEqual(t, 0, len(conns))

// if A disconnects from B, B should reconnect
nodeA.host.cm.peerSetHandler.DisconnectPeer(0, nodeB.host.id())

time.Sleep(time.Millisecond * 500)

conns = nodeB.host.h.Network().ConnsToPeer(nodeA.host.id())
conns = nodeB.host.p2pHost.Network().ConnsToPeer(nodeA.host.id())
require.NotEqual(t, 0, len(conns))
}

Expand Down Expand Up @@ -239,7 +239,7 @@ func TestSetReservedPeer(t *testing.T) {

require.Equal(t, 2, node3.host.peerCount())

node3.host.h.Peerstore().AddAddrs(addrC.ID, addrC.Addrs, peerstore.PermanentAddrTTL)
node3.host.p2pHost.Peerstore().AddAddrs(addrC.ID, addrC.Addrs, peerstore.PermanentAddrTTL)
node3.host.cm.peerSetHandler.SetReservedPeer(0, addrC.ID)
time.Sleep(200 * time.Millisecond)

Expand Down
4 changes: 2 additions & 2 deletions dot/network/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func newTestDiscovery(t *testing.T, num int) []*discovery {
require.NoError(t, err)
disc := &discovery{
ctx: srvc.ctx,
h: srvc.host.h,
h: srvc.host.p2pHost,
ds: ds,
}

Expand Down Expand Up @@ -200,7 +200,7 @@ func TestBeginDiscovery_ThreeNodes(t *testing.T) {
time.Sleep(time.Millisecond * 500)

// assert B and C can discover each other
addrs := nodeB.host.h.Peerstore().Addrs(nodeC.host.id())
addrs := nodeB.host.p2pHost.Peerstore().Addrs(nodeC.host.id())
require.NotEqual(t, 0, len(addrs))

}
44 changes: 22 additions & 22 deletions dot/network/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ const (
// host wraps libp2p host with network host configuration and services
type host struct {
ctx context.Context
h libp2phost.Host
p2pHost libp2phost.Host
discovery *discovery
bootnodes []peer.AddrInfo
persistentPeers []peer.AddrInfo
Expand Down Expand Up @@ -211,7 +211,7 @@ func newHost(ctx context.Context, cfg *Config) (*host, error) {

host := &host{
ctx: ctx,
h: h,
p2pHost: h,
discovery: discovery,
bootnodes: bns,
protocolID: pid,
Expand All @@ -236,14 +236,14 @@ func (h *host) close() error {
}

// close libp2p host
err = h.h.Close()
err = h.p2pHost.Close()
if err != nil {
logger.Errorf("Failed to close libp2p host: %s", err)
return err
}

h.closeSync.Do(func() {
err = h.h.Peerstore().Close()
err = h.p2pHost.Peerstore().Close()
if err != nil {
logger.Errorf("Failed to close libp2p peerstore: %s", err)
return
Expand All @@ -260,28 +260,28 @@ func (h *host) close() error {

// registerStreamHandler registers the stream handler for the given protocol id.
func (h *host) registerStreamHandler(pid protocol.ID, handler func(libp2pnetwork.Stream)) {
h.h.SetStreamHandler(pid, handler)
h.p2pHost.SetStreamHandler(pid, handler)
}

// connect connects the host to a specific peer address
func (h *host) connect(p peer.AddrInfo) (err error) {
h.h.Peerstore().AddAddrs(p.ID, p.Addrs, peerstore.PermanentAddrTTL)
h.p2pHost.Peerstore().AddAddrs(p.ID, p.Addrs, peerstore.PermanentAddrTTL)
ctx, cancel := context.WithTimeout(h.ctx, connectTimeout)
defer cancel()
err = h.h.Connect(ctx, p)
err = h.p2pHost.Connect(ctx, p)
return err
}

// bootstrap connects the host to the configured bootnodes
func (h *host) bootstrap() {
for _, info := range h.persistentPeers {
h.h.Peerstore().AddAddrs(info.ID, info.Addrs, peerstore.PermanentAddrTTL)
h.p2pHost.Peerstore().AddAddrs(info.ID, info.Addrs, peerstore.PermanentAddrTTL)
h.cm.peerSetHandler.AddReservedPeer(0, info.ID)
}

for _, addrInfo := range h.bootnodes {
logger.Debugf("bootstrapping to peer %s", addrInfo.ID)
h.h.Peerstore().AddAddrs(addrInfo.ID, addrInfo.Addrs, peerstore.PermanentAddrTTL)
h.p2pHost.Peerstore().AddAddrs(addrInfo.ID, addrInfo.Addrs, peerstore.PermanentAddrTTL)
h.cm.peerSetHandler.AddPeer(0, addrInfo.ID)
}
}
Expand All @@ -290,7 +290,7 @@ func (h *host) bootstrap() {
// the newly created stream.
func (h *host) send(p peer.ID, pid protocol.ID, msg Message) (libp2pnetwork.Stream, error) {
// open outbound stream with host protocol id
stream, err := h.h.NewStream(h.ctx, p, pid)
stream, err := h.p2pHost.NewStream(h.ctx, p, pid)
if err != nil {
logger.Tracef("failed to open new stream with peer %s using protocol %s: %s", p, pid, err)
return nil, err
Expand Down Expand Up @@ -334,12 +334,12 @@ func (h *host) writeToStream(s libp2pnetwork.Stream, msg Message) error {

// id returns the host id
func (h *host) id() peer.ID {
return h.h.ID()
return h.p2pHost.ID()
}

// Peers returns connected peers
func (h *host) peers() []peer.ID {
return h.h.Network().Peers()
return h.p2pHost.Network().Peers()
}

// addReservedPeers adds the peers `addrs` to the protected peers list and connects to them
Expand All @@ -354,7 +354,7 @@ func (h *host) addReservedPeers(addrs ...string) error {
if err != nil {
return err
}
h.h.Peerstore().AddAddrs(addrInfo.ID, addrInfo.Addrs, peerstore.PermanentAddrTTL)
h.p2pHost.Peerstore().AddAddrs(addrInfo.ID, addrInfo.Addrs, peerstore.PermanentAddrTTL)
h.cm.peerSetHandler.AddReservedPeer(0, addrInfo.ID)
}

Expand All @@ -369,7 +369,7 @@ func (h *host) removeReservedPeers(ids ...string) error {
return err
}
h.cm.peerSetHandler.RemoveReservedPeer(0, peerID)
h.h.ConnManager().Unprotect(peerID, "")
h.p2pHost.ConnManager().Unprotect(peerID, "")
}

return nil
Expand All @@ -378,7 +378,7 @@ func (h *host) removeReservedPeers(ids ...string) error {
// supportsProtocol checks if the protocol is supported by peerID
// returns an error if could not get peer protocols
func (h *host) supportsProtocol(peerID peer.ID, protocol protocol.ID) (bool, error) {
peerProtocols, err := h.h.Peerstore().SupportsProtocols(peerID, string(protocol))
peerProtocols, err := h.p2pHost.Peerstore().SupportsProtocols(peerID, string(protocol))
if err != nil {
return false, err
}
Expand All @@ -388,21 +388,21 @@ func (h *host) supportsProtocol(peerID peer.ID, protocol protocol.ID) (bool, err

// peerCount returns the number of connected peers
func (h *host) peerCount() int {
peers := h.h.Network().Peers()
peers := h.p2pHost.Network().Peers()
return len(peers)
}

// addrInfo returns the libp2p peer.AddrInfo of the host
func (h *host) addrInfo() peer.AddrInfo {
return peer.AddrInfo{
ID: h.h.ID(),
Addrs: h.h.Addrs(),
ID: h.p2pHost.ID(),
Addrs: h.p2pHost.Addrs(),
}
}

// multiaddrs returns the multiaddresses of the host
func (h *host) multiaddrs() (multiaddrs []ma.Multiaddr) {
addrs := h.h.Addrs()
addrs := h.p2pHost.Addrs()
for _, addr := range addrs {
multiaddr, err := ma.NewMultiaddr(fmt.Sprintf("%s/p2p/%s", addr, h.id()))
if err != nil {
Expand All @@ -415,16 +415,16 @@ func (h *host) multiaddrs() (multiaddrs []ma.Multiaddr) {

// protocols returns all protocols currently supported by the node
func (h *host) protocols() []string {
return h.h.Mux().Protocols()
return h.p2pHost.Mux().Protocols()
}

// closePeer closes connection with peer.
func (h *host) closePeer(peer peer.ID) error {
return h.h.Network().ClosePeer(peer)
return h.p2pHost.Network().ClosePeer(peer)
}

func (h *host) closeProtocolStream(pID protocol.ID, p peer.ID) {
connToPeer := h.h.Network().ConnsToPeer(p)
connToPeer := h.p2pHost.Network().ConnsToPeer(p)
for _, c := range connToPeer {
for _, st := range c.GetStreams() {
if st.Protocol() != pID {
Expand Down
12 changes: 6 additions & 6 deletions dot/network/host_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,13 +170,13 @@ func TestBootstrap(t *testing.T) {

peerCountA := nodeA.host.peerCount()
if peerCountA == 0 {
peerCountA := len(nodeA.host.h.Peerstore().Peers())
peerCountA := len(nodeA.host.p2pHost.Peerstore().Peers())
require.NotZero(t, peerCountA)
}

peerCountB := nodeB.host.peerCount()
if peerCountB == 0 {
peerCountB := len(nodeB.host.h.Peerstore().Peers())
peerCountB := len(nodeB.host.p2pHost.Peerstore().Peers())
require.NotZero(t, peerCountB)
}
}
Expand Down Expand Up @@ -498,7 +498,7 @@ func Test_RemoveReservedPeers(t *testing.T) {
time.Sleep(100 * time.Millisecond)

require.Equal(t, 1, nodeA.host.peerCount())
isProtected := nodeA.host.h.ConnManager().IsProtected(nodeB.host.addrInfo().ID, "")
isProtected := nodeA.host.p2pHost.ConnManager().IsProtected(nodeB.host.addrInfo().ID, "")
require.False(t, isProtected)

err = nodeA.host.removeReservedPeers("unknown_perr_id")
Expand Down Expand Up @@ -583,7 +583,7 @@ func TestPeerConnect(t *testing.T) {
nodeB.noGossip = true

addrInfoB := nodeB.host.addrInfo()
nodeA.host.h.Peerstore().AddAddrs(addrInfoB.ID, addrInfoB.Addrs, peerstore.PermanentAddrTTL)
nodeA.host.p2pHost.Peerstore().AddAddrs(addrInfoB.ID, addrInfoB.Addrs, peerstore.PermanentAddrTTL)
nodeA.host.cm.peerSetHandler.AddPeer(0, addrInfoB.ID)

time.Sleep(100 * time.Millisecond)
Expand Down Expand Up @@ -621,7 +621,7 @@ func TestBannedPeer(t *testing.T) {
nodeB.noGossip = true

addrInfoB := nodeB.host.addrInfo()
nodeA.host.h.Peerstore().AddAddrs(addrInfoB.ID, addrInfoB.Addrs, peerstore.PermanentAddrTTL)
nodeA.host.p2pHost.Peerstore().AddAddrs(addrInfoB.ID, addrInfoB.Addrs, peerstore.PermanentAddrTTL)
nodeA.host.cm.peerSetHandler.AddPeer(0, addrInfoB.ID)

time.Sleep(100 * time.Millisecond)
Expand Down Expand Up @@ -674,7 +674,7 @@ func TestPeerReputation(t *testing.T) {
nodeB.noGossip = true

addrInfoB := nodeB.host.addrInfo()
nodeA.host.h.Peerstore().AddAddrs(addrInfoB.ID, addrInfoB.Addrs, peerstore.PermanentAddrTTL)
nodeA.host.p2pHost.Peerstore().AddAddrs(addrInfoB.ID, addrInfoB.Addrs, peerstore.PermanentAddrTTL)
nodeA.host.cm.peerSetHandler.AddPeer(0, addrInfoB.ID)

time.Sleep(100 * time.Millisecond)
Expand Down
2 changes: 1 addition & 1 deletion dot/network/light_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func TestHandleLightMessage_Response(t *testing.T) {
}
require.NoError(t, err)

stream, err := s.host.h.NewStream(s.ctx, b.host.id(), s.host.protocolID+lightID)
stream, err := s.host.p2pHost.NewStream(s.ctx, b.host.id(), s.host.protocolID+lightID)
require.NoError(t, err)

// Testing empty request
Expand Down
4 changes: 2 additions & 2 deletions dot/network/mdns.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (m *mdns) start() {
// create and start service
mdns, err := libp2pdiscovery.NewMdnsService(
m.host.ctx,
m.host.h,
m.host.p2pHost,
MDNSPeriod,
string(m.host.protocolID),
)
Expand Down Expand Up @@ -89,7 +89,7 @@ func (n Notifee) HandlePeerFound(p peer.AddrInfo) {
"Peer %s found using mDNS discovery, with host %s",
p.ID, n.host.id())

n.host.h.Peerstore().AddAddrs(p.ID, p.Addrs, peerstore.PermanentAddrTTL)
n.host.p2pHost.Peerstore().AddAddrs(p.ID, p.Addrs, peerstore.PermanentAddrTTL)
// connect to found peer
n.host.cm.peerSetHandler.AddPeer(0, p.ID)
}
4 changes: 2 additions & 2 deletions dot/network/mdns_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,13 @@ func TestMDNS(t *testing.T) {

if peerCountA == 0 {
// check peerstore for disconnected peers
peerCountA := len(nodeA.host.h.Peerstore().Peers())
peerCountA := len(nodeA.host.p2pHost.Peerstore().Peers())
require.NotZero(t, peerCountA)
}

if peerCountB == 0 {
// check peerstore for disconnected peers
peerCountB := len(nodeB.host.h.Peerstore().Peers())
peerCountB := len(nodeB.host.p2pHost.Peerstore().Peers())
require.NotZero(t, peerCountB)
}
}
14 changes: 7 additions & 7 deletions dot/network/notifications_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func TestCreateNotificationsMessageHandler_BlockAnnounce(t *testing.T) {
}
require.NoError(t, err)

stream, err := s.host.h.NewStream(s.ctx, b.host.id(), s.host.protocolID+blockAnnounceID)
stream, err := s.host.p2pHost.NewStream(s.ctx, b.host.id(), s.host.protocolID+blockAnnounceID)
require.NoError(t, err)

// create info and handler
Expand Down Expand Up @@ -181,7 +181,7 @@ func TestCreateNotificationsMessageHandler_BlockAnnounceHandshake(t *testing.T)
}
require.NoError(t, err)

stream, err := s.host.h.NewStream(s.ctx, b.host.id(), s.host.protocolID+blockAnnounceID)
stream, err := s.host.p2pHost.NewStream(s.ctx, b.host.id(), s.host.protocolID+blockAnnounceID)
require.NoError(t, err)

// try invalid handshake
Expand Down Expand Up @@ -250,7 +250,7 @@ func Test_HandshakeTimeout(t *testing.T) {
info := newNotificationsProtocol(nodeA.host.protocolID+blockAnnounceID,
nodeA.getBlockAnnounceHandshake, testHandshakeDecoder, nodeA.validateBlockAnnounceHandshake)

nodeB.host.h.SetStreamHandler(info.protocolID, func(stream libp2pnetwork.Stream) {
nodeB.host.p2pHost.SetStreamHandler(info.protocolID, func(stream libp2pnetwork.Stream) {
// should not respond to a handshake message
})

Expand All @@ -267,7 +267,7 @@ func Test_HandshakeTimeout(t *testing.T) {
// clear handshake data from connection handler
time.Sleep(time.Millisecond * 100)
info.peersData.deleteOutboundHandshakeData(nodeB.host.id())
connAToB := nodeA.host.h.Network().ConnsToPeer(nodeB.host.id())
connAToB := nodeA.host.p2pHost.Network().ConnsToPeer(nodeB.host.id())
for _, stream := range connAToB[0].GetStreams() {
_ = stream.Close()
}
Expand All @@ -289,7 +289,7 @@ func Test_HandshakeTimeout(t *testing.T) {
require.Nil(t, data)

// a stream should be open until timeout
connAToB = nodeA.host.h.Network().ConnsToPeer(nodeB.host.id())
connAToB = nodeA.host.p2pHost.Network().ConnsToPeer(nodeB.host.id())
require.Len(t, connAToB, 1)
require.Len(t, connAToB[0].GetStreams(), 1)

Expand All @@ -301,7 +301,7 @@ func Test_HandshakeTimeout(t *testing.T) {
require.Nil(t, data)

// stream should be closed
connAToB = nodeA.host.h.Network().ConnsToPeer(nodeB.host.id())
connAToB = nodeA.host.p2pHost.Network().ConnsToPeer(nodeB.host.id())
require.Len(t, connAToB, 1)
require.Len(t, connAToB[0].GetStreams(), 0)
}
Expand Down Expand Up @@ -343,7 +343,7 @@ func TestCreateNotificationsMessageHandler_HandleTransaction(t *testing.T) {
require.NoError(t, err)

txnProtocolID := srvc1.host.protocolID + transactionsID
stream, err := srvc1.host.h.NewStream(srvc1.ctx, srvc2.host.id(), txnProtocolID)
stream, err := srvc1.host.p2pHost.NewStream(srvc1.ctx, srvc2.host.id(), txnProtocolID)
require.NoError(t, err)

// create info and handler
Expand Down
Loading