From 310402b9ed0a0baa3967b4d3db0ab4249401ae24 Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Fri, 25 Oct 2024 12:04:00 +0300 Subject: [PATCH 01/13] implementing PeerCount and DropPeer --- wakuv2/nwaku.go | 39 ++++++++++++++++----------------------- wakuv2/nwaku_test.go | 2 +- 2 files changed, 17 insertions(+), 24 deletions(-) diff --git a/wakuv2/nwaku.go b/wakuv2/nwaku.go index c86148dd5ce..445e1b7b884 100644 --- a/wakuv2/nwaku.go +++ b/wakuv2/nwaku.go @@ -1905,10 +1905,9 @@ func (w *Waku) ClearEnvelopesCache() { w.envelopeCache = newTTLCache() } -// TODO-nwaku func (w *Waku) PeerCount() int { - return 0 - // return w.node.PeerCount() + peerCount, _ := w.GetNumConnectedPeers() + return peerCount } // TODO-nwaku @@ -2176,10 +2175,20 @@ func (w *Waku) DialPeerByID(peerID peer.ID) error { return w.WakuDialPeerById(peerID, string(relay.WakuRelayID_v200), 1000) } -func (w *Waku) DropPeer(peerID peer.ID) error { - // TODO-nwaku - // return w.node.ClosePeerById(peerID) - return nil +func (self *Waku) DropPeer(peerId peer.ID) error { + var resp = C.allocResp() + var cPeerId = C.CString(peerId.String()) + defer C.freeResp(resp) + defer C.free(unsafe.Pointer(cPeerId)) + + C.cGoWakuDisconnectPeerById(self.wakuCtx, cPeerId, resp) + + if C.getRet(resp) == C.RET_OK { + return nil + } + errMsg := "error DisconnectPeerById: " + + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + return errors.New(errMsg) } func (w *Waku) ProcessingP2PMessages() bool { @@ -2842,22 +2851,6 @@ func (self *Waku) GetPeerIdsByProtocol(protocol string) (peer.IDSlice, error) { return nil, errors.New(errMsg) } -func (self *Waku) DisconnectPeerById(peerId peer.ID) error { - var resp = C.allocResp() - var cPeerId = C.CString(peerId.String()) - defer C.freeResp(resp) - defer C.free(unsafe.Pointer(cPeerId)) - - C.cGoWakuDisconnectPeerById(self.wakuCtx, cPeerId, resp) - - if C.getRet(resp) == C.RET_OK { - return nil - } - errMsg := "error DisconnectPeerById: " + - C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - return errors.New(errMsg) -} - // func main() { // config := WakuConfig{ diff --git a/wakuv2/nwaku_test.go b/wakuv2/nwaku_test.go index 417348e029c..48d11f7aa82 100644 --- a/wakuv2/nwaku_test.go +++ b/wakuv2/nwaku_test.go @@ -219,7 +219,7 @@ func TestBasicWakuV2(t *testing.T) { require.True(t, slices.Contains(connectedStoreNodes, storeNode.ID), "nwaku should be connected to the store node") // Disconnect from the store node - err = w.DisconnectPeerById(storeNode.ID) + err = w.DropPeer(storeNode.ID) require.NoError(t, err) // Check that we are indeed disconnected From 5c56806367601ca2a8e784c46b3b4abba43b1d42 Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Fri, 25 Oct 2024 13:33:57 +0300 Subject: [PATCH 02/13] initial dialPeer implementation --- wakuv2/nwaku.go | 39 ++++++++++++++++++---- wakuv2/nwaku_test.go | 78 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 111 insertions(+), 6 deletions(-) diff --git a/wakuv2/nwaku.go b/wakuv2/nwaku.go index 445e1b7b884..951af5db031 100644 --- a/wakuv2/nwaku.go +++ b/wakuv2/nwaku.go @@ -179,6 +179,20 @@ package wakuv2 resp) ); } + static void cGoWakuDialPeer(void* wakuCtx, + char* peerMultiAddr, + char* protocol, + int timeoutMs, + void* resp) { + + WAKU_CALL( waku_dial_peer(wakuCtx, + peerMultiAddr, + protocol, + timeoutMs, + (WakuCallBack) callback, + resp) ); + } + static void cGoWakuDialPeerById(void* wakuCtx, char* peerId, char* protocol, @@ -2163,12 +2177,7 @@ func (w *Waku) AddRelayPeer(address multiaddr.Multiaddr) (peer.ID, error) { } func (w *Waku) DialPeer(address multiaddr.Multiaddr) error { - // TODO-nwaku - /* - ctx, cancel := context.WithTimeout(w.ctx, requestTimeout) - defer cancel() - return w.node.DialPeerWithMultiAddress(ctx, address) */ - return nil + return w.WakuDialPeer(address, string(relay.WakuRelayID_v200), 1000) } func (w *Waku) DialPeerByID(peerID peer.ID) error { @@ -2677,6 +2686,24 @@ func (self *Waku) WakuConnect(peerMultiAddr string, timeoutMs int) error { return errors.New(errMsg) } +func (self *Waku) WakuDialPeer(peerMultiAddr multiaddr.Multiaddr, protocol string, timeoutMs int) error { + var resp = C.allocResp() + var cPeerMultiAddr = C.CString(peerMultiAddr.String()) + var cProtocol = C.CString(protocol) + defer C.freeResp(resp) + defer C.free(unsafe.Pointer(cPeerMultiAddr)) + defer C.free(unsafe.Pointer(cProtocol)) + + C.cGoWakuDialPeer(self.wakuCtx, cPeerMultiAddr, cProtocol, C.int(timeoutMs), resp) + + if C.getRet(resp) == C.RET_OK { + return nil + } + errMsg := "error DialPeer: " + + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + return errors.New(errMsg) +} + func (self *Waku) WakuDialPeerById(peerId peer.ID, protocol string, timeoutMs int) error { var resp = C.allocResp() var cPeerId = C.CString(peerId.String()) diff --git a/wakuv2/nwaku_test.go b/wakuv2/nwaku_test.go index 48d11f7aa82..85ff0a78683 100644 --- a/wakuv2/nwaku_test.go +++ b/wakuv2/nwaku_test.go @@ -545,6 +545,84 @@ func TestPeerExchange(t *testing.T) { require.NoError(t, discV5Node.Stop()) */ } +func TestDial(t *testing.T) { + logger, err := zap.NewDevelopment() + require.NoError(t, err) + + dialerNodeConfig := Config{ + UseThrottledPublish: true, + ClusterID: 16, + } + + // start node that will initiate the dial + dialerNodeWakuConfig := WakuConfig{ + EnableRelay: true, + LogLevel: "DEBUG", + Discv5Discovery: false, + ClusterID: 16, + Shards: []uint16{64}, + Discv5UdpPort: 9020, + TcpPort: 60020, + } + + dialerNode, err := New(nil, "", &dialerNodeConfig, &dialerNodeWakuConfig, logger.Named("dialerNode"), nil, nil, nil, nil) + require.NoError(t, err) + require.NoError(t, dialerNode.Start()) + + time.Sleep(1 * time.Second) + + receiverNodeConfig := Config{ + UseThrottledPublish: true, + ClusterID: 16, + } + + // start node that will receive the dial + receiverNodeWakuConfig := WakuConfig{ + EnableRelay: true, + LogLevel: "DEBUG", + Discv5Discovery: false, + ClusterID: 16, + Shards: []uint16{64}, + Discv5UdpPort: 9021, + TcpPort: 60021, + } + + receiverNode, err := New(nil, "", &receiverNodeConfig, &receiverNodeWakuConfig, logger.Named("receiverNode"), nil, nil, nil, nil) + require.NoError(t, err) + require.NoError(t, receiverNode.Start()) + + time.Sleep(1 * time.Second) + + receiverMultiaddr, err := receiverNode.ListenAddresses() + require.NoError(t, err) + require.NotNil(t, receiverMultiaddr) + + // Check that both nodes start with no connected peers + dialerPeerCount := dialerNode.PeerCount() + require.True(t, dialerPeerCount == 0, "Dialer node should have no connected peers") + + receiverPeerCount := receiverNode.PeerCount() + require.True(t, receiverPeerCount == 0, "Receiver node should have no connected peers") + + // Dial + err = dialerNode.DialPeer(receiverMultiaddr[0]) + require.NoError(t, err) + + time.Sleep(1 * time.Second) + + // Check that both nodes now have one connected peer + dialerPeerCount = dialerNode.PeerCount() + require.True(t, dialerPeerCount == 1, "Dialer node should have 1 peer") + + receiverPeerCount = receiverNode.PeerCount() + require.True(t, receiverPeerCount == 0, "Receiver node should have 1 peer") + + // Stop nodes + require.NoError(t, dialerNode.Stop()) + require.NoError(t, receiverNode.Stop()) + +} + /* func TestWakuV2Filter(t *testing.T) { From 5d330306d385dc6fa2d542ba75975b713cf46f1c Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Mon, 28 Oct 2024 12:21:56 +0200 Subject: [PATCH 03/13] changing function name to specify that only connected relay peers are returned --- wakuv2/nwaku.go | 15 +++++++-------- wakuv2/nwaku_test.go | 5 ++++- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/wakuv2/nwaku.go b/wakuv2/nwaku.go index 951af5db031..7b0ddc84832 100644 --- a/wakuv2/nwaku.go +++ b/wakuv2/nwaku.go @@ -234,7 +234,7 @@ package wakuv2 WAKU_CALL (waku_relay_get_num_peers_in_mesh(ctx, pubSubTopic, (WakuCallBack) callback, resp) ); } - static void cGoWakuGetNumConnectedPeers(void* ctx, char* pubSubTopic, void* resp) { + static void cGoWakuGetNumConnectedRelayPeers(void* ctx, char* pubSubTopic, void* resp) { WAKU_CALL (waku_relay_get_num_connected_peers(ctx, pubSubTopic, (WakuCallBack) callback, resp) ); } @@ -342,7 +342,6 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/legacy_store" - "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/protocol/store" storepb "github.com/waku-org/go-waku/waku/v2/protocol/store/pb" "github.com/waku-org/go-waku/waku/v2/utils" @@ -2177,11 +2176,11 @@ func (w *Waku) AddRelayPeer(address multiaddr.Multiaddr) (peer.ID, error) { } func (w *Waku) DialPeer(address multiaddr.Multiaddr) error { - return w.WakuDialPeer(address, string(relay.WakuRelayID_v200), 1000) + return w.WakuDialPeer(address, "", 1000) } func (w *Waku) DialPeerByID(peerID peer.ID) error { - return w.WakuDialPeerById(peerID, string(relay.WakuRelayID_v200), 1000) + return w.WakuDialPeerById(peerID, "", 1000) } func (self *Waku) DropPeer(peerId peer.ID) error { @@ -2789,7 +2788,7 @@ func (self *Waku) ListPeersInMesh(pubsubTopic string) (int, error) { return 0, errors.New(errMsg) } -func (self *Waku) GetNumConnectedPeers(paramPubsubTopic ...string) (int, error) { +func (self *Waku) GetNumConnectedRelayPeers(paramPubsubTopic ...string) (int, error) { var pubsubTopic string if len(paramPubsubTopic) == 0 { pubsubTopic = "" @@ -2802,18 +2801,18 @@ func (self *Waku) GetNumConnectedPeers(paramPubsubTopic ...string) (int, error) defer C.freeResp(resp) defer C.free(unsafe.Pointer(cPubsubTopic)) - C.cGoWakuGetNumConnectedPeers(self.wakuCtx, cPubsubTopic, resp) + C.cGoWakuGetNumConnectedRelayPeers(self.wakuCtx, cPubsubTopic, resp) if C.getRet(resp) == C.RET_OK { numPeersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) numPeers, err := strconv.Atoi(numPeersStr) if err != nil { - errMsg := "GetNumConnectedPeers - error converting string to int: " + err.Error() + errMsg := "GetNumConnectedRelayPeers - error converting string to int: " + err.Error() return 0, errors.New(errMsg) } return numPeers, nil } - errMsg := "error GetNumConnectedPeers: " + + errMsg := "error GetNumConnectedRelayPeers: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) return 0, errors.New(errMsg) } diff --git a/wakuv2/nwaku_test.go b/wakuv2/nwaku_test.go index 85ff0a78683..266720a6d6d 100644 --- a/wakuv2/nwaku_test.go +++ b/wakuv2/nwaku_test.go @@ -6,6 +6,7 @@ package wakuv2 import ( "context" "errors" + "fmt" "slices" "testing" "time" @@ -196,7 +197,7 @@ func TestBasicWakuV2(t *testing.T) { // Sanity check, not great, but it's probably helpful err = tt.RetryWithBackOff(func() error { - numConnected, err := w.GetNumConnectedPeers() + numConnected, err := w.GetNumConnectedRelayPeers() if err != nil { return err } @@ -612,6 +613,8 @@ func TestDial(t *testing.T) { // Check that both nodes now have one connected peer dialerPeerCount = dialerNode.PeerCount() + peerIds, _ := dialerNode.GetPeerIdsFromPeerStore() + fmt.Println("------------- dialerNode.GetPeerIdsFromPeerStore(): ", peerIds) require.True(t, dialerPeerCount == 1, "Dialer node should have 1 peer") receiverPeerCount = receiverNode.PeerCount() From 2beb1f46c79a42394a7ba0cfdcfd8c87517c51e3 Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Mon, 28 Oct 2024 13:43:05 +0200 Subject: [PATCH 04/13] fix flaky test --- wakuv2/nwaku_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/wakuv2/nwaku_test.go b/wakuv2/nwaku_test.go index 266720a6d6d..af8428eac6e 100644 --- a/wakuv2/nwaku_test.go +++ b/wakuv2/nwaku_test.go @@ -233,6 +233,8 @@ func TestBasicWakuV2(t *testing.T) { err = w.DialPeerByID(storeNode.ID) require.NoError(t, err) + time.Sleep(1 * time.Second) + // Check that we are connected again connectedStoreNodes, err = w.GetPeerIdsByProtocol(string(store.StoreQueryID_v300)) require.NoError(t, err) From 3982b8df5c63130cb03f73c0768ded0ef2f02de4 Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Mon, 28 Oct 2024 17:49:40 +0200 Subject: [PATCH 05/13] implementing feedback --- wakuv2/nwaku.go | 14 +++++++++----- wakuv2/nwaku_test.go | 17 +++++++++-------- 2 files changed, 18 insertions(+), 13 deletions(-) diff --git a/wakuv2/nwaku.go b/wakuv2/nwaku.go index 7b0ddc84832..796a943c40e 100644 --- a/wakuv2/nwaku.go +++ b/wakuv2/nwaku.go @@ -1918,9 +1918,13 @@ func (w *Waku) ClearEnvelopesCache() { w.envelopeCache = newTTLCache() } -func (w *Waku) PeerCount() int { - peerCount, _ := w.GetNumConnectedPeers() - return peerCount +func (w *Waku) PeerCount() (int, error) { + peerCount, err := w.GetNumConnectedPeers() + if err != nil { + return 0, err + } + + return peerCount, nil } // TODO-nwaku @@ -2176,11 +2180,11 @@ func (w *Waku) AddRelayPeer(address multiaddr.Multiaddr) (peer.ID, error) { } func (w *Waku) DialPeer(address multiaddr.Multiaddr) error { - return w.WakuDialPeer(address, "", 1000) + return w.WakuDialPeer(address, "", int(requestTimeout/time.Millisecond)) } func (w *Waku) DialPeerByID(peerID peer.ID) error { - return w.WakuDialPeerById(peerID, "", 1000) + return w.WakuDialPeerById(peerID, "", int(requestTimeout/time.Millisecond)) } func (self *Waku) DropPeer(peerId peer.ID) error { diff --git a/wakuv2/nwaku_test.go b/wakuv2/nwaku_test.go index af8428eac6e..8c2d1952ff1 100644 --- a/wakuv2/nwaku_test.go +++ b/wakuv2/nwaku_test.go @@ -6,7 +6,6 @@ package wakuv2 import ( "context" "errors" - "fmt" "slices" "testing" "time" @@ -601,10 +600,12 @@ func TestDial(t *testing.T) { require.NotNil(t, receiverMultiaddr) // Check that both nodes start with no connected peers - dialerPeerCount := dialerNode.PeerCount() + dialerPeerCount, err := dialerNode.PeerCount() + require.NoError(t, err) require.True(t, dialerPeerCount == 0, "Dialer node should have no connected peers") - receiverPeerCount := receiverNode.PeerCount() + receiverPeerCount, err := receiverNode.PeerCount() + require.NoError(t, err) require.True(t, receiverPeerCount == 0, "Receiver node should have no connected peers") // Dial @@ -614,13 +615,13 @@ func TestDial(t *testing.T) { time.Sleep(1 * time.Second) // Check that both nodes now have one connected peer - dialerPeerCount = dialerNode.PeerCount() - peerIds, _ := dialerNode.GetPeerIdsFromPeerStore() - fmt.Println("------------- dialerNode.GetPeerIdsFromPeerStore(): ", peerIds) + dialerPeerCount, err = dialerNode.PeerCount() + require.NoError(t, err) require.True(t, dialerPeerCount == 1, "Dialer node should have 1 peer") - receiverPeerCount = receiverNode.PeerCount() - require.True(t, receiverPeerCount == 0, "Receiver node should have 1 peer") + receiverPeerCount, err = receiverNode.PeerCount() + require.NoError(t, err) + require.True(t, receiverPeerCount == 1, "Receiver node should have 1 peer") // Stop nodes require.NoError(t, dialerNode.Stop()) From c1924c65f23e7ca999ee154a5f9c0b3b05d197ae Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Wed, 30 Oct 2024 14:14:23 +0200 Subject: [PATCH 06/13] requiring protocol for dial --- wakuv2/nwaku.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/wakuv2/nwaku.go b/wakuv2/nwaku.go index 796a943c40e..58f04bb4b97 100644 --- a/wakuv2/nwaku.go +++ b/wakuv2/nwaku.go @@ -2179,12 +2179,12 @@ func (w *Waku) AddRelayPeer(address multiaddr.Multiaddr) (peer.ID, error) { return "", nil } -func (w *Waku) DialPeer(address multiaddr.Multiaddr) error { - return w.WakuDialPeer(address, "", int(requestTimeout/time.Millisecond)) +func (w *Waku) DialPeer(address multiaddr.Multiaddr, protocol string) error { + return w.WakuDialPeer(address, protocol, int(requestTimeout/time.Millisecond)) } -func (w *Waku) DialPeerByID(peerID peer.ID) error { - return w.WakuDialPeerById(peerID, "", int(requestTimeout/time.Millisecond)) +func (w *Waku) DialPeerByID(peerID peer.ID, protocol string) error { + return w.WakuDialPeerById(peerID, protocol, int(requestTimeout/time.Millisecond)) } func (self *Waku) DropPeer(peerId peer.ID) error { From 1a77ee1c67b09d7b4ec6619a5cd7d3624db6f357 Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Wed, 30 Oct 2024 14:31:46 +0200 Subject: [PATCH 07/13] fixing test and adding missing procs --- wakuv2/nwaku.go | 40 ++++++++++++++++++++++++++++++++++++++++ wakuv2/nwaku_test.go | 5 +++-- 2 files changed, 43 insertions(+), 2 deletions(-) diff --git a/wakuv2/nwaku.go b/wakuv2/nwaku.go index 58f04bb4b97..da92e1ea770 100644 --- a/wakuv2/nwaku.go +++ b/wakuv2/nwaku.go @@ -238,6 +238,10 @@ package wakuv2 WAKU_CALL (waku_relay_get_num_connected_peers(ctx, pubSubTopic, (WakuCallBack) callback, resp) ); } + static void cGoWakuGetConnectedPeers(void* wakuCtx, void* resp) { + WAKU_CALL (waku_get_connected_peers(wakuCtx, (WakuCallBack) callback, resp) ); + } + static void cGoWakuGetPeerIdsFromPeerStore(void* wakuCtx, void* resp) { WAKU_CALL (waku_get_peerids_from_peerstore(wakuCtx, (WakuCallBack) callback, resp) ); } @@ -2821,6 +2825,42 @@ func (self *Waku) GetNumConnectedRelayPeers(paramPubsubTopic ...string) (int, er return 0, errors.New(errMsg) } +func (self *Waku) GetConnectedPeers() (peer.IDSlice, error) { + var resp = C.allocResp() + defer C.freeResp(resp) + C.cGoWakuGetConnectedPeers(self.wakuCtx, resp) + + if C.getRet(resp) == C.RET_OK { + peersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + if peersStr == "" { + return peer.IDSlice{}, nil + } + // peersStr contains a comma-separated list of peer ids + itemsPeerIds := strings.Split(peersStr, ",") + + var peers peer.IDSlice + for _, peerId := range itemsPeerIds { + id, err := peer.Decode(peerId) + if err != nil { + return nil, fmt.Errorf("GetConnectedPeers - decoding peerId: %w", err) + } + peers = append(peers, id) + } + + return peers, nil + } + errMsg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + return nil, fmt.Errorf("GetConnectedPeers: %s", errMsg) +} + +func (self *Waku) GetNumConnectedPeers() (int, error) { + connecterPeers, err := self.GetConnectedPeers() + if err != nil { + return 0, err + } + return len(connecterPeers), nil +} + func (self *Waku) GetPeerIdsFromPeerStore() (peer.IDSlice, error) { var resp = C.allocResp() defer C.freeResp(resp) diff --git a/wakuv2/nwaku_test.go b/wakuv2/nwaku_test.go index 8c2d1952ff1..f1853ed8063 100644 --- a/wakuv2/nwaku_test.go +++ b/wakuv2/nwaku_test.go @@ -12,6 +12,7 @@ import ( "github.com/cenkalti/backoff/v3" "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/p2p/protocol/ping" "github.com/waku-org/go-waku/waku/v2/protocol/store" "go.uber.org/zap" @@ -229,7 +230,7 @@ func TestBasicWakuV2(t *testing.T) { require.True(t, isDisconnected, "nwaku should be disconnected from the store node") // Re-connect - err = w.DialPeerByID(storeNode.ID) + err = w.DialPeerByID(storeNode.ID, ping.ID) require.NoError(t, err) time.Sleep(1 * time.Second) @@ -609,7 +610,7 @@ func TestDial(t *testing.T) { require.True(t, receiverPeerCount == 0, "Receiver node should have no connected peers") // Dial - err = dialerNode.DialPeer(receiverMultiaddr[0]) + err = dialerNode.DialPeer(receiverMultiaddr[0], ping.ID) require.NoError(t, err) time.Sleep(1 * time.Second) From 8fa01802a893a8cdbe0bdded567438d2b7e38a0f Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Wed, 30 Oct 2024 14:41:33 +0200 Subject: [PATCH 08/13] adding tests for connect --- wakuv2/nwaku.go | 4 +++ wakuv2/nwaku_test.go | 82 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 86 insertions(+) diff --git a/wakuv2/nwaku.go b/wakuv2/nwaku.go index da92e1ea770..941136b001b 100644 --- a/wakuv2/nwaku.go +++ b/wakuv2/nwaku.go @@ -2187,6 +2187,10 @@ func (w *Waku) DialPeer(address multiaddr.Multiaddr, protocol string) error { return w.WakuDialPeer(address, protocol, int(requestTimeout/time.Millisecond)) } +func (w *Waku) ConnectPeer(address multiaddr.Multiaddr) error { + return w.WakuConnect(address.String(), int(requestTimeout/time.Millisecond)) +} + func (w *Waku) DialPeerByID(peerID peer.ID, protocol string) error { return w.WakuDialPeerById(peerID, protocol, int(requestTimeout/time.Millisecond)) } diff --git a/wakuv2/nwaku_test.go b/wakuv2/nwaku_test.go index f1853ed8063..5feacfebd4c 100644 --- a/wakuv2/nwaku_test.go +++ b/wakuv2/nwaku_test.go @@ -630,6 +630,88 @@ func TestDial(t *testing.T) { } +func TestConnect(t *testing.T) { + logger, err := zap.NewDevelopment() + require.NoError(t, err) + + connectingNodeConfig := Config{ + UseThrottledPublish: true, + ClusterID: 16, + } + + // start node that will initiate the connection + connectingNodeWakuConfig := WakuConfig{ + EnableRelay: true, + LogLevel: "DEBUG", + Discv5Discovery: false, + ClusterID: 16, + Shards: []uint16{64}, + Discv5UdpPort: 9030, + TcpPort: 60030, + } + + connectingNode, err := New(nil, "", &connectingNodeConfig, &connectingNodeWakuConfig, logger.Named("connectingNode"), nil, nil, nil, nil) + require.NoError(t, err) + require.NoError(t, connectingNode.Start()) + + time.Sleep(1 * time.Second) + + receiverNodeConfig := Config{ + UseThrottledPublish: true, + ClusterID: 16, + } + + // start node that will receive the connection + receiverNodeWakuConfig := WakuConfig{ + EnableRelay: true, + LogLevel: "DEBUG", + Discv5Discovery: false, + ClusterID: 16, + Shards: []uint16{64}, + Discv5UdpPort: 9031, + TcpPort: 60031, + } + + receiverNode, err := New(nil, "", &receiverNodeConfig, &receiverNodeWakuConfig, logger.Named("receiverNode"), nil, nil, nil, nil) + require.NoError(t, err) + require.NoError(t, receiverNode.Start()) + + time.Sleep(1 * time.Second) + + receiverMultiaddr, err := receiverNode.ListenAddresses() + require.NoError(t, err) + require.NotNil(t, receiverMultiaddr) + + // Check that both nodes start with no connected peers + connectingPeerCount, err := connectingNode.PeerCount() + require.NoError(t, err) + require.True(t, connectingPeerCount == 0, "Connecting node should have no connected peers") + + receiverPeerCount, err := receiverNode.PeerCount() + require.NoError(t, err) + require.True(t, receiverPeerCount == 0, "Receiver node should have no connected peers") + + // Connect + err = connectingNode.ConnectPeer(receiverMultiaddr[0]) + require.NoError(t, err) + + time.Sleep(1 * time.Second) + + // Check that both nodes now have one connected peer + connectingPeerCount, err = connectingNode.PeerCount() + require.NoError(t, err) + require.True(t, connectingPeerCount == 1, "Connecting node should have 1 peer") + + receiverPeerCount, err = receiverNode.PeerCount() + require.NoError(t, err) + require.True(t, receiverPeerCount == 1, "Receiver node should have 1 peer") + + // Stop nodes + require.NoError(t, connectingNode.Stop()) + require.NoError(t, receiverNode.Stop()) + +} + /* func TestWakuV2Filter(t *testing.T) { From 41c1c38fac08993b3261e75d5b7347af622a7489 Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Wed, 30 Oct 2024 16:28:12 +0200 Subject: [PATCH 09/13] updating nwaku version --- third_party/nwaku | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/third_party/nwaku b/third_party/nwaku index 3665991a655..507b1fc4d97 160000 --- a/third_party/nwaku +++ b/third_party/nwaku @@ -1 +1 @@ -Subproject commit 3665991a655495a4e47c92596e7d9e156f4ed693 +Subproject commit 507b1fc4d97a01ee5695a205f7f981bd4accc694 From 2fa127c2d89af5c807e783bdc93e5c1075cafbd7 Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Wed, 30 Oct 2024 17:26:40 +0200 Subject: [PATCH 10/13] don't require protocol in dialpeer for status-go --- wakuv2/nwaku.go | 6 +----- wakuv2/nwaku_test.go | 7 +++---- 2 files changed, 4 insertions(+), 9 deletions(-) diff --git a/wakuv2/nwaku.go b/wakuv2/nwaku.go index 941136b001b..c616d84caa3 100644 --- a/wakuv2/nwaku.go +++ b/wakuv2/nwaku.go @@ -2183,11 +2183,7 @@ func (w *Waku) AddRelayPeer(address multiaddr.Multiaddr) (peer.ID, error) { return "", nil } -func (w *Waku) DialPeer(address multiaddr.Multiaddr, protocol string) error { - return w.WakuDialPeer(address, protocol, int(requestTimeout/time.Millisecond)) -} - -func (w *Waku) ConnectPeer(address multiaddr.Multiaddr) error { +func (w *Waku) DialPeer(address multiaddr.Multiaddr) error { return w.WakuConnect(address.String(), int(requestTimeout/time.Millisecond)) } diff --git a/wakuv2/nwaku_test.go b/wakuv2/nwaku_test.go index 5feacfebd4c..41f3407ccdf 100644 --- a/wakuv2/nwaku_test.go +++ b/wakuv2/nwaku_test.go @@ -12,7 +12,6 @@ import ( "github.com/cenkalti/backoff/v3" "github.com/libp2p/go-libp2p/core/peer" - "github.com/libp2p/go-libp2p/p2p/protocol/ping" "github.com/waku-org/go-waku/waku/v2/protocol/store" "go.uber.org/zap" @@ -230,7 +229,7 @@ func TestBasicWakuV2(t *testing.T) { require.True(t, isDisconnected, "nwaku should be disconnected from the store node") // Re-connect - err = w.DialPeerByID(storeNode.ID, ping.ID) + err = w.DialPeer(storeNode.Addrs[0]) require.NoError(t, err) time.Sleep(1 * time.Second) @@ -610,7 +609,7 @@ func TestDial(t *testing.T) { require.True(t, receiverPeerCount == 0, "Receiver node should have no connected peers") // Dial - err = dialerNode.DialPeer(receiverMultiaddr[0], ping.ID) + err = dialerNode.DialPeer(receiverMultiaddr[0]) require.NoError(t, err) time.Sleep(1 * time.Second) @@ -692,7 +691,7 @@ func TestConnect(t *testing.T) { require.True(t, receiverPeerCount == 0, "Receiver node should have no connected peers") // Connect - err = connectingNode.ConnectPeer(receiverMultiaddr[0]) + err = connectingNode.DialPeer(receiverMultiaddr[0]) require.NoError(t, err) time.Sleep(1 * time.Second) From e859f42026ce1662cdb296b58bf198b78dad4136 Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Wed, 30 Oct 2024 17:27:46 +0200 Subject: [PATCH 11/13] remove unnecessary test --- wakuv2/nwaku_test.go | 82 -------------------------------------------- 1 file changed, 82 deletions(-) diff --git a/wakuv2/nwaku_test.go b/wakuv2/nwaku_test.go index 41f3407ccdf..75e6a0c2b38 100644 --- a/wakuv2/nwaku_test.go +++ b/wakuv2/nwaku_test.go @@ -629,88 +629,6 @@ func TestDial(t *testing.T) { } -func TestConnect(t *testing.T) { - logger, err := zap.NewDevelopment() - require.NoError(t, err) - - connectingNodeConfig := Config{ - UseThrottledPublish: true, - ClusterID: 16, - } - - // start node that will initiate the connection - connectingNodeWakuConfig := WakuConfig{ - EnableRelay: true, - LogLevel: "DEBUG", - Discv5Discovery: false, - ClusterID: 16, - Shards: []uint16{64}, - Discv5UdpPort: 9030, - TcpPort: 60030, - } - - connectingNode, err := New(nil, "", &connectingNodeConfig, &connectingNodeWakuConfig, logger.Named("connectingNode"), nil, nil, nil, nil) - require.NoError(t, err) - require.NoError(t, connectingNode.Start()) - - time.Sleep(1 * time.Second) - - receiverNodeConfig := Config{ - UseThrottledPublish: true, - ClusterID: 16, - } - - // start node that will receive the connection - receiverNodeWakuConfig := WakuConfig{ - EnableRelay: true, - LogLevel: "DEBUG", - Discv5Discovery: false, - ClusterID: 16, - Shards: []uint16{64}, - Discv5UdpPort: 9031, - TcpPort: 60031, - } - - receiverNode, err := New(nil, "", &receiverNodeConfig, &receiverNodeWakuConfig, logger.Named("receiverNode"), nil, nil, nil, nil) - require.NoError(t, err) - require.NoError(t, receiverNode.Start()) - - time.Sleep(1 * time.Second) - - receiverMultiaddr, err := receiverNode.ListenAddresses() - require.NoError(t, err) - require.NotNil(t, receiverMultiaddr) - - // Check that both nodes start with no connected peers - connectingPeerCount, err := connectingNode.PeerCount() - require.NoError(t, err) - require.True(t, connectingPeerCount == 0, "Connecting node should have no connected peers") - - receiverPeerCount, err := receiverNode.PeerCount() - require.NoError(t, err) - require.True(t, receiverPeerCount == 0, "Receiver node should have no connected peers") - - // Connect - err = connectingNode.DialPeer(receiverMultiaddr[0]) - require.NoError(t, err) - - time.Sleep(1 * time.Second) - - // Check that both nodes now have one connected peer - connectingPeerCount, err = connectingNode.PeerCount() - require.NoError(t, err) - require.True(t, connectingPeerCount == 1, "Connecting node should have 1 peer") - - receiverPeerCount, err = receiverNode.PeerCount() - require.NoError(t, err) - require.True(t, receiverPeerCount == 1, "Receiver node should have 1 peer") - - // Stop nodes - require.NoError(t, connectingNode.Stop()) - require.NoError(t, receiverNode.Stop()) - -} - /* func TestWakuV2Filter(t *testing.T) { From c75a3d5e3915c6b1e330375a51700ab31d669f33 Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Wed, 30 Oct 2024 17:30:59 +0200 Subject: [PATCH 12/13] comment and deleting unnecessary function --- wakuv2/nwaku.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/wakuv2/nwaku.go b/wakuv2/nwaku.go index c616d84caa3..ce078c1e86f 100644 --- a/wakuv2/nwaku.go +++ b/wakuv2/nwaku.go @@ -2184,13 +2184,10 @@ func (w *Waku) AddRelayPeer(address multiaddr.Multiaddr) (peer.ID, error) { } func (w *Waku) DialPeer(address multiaddr.Multiaddr) error { + // Using WakuConnect so it matches the go-waku's behavior and terminology return w.WakuConnect(address.String(), int(requestTimeout/time.Millisecond)) } -func (w *Waku) DialPeerByID(peerID peer.ID, protocol string) error { - return w.WakuDialPeerById(peerID, protocol, int(requestTimeout/time.Millisecond)) -} - func (self *Waku) DropPeer(peerId peer.ID) error { var resp = C.allocResp() var cPeerId = C.CString(peerId.String()) From 101cc2b080e586a7b6d36b4127d7e5a324c7cca8 Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Wed, 30 Oct 2024 17:43:07 +0200 Subject: [PATCH 13/13] fixing test --- wakuv2/nwaku_test.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/wakuv2/nwaku_test.go b/wakuv2/nwaku_test.go index 75e6a0c2b38..0a58054ecb6 100644 --- a/wakuv2/nwaku_test.go +++ b/wakuv2/nwaku_test.go @@ -12,6 +12,7 @@ import ( "github.com/cenkalti/backoff/v3" "github.com/libp2p/go-libp2p/core/peer" + "github.com/multiformats/go-multiaddr" "github.com/waku-org/go-waku/waku/v2/protocol/store" "go.uber.org/zap" @@ -228,8 +229,11 @@ func TestBasicWakuV2(t *testing.T) { isDisconnected := !slices.Contains(connectedStoreNodes, storeNode.ID) require.True(t, isDisconnected, "nwaku should be disconnected from the store node") + storeNodeMultiadd, err := multiaddr.NewMultiaddr(storeNodeInfo.ListenAddresses[0]) + require.NoError(t, err) + // Re-connect - err = w.DialPeer(storeNode.Addrs[0]) + err = w.DialPeer(storeNodeMultiadd) require.NoError(t, err) time.Sleep(1 * time.Second)